精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案

發布于 2025-10-24 17:05
瀏覽
0收藏

作者 | 崔皓

審校 | 重樓

整體思路

在數據庫運維場景中,“慢查詢報警” 往往是最讓工程師頭疼的問題之一 —— 就像我的運維學員所遭遇的:頻繁接到告警通知,卻始終難以快速定位問題根源是系統資源瓶頸、數據庫配置缺陷,還是應用層 SQL 寫法不規范。

為徹底解決這一 “定位難、分析繁” 的痛點,本文將通過 Fluent-Bit + Fluentd + DeepSeek 的實戰方案,構建一套 “從慢查詢信息采集到智能分析” 的全流程自動化體系。這套方案的核心價值在于:用工具鏈解決 “日志采集與處理” 的重復性工作,用大模型替代 “人工排查與分析” 的經驗依賴,讓 MySQL 慢查詢問題從 “被動響應” 轉變為 “主動診斷 + 精準解決”,大幅降低運維成本,提升數據庫性能穩定性。

我們將該項目實戰的整體思路整理為下圖所示:

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

具體思路可拆解為三大核心環節:

日志采集:當業務應用向 MySQL 數據庫發起查詢請求時,若查詢執行時間超出預設閾值(即觸發慢查詢條件),系統將先通過輕量級日志采集工具 Fluent-Bit,實時捕獲慢查詢的完整日志信息(含原始 SQL 語句、執行時間戳等基礎數據),再將采集到的日志穩定傳遞至 Fluentd 進行后續處理,確保慢查詢數據不丟失、不延遲。

提取信息:Fluentd 作為日志處理中樞,將對原始慢查詢日志進行格式化解析:通過正則匹配與字段提取邏輯,從非結構化日志中拆解出 “查詢耗時”“鎖定時間”“返回行數”“掃描行數” 等核心性能指標,同時識別 SQL 語句中涉及的數據庫表名,完成慢查詢信息從 “雜亂文本” 到 “結構化數據” 的轉化,為后續分析奠定基礎。

智能分析:為避免僅依賴日志數據導致的分析片面性,方案將進一步關聯 MySQL 數據庫的元數據信息 —— 通過數據庫連接工具獲取目標表的 “表結構定義”“索引配置”“表數據量” 等關鍵元數據,再將 “結構化慢查詢指標 + 表元數據 + 原始 SQL” 三部分信息整合為統一的分析素材,傳遞至 DeepSeek 大模型。大模型將基于數據庫性能優化知識,自動診斷慢查詢根源(如 “缺少關鍵索引導致全表掃描”“SQL 關聯邏輯冗余” 等),并生成包含具體優化建議(索引創建語句、SQL 改寫方案等)的結構化報告,最終幫助運維工程師快速定位問題類型(系統/數據庫/應用層),并直接落地優化操作。

實戰步驟

說完了整體思路,就到了項目實戰的環節,我們通過一張圖讓大家快速了解要經歷的實戰步驟,如下圖所示。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

  • 安裝 MySQL 數據庫,為后續的查詢及慢查詢日志提供準備。
  • 配置慢查詢日志文件與觸發條件。指定慢查詢日志的存儲文件,同時設定觸發慢查詢的閾值(比如執行時間超過多少秒的查詢會被記錄為慢查詢)。
  • 配置 Fluent - Bit 和 FluentD。Fluent - Bit 負責從 MySQL 端采集慢查詢日志,FluentD 則對采集到的日志進行過濾、解析等處理操作,它們共同構成了日志采集與初步處理的管道。
  • 安裝 Fluent - Bit 與 FluentD,確保這兩個工具能在環境中正常運行,為后續的日志采集和處理提供工具支持。
  • 生成日志分析程序(process_slow_log.py)。該程序會基于 FluentD 處理后的結構化日志,結合數據庫的表結構、索引等元數據,進一步深入分析慢查詢的原因,為后續生成報告提供分析依據。
  • 生成日志查看應用(show_slow_report.py)。這個應用主要用于將日志分析程序處理后的結果,以更直觀、易讀的形式展示出來,方便用戶查看慢查詢的分析報告。
  • 測試功能環節。通過模擬或實際的慢查詢場景,驗證從 MySQL 慢查詢日志的產生、采集、處理,到分析程序運行、報告展示這一整個流程是否正常工作。

數據庫安裝與配置

介紹完畢項目的執行步驟之后,我們就開始動手實現了,首先在 MySQL 的官網下載安裝文件,地址如下:

??https://dev.mysql.com/downloads/??

由于我所用的操作系統是 Windows ,所以點擊下圖“MySQL Installer for Windows”下載 MySQL,大家請根據自身系統情況下載對應的版本。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

接著選擇“5.7.44”的 MySQL 版本。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

下載完成后,雙擊mysql-installer-community-5.7.44.0.msi 文件進行安裝。

如下圖所示,在安裝時會提示更新安裝程序,選擇“No”。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

如下圖所示,選擇“Custom”可以自定義安裝目錄,否則默認安裝到C盤。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

接著選擇安裝“MySQL Server 5.7.44 - X64”版本,選中后點擊“Next”。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

這里可以選擇安裝目錄,默認是C盤,也可以修改安裝目錄。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

選擇“Execute”執行安裝。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

勾選Show Advanced and Logging Options,其他配置按默認不變。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

填寫root用戶的密碼,為了方便我直接使用“root”作為密碼,大家不用效仿,自行設置密碼即可。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

后面就是一頓 Next 安裝,不過到了下面這個界面的時候請注意。

如下圖所示,修改Slow Query Log下的FilePath和Seconds。

  • FilePath修改為mysql-slow.log(慢查詢日志文件);
  • Seconds修改為3(sql執行超過3秒記錄到慢查詢日志中),當然也可以改成希望的時間,這個時間后面還可以通過配置文件修改。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

接著,點擊“Execute”執行安裝。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

最后,點擊“Finish”完成安裝。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

安裝完成后打開MySQL 5.7 Command Line Client,按照下圖操作在“Windows”中搜索“mysql”,打開 MySQL 的客戶端。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

在打開的客戶端中,輸入在安裝時就輸入過的密碼, 我這里輸入“root”密碼。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

如果看到如下圖所示的內容,說明安裝成功。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

設置慢查詢日志權限

這里需要注意的是,我們在安裝時定義過了慢查詢日志文件:mysql-slow.log,如果沒有更改安裝目錄會在c:\ProgramData\MySQL\MySQL Server 5.7\Data目錄下,在后續的使用中發現需要給mysql-slow.log文件賦予權限,否則 Fluent-bit 解析日志時會報錯。所以,在安裝完 MySQL 之后最后手動刪除這個文件,再重新啟動 MySQL 的服務,該服務會自動生成mysql-slow.log,并賦予它正確的權限。

當然如果沒有權限問題的小伙伴可以跳過這個步驟。

這里如下圖所示,在“Windows”中搜索“服務”,并打開。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

在服務列表中選擇“MySQL57”的服務,并且點擊“重啟此服務”。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

慢查詢配置

至此,MySQL 的安裝就完成了,接著再啰嗦一下慢查詢日志和閾值的配置。假設 MySQL 安裝在 c:\ProgramData\MySQL\MySQL Server 5.7(默認會在C盤)目錄下,如下圖所示我們可以打開該目錄下的 my.ini 文件。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

并且,編輯文件內容如下:

# 啟用慢查詢的配置一定要放在[mysqld]下面
# 默認配置文件中如果有[mysqld]這里不要重復加[mysqld]
[mysqld]
# 啟用慢查詢日志
slow-query-log=1

# 慢查詢日志名稱,生成的文件路徑 c:\ProgramData\MySQL\MySQL Server 5.7\Data,如果
slow_query_log_file="mysql-slow.log"

# 定義慢查詢時間(秒)
long_query_time=2

該文件可以啟用慢查詢,保存配置后需要重啟MySQL服務。

  • slow-query-log=1

當設置為 1 時,MySQL 會將執行時間超過指定閾值的查詢操作記錄到慢查詢日志文件中;如果設置為 0,則關閉慢查詢日志功能。

  • slow_query_log_file="mysql-slow.log"

設置的日志文件名為 mysql-slow.log,MySQL 會將慢查詢相關的日志內容寫入到這個文件中。

  • long_query_time=2

定義了慢查詢的時間閾值,單位是秒。當一條 SQL 查詢語句的執行時間超過這個值(為了方便演示,我設置的是 2 秒)時,這條查詢就會被認定為慢查詢,進而被記錄到上述指定的慢查詢日志文件中。也就是說,只有執行時間超過 2 秒的查詢才會被 MySQL 記錄到慢查詢日志里,方便 DBA(數據庫管理員)后續分析和優化這些執行效率低下的查詢。

配置 fluent-bit

完成 MySQL 安裝以及慢查詢日志配置之后,就需要配置 Fluent-bit 采集日志信息。個人建議在本地創建目錄用來存放 Fluent-bit 的配置文件,這個文件用來解析慢日志的內容。由于后期我們會使用 Docker 安裝 Fluent-bit,所以也需要將這個目錄下的配置文件與 Docker 中安裝的 Fluent-bit 文件進行掛載,這樣方便修改配置信息。

我在本地“d:\docker\EFK” 下面分別建立 fluent-bit 和 fluentd 兩個目錄,大家可以根據具體情況創建相似的目錄。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

配置日志解析

通過前面內容的介紹可以得知,fluent-bit 的主要任務是采集并解析慢日志的內容,采集屬于基本功能,而解析需要按照 MySQL 慢日志的格式對文本進行處理。先通過如下慢日志文本觀察其結構。

C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe, Version: 5.7.44-log (MySQL Community Server (GPL)). started with:
TCP Port: 3306, Named Pipe: (null)
Time                 Id Command    Argument
# Time: 2025-09-30T10:35:46.687011Z
# User@Host: root[root] @ localhost [::1]  Id:     2
# Query_time: 4.063656  Lock_time: 0.017691 Rows_sent: 100694  Rows_examined: 1814434
use after_sale;
SET timestamp=1759228546;
SELECT 
u.UserId, u.Name, u.Email,
i.IssueId, i.Description, i.Status,
cs.CustomerServiceId, cs.Name,
s.SolutionId, s.SolutionDescription,
si.ActionTime
FROM SupportIssue si
JOIN Issue i ON si.IssueId = i.IssueId
JOIN User u ON i.UserId = u.UserId
JOIN CustomerService cs ON si.CustomerServiceId = cs.CustomerServiceId
JOIN Solution s ON si.SolutionId = s.SolutionId
WHERE s.IsValid = 1
AND i.Status = 'unsolve'
ORDER BY si.ActionTime DESC;

雖然,日志的內容比較多,可以從如下幾個方面進行分析,慢查詢日志由 5 部分組成,呈現 “起始行 + 元數據行 + SQL 語句行” 的多行結構:

1. 起始行:# Time: 2025-09-30T10:35:46.687011Z以# Time: 開頭,包含精確到微秒的時間戳,是每條慢查詢日志的唯一起始標識。

2. 用戶與連接信息行:# User@Host: root[root] @ localhost [::1] Id: 2記錄執行查詢的用戶、主機和連接 ID,是日志的元數據部分。

3. 性能指標行:# Query_time: 4.063656 Lock_time: 0.017691 Rows_sent: 100694 Rows_examined: 1814434包含查詢耗時(4.06 秒)、鎖定時間、返回行數、掃描行數等核心性能數據,是后續分析的關鍵指標。

4. 環境與時間設置行:use after_sale; 和 SET timestamp=1759228546;記錄查詢執行的數據庫環境(切換到after_sale庫)和時間戳設置,屬于 SQL 執行的前置操作。

5. SQL 語句行:從SELECT到最后的;完整的查詢語句,包含多表關聯(JOIN)、條件篩選(WHERE)和排序(ORDER BY),是慢查詢分析的核心對象。

接著,我們就需要根據日志“特征”對其進行解析。先來一波分析,如下:

第一步:找到 “唯一起始標志”—— 確定一條日志的開端

所有結構化日志(包括 MySQL 慢查詢日志)都有一個共同特點:每條完整日志都有唯一的 “開頭標識”,這是區分不同日志的關鍵。在示例慢查詢日志中,# Time: 2025-09-30T10:35:46.687011Z 就是典型的起始標志 —— 它以固定前綴 # Time: 開頭,后跟標準化的時間戳(年 - 月 - 日 T 時:分: 秒。毫秒 Z),且每條慢查詢日志必然以此行開始。因此,第一步要做的就是:用正則表達式精準匹配這個起始標志,告訴 Fluent Bit“遇到這樣的行,就意味著一條新的慢查詢日志開始了”。因此需要用正則表達式 /^# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/ :^ 錨定行首,\d{4}-\d{2}-\d{2}T... 匹配時間戳格式,確保只命中起始行。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

第二步:定義 “延續行規則”—— 合并日志的后續內容

一條完整的慢查詢日志除了起始行,還包含用戶信息(# User@Host: ...)、性能指標(# Query_time: ...)、SQL 語句等多行內容。這些行的共同特點是:它們不是 “新日志的起始行”,而是當前日志的一部分。因此,第二步的思路是:用 “否定邏輯” 匹配所有 “不是起始行” 的內容,告訴 Fluent Bit“只要不是新日志的開頭,就都歸到當前日志里”。因此,需要通過正則表達式: /^(?!\# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).*/ 采用了正則的 “否定前瞻” 語法(?!),意思是 “匹配所有不以 # Time: 時間戳 開頭的行”。無論這些行是# User@Host這樣的元數據,還是SELECT ...這樣的 SQL 語句(哪怕 SQL 跨多行),都會被合并到當前日志中。

第三步:設置 “超時兜底”—— 避免日志截斷或阻塞

實際場景中,日志可能因系統延遲、寫入中斷等原因出現 “不完整” 的情況(比如一條日志寫到一半突然停了)。這時需要一個 “超時機制”:如果一段時間內沒有新內容加入,就強制結束當前日志的合并,避免數據一直卡在緩沖區。示例中的 Flush_Timeout 5000 就是這個作用(5000 毫秒 = 5 秒),確保即使日志不完整,也能在 5 秒后輸出已收集的內容,平衡數據完整性和處理效率。

好了,有了上面的分析之后,我們就可以在 fluent-bit 目錄下面創建 etc 目錄,同時創建“parsers_mysql_slow.conf” 文件,它用來對慢查詢日志進行解析。如下:

[MULTILINE_PARSER]
# 多行解析器名稱,用于在輸入插件中引用
Name          mysql_slow
# 解析器類型,regex 表示使用正則表達式匹配
Type          regex
# 超時時間(毫秒),當多行日志間隔超過此時間時強制刷新緩沖區
Flush_Timeout 5000

# 第一條規則:定義多行日志的起始行
# "start_state" - 初始狀態名稱
# 正則表達式匹配以 "# Time:" 開頭的行(慢查詢日志的開始)
# "cont" - 匹配后跳轉到的下一個狀態
Rule          "start_state" "/^# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/" "cont"

# 第二條規則:定義多行日志的延續行
# "cont" - 當前狀態名稱(與上一條規則的跳轉狀態對應)
# 正則表達式匹配不以 "# Time:" 開頭的行(即延續行)
# "cont" - 匹配后保持當前狀態,繼續收集多行內容
Rule          "cont"       "/^(?!\# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).*/" "cont"

這個文件是用來解析慢日志內容的,遵循了幾個原則:

  • 找 “開頭”:觀察日志格式,找到每條日志唯一的起始標志(如特定前綴、固定格式的時間戳等),用正則精準匹配;
  • 定 “延續”:用 “否定起始標志” 的邏輯,匹配所有屬于當前日志的后續行,確保不遺漏任何內容;
  • 設 “超時”:添加超時配置,應對日志不完整的極端情況。

這里我們介紹解析日志的部分,是希望大家遇到類似的日志時,具備基本的分析能力。當然,還有一個彎道超車的方法,就是丟給大模型讓它幫你分析日志,自動生成MULTILINE_PARSER 的內容。

引用日志解析

在“\fluent-bit\etc”目錄下創建fluent-bit.conf 文件, 加入如下內容:

##############################################
# Fluent Bit 配置文件 (INI格式)
# 注意:注釋必須獨占一行,不能與配置項同行
##############################################

[SERVICE]
# 指定自定義解析器配置文件路徑
# 該文件包含專門的多行日志解析規則(如mysql慢查詢)
Parsers_File  /fluent-bit/etc/parsers_mysql_slow.conf
##############################################
# 輸入插件配置(收集日志)
##############################################
# 輸入源:采集mysql慢查詢日志(支持多行處理)
[INPUT]
# 使用 tail 插件監控文件變化
Name                 tail
# 自定義標簽,標識為mysql慢查詢日志
Tag                  mysql.slow
# mysql慢查詢日志文件路徑
Path                 /var/log/mysql-slow.log
Read_from_Head       On
# 跳過超長行,避免解析錯誤導致進程崩潰
Skip_Long_Lines      On
# 檢查文件變化的間隔時間(秒)
Refresh_Interval     10
# 指定多行解析器名稱(需在parsers_mysql_slow.conf中定義)
# 用于mysql慢查詢跨多行的日志條目
multiline.parser     mysql_slow


##############################################
# 輸出插件(轉發到 Fluentd)
##############################################
[OUTPUT]
# 使用 forward 插件將日志轉發到Fluentd聚合器
Name            forward
# 匹配所有標簽的日志(* 是通配符,表示所有輸入源)
Match           *
# Fluentd 服務地址(使用Docker Compose服務名進行服務發現)
Host            fluentd
# Fluentd 監聽端口(forward插件的默認端口)
Port            24224
# 網絡故障時的最大重試次數,防止無限重試消耗資源
Retry_Limit     10

# 輸出源2:同時輸出到控制臺(用于調試和監控)
[OUTPUT]
# 使用 stdout 插件在控制臺打印日志
Name          stdout
# 匹配所有標簽的日志
Match         *
# 注意:生產環境通常應注釋或移除此輸出,避免日志重復和性能開銷

該文件配置聚焦 MySQL 慢查詢日志的采集與解析,通過 [SERVICE] 模塊加載存放慢查詢多行解析規則的parsers_mysql_slow.conf文件,再以 [INPUT] 模塊的 tail 插件,監控/var/log/mysql-slow.log路徑下的慢查詢日志文件,同時啟用mysql_slow多行解析器確??缍嘈腥罩就暾喜ⅲ詈笸ㄟ^ [OUTPUT] 模塊的 forward 插件,將標記為mysql.slow的日志轉發到 Fluentd(地址為 fluentd,端口 24224)做進一步處理,全程僅保留慢查詢采集與解析的關鍵配置。

配置 Fluentd

Fluent-bit 的配置完成之后,就輪到 Fluentd 了, 前者主要任務是日志的采集和解析,后者則需要完成日志結構化、重要信息提取以及調用智能報告分析服務等功能。

在“\fluentd\conf” 創建新文件fluent.conf

文件內容比較多,我們聚焦如下幾個內容:

核心,通過 4 個filter插件對原始慢查詢日志進行 “解析→提取→轉換→精簡”,最終輸出結構化數據,僅針對mysql.slow標簽的日志生效:

第 1 個 filter:解析原始日志(parser)

<filter mysql.slow>
  @type parser               # 日志解析插件
  key_name log               # 待解析的字段(Fluent Bit轉發的原始日志存放在"log"字段中)
  reserve_data true          # 保留原始數據,避免解析后丟失信息
  <parse>
    @type regexp             # 用正則表達式解析
    # 正則提?。喝罩绢^部(時間、用戶、查詢耗時等)和SQL主體
    expression /^(?<header># Time: (?<time>.+)\n# User@Host: (?<user_host>.+)\n# Query_time: (?<query_time>\d+\.\d+)\s+Lock_time: (?<lock_time>\d+\.\d+)\s+Rows_sent: (?<rows_sent>\d+)\s+Rows_examined: (?<rows_examined>\d+))(?<sql_body>[\s\S]*)$/
  </parse>
</filter>

將非結構化的 “log” 字段拆分為結構化字段,最終生成time(慢查詢時間)、user_host(執行用戶與主機)、query_time(查詢耗時)、lock_time(鎖定時間)、rows_sent(返回行數)、rows_examined(掃描行數)、sql_body(完整 SQL 語句)等字段,為后續處理打基礎。

第 2 個 filter:提取涉及的表名(record_transformer)

<filter mysql.slow>
  @type record_transformer   # 日志字段轉換插件
  enable_ruby true           # 啟用Ruby腳本,實現復雜邏輯
  <record>
    # Ruby腳本:從sql_body中提取SQL涉及的表名(支持帶反引號/帶庫名的表名格式)
    tables ${ sql = record["sql_body"].to_s; pattern = /(?:FROM|JOIN)\s+(?:(?<schema1>[a-zA-Z_][a-zA-Z0-9_]*)\.(?<table1>[a-zA-Z_][a-zA-Z0-9_]*)|(?<schema2>`[^`]+`)\.(?<table2>`[^`]+`)|(?<schema3>[a-zA-Z_][a-zA-Z0-9_]*)\.(?<table3>`[^`]+`)|(?<schema4>`[^`]+`)\.(?<table4>[a-zA-Z_][a-zA-Z0-9_]*)|(?<table5>[a-zA-Z_][a-zA-Z0-9_]*)|(?<table6>`[^`]+`))(?:\s|$)/; matches = sql.scan(pattern); tables = []; matches.each { |m| schema1, table1, schema2, table2, schema3, table3, schema4, table4, table5, table6 = m; if schema1 && table1; tables << "#{schema1}.#{table1}"; elsif schema2 && table2; tables << "#{schema2}.#{table2}"; elsif schema3 && table3; tables << "#{schema3}.#{table3}"; elsif schema4 && table4; tables << "#{schema4}.#{table4}"; elsif table5 && !["si","i","u","cs","s"].include?(table5); tables << table5; elsif table6; tables << table6; end }; tables.uniq }
  </record>
</filter>

作用:通過 Ruby 腳本和正則,從sql_body的FROM/JOIN關鍵字后提取涉及的表名(支持db.table、`db`.`table`等多種格式),去重后生成tables字段(表名數組,如["after_sale.SupportIssue", "after_sale.Issue"]),方便后續關聯表結構元數據。

第 3 個 filter:衍生字段與類型轉換(record_transformer)

<filter mysql.slow>
  @type record_transformer
  enable_ruby true
  <record>
    tables_str ${ record["tables"].join(",") }  # 將表名數組轉為字符串(如"table1,table2")
    table_count ${ record["tables"].size }      # 計算涉及的表數量
    # 類型轉換:將字符串格式的數值轉為浮點數/整數(便于后續分析計算)
    query_time_float ${ record["query_time"].to_f }
    lock_time_float ${ record["lock_time"].to_f }
    rows_sent_int ${ record["rows_sent"].to_i }
    rows_examined_int ${ record["rows_examined"].to_i }
  </record>
</filter>

作用:基于已有字段生成衍生信息(表名字符串、表數量),并將 “查詢耗時”“行數” 等字符串字段轉為數值類型(避免后續分析時因類型錯誤導致問題)。

第 4 個 filter:精簡字段(record_transformer)

<filter mysql.slow>
  @type record_transformer
  remove_keys sql_body,header  # 刪除無用字段(sql_body、header已完成使命,精簡數據)
</filter>

作用:刪除解析過程中生成的臨時字段(sql_body、header),減少后續輸出的數據量,避免冗余。

四個過濾器之后接著的是“match”,將結構化后的慢查詢日志分兩路輸出:

<match *.**>                 # 匹配所有標簽的日志(此處主要匹配mysql.slow)
  @type copy                 # 復制插件:將同一份日志同時發送到多個目的地
  # 輸出目標1:控制臺(調試用)
  <store>
    @id output
    @type stdout             # 輸出到Fluentd的控制臺
  </store>
  # 輸出目標2:HTTP接口(對接慢查詢分析服務)
  <store>
    @id http_output
    @type http               # HTTP輸出插件
    # 發送到宿主機的5001端口(host.docker.internal是Docker專屬地址,指向宿主機)
    endpoint http://host.docker.internal:5001/analyze-slow-query
    http_method post         # 用POST方法發送
    <format>
      @type json             # 日志格式轉為JSON(便于后端API解析)
    </format>
    <buffer>
      # 緩沖配置:平衡實時性與可靠性
      flush_interval 2s      # 每2秒刷新一次緩沖區(即使數據未滿)
      retry_type exponential_backoff  # 指數退避重試(避免網絡故障時頻繁重試)
      retry_wait 1s          # 首次重試等待1秒
      retry_max_interval 30s # 最大重試間隔30秒(避免等待過久)
      retry_timeout 10m      # 10分鐘后放棄重試(防止無限重試)
    </buffer>
  </store>
</match>

作用:

控制臺輸出:用于調試(查看日志是否正確處理),生產環境可關閉;

HTTP 接口輸出:將 JSON 格式的結構化日志發送到宿主機的analyze-slow-query接口(后續提供該接口實現),調用大模型生成優化報告。

安裝 Fluent-Bit 和 FluentD

由于本案例需要安裝 fluent-bit、fluentd等應用,為了方便安裝與調試,計劃使用 docker 方式對他們進行安裝。于是 docker compose 的安裝方式就成了最佳選擇,它可以用于定義和管理多容器 Docker 應用的 YAML 配置文件,能將多個關聯的容器(如應用服務、數據庫、緩存等)的配置(鏡像、端口映射、數據卷、環境變量、依賴關系等)集中整合,通過 docker compose 命令一鍵實現多容器的創建、啟動、停止、重啟等操作。其核心益處在于簡化了多容器應用的部署與管理流程。選擇??Fluentd官網的docker-compose?? 文件,并在其基礎上進行修改,從而適應安裝需求。

??https://docs.fluentd.org/container-deployment/docker-compose#step-0-create-docker-compose.yml??

我們可以直接下載并修改 yml 文件,在文件中找到服務名fluent-bit增加掛載配置volumes,添加如下內容:

# 數據卷掛載:將宿主機的目錄或文件掛載到容器內,實現數據持久化或配置注入
    volumes:
      # Fluent Bit 會監控這個目錄下的日志文件變化,并收集新產生的日志。
      - C:\ProgramData\MySQL\MySQL Server 5.7\Data\mysql-slow.log:/var/log/mysql-slow.log
      # 將宿主機的自定義解析器配置文件掛載到容器內,用于解析特定格式的日志(如 mysql慢查詢日志)。
      - D:\docker\EFK\fluent-bit\etc\parsers_mysql_slow.conf:/fluent-bit/etc/parsers_mysql_slow.conf
      # 將宿主機的 Fluent Bit 主配置文件掛載到容器內,替代鏡像內的默認配置。
      # 這個文件定義了數據輸入(Input)、處理(Parser, Filter)和輸出(Output)的規則。
      - D:\docker\EFK\fluent-bit\etc\fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf

在配置中找到服務名fluentd(在services下一級)增加掛載配置volumes,要在container_name同級。

volumes:
      # 將宿主機上的 Fluentd 配置目錄掛載到容器內,使配置變更無需重新構建鏡像。
      - D:\docker\EFK\fluentd\conf\fluent.conf:/fluentd/etc

需要注意的是,這里的 MySQL 慢日志文件目錄、Fluent-bit 以及 FluentD 配置文件目錄都在前面的配置中出現過了,按照你本地的配置填寫就好了,其目的就是方便修改 Docker 容器中的配置文件以及對日志文件的監聽。

如下圖所示,就是我的目錄結構,在 EFK 目錄下面放的就是 docker-compose.yml 文件。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

配置完成來到 docker-compose.yml 文件所在目錄,執行如下命令安裝容器:

docker compose -f  docker-compose.yml up -d

此時打開 docker desktop,如下圖所示,可以看到 fluent-bit、fluent 都啟動了,另外還有 kibana 和 es 也啟動了,本例中后面兩個容器用不到,可以關閉節省資源。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

編寫“生成日志分析程序”

好了,到這里我們已完成整個實例 50% 以上的工作了,回到大圖看看進展。如下圖所示,在安裝 Fluent-Bit 和 FluentD 之后,就要編寫日志分析程序了。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

在編寫程序之前執行 pip 命令安裝必要依賴如下:

pip install dotenv "flask[async]" openai

接著在工作目錄下創建環境變量文件 .env 文件,我的工作目錄如下圖所示,我在 D 盤的 docker 目錄下創建了 mysql 目錄,將所有與本次實踐相關的代碼和配置都放這里了。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

.env 的具體文件內容如下:

# DeepSeek API配置
DEEPSEEK_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

# 數據庫配置
DB_USER=root
DB_PASSWORD=root

需要注意的是,確保 .env 文件與 Python 腳本在同一目錄即可。

好了,再創建process_slow_log.py文件用來處理慢查詢日志,這個文件的代碼比較長,其實一句話就可以概括:把慢日志基本信息、 SQL、表結構和記錄數都丟給大模型(DeepSeek),讓它分析并產生解決方案,最后生成報表。

具體代碼如下:

import os
import re
import json
import mysql.connector
import threading
import asyncio
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Dict, Any, List

from flask import Flask, request, Response
from openai import AsyncOpenAI, APIError, APIConnectionError, Timeout

# 應用配置 - 非敏感配置集中管理
@dataclass
class AppConfig:
    # 服務器配置
    HOST: str = "0.0.0.0"
    PORT: int = 5001
    DEBUG: bool = False
    
    # 路徑配置
    CURRENT_DIR: str = os.path.dirname(os.path.abspath(__file__))
    LOG_DIR: str = os.path.join(CURRENT_DIR, "logs")
    REPORT_DIR: str = os.path.join(CURRENT_DIR, "reports")
    SLOW_REPORT_PATH: str = os.path.join(REPORT_DIR, "slow_report.json")  # 統一報告文件路徑
    
    # 日志文件路徑
    OPERATION_LOG_PATH: str = os.path.join(LOG_DIR, "api_operation.log")
    ERROR_LOG_PATH: str = os.path.join(LOG_DIR, "api_error.log")
    
    # LLM配置
    LLM_BASE_URL: str = "https://api.deepseek.com"
    LLM_MODEL: str = "deepseek-chat"
    LLM_TEMPERATURE: float = 0.7
    LLM_MAX_TOKENS: int = 2000
    
    # 數據庫連接配置
    DB_CONNECT_TIMEOUT: int = 10
    DB_CHARSET: str = "utf8mb4"
    DEFAULT_DB_SCHEMA: str = "after_sale"  # 無schema時使用的默認數據庫
    
    # 內容類型配置
    SUPPORTED_CONTENT_TYPE: str = "application/x-ndjson"

# 初始化配置與目錄
config = AppConfig()
os.makedirs(config.LOG_DIR, exist_ok=True)
os.makedirs(config.REPORT_DIR, exist_ok=True)

# 初始化報告文件(如果不存在)
if not os.path.exists(config.SLOW_REPORT_PATH):
    with open(config.SLOW_REPORT_PATH, 'w', encoding='utf-8') as f:
        json.dump([], f, ensure_ascii=False, indent=2)

# 初始化Flask應用與LLM客戶端(延遲初始化)
app = Flask(__name__)
llm_client: Optional[AsyncOpenAI] = None


def init_llm_client() -> AsyncOpenAI:
    """初始化LLM客戶端,檢查API密鑰"""
    global llm_client
    if llm_client is None:
        api_key = os.getenv("DEEPSEEK_API_KEY")
        if not api_key:
            raise ValueError("未找到DEEPSEEK_API_KEY環境變量,請配置")
        
        llm_client = AsyncOpenAI(
            api_key=api_key,
            base_url=config.LLM_BASE_URL
        )
    return llm_client


def get_db_config(schema: Optional[str] = None) -> Dict[str, Any]:
    """從環境變量獲取數據庫配置,支持指定schema"""
    config_dict = {
        'host': os.getenv('DB_HOST', 'localhost'),
        'user': os.getenv('DB_USER', 'root'),
        'password': os.getenv('DB_PASSWORD', ''),
        'port': int(os.getenv('DB_PORT', 3306)),
        'connect_timeout': config.DB_CONNECT_TIMEOUT,
        'charset': config.DB_CHARSET,
        'autocommit': True
    }
    
    # 如果指定了schema,則添加到配置
    if schema:
        config_dict['database'] = schema
        
    return config_dict


def write_operation_log(message: str, request_id: Optional[str] = None) -> None:
    """記錄操作日志,包含時間戳和請求ID"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    request_id = request_id or "N/A"
    log_entry = f"[{timestamp}] [REQUEST={request_id}] OPERATION: {message}\n"
    try:
        with open(config.OPERATION_LOG_PATH, 'a', encoding='utf-8') as f:
            f.write(log_entry)
        print(log_entry.strip())
    except Exception as e:
        print(f"[日志系統錯誤] 寫入操作日志失敗: {str(e)} | 原始消息: {message}")


def write_error_log(error_message: str, request_id: Optional[str] = None) -> None:
    """記錄錯誤日志,包含時間戳和請求ID"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    request_id = request_id or "N/A"
    log_entry = f"[{timestamp}] [REQUEST={request_id}] ERROR: {error_message}\n"
    try:
        with open(config.ERROR_LOG_PATH, 'a', encoding='utf-8') as f:
            f.write(log_entry)
        print(log_entry.strip())
    except Exception as e:
        print(f"[日志系統錯誤] 寫入錯誤日志失敗: {str(e)} | 原始錯誤: {error_message}")


def clean_table_name(table_name: str) -> tuple[str, str]:
    """
    清理表名中的反引號和提取數據庫前綴
    優化點:無schema時使用配置的默認數據庫
    """
    # 移除反引號
    cleaned = re.sub(r'`', '', table_name)
    # 分割數據庫和表名(如果存在)
    parts = cleaned.split('.')
    if len(parts) == 2:
        return parts[0].strip(), parts[1].strip()  # (數據庫名, 表名)
    # 無schema時使用默認數據庫
    return config.DEFAULT_DB_SCHEMA, cleaned.strip()  # (默認數據庫, 表名)


def get_table_info(table_name: str, request_id: str) -> Dict[str, Any]:
    """獲取表的 DDL 和行數,優化無schema表名的處理"""
    db_name, clean_name = clean_table_name(table_name)
    write_operation_log(f"開始獲取表 {table_name} 的信息 (清理后: {db_name}.{clean_name})", request_id)
    
    try:
        # 獲取數據庫配置,指定數據庫名
        db_config = get_db_config(db_name)
        write_operation_log(f"使用數據庫配置: host={db_config['host']}, port={db_config['port']}, database={db_name}, user={db_config['user']}", request_id)
        
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor(dictionary=True)
        
        # 獲取表的 DDL
        cursor.execute(f"SHOW CREATE TABLE `{clean_name}`")
        create_table = cursor.fetchone()
        ddl = create_table['Create Table'] if create_table else None
        write_operation_log(f"成功獲取表 {db_name}.{clean_name} 的DDL信息", request_id)
        
        # 獲取表的行數(快速近似值)
        cursor.execute(f"SHOW TABLE STATUS LIKE '{clean_name}'")
        table_status = cursor.fetchone()
        row_count = table_status['Rows'] if table_status else None
        write_operation_log(f"成功獲取表 {db_name}.{clean_name} 的行數信息: {row_count}", request_id)
        
        result = {
            'table_name': table_name,
            'cleaned_name': f"{db_name}.{clean_name}",
            'database': db_name,
            'table': clean_name,
            'ddl': ddl,
            'row_count': row_count,
            'error': None
        }
        
        cursor.close()
        conn.close()
        write_operation_log(f"完成表 {table_name} 的信息獲取", request_id)
        return result
        
    except mysql.connector.Error as e:
        # 更詳細的數據庫錯誤處理
        error_msg = f"數據庫錯誤 (代碼: {e.errno}): {e.msg}"
        write_error_log(f"獲取表 {db_name}.{clean_name} 信息失敗: {error_msg}", request_id)
        return {
            'table_name': table_name,
            'cleaned_name': f"{db_name}.{clean_name}",
            'database': db_name,
            'table': clean_name,
            'error': error_msg,
            'ddl': None,
            'row_count': None
        }
    except Exception as e:
        error_msg = f"獲取表信息失敗: {str(e)}"
        write_error_log(f"獲取表 {db_name}.{clean_name} 信息失敗: {error_msg}", request_id)
        return {
            'table_name': table_name,
            'cleaned_name': f"{db_name}.{clean_name}",
            'database': db_name,
            'table': clean_name,
            'error': error_msg,
            'ddl': None,
            'row_count': None
        }


def generate_report_title(log_data: Dict[str, Any]) -> str:
    """生成報告標題,包含關鍵信息"""
    query_time = log_data.get('query_time', '未知')
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    tables_str = log_data.get('tables_str', '多個表')
    
    # 提取SQL中的主要操作(SELECT/UPDATE/DELETE等)
    sql_body = log_data.get('log', '').upper()
    operation = '查詢'
    if 'UPDATE' in sql_body:
        operation = '更新'
    elif 'DELETE' in sql_body:
        operation = '刪除'
    elif 'INSERT' in sql_body:
        operation = '插入'
    
    return f"{timestamp} - 慢{operation}分析報告(耗時: {query_time}秒,涉及表: {tables_str[:50]})"


def generate_prompt(log_data: Dict[str, Any], table_info_list: List[Dict[str, Any]], request_id: str) -> str:
    """生成發送給大模型的提示詞"""
    write_operation_log("開始生成提示詞", request_id)
    
    prompt = f"""請分析以下MySQL慢查詢日志,并提供優化建議。

慢查詢日志詳情:
{log_data['log']}

查詢時間: {log_data['query_time']} 秒
鎖定時間: {log_data['lock_time']} 秒
返回行數: {log_data['rows_sent']}
掃描行數: {log_data['rows_examined']}

涉及表結構及行數:
"""
    for table_info in table_info_list:
        if table_info['error']:
            prompt += f"- 表 {table_info['cleaned_name']}: 錯誤 - {table_info['error']}\n"
        else:
            prompt += f"- 表 {table_info['cleaned_name']} (約 {table_info['row_count']} 行):\n"
            prompt += f"  DDL: {table_info['ddl'][:500]}...\n\n"
    
    prompt += """
請基于以上信息,分析該慢查詢的性能問題原因,并提供具體的優化建議,包括但不限于:
1. 索引優化建議
2. SQL語句改寫建議
3. 表結構優化建議
4. 其他可能的性能改進方案

請提供詳細且可操作的建議,避免泛泛而談。
"""
    
    write_operation_log(f"提示詞生成完成,長度: {len(prompt)}字符", request_id)
    return prompt


async def call_deepseek_api(prompt: str, request_id: str) -> str:
    """調用DeepSeek API生成分析報告"""
    write_operation_log("開始調用DeepSeek API", request_id)
    
    try:
        client = init_llm_client()
        response = await client.chat.completions.create(
            model=config.LLM_MODEL,
            messages=[
                {"role": "system", "content": "你是一位數據庫性能優化專家,擅長分析MySQL慢查詢并提供優化建議。"},
                {"role": "user", "content": prompt}
            ],
            temperature=config.LLM_TEMPERATURE,
            max_tokens=config.LLM_MAX_TOKENS,
            stream=False
        )
        
        write_operation_log(f"API調用成功,響應ID: {response.id}", request_id)
        write_operation_log(
            f"Token使用情況: 輸入={response.usage.prompt_tokens}, 輸出={response.usage.completion_tokens}",
            request_id
        )
        
        return response.choices[0].message.content
        
    except APIError as e:
        error_msg = f"LLM API錯誤 (狀態碼: {e.status_code}): {e.message}"
        write_error_log(error_msg, request_id)
        return f"分析失敗: {error_msg}"
    except APIConnectionError as e:
        error_msg = f"LLM連接錯誤: {str(e)}"
        write_error_log(error_msg, request_id)
        return f"分析失敗: {error_msg}"
    except Timeout as e:
        error_msg = f"LLM調用超時: {str(e)}"
        write_error_log(error_msg, request_id)
        return f"分析失敗: {error_msg}"
    except Exception as e:
        error_msg = f"LLM調用未知錯誤: {str(e)}"
        write_error_log(error_msg, request_id)
        return f"分析失敗: {error_msg}"


def append_to_report(report_data: Dict[str, Any], request_id: str) -> bool:
    """將報告數據追加到slow_report.json數組中"""
    write_operation_log(f"開始將報告追加到 {config.SLOW_REPORT_PATH}", request_id)
    
    try:
        # 讀取現有報告
        existing_reports = []
        if os.path.exists(config.SLOW_REPORT_PATH):
            try:
                with open(config.SLOW_REPORT_PATH, 'r', encoding='utf-8') as f:
                    existing_reports = json.load(f)
                write_operation_log(f"成功讀取現有報告,共{len(existing_reports)}條記錄", request_id)
            except json.JSONDecodeError:
                write_error_log("報告文件損壞,將重新初始化", request_id)
                existing_reports = []
        
        # 添加新報告
        existing_reports.append(report_data)
        
        # 寫入更新后的報告
        with open(config.SLOW_REPORT_PATH, 'w', encoding='utf-8') as f:
            json.dump(existing_reports, f, ensure_ascii=False, indent=2)
        
        write_operation_log(f"報告已成功追加到 {config.SLOW_REPORT_PATH}", request_id)
        return True
        
    except Exception as e:
        error_msg = f"追加報告到文件失敗: {str(e)}"
        write_error_log(error_msg, request_id)
        return False


def extract_valid_log_records(ndjson_data: List[str], request_id: str) -> List[Dict[str, Any]]:
    """解析NDJSON數據并提取有效的日志記錄"""
    write_operation_log(f"開始解析NDJSON數據,共{len(ndjson_data)}行", request_id)
    records = []
    
    for i, line in enumerate(ndjson_data):
        line = line.strip()
        if not line:
            write_operation_log(f"第{i+1}行是空白行,已忽略", request_id)
            continue
            
        try:
            record = json.loads(line)
            if isinstance(record, dict) and 'log' in record and 'tables' in record:
                records.append(record)
                write_operation_log(f"第{i+1}行NDJSON解析成功,包含有效日志記錄", request_id)
            else:
                missing_fields = []
                if 'log' not in record:
                    missing_fields.append('log')
                if 'tables' not in record:
                    missing_fields.append('tables')
                write_operation_log(
                    f"第{i+1}行NDJSON缺少必要字段: {', '.join(missing_fields)},已忽略", 
                    request_id
                )
        except json.JSONDecodeError as e:
            error_msg = f"第{i+1}行NDJSON解析失敗: {str(e)}"
            write_error_log(error_msg, request_id)
    
    write_operation_log(f"NDJSON解析完成,共獲取{len(records)}條有效日志記錄", request_id)
    return records


async def process_analysis_background(log_records: List[Dict[str, Any]], request_id: str) -> None:
    """后臺處理分析流程"""
    try:
        write_operation_log(f"開始后臺分析流程,共處理{len(log_records)}條記錄", request_id)
        
        for record_idx, log_data in enumerate(log_records):
            write_operation_log(f"開始處理第{record_idx+1}/{len(log_records)}條記錄", request_id)
            
            # 1. 生成報告標題
            report_title = generate_report_title(log_data)
            write_operation_log(f"生成報告標題: {report_title}", request_id)
            
            # 2. 獲取所有表的信息
            table_info_list = []
            for table_name in log_data.get('tables', []):
                table_info = get_table_info(table_name, request_id)
                table_info_list.append(table_info)
            
            # 3. 生成提示詞并調用API
            prompt = generate_prompt(log_data, table_info_list, request_id)
            analysis_report = await call_deepseek_api(prompt, request_id)
            
            # 4. 準備報告數據
            report_data = {
                "id": f"{request_id}-record-{record_idx+1}",
                "title": report_title,
                "request_id": request_id,
                "record_index": record_idx + 1,
                "total_records": len(log_records),
                "timestamp": datetime.now().isoformat(),
                "original_data": {
                    "query_time": log_data.get('query_time'),
                    "lock_time": log_data.get('lock_time'),
                    "rows_sent": log_data.get('rows_sent'),
                    "rows_examined": log_data.get('rows_examined'),
                    "log": log_data.get('log'),
                    "tables": log_data.get('tables')
                },
                "table_info": table_info_list,
                "analysis_report": analysis_report
            }
            
            # 5. 追加到報告文件
            if append_to_report(report_data, request_id):
                write_operation_log(f"第{record_idx+1}條記錄分析報告已追加到匯總文件", request_id)
            else:
                write_error_log(f"第{record_idx+1}條記錄分析報告追加失敗", request_id)
        
        write_operation_log("所有記錄分析完成", request_id)
        
    except Exception as e:
        write_error_log(f"后臺分析流程失敗: {str(e)}", request_id)


def run_async_task(log_records: List[Dict[str, Any]], request_id: str) -> None:
    """線程包裝器,運行異步任務"""
    try:
        write_operation_log("啟動異步處理線程", request_id)
        asyncio.run(process_analysis_background(log_records, request_id))
    except Exception as e:
        write_error_log(f"異步任務執行失敗: {str(e)}", request_id)


@app.route('/analyze-slow-query', methods=['POST'])
def analyze_slow_query():
    """分析慢查詢日志的API端點"""
    # 生成唯一請求ID
    request_id = datetime.now().strftime("%Y%m%d%H%M%S") + f"-{os.urandom(4).hex()}"
    write_operation_log("收到慢查詢分析請求", request_id)
    
    try:
        # 記錄請求基本信息
        call_timestamp = datetime.now().isoformat()
        write_operation_log(
            f"請求信息 - 方法: {request.method}, 客戶端IP: {request.remote_addr}, Content-Type: {request.content_type}",
            request_id
        )
        
        # 驗證內容類型
        if request.content_type != config.SUPPORTED_CONTENT_TYPE:
            error_msg = f"不支持的Content-Type: {request.content_type},僅接受{config.SUPPORTED_CONTENT_TYPE}"
            write_error_log(error_msg, request_id)
            return Response(
                json.dumps({
                    "status": "error",
                    "message": error_msg,
                    "request_id": request_id
                }, ensure_ascii=False, indent=2),
                mimetype="application/json",
                status=415
            )
        
        # 解析NDJSON數據
        write_operation_log("開始讀取并解析NDJSON數據", request_id)
        try:
            ndjson_data = request.data.decode('utf-8').splitlines()
            write_operation_log(f"成功讀取{len(ndjson_data)}行NDJSON數據", request_id)
        except UnicodeDecodeError as e:
            error_msg = f"數據解碼失敗: {str(e)}"
            write_error_log(error_msg, request_id)
            return Response(
                json.dumps({
                    "status": "error",
                    "message": error_msg,
                    "request_id": request_id
                }, ensure_ascii=False, indent=2),
                mimetype="application/json",
                status=400
            )
        
        # 提取有效日志記錄
        log_records = extract_valid_log_records(ndjson_data, request_id)
        
        # 未找到有效日志時返回正常響應
        if not log_records:
            write_operation_log("未找到有效日志記錄,返回正常響應", request_id)
            return Response(
                json.dumps({
                    "status": "completed",
                    "message": "未找到需要處理的有效日志記錄",
                    "request_id": request_id,
                    "total_lines_received": len(ndjson_data),
                    "valid_records_found": 0
                }, ensure_ascii=False, indent=2),
                mimetype="application/json",
                status=200
            )
        
        # 有有效日志時啟動后臺處理
        thread = threading.Thread(
            target=run_async_task,
            args=(log_records, request_id),
            daemon=True
        )
        thread.start()
        write_operation_log("后臺處理線程已啟動", request_id)
        
        # 返回接受響應
        response_data = {
            "status": "accepted",
            "message": "請求已接收,正在后臺處理",
            "request_id": request_id,
            "timestamp": call_timestamp,
            "total_records": len(log_records),
            "total_lines_received": len(ndjson_data)
        }
        
        # write_operation_log("請求處理完成,返回響應", request_id)
        return Response(
            json.dumps(response_data, ensure_ascii=False, indent=2),
            mimetype="application/json",
            status=200
        )
        
    except Exception as e:
        # 僅邏輯錯誤返回錯誤響應
        error_msg = f"請求處理失敗: {str(e)}"
        write_error_log(error_msg, request_id)
        return Response(
            json.dumps({
                "status": "error",
                "message": error_msg,
                "request_id": request_id
            }, ensure_ascii=False, indent=2),
            mimetype="application/json",
            status=500
        )


@app.route('/health', methods=['GET'])
def health_check():
    """健康檢查接口"""
    request_id = f"health-{datetime.now().strftime('%Y%m%d%H%M%S')}"
    write_operation_log("收到健康檢查請求", request_id)
    
    # 檢查LLM客戶端
    llm_status = "healthy"
    llm_details = "未初始化"
    try:
        init_llm_client()
        llm_status = "healthy"
        llm_details = "客戶端已初始化"
    except Exception as e:
        llm_status = "degraded"
        llm_details = f"初始化失敗: {str(e)}"
    
    # 檢查數據庫連接
    db_status = "healthy"
    db_details = "連接成功"
    try:
        conn = mysql.connector.connect(** get_db_config())
        conn.close()
    except Exception as e:
        db_status = "degraded"
        db_details = f"連接失敗: {str(e)}"
    
    # 檢查報告文件
    report_status = "healthy"
    report_details = "文件正常"
    try:
        if not os.path.exists(config.SLOW_REPORT_PATH):
            report_status = "warning"
            report_details = "報告文件不存在,將在首次分析后創建"
        else:
            with open(config.SLOW_REPORT_PATH, 'r') as f:
                pass  # 僅檢查文件是否可讀取
    except Exception as e:
        report_status = "degraded"
        report_details = f"報告文件訪問失敗: {str(e)}"
    
    # 整體狀態
    overall_status = "healthy"
    if llm_status != "healthy" or db_status != "healthy" or report_status == "degraded":
        overall_status = "degraded"
    
    response = {
        "status": overall_status,
        "services": {
            "llm_client": {
                "status": llm_status,
                "details": llm_details
            },
            "database": {
                "status": db_status,
                "details": db_details
            },
            "report_file": {
                "status": report_status,
                "details": report_details,
                "path": config.SLOW_REPORT_PATH
            }
        },
        "supported_content_type": config.SUPPORTED_CONTENT_TYPE,
        "default_schema": config.DEFAULT_DB_SCHEMA,
        "timestamp": datetime.now().isoformat(),
        "request_id": request_id
    }
    
    write_operation_log("健康檢查完成", request_id)
    return Response(
        json.dumps(response, ensure_ascii=False, indent=2),
        mimetype="application/json"
    )


if __name__ == '__main__':
    print("慢查詢分析API啟動中...")
    print(f"服務器配置: {config.HOST}:{config.PORT} (調試模式: {'開啟' if config.DEBUG else '關閉'})")
    print(f"內容類型支持: {config.SUPPORTED_CONTENT_TYPE}")
    print(f"默認數據庫schema: {config.DEFAULT_DB_SCHEMA}")
    print(f"報告文件路徑: {config.SLOW_REPORT_PATH}")
    print(f"LLM配置: 模型={config.LLM_MODEL}, 基礎URL={config.LLM_BASE_URL}")
    print(f"日志目錄: {config.LOG_DIR}")
    print("環境變量檢查:")
    print(f"  - DEEPSEEK_API_KEY: {'已配置' if os.getenv('DEEPSEEK_API_KEY') else '未配置'}")
    print(f"  - 數據庫配置: HOST={os.getenv('DB_HOST', 'localhost')}, USER={os.getenv('DB_USER', 'root')}")
    print("可用接口:")
    print("  - POST /analyze-slow-query - 分析慢查詢日志")
    print("  - GET  /health - 健康檢查")
    
    app.run(host=config.HOST, port=config.PORT, debug=config.DEBUG)

這里我把代碼中重要的部分做一個總結性講解:

  • 數據庫交互模塊

A.提供數據庫配置獲取函數,支持指定數據庫 Schema,默認使用預設的after_sale庫。

B.實現表信息提取功能:清理表名中的反引號、拆分庫表結構,通過SHOW CREATE TABLE獲取表 DDL、SHOW TABLE STATUS獲取表行數,同時處理數據庫連接錯誤、表不存在等異常。

  • 大模型交互模塊

A.封裝 DeepSeek API 調用邏輯:初始化客戶端時校驗 API 密鑰,支持設置模型參數(溫度、最大 tokens),處理 API 錯誤(連接超時、狀態碼異常等)并返回友好提示。

B.提供提示詞生成函數:整合慢查詢詳情(執行時間、掃描行數等)、表結構元信息,生成結構化提示,引導大模型輸出索引優化、SQL 改寫等可操作建議。

  • 報告生成與管理模塊

A.自動生成報告標題:包含查詢類型(SELECT/UPDATE 等)、執行耗時、涉及表名和時間戳,確保報告辨識度。

B.實現報告持久化:將分析結果(原始慢查詢數據、表元信息、大模型報告)追加到 JSON 格式的報告文件,支持讀取現有報告并處理文件損壞的情況。

  • API 接口模塊

A.慢查詢分析接口(POST /analyze-slow-query):接收 NDJSON 格式的結構化慢查詢數據,驗證內容類型,解析并過濾有效記錄(需包含log和tables字段),啟動后臺線程異步處理分析流程,避免前端等待超時。

編寫“生成日志查看應用”

這里依舊使用我們的老朋友 streamlit 來充當 Web UI 界面,如下命令安裝它:

pip install streamlit

在工作目錄創建,創建show_slow_report.py文件,方便查看生成的慢查詢分析報告,代碼如下:

import streamlit as st
import json
import os
from datetime import datetime

# 頁面基礎配置
st.set_page_config(
    page_title="慢查詢分析報告查看器",
    page_icon="??",
    layout="wide"
)

# 自定義CSS - 按鈕樣式和間隔
st.markdown("""
<style>
    .report-btn {
        width: 100%;
        text-align: left;
        margin-bottom: 8px;
        padding: 8px 12px;
        border-radius: 4px;
        border: none;
        cursor: pointer;
        transition: all 0.2s ease;
    }
    .report-btn:not(.selected) {
        background-color: #f0f2f6;
        color: #333;
    }
    .report-btn:not(.selected):hover {
        background-color: #e6e9ed;
    }
    .report-btn.selected {
        background-color: #0066cc;
        color: white;
    }
    .report-container {
        max-height: 600px;
        overflow-y: auto;
        padding-right: 8px;
    }
</style>
""", unsafe_allow_html=True)

# 報告文件路徑
REPORT_PATH = os.path.join("reports", "slow_report.json")

# ----------------------
# 回調函數 - 用于即時更新選中狀態
# ----------------------
def update_selected_report(report_id):
    """更新選中的報告ID(回調函數)"""
    st.session_state.selected_report_id = report_id

# ----------------------
# 工具函數
# ----------------------
def load_reports():
    """加載并排序報告(按時間倒序),處理文件異常"""
    try:
        if not os.path.exists(REPORT_PATH):
            st.warning(f"報告文件不存在:{REPORT_PATH}")
            return []
        
        with open(REPORT_PATH, 'r', encoding='utf-8') as f:
            reports = json.load(f)
        
        # 按時間戳倒序排序(最新報告在前)
        reports.sort(
            key=lambda x: x.get('timestamp', ''),
            reverse=True
        )
        return reports
    
    except json.JSONDecodeError:
        st.error("報告文件格式錯誤,無法解析(可能是JSON損壞)")
        return []
    except PermissionError:
        st.error(f"無權限訪問報告文件:{REPORT_PATH}")
        return []
    except Exception as e:
        st.error(f"加載報告失?。簕str(e)}")
        return []

def format_timestamp(timestamp_str):
    """格式化ISO時間戳為易讀格式"""
    try:
        dt = datetime.fromisoformat(timestamp_str)
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    except:
        return timestamp_str

def search_reports(reports, query):
    """根據搜索詞過濾報告(支持標題、表名、內容)"""
    if not query:
        return reports
        
    query_lower = query.lower()
    return [
        report for report in reports
        if (
            query_lower in report.get('title', '').lower() or
            any(query_lower in str(table).lower() for table in report.get('original_data', {}).get('tables', [])) or
            query_lower in report.get('analysis_report', '').lower() or
            query_lower in report.get('original_data', {}).get('log', '').lower()
        )
    ]

# ----------------------
# 主邏輯
# ----------------------
reports = load_reports()
st.title("?? 慢查詢分析報告查看器")

# 初始化會話狀態
if 'selected_report_id' not in st.session_state:
    if reports:
        st.session_state.selected_report_id = reports[0]['id']
    else:
        st.session_state.selected_report_id = None

# 側邊欄 - 報告列表和搜索
with st.sidebar:
    st.header("報告列表")
    
    # 搜索框
    search_query = st.text_input("搜索報告", placeholder="輸入標題、表名或內容關鍵詞...")
    filtered_reports = search_reports(reports, search_query)
    
    # 顯示統計信息
    st.caption(f"顯示 {len(filtered_reports)} 份報告(共 {len(reports)} 份)")
    
    # 報告列表
    if filtered_reports:
        # 確保選中的ID在當前過濾列表中
        valid_ids = [r['id'] for r in filtered_reports]
        if (st.session_state.selected_report_id is None or 
            st.session_state.selected_report_id not in valid_ids):
            st.session_state.selected_report_id = valid_ids[0]
        
        # 報告容器(帶滾動)
        st.markdown('<div class="report-container">', unsafe_allow_html=True)
        
        # 逐個顯示報告按鈕,使用回調函數更新狀態
        for report in filtered_reports:
            report_id = report['id']
            timestamp = format_timestamp(report['timestamp'])
            title = report['title']
            
            # 按鈕狀態判斷
            is_selected = (report_id == st.session_state.selected_report_id)
            
            # 使用回調函數確保狀態即時更新
            st.button(
                label=f"[{timestamp}] {title}",
                key=f"btn_{report_id}",
                use_container_width=True,
                type="primary" if is_selected else "secondary",
                on_click=update_selected_report,
                args=(report_id,)
            )
        
        st.markdown('</div>', unsafe_allow_html=True)
        
        # 獲取選中的報告
        selected_report = next(
            (r for r in filtered_reports if r['id'] == st.session_state.selected_report_id),
            None
        )
    else:
        st.info("沒有找到匹配的報告")
        selected_report = None

# 主內容區 - 顯示報告詳情
if selected_report:
    st.subheader(selected_report['title'])
    
    # 基本信息卡片
    col1, col2, col3 = st.columns(3)
    with col1:
        st.info(f"**報告ID**\n\n{selected_report['id']}")
    with col2:
        st.info(f"**生成時間**\n\n{format_timestamp(selected_report['timestamp'])}")
    with col3:
        st.info(f"**請求ID**\n\n{selected_report['request_id']}")
    
    # 性能指標
    st.subheader("查詢性能指標")
    original_data = selected_report.get('original_data', {})
    metrics_col1, metrics_col2, metrics_col3, metrics_col4 = st.columns(4)
    with metrics_col1:
        st.metric("查詢時間", f"{original_data.get('query_time', 'N/A')} 秒")
    with metrics_col2:
        st.metric("鎖定時間", f"{original_data.get('lock_time', 'N/A')} 秒")
    with metrics_col3:
        st.metric("返回行數", original_data.get('rows_sent', 'N/A'))
    with metrics_col4:
        st.metric("掃描行數", original_data.get('rows_examined', 'N/A'))
    
    # 涉及表名
    st.subheader("涉及表名")
    tables = original_data.get('tables', [])
    if tables:
        for table in tables:
            st.text(f"- {table}")
    else:
        st.text("無表信息")
    
    # 原始SQL
    with st.expander("查看原始SQL", expanded=False):
        st.code(original_data.get('log', '無SQL日志'), language="sql")
    
    # 表結構信息
    st.subheader("表結構詳情")
    table_info_list = selected_report.get('table_info', [])
    for table_info in table_info_list:
        with st.expander(f"表: {table_info.get('cleaned_name', '未知表')}"):
            if table_info.get('error'):
                st.error(f"獲取表信息失敗: {table_info['error']}")
            else:
                st.text(f"預估行數: {table_info.get('row_count', '未知')}")
                st.code(table_info.get('ddl', '無DDL信息'), language="sql")
    
    # 分析報告
    st.subheader("優化建議")
    st.markdown(selected_report.get('analysis_report', '無分析內容'))

else:
    if reports:
        st.info("請從側邊欄選擇一份報告查看詳情")
    else:
        st.info("暫無報告數據,請先運行分析任務生成報告")

至此,所有準備工作已經完成,我們開始測試功能。

功能測試

由于是對慢查詢記錄進行采集、處理、分析、生成報告。所以測試的基本思路是:創建 MySQL 數據庫表,插入數據,啟動 Fluent-Bit,FluentD 服務準備采集慢查詢日志,啟動 process_slow_log.py 生成分析報告,然后執行一條 SQL 語句觸發慢查詢日志的生成,最后通過show_slow_report.py 查看分析報告的內容。

創建數據庫

在mysql5.7下執行sql生成對應的表,打開MySQL 5.7 Command Line Client,輸入如下 sql 并執行。

-- 創建數據庫
CREATE DATABASE IF NOT EXISTS after_sale
  CHARACTER SET utf8mb4
  COLLATE utf8mb4_general_ci;

USE after_sale;

-- 刪除現有表,如果存在
DROP TABLE IF EXISTS SupportIssue;
DROP TABLE IF EXISTS CustomerService;
DROP TABLE IF EXISTS Solution;
DROP TABLE IF EXISTS Issue;
DROP TABLE IF EXISTS User;

-- 創建用戶表
CREATE TABLE User
(
    UserId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 用戶ID
    Name VARCHAR(60) NOT NULL, -- 姓名
    Email VARCHAR(60) NOT NULL, -- 電子郵件
    Phone VARCHAR(24) -- 電話
);

-- 創建問題表
CREATE TABLE Issue
(
    IssueId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 問題ID
    UserId INTEGER NOT NULL, -- 用戶ID
    Description TEXT NOT NULL, -- 問題描述
    Status VARCHAR(20) NOT NULL, -- 狀態
    FOREIGN KEY (UserId) REFERENCES User (UserId)
        ON DELETE NO ACTION ON UPDATE NO ACTION
);

-- 創建解決方案表
CREATE TABLE Solution
(
    SolutionId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 解決方案ID
    SolutionDescription TEXT NOT NULL, -- 解決方案描述
    IsValid TINYINT -- 是否有效(MySQL中用TINYINT表示布爾值,1為真,0為假)
);

-- 創建客服表
CREATE TABLE CustomerService
(
    CustomerServiceId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 客服ID
    Name VARCHAR(40) NOT NULL, -- 姓名
    Email VARCHAR(60), -- 電子郵件
    Phone VARCHAR(24) -- 電話
);

-- 創建客服問題關聯表
CREATE TABLE SupportIssue
(
    SupportIssueId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 客服問題關聯ID
    IssueId INTEGER NOT NULL, -- 問題ID
    CustomerServiceId INTEGER NOT NULL, -- 客服ID
    SolutionId INTEGER NOT NULL, -- 解決方案ID
    ActionTime DATETIME NOT NULL, -- 操作時間
    FOREIGN KEY (IssueId) REFERENCES Issue (IssueId)
        ON DELETE NO ACTION ON UPDATE NO ACTION,
    FOREIGN KEY (CustomerServiceId) REFERENCES CustomerService (CustomerServiceId)
        ON DELETE NO ACTION ON UPDATE NO ACTION,
    FOREIGN KEY (SolutionId) REFERENCES Solution (SolutionId)
        ON DELETE NO ACTION ON UPDATE NO ACTION
);

插入測試數據

數據庫和表結構有了,接著插入一些測試數據,為后面慢查詢提供條件。為了保證數據量,我這里利用 python 腳本生成數據,先執行如下命令安裝數據庫連接依賴,方便 python 訪問數據庫。

pip install mysql-connector-python

然后,在工作目錄創建 generate_data.py 文件,插入數據。

import mysql.connector
from faker import Faker
import random
from datetime import datetime, timedelta

# 數據庫連接配置 - 請修改為你的數據庫信息
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'database': 'after_sale',
    'autocommit': True
}

# 生成數據量 (可根據需要調整)
NUM_USERS = 100000          # 10萬用戶
NUM_ISSUES = 500000         # 50萬問題
NUM_SOLUTIONS = 10000       # 1萬解決方案
NUM_CUSTOMERSERVICE = 500   # 500客服
NUM_SUPPORTISSUES = 1000000 # 100萬客服問題關聯記錄

# 初始化Faker
fake = Faker('zh_CN')

def get_db_connection():
    """獲取數據庫連接"""
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        return conn
    except mysql.connector.Error as err:
        print(f"數據庫連接錯誤: {err}")
        raise

def generate_users():
    """生成用戶數據"""
    print(f"開始生成 {NUM_USERS} 條用戶數據...")
    conn = get_db_connection()
    cursor = conn.cursor()

    # 批量插入大小
    batch_size = 1000
    insert_query = """
    INSERT INTO User (Name, Email, Phone)
    VALUES (%s, %s, %s)
    """

    try:
        for i in range(0, NUM_USERS, batch_size):
            batch_data = []
            for _ in range(min(batch_size, NUM_USERS - i)):
                name = fake.name()
                email = fake.email()
                phone = fake.phone_number()
                batch_data.append((name, email, phone))

            cursor.executemany(insert_query, batch_data)
            print(f"已插入 {i + len(batch_data)} 條用戶數據")
    except mysql.connector.Error as err:
        print(f"插入用戶數據錯誤: {err}")
        raise
    finally:
        cursor.close()
        conn.close()
    print(f"用戶數據生成完成,共 {NUM_USERS} 條")

def generate_solutions():
    """生成解決方案數據 - 修復了字符串格式化問題"""
    print(f"開始生成 {NUM_SOLUTIONS} 條解決方案數據...")
    conn = get_db_connection()
    cursor = conn.cursor()

    # 重新設計解決方案模板,確保每個模板的占位符數量明確
    solution_templates = [
        ["檢查%s并重啟%s", 2],  # 包含2個占位符
        ["更新%s至最新版本", 1],  # 包含1個占位符
        ["清除%s緩存并重新嘗試", 1],
        ["聯系%s獲取技術支持", 1],
        ["檢查%s設置是否正確", 1],
        ["卸載并重新安裝%s", 1],
        ["重啟%s后再試", 1],
        ["檢查網絡連接并確保%s可訪問", 1],
        ["使用%s工具進行診斷", 1],
        ["恢復%s至默認設置", 1]
    ]

    objects = ["網絡", "系統", "應用", "設備", "軟件", "驅動", "賬戶", "服務"]

    batch_size = 1000
    insert_query = """
    INSERT INTO Solution (SolutionDescription, IsValid)
    VALUES (%s, %s)
    """

    try:
        for i in range(0, NUM_SOLUTIONS, batch_size):
            batch_data = []
            for _ in range(min(batch_size, NUM_SOLUTIONS - i)):
                # 隨機選擇一個模板及其占位符數量
                template, param_count = random.choice(solution_templates)

                # 根據占位符數量提供相應數量的參數
                if param_count == 1:
                    obj = random.choice(objects)
                    description = template % obj
                elif param_count == 2:
                    obj1 = random.choice(objects)
                    obj2 = random.choice(objects)
                    description = template % (obj1, obj2)
                else:
                    # 默認為1個參數的情況
                    obj = random.choice(objects)
                    description = template % obj
                    
                is_valid = random.choice([0, 1])
                batch_data.append((description, is_valid))
            
            cursor.executemany(insert_query, batch_data)
            print(f"已插入 {i + len(batch_data)} 條解決方案數據")
    except mysql.connector.Error as err:
        print(f"插入解決方案數據錯誤: {err}")
        raise
    except Exception as e:
        print(f"生成解決方案描述時出錯: {e},模板: {template}, 參數數量: {param_count}")
        raise
    finally:
        cursor.close()
        conn.close()
    print(f"解決方案數據生成完成,共 {NUM_SOLUTIONS} 條")

def generate_customer_service():
    """生成客服數據"""
    print(f"開始生成 {NUM_CUSTOMERSERVICE} 條客服數據...")
    conn = get_db_connection()
    cursor = conn.cursor()
    
    batch_size = 100
    insert_query = """
    INSERT INTO CustomerService (Name, Email, Phone)
    VALUES (%s, %s, %s)
    """
    
    try:
        for i in range(0, NUM_CUSTOMERSERVICE, batch_size):
            batch_data = []
            for _ in range(min(batch_size, NUM_CUSTOMERSERVICE - i)):
                name = fake.name()
                email = f"{name.replace(' ', '')}.{random.randint(100, 999)}@support.com"
                phone = fake.phone_number()
                batch_data.append((name, email, phone))
            
            cursor.executemany(insert_query, batch_data)
            print(f"已插入 {i + len(batch_data)} 條客服數據")
    except mysql.connector.Error as err:
        print(f"插入客服數據錯誤: {err}")
        raise
    finally:
        cursor.close()
        conn.close()
    print(f"客服數據生成完成,共 {NUM_CUSTOMERSERVICE} 條")

def generate_issues():
    """生成問題數據"""
    print(f"開始生成 {NUM_ISSUES} 條問題數據...")
    conn = get_db_connection()
    cursor = conn.cursor()
    
    # 獲取最大用戶ID
    cursor.execute("SELECT MAX(UserId) FROM User")
    max_user_id = cursor.fetchone()[0]
    if not max_user_id:
        raise Exception("請先生成用戶數據")
    
    # 問題描述模板
    issue_templates = [
        ["%s無法正常工作", 1],
        ["無法%s", 1],
        ["%s出現錯誤", 1],
        ["%s時遇到問題", 1],
        ["%s功能異常", 1],
        ["關于%s的問題", 1],
        ["%s失敗", 1],
        ["%s導致系統異常", 1]
    ]
    
    issue_objects = ["電腦", "手機", "應用", "軟件", "網絡", "賬戶", "訂單", "支付", "登錄", "文件"]
    actions = ["連接網絡", "發送消息", "上傳文件", "下載數據", "安裝軟件", "卸載程序", "更新系統", "登錄賬戶"]
    statuses = ["未解決", "處理中", "已解決", "已關閉", "重新打開"]
    
    batch_size = 1000
    insert_query = """
    INSERT INTO Issue (UserId, Description, Status)
    VALUES (%s, %s, %s)
    """
    
    try:
        for i in range(0, NUM_ISSUES, batch_size):
            batch_data = []
            for _ in range(min(batch_size, NUM_ISSUES - i)):
                user_id = random.randint(1, max_user_id)
                template, param_count = random.choice(issue_templates)
                
                if "無法%s" in template:
                    desc = template % random.choice(actions)
                else:
                    desc = template % random.choice(issue_objects)
                    
                status = random.choice(statuses)
                batch_data.append((user_id, desc, status))
            
            cursor.executemany(insert_query, batch_data)
            print(f"已插入 {i + len(batch_data)} 條問題數據")
    except mysql.connector.Error as err:
        print(f"插入問題數據錯誤: {err}")
        raise
    finally:
        cursor.close()
        conn.close()
    print(f"問題數據生成完成,共 {NUM_ISSUES} 條")

def generate_support_issues():
    """生成客服問題關聯數據"""
    print(f"開始生成 {NUM_SUPPORTISSUES} 條客服問題關聯數據...")
    conn = get_db_connection()
    cursor = conn.cursor()
    
    # 獲取各表最大ID
    cursor.execute("SELECT MAX(IssueId) FROM Issue")
    max_issue_id = cursor.fetchone()[0]
    cursor.execute("SELECT MAX(CustomerServiceId) FROM CustomerService")
    max_cs_id = cursor.fetchone()[0]
    cursor.execute("SELECT MAX(SolutionId) FROM Solution")
    max_solution_id = cursor.fetchone()[0]
    
    if not all([max_issue_id, max_cs_id, max_solution_id]):
        raise Exception("請先生成所有基礎表數據")
    
    # 生成時間范圍:過去1年
    end_date = datetime.now()
    start_date = end_date - timedelta(days=365)
    
    batch_size = 1000
    insert_query = """
    INSERT INTO SupportIssue (IssueId, CustomerServiceId, SolutionId, ActionTime)
    VALUES (%s, %s, %s, %s)
    """
    
    try:
        for i in range(0, NUM_SUPPORTISSUES, batch_size):
            batch_data = []
            for _ in range(min(batch_size, NUM_SUPPORTISSUES - i)):
                issue_id = random.randint(1, max_issue_id)
                cs_id = random.randint(1, max_cs_id)
                solution_id = random.randint(1, max_solution_id)
                
                # 隨機生成時間
                time_delta = end_date - start_date
                random_days = random.randint(0, time_delta.days)
                random_seconds = random.randint(0, 86399)  # 一天的秒數
                action_time = start_date + timedelta(days=random_days, seconds=random_seconds)
                
                batch_data.append((issue_id, cs_id, solution_id, action_time.strftime('%Y-%m-%d %H:%M:%S')))
            
            cursor.executemany(insert_query, batch_data)
            print(f"已插入 {i + len(batch_data)} 條客服問題關聯數據")
    except mysql.connector.Error as err:
        print(f"插入客服問題關聯數據錯誤: {err}")
        raise
    finally:
        cursor.close()
        conn.close()
    print(f"客服問題關聯數據生成完成,共 {NUM_SUPPORTISSUES} 條")

if __name__ == "__main__":
    # 按順序生成數據(有依賴關系)
    try:
        generate_users()
        generate_solutions()
        generate_customer_service()
        generate_issues()
        generate_support_issues()
        print("所有數據生成完成!")
    except Exception as e:
        print(f"生成數據時出錯: {str(e)}")

由于在代碼中使用到了假數據,所以需要安裝 faker 組件,執行如下代碼:

pip install faker
python generate_data.py

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

確保 Fluent-Bit 與 FluentD 執行

在docker desktop 中可以看到容器運行狀態。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

啟動 “生成日志分析程序”

工作目錄執行如下命令:

python Process_slow_log.py

執行慢查詢 SQL

MySQL 5.7 Command Line Client執行如下 sql。

use after_sale;

SELECT 
    u.UserId, u.Name, u.Email,
    i.IssueId, i.Description, i.Status,
    cs.CustomerServiceId, cs.Name,
    s.SolutionId, s.SolutionDescription,
    si.ActionTime
FROM SupportIssue si
JOIN Issue i ON si.IssueId = i.IssueId
JOIN User u ON i.UserId = u.UserId
JOIN CustomerService cs ON si.CustomerServiceId = cs.CustomerServiceId
JOIN Solution s ON si.SolutionId = s.SolutionId
WHERE s.IsValid = 1
AND i.Status = 'unsolve'
ORDER BY si.ActionTime DESC;

返回結果如下:

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

查看fluent-bit容器日志,容器日志中會顯示采集到的內容。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

啟動“生成日志查看應用”

執行如下命令:

streamlit run show_slow_report.py

啟動后的控制臺會顯示調用日志和生成報告。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

此時通過瀏覽器頁面可以查看慢查詢日志分析報告,如下圖所示。

數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案-AI.x社區

作者介紹

崔皓,51CTO社區編輯,資深架構師,擁有18年的軟件開發和架構經驗,10年分布式架構經驗。

收藏
回復
舉報
回復
相關推薦
91久久精品国产91性色| 天堂日韩电影| 亚洲福利久久| 欧美日韩专区在线| 精品卡一卡二| jizz亚洲少妇| 2019中文字幕在线视频| 乱亲女h秽乱长久久久| 国产精品久久久久久亚洲毛片| 国内精品久久久久久久| 6080国产精品| 午夜免费播放观看在线视频| 亚洲一区二区伦理| 精品久久免费看| 国产卡一卡二在线| 亚洲在线精品视频| 欧美军人男男激情gay| 欧美日韩国产在线看| 国产高清一区二区三区| 青娱乐国产在线视频| 只有精品亚洲| 国产精品美女久久久久aⅴ| 97人人模人人爽人人少妇| 国产免费久久久久| 亚洲人体在线| 欧美日韩性视频| av不卡在线免费观看| 在线视频欧美亚洲| 一本一道久久综合狠狠老精东影业| 欧美不卡一区二区| 538任你躁在线精品免费| 成人在线免费电影| 久久国内精品视频| 久久影视免费观看| youjizz.com日本| 黑人玩欧美人三根一起进| 国产不卡在线一区| 91国产视频在线| 插吧插吧综合网| 欧美日韩女优| 亚洲欧美日韩国产综合在线| 国产成人精品日本亚洲| 手机看片福利视频| 在线播放成人| 欧美日韩综合不卡| 别急慢慢来1978如如2| av资源网站在线观看| 99热99精品| 国产精品十八以下禁看| 九九精品视频免费| 欧美电影《睫毛膏》| 欧美一级高清片| 日本www在线视频| 国产高清免费在线播放| 国产一区二区三区日韩| 国模叶桐国产精品一区| 欧美又粗又大又长| 欧美一级色片| 欧美精品在线一区二区| 永久免费看av| 日韩大片b站免费观看直播| 日韩经典中文字幕一区| 乱亲女秽乱长久久久| 国产成人一区二区在线观看| 精品产国自在拍| 亚洲无亚洲人成网站77777| 99日在线视频| 亚洲天堂导航| 亚洲精品免费看| 麻豆av一区| www五月婷婷| 日韩成人dvd| 国产精品丝袜高跟| 欧美日韩人妻精品一区二区三区| 伊人色**天天综合婷婷| 亚洲欧美999| 日韩欧美中文在线视频| 免费看av不卡| 中文字幕欧美一区| 久久久久久国产精品一区| 一区二区三区免费在线| 99国产精品久久久久久久| 久久久久久久香蕉网| 日本黄色激情视频| 亚洲精品进入| 精品成人一区二区三区四区| 第一区免费在线观看| 亚洲黄色免费av| 一区二区三区免费| 在线国产精品网| 免费在线黄色网址| 99久久综合精品| 欧美激情专区| 少妇无码一区二区三区| 国产在线精品国自产拍免费| 99在线视频播放| 国产精品欧美综合亚洲| 日韩高清在线不卡| 91麻豆精品秘密入口| 神马午夜一区二区| 国产精品理论片| 日本一区午夜艳熟免费| 1769免费视频在线观看| 中文av一区特黄| 五月天综合网| 日av在线播放| 亚洲视频一区二区免费在线观看| 久久这里只有精品23| 在线中文字幕视频观看| 红桃av永久久久| 日本午夜激情视频| av在线一区不卡| 精品国产网站在线观看| 超碰人人干人人| 激情五月色综合国产精品| 国产视频精品va久久久久久| 中文字幕乱码一区| 欧美变态挠脚心| 深夜福利日韩在线看| 91麻豆精品国产91久久综合| 国产精品地址| 性色av一区二区三区| 激情综合网五月婷婷| 狠狠色狠狠色综合日日tαg| 国产精品99导航| 男人天堂手机在线观看| 99麻豆久久久国产精品免费优播| 亚洲一区二区在线看| 毛片在线播放a| 亚洲欧美日韩国产综合在线| 国产精品99久久免费黑人人妻| 欧美va在线观看| 欧美日韩情趣电影| 一级 黄 色 片一| 国产成人3p视频免费观看| 欧美激情精品久久久久久久变态| 九九视频在线免费观看| 久久精品久久99精品久久| 久久99精品久久久久久久久久| 91香蕉在线观看| 欧美日韩日本视频| 欧美激情 一区| 亚洲免费一区二区| 国产精品美乳一区二区免费 | 粉色视频免费看| 亚洲aaa级| 18久久久久久| 欧美一级做性受免费大片免费| 日韩毛片一二三区| 天天干天天爽天天射| 久久91超碰青草在哪里看| 欧美日韩大陆一区二区| 国产1区2区在线观看| 久热re这里精品视频在线6| 国产日韩在线观看av| av老司机久久| 91免费看片在线观看| 日日噜噜噜噜夜夜爽亚洲精品| 免费成人在线电影| 欧美老肥妇做.爰bbww| 亚洲精品国产精品国自| 麻豆精品久久精品色综合| 亚洲成色www久久网站| 国产色婷婷在线| 欧美videossexotv100| 久久艹精品视频| 成人午夜av电影| 亚洲欧美综合一区| 九九久久国产| 欧美日本在线视频中文字字幕| av片免费观看| 国产成人在线观看| 亚洲精品视频一二三| 九九热这里有精品| 九九热精品在线| 天堂网在线观看视频| 色综合欧美在线| 亚洲午夜久久久久久久久| 成人在线电影在线观看视频| 欧美精品18videos性欧| 日本高清视频免费看| 精品久久久久久国产91| 欧美偷拍一区二区三区| 亚洲精品综合| 亚洲a在线播放| 午夜在线视频播放| 日韩天堂在线观看| 久久99精品波多结衣一区| 国产高清在线观看免费不卡| 岛国大片在线播放| 深爱激情综合| 欧美自拍大量在线观看| 亚洲国产精品欧美久久| 中文字幕日韩精品一区| 精品少妇人妻av一区二区三区| 久久国产欧美| 久久久99爱| 国产精品一区二区免费福利视频| 欧美另类在线播放| 欧美老女人性开放| 欧美一卡在线观看| 三上悠亚作品在线观看| 免费观看在线色综合| 日韩国产美国| 精品视频一区二区三区四区五区| 萌白酱国产一区二区| 国产精品久久免费| 精品久久在线播放| 粉嫩av性色av蜜臀av网站| 91免费国产在线| 亚洲精品在线网址| 日本成人超碰在线观看| 日韩中文字幕一区| 超碰精品在线| 午夜精品久久久久久久99黑人 | 国产精品xxx| 在线丨暗呦小u女国产精品| 亚洲精品久久久久久久蜜桃| 久久一夜天堂av一区二区三区| 欧美一区二区三区爽大粗免费| 久久人人爽人人爽人人片av不| 国产精品久久久久久久电影| 成人欧美一区| 亚洲国产精品人久久电影| 日本在线观看中文字幕| 99精品视频在线观看| 日韩在线综合网| 欧美福利影院| 久久本道综合色狠狠五月| 国产一区2区在线观看| 精品中文字幕视频| 秋霞a级毛片在线看| 日韩理论片久久| 免费观看黄色av| 日韩欧美你懂的| 亚洲黄色一区二区| 国产网站一区二区| 男生操女生视频在线观看 | 欧美第一页浮力影院| 免费在线播放第一区高清av| 免费超爽大片黄| 国产精品啊啊啊| 韩国无码av片在线观看网站| 久久免费视频66| 国产91精品一区二区绿帽| 2019年精品视频自拍| 国产成一区二区| 忘忧草在线www成人影院| 人九九综合九九宗合| 一区二区精品伦理... | 国产黄色片免费看| 天天影视色香欲综合网老头| 国产午夜福利片| 天天色天天操综合| 国产美女激情视频| 国产精品国产三级国产| 国产av一区二区三区传媒| 国产综合色在线| 亚洲视频在线不卡| 国产aⅴ精品一区二区三区色成熟| 婷婷激情小说网| 国产精品小仙女| 欧美成人黑人猛交| 日韩av在线免费观看不卡| 欧美伦理片在线看| 狠狠网亚洲精品| 日韩 欧美 高清| 久久天堂精品| 91极品尤物在线播放国产| 精品一区二区免费| 99福利在线观看| 亚洲视屏一区| 91猫先生在线| 国产一区视频在线观看免费| 无码熟妇人妻av在线电影| 亚洲香蕉网站| av免费观看网| 日本不卡视频在线| 超碰91在线播放| 99这里都是精品| 美国美女黄色片| 亚洲女爱视频在线| 五月天综合激情网| 欧美日本一道本| 东京干手机福利视频| 91精品国产麻豆| 黄色片网站免费在线观看| 亚洲乱码一区av黑人高潮| 色影院视频在线| 高清欧美性猛交| 国产精品亚洲一区二区三区在线观看| 91麻豆国产语对白在线观看| 国内毛片久久| 国产精品久久久久久久久久直播| 高清不卡一区| 久久精品国产精品青草色艺| 久久在线视频| 妺妺窝人体色777777| 久久99精品国产.久久久久久| 精品无码av一区二区三区不卡| 韩国v欧美v日本v亚洲v| 国产精品无码永久免费不卡| 成人动漫一区二区三区| 超碰人人cao| 国产精品一区二区视频| 性色av蜜臀av色欲av| 亚洲三级在线免费观看| 中文字幕精品无码一区二区| 色综合欧美在线| www.成人免费视频| 在线观看视频亚洲| 国产玉足榨精视频在线观看| 亚洲少妇中文在线| 国产福利免费在线观看| 欧美黄色www| 黑人一区二区三区| 鲁片一区二区三区| 在线欧美福利| 国产成人精品综合久久久久99 | 国产亚洲精品自拍| 4438x全国最大成人| 国产精品人人做人人爽人人添| 日本少妇性生活| 欧美成人精品高清在线播放| 最新真实国产在线视频| 国产va免费精品高清在线观看| 96sao在线精品免费视频| 正在播放91九色| 视频精品一区二区| 国产伦精品一区二区三区妓女| 一区二区三区视频在线观看| 91影院在线播放| 尤物tv国产一区| 黑人精品一区| 欧美精品国产精品久久久 | 精品少妇无遮挡毛片| 91玉足脚交白嫩脚丫在线播放| 久久久久久久久久综合 | 在线能看的av| 亚洲国产精品99| 九九在线视频| 奇米一区二区三区四区久久| 日本久久成人网| 久久成人免费观看| 91视频在线观看免费| 精品久久免费视频| 亚洲成色www8888| 丰满大乳少妇在线观看网站| 99r国产精品视频| 欧美精品国产| 毛片av免费在线观看| 91丨国产丨九色丨pron| 国产午夜免费福利| 亚洲欧美精品一区二区| 偷拍视频一区二区三区| 日韩精品福利视频| 捆绑调教美女网站视频一区| 欧美日韩国产一二三区| 91精品综合久久久久久| 天堂在线中文资源| 久久亚洲综合国产精品99麻豆精品福利| 国产毛片精品久久| 精品一区二区三区毛片| 国产传媒日韩欧美成人| 日本三级片在线观看| 亚洲精品第一页| 四虎4545www国产精品| 天天综合色天天综合色hd| 精品一区二区三区久久久| 亚洲国产成人精品综合99| 亚洲成avwww人| 伊人久久视频| 中国成人在线视频| 国产成人av网站| 欧美特黄aaaaaa| 日韩在线视频观看正片免费网站| av成人在线网站| 亚洲熟妇国产熟妇肥婆| 日本一区二区三区久久久久久久久不 | 久久精品无码中文字幕| 91丨九色丨黑人外教| 亚洲一区二区天堂| 久久久久国产视频| 欧美男gay| 1314成人网| 精品日本高清在线播放| av资源网在线观看| 国产成人av一区二区三区| 久久福利毛片| 国产精品国产三级国产传播| 日韩黄在线观看| heyzo高清国产精品| 亚洲a成v人在线观看| 日韩视频久久| av黄色免费在线观看| 日韩高清a**址| 九九99久久精品在免费线bt| 亚洲高清乱码| 成人h精品动漫一区二区三区| 免费看av在线|