數據庫慢查詢破局指南:從日志采集到智能診斷的全流程自動化方案
作者 | 崔皓
審校 | 重樓
整體思路
在數據庫運維場景中,“慢查詢報警” 往往是最讓工程師頭疼的問題之一 —— 就像我的運維學員所遭遇的:頻繁接到告警通知,卻始終難以快速定位問題根源是系統資源瓶頸、數據庫配置缺陷,還是應用層 SQL 寫法不規范。
為徹底解決這一 “定位難、分析繁” 的痛點,本文將通過 Fluent-Bit + Fluentd + DeepSeek 的實戰方案,構建一套 “從慢查詢信息采集到智能分析” 的全流程自動化體系。這套方案的核心價值在于:用工具鏈解決 “日志采集與處理” 的重復性工作,用大模型替代 “人工排查與分析” 的經驗依賴,讓 MySQL 慢查詢問題從 “被動響應” 轉變為 “主動診斷 + 精準解決”,大幅降低運維成本,提升數據庫性能穩定性。
我們將該項目實戰的整體思路整理為下圖所示:

具體思路可拆解為三大核心環節:
日志采集:當業務應用向 MySQL 數據庫發起查詢請求時,若查詢執行時間超出預設閾值(即觸發慢查詢條件),系統將先通過輕量級日志采集工具 Fluent-Bit,實時捕獲慢查詢的完整日志信息(含原始 SQL 語句、執行時間戳等基礎數據),再將采集到的日志穩定傳遞至 Fluentd 進行后續處理,確保慢查詢數據不丟失、不延遲。
提取信息:Fluentd 作為日志處理中樞,將對原始慢查詢日志進行格式化解析:通過正則匹配與字段提取邏輯,從非結構化日志中拆解出 “查詢耗時”“鎖定時間”“返回行數”“掃描行數” 等核心性能指標,同時識別 SQL 語句中涉及的數據庫表名,完成慢查詢信息從 “雜亂文本” 到 “結構化數據” 的轉化,為后續分析奠定基礎。
智能分析:為避免僅依賴日志數據導致的分析片面性,方案將進一步關聯 MySQL 數據庫的元數據信息 —— 通過數據庫連接工具獲取目標表的 “表結構定義”“索引配置”“表數據量” 等關鍵元數據,再將 “結構化慢查詢指標 + 表元數據 + 原始 SQL” 三部分信息整合為統一的分析素材,傳遞至 DeepSeek 大模型。大模型將基于數據庫性能優化知識,自動診斷慢查詢根源(如 “缺少關鍵索引導致全表掃描”“SQL 關聯邏輯冗余” 等),并生成包含具體優化建議(索引創建語句、SQL 改寫方案等)的結構化報告,最終幫助運維工程師快速定位問題類型(系統/數據庫/應用層),并直接落地優化操作。
實戰步驟
說完了整體思路,就到了項目實戰的環節,我們通過一張圖讓大家快速了解要經歷的實戰步驟,如下圖所示。

- 安裝 MySQL 數據庫,為后續的查詢及慢查詢日志提供準備。
- 配置慢查詢日志文件與觸發條件。指定慢查詢日志的存儲文件,同時設定觸發慢查詢的閾值(比如執行時間超過多少秒的查詢會被記錄為慢查詢)。
- 配置 Fluent - Bit 和 FluentD。Fluent - Bit 負責從 MySQL 端采集慢查詢日志,FluentD 則對采集到的日志進行過濾、解析等處理操作,它們共同構成了日志采集與初步處理的管道。
- 安裝 Fluent - Bit 與 FluentD,確保這兩個工具能在環境中正常運行,為后續的日志采集和處理提供工具支持。
- 生成日志分析程序(process_slow_log.py)。該程序會基于 FluentD 處理后的結構化日志,結合數據庫的表結構、索引等元數據,進一步深入分析慢查詢的原因,為后續生成報告提供分析依據。
- 生成日志查看應用(show_slow_report.py)。這個應用主要用于將日志分析程序處理后的結果,以更直觀、易讀的形式展示出來,方便用戶查看慢查詢的分析報告。
- 測試功能環節。通過模擬或實際的慢查詢場景,驗證從 MySQL 慢查詢日志的產生、采集、處理,到分析程序運行、報告展示這一整個流程是否正常工作。
數據庫安裝與配置
介紹完畢項目的執行步驟之后,我們就開始動手實現了,首先在 MySQL 的官網下載安裝文件,地址如下:
??https://dev.mysql.com/downloads/??
由于我所用的操作系統是 Windows ,所以點擊下圖“MySQL Installer for Windows”下載 MySQL,大家請根據自身系統情況下載對應的版本。

接著選擇“5.7.44”的 MySQL 版本。

下載完成后,雙擊mysql-installer-community-5.7.44.0.msi 文件進行安裝。
如下圖所示,在安裝時會提示更新安裝程序,選擇“No”。

如下圖所示,選擇“Custom”可以自定義安裝目錄,否則默認安裝到C盤。

接著選擇安裝“MySQL Server 5.7.44 - X64”版本,選中后點擊“Next”。

這里可以選擇安裝目錄,默認是C盤,也可以修改安裝目錄。

選擇“Execute”執行安裝。

勾選Show Advanced and Logging Options,其他配置按默認不變。

填寫root用戶的密碼,為了方便我直接使用“root”作為密碼,大家不用效仿,自行設置密碼即可。

后面就是一頓 Next 安裝,不過到了下面這個界面的時候請注意。
如下圖所示,修改Slow Query Log下的FilePath和Seconds。
- FilePath修改為mysql-slow.log(慢查詢日志文件);
- Seconds修改為3(sql執行超過3秒記錄到慢查詢日志中),當然也可以改成希望的時間,這個時間后面還可以通過配置文件修改。

接著,點擊“Execute”執行安裝。

最后,點擊“Finish”完成安裝。

安裝完成后打開MySQL 5.7 Command Line Client,按照下圖操作在“Windows”中搜索“mysql”,打開 MySQL 的客戶端。

在打開的客戶端中,輸入在安裝時就輸入過的密碼, 我這里輸入“root”密碼。

如果看到如下圖所示的內容,說明安裝成功。

設置慢查詢日志權限
這里需要注意的是,我們在安裝時定義過了慢查詢日志文件:mysql-slow.log,如果沒有更改安裝目錄會在c:\ProgramData\MySQL\MySQL Server 5.7\Data目錄下,在后續的使用中發現需要給mysql-slow.log文件賦予權限,否則 Fluent-bit 解析日志時會報錯。所以,在安裝完 MySQL 之后最后手動刪除這個文件,再重新啟動 MySQL 的服務,該服務會自動生成mysql-slow.log,并賦予它正確的權限。
當然如果沒有權限問題的小伙伴可以跳過這個步驟。
這里如下圖所示,在“Windows”中搜索“服務”,并打開。

在服務列表中選擇“MySQL57”的服務,并且點擊“重啟此服務”。

慢查詢配置
至此,MySQL 的安裝就完成了,接著再啰嗦一下慢查詢日志和閾值的配置。假設 MySQL 安裝在 c:\ProgramData\MySQL\MySQL Server 5.7(默認會在C盤)目錄下,如下圖所示我們可以打開該目錄下的 my.ini 文件。

并且,編輯文件內容如下:
# 啟用慢查詢的配置一定要放在[mysqld]下面
# 默認配置文件中如果有[mysqld]這里不要重復加[mysqld]
[mysqld]
# 啟用慢查詢日志
slow-query-log=1
# 慢查詢日志名稱,生成的文件路徑 c:\ProgramData\MySQL\MySQL Server 5.7\Data,如果
slow_query_log_file="mysql-slow.log"
# 定義慢查詢時間(秒)
long_query_time=2該文件可以啟用慢查詢,保存配置后需要重啟MySQL服務。
- slow-query-log=1:
當設置為 1 時,MySQL 會將執行時間超過指定閾值的查詢操作記錄到慢查詢日志文件中;如果設置為 0,則關閉慢查詢日志功能。
- slow_query_log_file="mysql-slow.log":
設置的日志文件名為 mysql-slow.log,MySQL 會將慢查詢相關的日志內容寫入到這個文件中。
- long_query_time=2:
定義了慢查詢的時間閾值,單位是秒。當一條 SQL 查詢語句的執行時間超過這個值(為了方便演示,我設置的是 2 秒)時,這條查詢就會被認定為慢查詢,進而被記錄到上述指定的慢查詢日志文件中。也就是說,只有執行時間超過 2 秒的查詢才會被 MySQL 記錄到慢查詢日志里,方便 DBA(數據庫管理員)后續分析和優化這些執行效率低下的查詢。
配置 fluent-bit
完成 MySQL 安裝以及慢查詢日志配置之后,就需要配置 Fluent-bit 采集日志信息。個人建議在本地創建目錄用來存放 Fluent-bit 的配置文件,這個文件用來解析慢日志的內容。由于后期我們會使用 Docker 安裝 Fluent-bit,所以也需要將這個目錄下的配置文件與 Docker 中安裝的 Fluent-bit 文件進行掛載,這樣方便修改配置信息。
我在本地“d:\docker\EFK” 下面分別建立 fluent-bit 和 fluentd 兩個目錄,大家可以根據具體情況創建相似的目錄。

配置日志解析
通過前面內容的介紹可以得知,fluent-bit 的主要任務是采集并解析慢日志的內容,采集屬于基本功能,而解析需要按照 MySQL 慢日志的格式對文本進行處理。先通過如下慢日志文本觀察其結構。
C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe, Version: 5.7.44-log (MySQL Community Server (GPL)). started with:
TCP Port: 3306, Named Pipe: (null)
Time Id Command Argument
# Time: 2025-09-30T10:35:46.687011Z
# User@Host: root[root] @ localhost [::1] Id: 2
# Query_time: 4.063656 Lock_time: 0.017691 Rows_sent: 100694 Rows_examined: 1814434
use after_sale;
SET timestamp=1759228546;
SELECT
u.UserId, u.Name, u.Email,
i.IssueId, i.Description, i.Status,
cs.CustomerServiceId, cs.Name,
s.SolutionId, s.SolutionDescription,
si.ActionTime
FROM SupportIssue si
JOIN Issue i ON si.IssueId = i.IssueId
JOIN User u ON i.UserId = u.UserId
JOIN CustomerService cs ON si.CustomerServiceId = cs.CustomerServiceId
JOIN Solution s ON si.SolutionId = s.SolutionId
WHERE s.IsValid = 1
AND i.Status = 'unsolve'
ORDER BY si.ActionTime DESC;雖然,日志的內容比較多,可以從如下幾個方面進行分析,慢查詢日志由 5 部分組成,呈現 “起始行 + 元數據行 + SQL 語句行” 的多行結構:
1. 起始行:# Time: 2025-09-30T10:35:46.687011Z以# Time: 開頭,包含精確到微秒的時間戳,是每條慢查詢日志的唯一起始標識。
2. 用戶與連接信息行:# User@Host: root[root] @ localhost [::1] Id: 2記錄執行查詢的用戶、主機和連接 ID,是日志的元數據部分。
3. 性能指標行:# Query_time: 4.063656 Lock_time: 0.017691 Rows_sent: 100694 Rows_examined: 1814434包含查詢耗時(4.06 秒)、鎖定時間、返回行數、掃描行數等核心性能數據,是后續分析的關鍵指標。
4. 環境與時間設置行:use after_sale; 和 SET timestamp=1759228546;記錄查詢執行的數據庫環境(切換到after_sale庫)和時間戳設置,屬于 SQL 執行的前置操作。
5. SQL 語句行:從SELECT到最后的;完整的查詢語句,包含多表關聯(JOIN)、條件篩選(WHERE)和排序(ORDER BY),是慢查詢分析的核心對象。
接著,我們就需要根據日志“特征”對其進行解析。先來一波分析,如下:
第一步:找到 “唯一起始標志”—— 確定一條日志的開端
所有結構化日志(包括 MySQL 慢查詢日志)都有一個共同特點:每條完整日志都有唯一的 “開頭標識”,這是區分不同日志的關鍵。在示例慢查詢日志中,# Time: 2025-09-30T10:35:46.687011Z 就是典型的起始標志 —— 它以固定前綴 # Time: 開頭,后跟標準化的時間戳(年 - 月 - 日 T 時:分: 秒。毫秒 Z),且每條慢查詢日志必然以此行開始。因此,第一步要做的就是:用正則表達式精準匹配這個起始標志,告訴 Fluent Bit“遇到這樣的行,就意味著一條新的慢查詢日志開始了”。因此需要用正則表達式 /^# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/ :^ 錨定行首,\d{4}-\d{2}-\d{2}T... 匹配時間戳格式,確保只命中起始行。

第二步:定義 “延續行規則”—— 合并日志的后續內容
一條完整的慢查詢日志除了起始行,還包含用戶信息(# User@Host: ...)、性能指標(# Query_time: ...)、SQL 語句等多行內容。這些行的共同特點是:它們不是 “新日志的起始行”,而是當前日志的一部分。因此,第二步的思路是:用 “否定邏輯” 匹配所有 “不是起始行” 的內容,告訴 Fluent Bit“只要不是新日志的開頭,就都歸到當前日志里”。因此,需要通過正則表達式: /^(?!\# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).*/ 采用了正則的 “否定前瞻” 語法(?!),意思是 “匹配所有不以 # Time: 時間戳 開頭的行”。無論這些行是# User@Host這樣的元數據,還是SELECT ...這樣的 SQL 語句(哪怕 SQL 跨多行),都會被合并到當前日志中。
第三步:設置 “超時兜底”—— 避免日志截斷或阻塞
實際場景中,日志可能因系統延遲、寫入中斷等原因出現 “不完整” 的情況(比如一條日志寫到一半突然停了)。這時需要一個 “超時機制”:如果一段時間內沒有新內容加入,就強制結束當前日志的合并,避免數據一直卡在緩沖區。示例中的 Flush_Timeout 5000 就是這個作用(5000 毫秒 = 5 秒),確保即使日志不完整,也能在 5 秒后輸出已收集的內容,平衡數據完整性和處理效率。
好了,有了上面的分析之后,我們就可以在 fluent-bit 目錄下面創建 etc 目錄,同時創建“parsers_mysql_slow.conf” 文件,它用來對慢查詢日志進行解析。如下:
[MULTILINE_PARSER]
# 多行解析器名稱,用于在輸入插件中引用
Name mysql_slow
# 解析器類型,regex 表示使用正則表達式匹配
Type regex
# 超時時間(毫秒),當多行日志間隔超過此時間時強制刷新緩沖區
Flush_Timeout 5000
# 第一條規則:定義多行日志的起始行
# "start_state" - 初始狀態名稱
# 正則表達式匹配以 "# Time:" 開頭的行(慢查詢日志的開始)
# "cont" - 匹配后跳轉到的下一個狀態
Rule "start_state" "/^# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/" "cont"
# 第二條規則:定義多行日志的延續行
# "cont" - 當前狀態名稱(與上一條規則的跳轉狀態對應)
# 正則表達式匹配不以 "# Time:" 開頭的行(即延續行)
# "cont" - 匹配后保持當前狀態,繼續收集多行內容
Rule "cont" "/^(?!\# Time: \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).*/" "cont"這個文件是用來解析慢日志內容的,遵循了幾個原則:
- 找 “開頭”:觀察日志格式,找到每條日志唯一的起始標志(如特定前綴、固定格式的時間戳等),用正則精準匹配;
- 定 “延續”:用 “否定起始標志” 的邏輯,匹配所有屬于當前日志的后續行,確保不遺漏任何內容;
- 設 “超時”:添加超時配置,應對日志不完整的極端情況。
這里我們介紹解析日志的部分,是希望大家遇到類似的日志時,具備基本的分析能力。當然,還有一個彎道超車的方法,就是丟給大模型讓它幫你分析日志,自動生成MULTILINE_PARSER 的內容。
引用日志解析
在“\fluent-bit\etc”目錄下創建fluent-bit.conf 文件, 加入如下內容:
##############################################
# Fluent Bit 配置文件 (INI格式)
# 注意:注釋必須獨占一行,不能與配置項同行
##############################################
[SERVICE]
# 指定自定義解析器配置文件路徑
# 該文件包含專門的多行日志解析規則(如mysql慢查詢)
Parsers_File /fluent-bit/etc/parsers_mysql_slow.conf
##############################################
# 輸入插件配置(收集日志)
##############################################
# 輸入源:采集mysql慢查詢日志(支持多行處理)
[INPUT]
# 使用 tail 插件監控文件變化
Name tail
# 自定義標簽,標識為mysql慢查詢日志
Tag mysql.slow
# mysql慢查詢日志文件路徑
Path /var/log/mysql-slow.log
Read_from_Head On
# 跳過超長行,避免解析錯誤導致進程崩潰
Skip_Long_Lines On
# 檢查文件變化的間隔時間(秒)
Refresh_Interval 10
# 指定多行解析器名稱(需在parsers_mysql_slow.conf中定義)
# 用于mysql慢查詢跨多行的日志條目
multiline.parser mysql_slow
##############################################
# 輸出插件(轉發到 Fluentd)
##############################################
[OUTPUT]
# 使用 forward 插件將日志轉發到Fluentd聚合器
Name forward
# 匹配所有標簽的日志(* 是通配符,表示所有輸入源)
Match *
# Fluentd 服務地址(使用Docker Compose服務名進行服務發現)
Host fluentd
# Fluentd 監聽端口(forward插件的默認端口)
Port 24224
# 網絡故障時的最大重試次數,防止無限重試消耗資源
Retry_Limit 10
# 輸出源2:同時輸出到控制臺(用于調試和監控)
[OUTPUT]
# 使用 stdout 插件在控制臺打印日志
Name stdout
# 匹配所有標簽的日志
Match *
# 注意:生產環境通常應注釋或移除此輸出,避免日志重復和性能開銷該文件配置聚焦 MySQL 慢查詢日志的采集與解析,通過 [SERVICE] 模塊加載存放慢查詢多行解析規則的parsers_mysql_slow.conf文件,再以 [INPUT] 模塊的 tail 插件,監控/var/log/mysql-slow.log路徑下的慢查詢日志文件,同時啟用mysql_slow多行解析器確??缍嘈腥罩就暾喜ⅲ詈笸ㄟ^ [OUTPUT] 模塊的 forward 插件,將標記為mysql.slow的日志轉發到 Fluentd(地址為 fluentd,端口 24224)做進一步處理,全程僅保留慢查詢采集與解析的關鍵配置。
配置 Fluentd
Fluent-bit 的配置完成之后,就輪到 Fluentd 了, 前者主要任務是日志的采集和解析,后者則需要完成日志結構化、重要信息提取以及調用智能報告分析服務等功能。
在“\fluentd\conf” 創建新文件fluent.conf
文件內容比較多,我們聚焦如下幾個內容:
核心,通過 4 個filter插件對原始慢查詢日志進行 “解析→提取→轉換→精簡”,最終輸出結構化數據,僅針對mysql.slow標簽的日志生效:
第 1 個 filter:解析原始日志(parser)
<filter mysql.slow>
@type parser # 日志解析插件
key_name log # 待解析的字段(Fluent Bit轉發的原始日志存放在"log"字段中)
reserve_data true # 保留原始數據,避免解析后丟失信息
<parse>
@type regexp # 用正則表達式解析
# 正則提?。喝罩绢^部(時間、用戶、查詢耗時等)和SQL主體
expression /^(?<header># Time: (?<time>.+)\n# User@Host: (?<user_host>.+)\n# Query_time: (?<query_time>\d+\.\d+)\s+Lock_time: (?<lock_time>\d+\.\d+)\s+Rows_sent: (?<rows_sent>\d+)\s+Rows_examined: (?<rows_examined>\d+))(?<sql_body>[\s\S]*)$/
</parse>
</filter>將非結構化的 “log” 字段拆分為結構化字段,最終生成time(慢查詢時間)、user_host(執行用戶與主機)、query_time(查詢耗時)、lock_time(鎖定時間)、rows_sent(返回行數)、rows_examined(掃描行數)、sql_body(完整 SQL 語句)等字段,為后續處理打基礎。
第 2 個 filter:提取涉及的表名(record_transformer)
<filter mysql.slow>
@type record_transformer # 日志字段轉換插件
enable_ruby true # 啟用Ruby腳本,實現復雜邏輯
<record>
# Ruby腳本:從sql_body中提取SQL涉及的表名(支持帶反引號/帶庫名的表名格式)
tables ${ sql = record["sql_body"].to_s; pattern = /(?:FROM|JOIN)\s+(?:(?<schema1>[a-zA-Z_][a-zA-Z0-9_]*)\.(?<table1>[a-zA-Z_][a-zA-Z0-9_]*)|(?<schema2>`[^`]+`)\.(?<table2>`[^`]+`)|(?<schema3>[a-zA-Z_][a-zA-Z0-9_]*)\.(?<table3>`[^`]+`)|(?<schema4>`[^`]+`)\.(?<table4>[a-zA-Z_][a-zA-Z0-9_]*)|(?<table5>[a-zA-Z_][a-zA-Z0-9_]*)|(?<table6>`[^`]+`))(?:\s|$)/; matches = sql.scan(pattern); tables = []; matches.each { |m| schema1, table1, schema2, table2, schema3, table3, schema4, table4, table5, table6 = m; if schema1 && table1; tables << "#{schema1}.#{table1}"; elsif schema2 && table2; tables << "#{schema2}.#{table2}"; elsif schema3 && table3; tables << "#{schema3}.#{table3}"; elsif schema4 && table4; tables << "#{schema4}.#{table4}"; elsif table5 && !["si","i","u","cs","s"].include?(table5); tables << table5; elsif table6; tables << table6; end }; tables.uniq }
</record>
</filter>作用:通過 Ruby 腳本和正則,從sql_body的FROM/JOIN關鍵字后提取涉及的表名(支持db.table、`db`.`table`等多種格式),去重后生成tables字段(表名數組,如["after_sale.SupportIssue", "after_sale.Issue"]),方便后續關聯表結構元數據。
第 3 個 filter:衍生字段與類型轉換(record_transformer)
<filter mysql.slow>
@type record_transformer
enable_ruby true
<record>
tables_str ${ record["tables"].join(",") } # 將表名數組轉為字符串(如"table1,table2")
table_count ${ record["tables"].size } # 計算涉及的表數量
# 類型轉換:將字符串格式的數值轉為浮點數/整數(便于后續分析計算)
query_time_float ${ record["query_time"].to_f }
lock_time_float ${ record["lock_time"].to_f }
rows_sent_int ${ record["rows_sent"].to_i }
rows_examined_int ${ record["rows_examined"].to_i }
</record>
</filter>作用:基于已有字段生成衍生信息(表名字符串、表數量),并將 “查詢耗時”“行數” 等字符串字段轉為數值類型(避免后續分析時因類型錯誤導致問題)。
第 4 個 filter:精簡字段(record_transformer)
<filter mysql.slow>
@type record_transformer
remove_keys sql_body,header # 刪除無用字段(sql_body、header已完成使命,精簡數據)
</filter>作用:刪除解析過程中生成的臨時字段(sql_body、header),減少后續輸出的數據量,避免冗余。
四個過濾器之后接著的是“match”,將結構化后的慢查詢日志分兩路輸出:
<match *.**> # 匹配所有標簽的日志(此處主要匹配mysql.slow)
@type copy # 復制插件:將同一份日志同時發送到多個目的地
# 輸出目標1:控制臺(調試用)
<store>
@id output
@type stdout # 輸出到Fluentd的控制臺
</store>
# 輸出目標2:HTTP接口(對接慢查詢分析服務)
<store>
@id http_output
@type http # HTTP輸出插件
# 發送到宿主機的5001端口(host.docker.internal是Docker專屬地址,指向宿主機)
endpoint http://host.docker.internal:5001/analyze-slow-query
http_method post # 用POST方法發送
<format>
@type json # 日志格式轉為JSON(便于后端API解析)
</format>
<buffer>
# 緩沖配置:平衡實時性與可靠性
flush_interval 2s # 每2秒刷新一次緩沖區(即使數據未滿)
retry_type exponential_backoff # 指數退避重試(避免網絡故障時頻繁重試)
retry_wait 1s # 首次重試等待1秒
retry_max_interval 30s # 最大重試間隔30秒(避免等待過久)
retry_timeout 10m # 10分鐘后放棄重試(防止無限重試)
</buffer>
</store>
</match>作用:
控制臺輸出:用于調試(查看日志是否正確處理),生產環境可關閉;
HTTP 接口輸出:將 JSON 格式的結構化日志發送到宿主機的analyze-slow-query接口(后續提供該接口實現),調用大模型生成優化報告。
安裝 Fluent-Bit 和 FluentD
由于本案例需要安裝 fluent-bit、fluentd等應用,為了方便安裝與調試,計劃使用 docker 方式對他們進行安裝。于是 docker compose 的安裝方式就成了最佳選擇,它可以用于定義和管理多容器 Docker 應用的 YAML 配置文件,能將多個關聯的容器(如應用服務、數據庫、緩存等)的配置(鏡像、端口映射、數據卷、環境變量、依賴關系等)集中整合,通過 docker compose 命令一鍵實現多容器的創建、啟動、停止、重啟等操作。其核心益處在于簡化了多容器應用的部署與管理流程。選擇??Fluentd官網的docker-compose?? 文件,并在其基礎上進行修改,從而適應安裝需求。
??https://docs.fluentd.org/container-deployment/docker-compose#step-0-create-docker-compose.yml??
我們可以直接下載并修改 yml 文件,在文件中找到服務名fluent-bit增加掛載配置volumes,添加如下內容:
# 數據卷掛載:將宿主機的目錄或文件掛載到容器內,實現數據持久化或配置注入
volumes:
# Fluent Bit 會監控這個目錄下的日志文件變化,并收集新產生的日志。
- C:\ProgramData\MySQL\MySQL Server 5.7\Data\mysql-slow.log:/var/log/mysql-slow.log
# 將宿主機的自定義解析器配置文件掛載到容器內,用于解析特定格式的日志(如 mysql慢查詢日志)。
- D:\docker\EFK\fluent-bit\etc\parsers_mysql_slow.conf:/fluent-bit/etc/parsers_mysql_slow.conf
# 將宿主機的 Fluent Bit 主配置文件掛載到容器內,替代鏡像內的默認配置。
# 這個文件定義了數據輸入(Input)、處理(Parser, Filter)和輸出(Output)的規則。
- D:\docker\EFK\fluent-bit\etc\fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf在配置中找到服務名fluentd(在services下一級)增加掛載配置volumes,要在container_name同級。
volumes:
# 將宿主機上的 Fluentd 配置目錄掛載到容器內,使配置變更無需重新構建鏡像。
- D:\docker\EFK\fluentd\conf\fluent.conf:/fluentd/etc需要注意的是,這里的 MySQL 慢日志文件目錄、Fluent-bit 以及 FluentD 配置文件目錄都在前面的配置中出現過了,按照你本地的配置填寫就好了,其目的就是方便修改 Docker 容器中的配置文件以及對日志文件的監聽。
如下圖所示,就是我的目錄結構,在 EFK 目錄下面放的就是 docker-compose.yml 文件。

配置完成來到 docker-compose.yml 文件所在目錄,執行如下命令安裝容器:
docker compose -f docker-compose.yml up -d此時打開 docker desktop,如下圖所示,可以看到 fluent-bit、fluent 都啟動了,另外還有 kibana 和 es 也啟動了,本例中后面兩個容器用不到,可以關閉節省資源。

編寫“生成日志分析程序”
好了,到這里我們已完成整個實例 50% 以上的工作了,回到大圖看看進展。如下圖所示,在安裝 Fluent-Bit 和 FluentD 之后,就要編寫日志分析程序了。

在編寫程序之前執行 pip 命令安裝必要依賴如下:
pip install dotenv "flask[async]" openai接著在工作目錄下創建環境變量文件 .env 文件,我的工作目錄如下圖所示,我在 D 盤的 docker 目錄下創建了 mysql 目錄,將所有與本次實踐相關的代碼和配置都放這里了。

.env 的具體文件內容如下:
# DeepSeek API配置
DEEPSEEK_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# 數據庫配置
DB_USER=root
DB_PASSWORD=root需要注意的是,確保 .env 文件與 Python 腳本在同一目錄即可。
好了,再創建process_slow_log.py文件用來處理慢查詢日志,這個文件的代碼比較長,其實一句話就可以概括:把慢日志基本信息、 SQL、表結構和記錄數都丟給大模型(DeepSeek),讓它分析并產生解決方案,最后生成報表。
具體代碼如下:
import os
import re
import json
import mysql.connector
import threading
import asyncio
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from flask import Flask, request, Response
from openai import AsyncOpenAI, APIError, APIConnectionError, Timeout
# 應用配置 - 非敏感配置集中管理
@dataclass
class AppConfig:
# 服務器配置
HOST: str = "0.0.0.0"
PORT: int = 5001
DEBUG: bool = False
# 路徑配置
CURRENT_DIR: str = os.path.dirname(os.path.abspath(__file__))
LOG_DIR: str = os.path.join(CURRENT_DIR, "logs")
REPORT_DIR: str = os.path.join(CURRENT_DIR, "reports")
SLOW_REPORT_PATH: str = os.path.join(REPORT_DIR, "slow_report.json") # 統一報告文件路徑
# 日志文件路徑
OPERATION_LOG_PATH: str = os.path.join(LOG_DIR, "api_operation.log")
ERROR_LOG_PATH: str = os.path.join(LOG_DIR, "api_error.log")
# LLM配置
LLM_BASE_URL: str = "https://api.deepseek.com"
LLM_MODEL: str = "deepseek-chat"
LLM_TEMPERATURE: float = 0.7
LLM_MAX_TOKENS: int = 2000
# 數據庫連接配置
DB_CONNECT_TIMEOUT: int = 10
DB_CHARSET: str = "utf8mb4"
DEFAULT_DB_SCHEMA: str = "after_sale" # 無schema時使用的默認數據庫
# 內容類型配置
SUPPORTED_CONTENT_TYPE: str = "application/x-ndjson"
# 初始化配置與目錄
config = AppConfig()
os.makedirs(config.LOG_DIR, exist_ok=True)
os.makedirs(config.REPORT_DIR, exist_ok=True)
# 初始化報告文件(如果不存在)
if not os.path.exists(config.SLOW_REPORT_PATH):
with open(config.SLOW_REPORT_PATH, 'w', encoding='utf-8') as f:
json.dump([], f, ensure_ascii=False, indent=2)
# 初始化Flask應用與LLM客戶端(延遲初始化)
app = Flask(__name__)
llm_client: Optional[AsyncOpenAI] = None
def init_llm_client() -> AsyncOpenAI:
"""初始化LLM客戶端,檢查API密鑰"""
global llm_client
if llm_client is None:
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
raise ValueError("未找到DEEPSEEK_API_KEY環境變量,請配置")
llm_client = AsyncOpenAI(
api_key=api_key,
base_url=config.LLM_BASE_URL
)
return llm_client
def get_db_config(schema: Optional[str] = None) -> Dict[str, Any]:
"""從環境變量獲取數據庫配置,支持指定schema"""
config_dict = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'port': int(os.getenv('DB_PORT', 3306)),
'connect_timeout': config.DB_CONNECT_TIMEOUT,
'charset': config.DB_CHARSET,
'autocommit': True
}
# 如果指定了schema,則添加到配置
if schema:
config_dict['database'] = schema
return config_dict
def write_operation_log(message: str, request_id: Optional[str] = None) -> None:
"""記錄操作日志,包含時間戳和請求ID"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
request_id = request_id or "N/A"
log_entry = f"[{timestamp}] [REQUEST={request_id}] OPERATION: {message}\n"
try:
with open(config.OPERATION_LOG_PATH, 'a', encoding='utf-8') as f:
f.write(log_entry)
print(log_entry.strip())
except Exception as e:
print(f"[日志系統錯誤] 寫入操作日志失敗: {str(e)} | 原始消息: {message}")
def write_error_log(error_message: str, request_id: Optional[str] = None) -> None:
"""記錄錯誤日志,包含時間戳和請求ID"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
request_id = request_id or "N/A"
log_entry = f"[{timestamp}] [REQUEST={request_id}] ERROR: {error_message}\n"
try:
with open(config.ERROR_LOG_PATH, 'a', encoding='utf-8') as f:
f.write(log_entry)
print(log_entry.strip())
except Exception as e:
print(f"[日志系統錯誤] 寫入錯誤日志失敗: {str(e)} | 原始錯誤: {error_message}")
def clean_table_name(table_name: str) -> tuple[str, str]:
"""
清理表名中的反引號和提取數據庫前綴
優化點:無schema時使用配置的默認數據庫
"""
# 移除反引號
cleaned = re.sub(r'`', '', table_name)
# 分割數據庫和表名(如果存在)
parts = cleaned.split('.')
if len(parts) == 2:
return parts[0].strip(), parts[1].strip() # (數據庫名, 表名)
# 無schema時使用默認數據庫
return config.DEFAULT_DB_SCHEMA, cleaned.strip() # (默認數據庫, 表名)
def get_table_info(table_name: str, request_id: str) -> Dict[str, Any]:
"""獲取表的 DDL 和行數,優化無schema表名的處理"""
db_name, clean_name = clean_table_name(table_name)
write_operation_log(f"開始獲取表 {table_name} 的信息 (清理后: {db_name}.{clean_name})", request_id)
try:
# 獲取數據庫配置,指定數據庫名
db_config = get_db_config(db_name)
write_operation_log(f"使用數據庫配置: host={db_config['host']}, port={db_config['port']}, database={db_name}, user={db_config['user']}", request_id)
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
# 獲取表的 DDL
cursor.execute(f"SHOW CREATE TABLE `{clean_name}`")
create_table = cursor.fetchone()
ddl = create_table['Create Table'] if create_table else None
write_operation_log(f"成功獲取表 {db_name}.{clean_name} 的DDL信息", request_id)
# 獲取表的行數(快速近似值)
cursor.execute(f"SHOW TABLE STATUS LIKE '{clean_name}'")
table_status = cursor.fetchone()
row_count = table_status['Rows'] if table_status else None
write_operation_log(f"成功獲取表 {db_name}.{clean_name} 的行數信息: {row_count}", request_id)
result = {
'table_name': table_name,
'cleaned_name': f"{db_name}.{clean_name}",
'database': db_name,
'table': clean_name,
'ddl': ddl,
'row_count': row_count,
'error': None
}
cursor.close()
conn.close()
write_operation_log(f"完成表 {table_name} 的信息獲取", request_id)
return result
except mysql.connector.Error as e:
# 更詳細的數據庫錯誤處理
error_msg = f"數據庫錯誤 (代碼: {e.errno}): {e.msg}"
write_error_log(f"獲取表 {db_name}.{clean_name} 信息失敗: {error_msg}", request_id)
return {
'table_name': table_name,
'cleaned_name': f"{db_name}.{clean_name}",
'database': db_name,
'table': clean_name,
'error': error_msg,
'ddl': None,
'row_count': None
}
except Exception as e:
error_msg = f"獲取表信息失敗: {str(e)}"
write_error_log(f"獲取表 {db_name}.{clean_name} 信息失敗: {error_msg}", request_id)
return {
'table_name': table_name,
'cleaned_name': f"{db_name}.{clean_name}",
'database': db_name,
'table': clean_name,
'error': error_msg,
'ddl': None,
'row_count': None
}
def generate_report_title(log_data: Dict[str, Any]) -> str:
"""生成報告標題,包含關鍵信息"""
query_time = log_data.get('query_time', '未知')
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
tables_str = log_data.get('tables_str', '多個表')
# 提取SQL中的主要操作(SELECT/UPDATE/DELETE等)
sql_body = log_data.get('log', '').upper()
operation = '查詢'
if 'UPDATE' in sql_body:
operation = '更新'
elif 'DELETE' in sql_body:
operation = '刪除'
elif 'INSERT' in sql_body:
operation = '插入'
return f"{timestamp} - 慢{operation}分析報告(耗時: {query_time}秒,涉及表: {tables_str[:50]})"
def generate_prompt(log_data: Dict[str, Any], table_info_list: List[Dict[str, Any]], request_id: str) -> str:
"""生成發送給大模型的提示詞"""
write_operation_log("開始生成提示詞", request_id)
prompt = f"""請分析以下MySQL慢查詢日志,并提供優化建議。
慢查詢日志詳情:
{log_data['log']}
查詢時間: {log_data['query_time']} 秒
鎖定時間: {log_data['lock_time']} 秒
返回行數: {log_data['rows_sent']}
掃描行數: {log_data['rows_examined']}
涉及表結構及行數:
"""
for table_info in table_info_list:
if table_info['error']:
prompt += f"- 表 {table_info['cleaned_name']}: 錯誤 - {table_info['error']}\n"
else:
prompt += f"- 表 {table_info['cleaned_name']} (約 {table_info['row_count']} 行):\n"
prompt += f" DDL: {table_info['ddl'][:500]}...\n\n"
prompt += """
請基于以上信息,分析該慢查詢的性能問題原因,并提供具體的優化建議,包括但不限于:
1. 索引優化建議
2. SQL語句改寫建議
3. 表結構優化建議
4. 其他可能的性能改進方案
請提供詳細且可操作的建議,避免泛泛而談。
"""
write_operation_log(f"提示詞生成完成,長度: {len(prompt)}字符", request_id)
return prompt
async def call_deepseek_api(prompt: str, request_id: str) -> str:
"""調用DeepSeek API生成分析報告"""
write_operation_log("開始調用DeepSeek API", request_id)
try:
client = init_llm_client()
response = await client.chat.completions.create(
model=config.LLM_MODEL,
messages=[
{"role": "system", "content": "你是一位數據庫性能優化專家,擅長分析MySQL慢查詢并提供優化建議。"},
{"role": "user", "content": prompt}
],
temperature=config.LLM_TEMPERATURE,
max_tokens=config.LLM_MAX_TOKENS,
stream=False
)
write_operation_log(f"API調用成功,響應ID: {response.id}", request_id)
write_operation_log(
f"Token使用情況: 輸入={response.usage.prompt_tokens}, 輸出={response.usage.completion_tokens}",
request_id
)
return response.choices[0].message.content
except APIError as e:
error_msg = f"LLM API錯誤 (狀態碼: {e.status_code}): {e.message}"
write_error_log(error_msg, request_id)
return f"分析失敗: {error_msg}"
except APIConnectionError as e:
error_msg = f"LLM連接錯誤: {str(e)}"
write_error_log(error_msg, request_id)
return f"分析失敗: {error_msg}"
except Timeout as e:
error_msg = f"LLM調用超時: {str(e)}"
write_error_log(error_msg, request_id)
return f"分析失敗: {error_msg}"
except Exception as e:
error_msg = f"LLM調用未知錯誤: {str(e)}"
write_error_log(error_msg, request_id)
return f"分析失敗: {error_msg}"
def append_to_report(report_data: Dict[str, Any], request_id: str) -> bool:
"""將報告數據追加到slow_report.json數組中"""
write_operation_log(f"開始將報告追加到 {config.SLOW_REPORT_PATH}", request_id)
try:
# 讀取現有報告
existing_reports = []
if os.path.exists(config.SLOW_REPORT_PATH):
try:
with open(config.SLOW_REPORT_PATH, 'r', encoding='utf-8') as f:
existing_reports = json.load(f)
write_operation_log(f"成功讀取現有報告,共{len(existing_reports)}條記錄", request_id)
except json.JSONDecodeError:
write_error_log("報告文件損壞,將重新初始化", request_id)
existing_reports = []
# 添加新報告
existing_reports.append(report_data)
# 寫入更新后的報告
with open(config.SLOW_REPORT_PATH, 'w', encoding='utf-8') as f:
json.dump(existing_reports, f, ensure_ascii=False, indent=2)
write_operation_log(f"報告已成功追加到 {config.SLOW_REPORT_PATH}", request_id)
return True
except Exception as e:
error_msg = f"追加報告到文件失敗: {str(e)}"
write_error_log(error_msg, request_id)
return False
def extract_valid_log_records(ndjson_data: List[str], request_id: str) -> List[Dict[str, Any]]:
"""解析NDJSON數據并提取有效的日志記錄"""
write_operation_log(f"開始解析NDJSON數據,共{len(ndjson_data)}行", request_id)
records = []
for i, line in enumerate(ndjson_data):
line = line.strip()
if not line:
write_operation_log(f"第{i+1}行是空白行,已忽略", request_id)
continue
try:
record = json.loads(line)
if isinstance(record, dict) and 'log' in record and 'tables' in record:
records.append(record)
write_operation_log(f"第{i+1}行NDJSON解析成功,包含有效日志記錄", request_id)
else:
missing_fields = []
if 'log' not in record:
missing_fields.append('log')
if 'tables' not in record:
missing_fields.append('tables')
write_operation_log(
f"第{i+1}行NDJSON缺少必要字段: {', '.join(missing_fields)},已忽略",
request_id
)
except json.JSONDecodeError as e:
error_msg = f"第{i+1}行NDJSON解析失敗: {str(e)}"
write_error_log(error_msg, request_id)
write_operation_log(f"NDJSON解析完成,共獲取{len(records)}條有效日志記錄", request_id)
return records
async def process_analysis_background(log_records: List[Dict[str, Any]], request_id: str) -> None:
"""后臺處理分析流程"""
try:
write_operation_log(f"開始后臺分析流程,共處理{len(log_records)}條記錄", request_id)
for record_idx, log_data in enumerate(log_records):
write_operation_log(f"開始處理第{record_idx+1}/{len(log_records)}條記錄", request_id)
# 1. 生成報告標題
report_title = generate_report_title(log_data)
write_operation_log(f"生成報告標題: {report_title}", request_id)
# 2. 獲取所有表的信息
table_info_list = []
for table_name in log_data.get('tables', []):
table_info = get_table_info(table_name, request_id)
table_info_list.append(table_info)
# 3. 生成提示詞并調用API
prompt = generate_prompt(log_data, table_info_list, request_id)
analysis_report = await call_deepseek_api(prompt, request_id)
# 4. 準備報告數據
report_data = {
"id": f"{request_id}-record-{record_idx+1}",
"title": report_title,
"request_id": request_id,
"record_index": record_idx + 1,
"total_records": len(log_records),
"timestamp": datetime.now().isoformat(),
"original_data": {
"query_time": log_data.get('query_time'),
"lock_time": log_data.get('lock_time'),
"rows_sent": log_data.get('rows_sent'),
"rows_examined": log_data.get('rows_examined'),
"log": log_data.get('log'),
"tables": log_data.get('tables')
},
"table_info": table_info_list,
"analysis_report": analysis_report
}
# 5. 追加到報告文件
if append_to_report(report_data, request_id):
write_operation_log(f"第{record_idx+1}條記錄分析報告已追加到匯總文件", request_id)
else:
write_error_log(f"第{record_idx+1}條記錄分析報告追加失敗", request_id)
write_operation_log("所有記錄分析完成", request_id)
except Exception as e:
write_error_log(f"后臺分析流程失敗: {str(e)}", request_id)
def run_async_task(log_records: List[Dict[str, Any]], request_id: str) -> None:
"""線程包裝器,運行異步任務"""
try:
write_operation_log("啟動異步處理線程", request_id)
asyncio.run(process_analysis_background(log_records, request_id))
except Exception as e:
write_error_log(f"異步任務執行失敗: {str(e)}", request_id)
@app.route('/analyze-slow-query', methods=['POST'])
def analyze_slow_query():
"""分析慢查詢日志的API端點"""
# 生成唯一請求ID
request_id = datetime.now().strftime("%Y%m%d%H%M%S") + f"-{os.urandom(4).hex()}"
write_operation_log("收到慢查詢分析請求", request_id)
try:
# 記錄請求基本信息
call_timestamp = datetime.now().isoformat()
write_operation_log(
f"請求信息 - 方法: {request.method}, 客戶端IP: {request.remote_addr}, Content-Type: {request.content_type}",
request_id
)
# 驗證內容類型
if request.content_type != config.SUPPORTED_CONTENT_TYPE:
error_msg = f"不支持的Content-Type: {request.content_type},僅接受{config.SUPPORTED_CONTENT_TYPE}"
write_error_log(error_msg, request_id)
return Response(
json.dumps({
"status": "error",
"message": error_msg,
"request_id": request_id
}, ensure_ascii=False, indent=2),
mimetype="application/json",
status=415
)
# 解析NDJSON數據
write_operation_log("開始讀取并解析NDJSON數據", request_id)
try:
ndjson_data = request.data.decode('utf-8').splitlines()
write_operation_log(f"成功讀取{len(ndjson_data)}行NDJSON數據", request_id)
except UnicodeDecodeError as e:
error_msg = f"數據解碼失敗: {str(e)}"
write_error_log(error_msg, request_id)
return Response(
json.dumps({
"status": "error",
"message": error_msg,
"request_id": request_id
}, ensure_ascii=False, indent=2),
mimetype="application/json",
status=400
)
# 提取有效日志記錄
log_records = extract_valid_log_records(ndjson_data, request_id)
# 未找到有效日志時返回正常響應
if not log_records:
write_operation_log("未找到有效日志記錄,返回正常響應", request_id)
return Response(
json.dumps({
"status": "completed",
"message": "未找到需要處理的有效日志記錄",
"request_id": request_id,
"total_lines_received": len(ndjson_data),
"valid_records_found": 0
}, ensure_ascii=False, indent=2),
mimetype="application/json",
status=200
)
# 有有效日志時啟動后臺處理
thread = threading.Thread(
target=run_async_task,
args=(log_records, request_id),
daemon=True
)
thread.start()
write_operation_log("后臺處理線程已啟動", request_id)
# 返回接受響應
response_data = {
"status": "accepted",
"message": "請求已接收,正在后臺處理",
"request_id": request_id,
"timestamp": call_timestamp,
"total_records": len(log_records),
"total_lines_received": len(ndjson_data)
}
# write_operation_log("請求處理完成,返回響應", request_id)
return Response(
json.dumps(response_data, ensure_ascii=False, indent=2),
mimetype="application/json",
status=200
)
except Exception as e:
# 僅邏輯錯誤返回錯誤響應
error_msg = f"請求處理失敗: {str(e)}"
write_error_log(error_msg, request_id)
return Response(
json.dumps({
"status": "error",
"message": error_msg,
"request_id": request_id
}, ensure_ascii=False, indent=2),
mimetype="application/json",
status=500
)
@app.route('/health', methods=['GET'])
def health_check():
"""健康檢查接口"""
request_id = f"health-{datetime.now().strftime('%Y%m%d%H%M%S')}"
write_operation_log("收到健康檢查請求", request_id)
# 檢查LLM客戶端
llm_status = "healthy"
llm_details = "未初始化"
try:
init_llm_client()
llm_status = "healthy"
llm_details = "客戶端已初始化"
except Exception as e:
llm_status = "degraded"
llm_details = f"初始化失敗: {str(e)}"
# 檢查數據庫連接
db_status = "healthy"
db_details = "連接成功"
try:
conn = mysql.connector.connect(** get_db_config())
conn.close()
except Exception as e:
db_status = "degraded"
db_details = f"連接失敗: {str(e)}"
# 檢查報告文件
report_status = "healthy"
report_details = "文件正常"
try:
if not os.path.exists(config.SLOW_REPORT_PATH):
report_status = "warning"
report_details = "報告文件不存在,將在首次分析后創建"
else:
with open(config.SLOW_REPORT_PATH, 'r') as f:
pass # 僅檢查文件是否可讀取
except Exception as e:
report_status = "degraded"
report_details = f"報告文件訪問失敗: {str(e)}"
# 整體狀態
overall_status = "healthy"
if llm_status != "healthy" or db_status != "healthy" or report_status == "degraded":
overall_status = "degraded"
response = {
"status": overall_status,
"services": {
"llm_client": {
"status": llm_status,
"details": llm_details
},
"database": {
"status": db_status,
"details": db_details
},
"report_file": {
"status": report_status,
"details": report_details,
"path": config.SLOW_REPORT_PATH
}
},
"supported_content_type": config.SUPPORTED_CONTENT_TYPE,
"default_schema": config.DEFAULT_DB_SCHEMA,
"timestamp": datetime.now().isoformat(),
"request_id": request_id
}
write_operation_log("健康檢查完成", request_id)
return Response(
json.dumps(response, ensure_ascii=False, indent=2),
mimetype="application/json"
)
if __name__ == '__main__':
print("慢查詢分析API啟動中...")
print(f"服務器配置: {config.HOST}:{config.PORT} (調試模式: {'開啟' if config.DEBUG else '關閉'})")
print(f"內容類型支持: {config.SUPPORTED_CONTENT_TYPE}")
print(f"默認數據庫schema: {config.DEFAULT_DB_SCHEMA}")
print(f"報告文件路徑: {config.SLOW_REPORT_PATH}")
print(f"LLM配置: 模型={config.LLM_MODEL}, 基礎URL={config.LLM_BASE_URL}")
print(f"日志目錄: {config.LOG_DIR}")
print("環境變量檢查:")
print(f" - DEEPSEEK_API_KEY: {'已配置' if os.getenv('DEEPSEEK_API_KEY') else '未配置'}")
print(f" - 數據庫配置: HOST={os.getenv('DB_HOST', 'localhost')}, USER={os.getenv('DB_USER', 'root')}")
print("可用接口:")
print(" - POST /analyze-slow-query - 分析慢查詢日志")
print(" - GET /health - 健康檢查")
app.run(host=config.HOST, port=config.PORT, debug=config.DEBUG)這里我把代碼中重要的部分做一個總結性講解:
- 數據庫交互模塊
A.提供數據庫配置獲取函數,支持指定數據庫 Schema,默認使用預設的after_sale庫。
B.實現表信息提取功能:清理表名中的反引號、拆分庫表結構,通過SHOW CREATE TABLE獲取表 DDL、SHOW TABLE STATUS獲取表行數,同時處理數據庫連接錯誤、表不存在等異常。
- 大模型交互模塊
A.封裝 DeepSeek API 調用邏輯:初始化客戶端時校驗 API 密鑰,支持設置模型參數(溫度、最大 tokens),處理 API 錯誤(連接超時、狀態碼異常等)并返回友好提示。
B.提供提示詞生成函數:整合慢查詢詳情(執行時間、掃描行數等)、表結構元信息,生成結構化提示,引導大模型輸出索引優化、SQL 改寫等可操作建議。
- 報告生成與管理模塊
A.自動生成報告標題:包含查詢類型(SELECT/UPDATE 等)、執行耗時、涉及表名和時間戳,確保報告辨識度。
B.實現報告持久化:將分析結果(原始慢查詢數據、表元信息、大模型報告)追加到 JSON 格式的報告文件,支持讀取現有報告并處理文件損壞的情況。
- API 接口模塊
A.慢查詢分析接口(POST /analyze-slow-query):接收 NDJSON 格式的結構化慢查詢數據,驗證內容類型,解析并過濾有效記錄(需包含log和tables字段),啟動后臺線程異步處理分析流程,避免前端等待超時。
編寫“生成日志查看應用”
這里依舊使用我們的老朋友 streamlit 來充當 Web UI 界面,如下命令安裝它:
pip install streamlit在工作目錄創建,創建show_slow_report.py文件,方便查看生成的慢查詢分析報告,代碼如下:
import streamlit as st
import json
import os
from datetime import datetime
# 頁面基礎配置
st.set_page_config(
page_title="慢查詢分析報告查看器",
page_icon="??",
layout="wide"
)
# 自定義CSS - 按鈕樣式和間隔
st.markdown("""
<style>
.report-btn {
width: 100%;
text-align: left;
margin-bottom: 8px;
padding: 8px 12px;
border-radius: 4px;
border: none;
cursor: pointer;
transition: all 0.2s ease;
}
.report-btn:not(.selected) {
background-color: #f0f2f6;
color: #333;
}
.report-btn:not(.selected):hover {
background-color: #e6e9ed;
}
.report-btn.selected {
background-color: #0066cc;
color: white;
}
.report-container {
max-height: 600px;
overflow-y: auto;
padding-right: 8px;
}
</style>
""", unsafe_allow_html=True)
# 報告文件路徑
REPORT_PATH = os.path.join("reports", "slow_report.json")
# ----------------------
# 回調函數 - 用于即時更新選中狀態
# ----------------------
def update_selected_report(report_id):
"""更新選中的報告ID(回調函數)"""
st.session_state.selected_report_id = report_id
# ----------------------
# 工具函數
# ----------------------
def load_reports():
"""加載并排序報告(按時間倒序),處理文件異常"""
try:
if not os.path.exists(REPORT_PATH):
st.warning(f"報告文件不存在:{REPORT_PATH}")
return []
with open(REPORT_PATH, 'r', encoding='utf-8') as f:
reports = json.load(f)
# 按時間戳倒序排序(最新報告在前)
reports.sort(
key=lambda x: x.get('timestamp', ''),
reverse=True
)
return reports
except json.JSONDecodeError:
st.error("報告文件格式錯誤,無法解析(可能是JSON損壞)")
return []
except PermissionError:
st.error(f"無權限訪問報告文件:{REPORT_PATH}")
return []
except Exception as e:
st.error(f"加載報告失?。簕str(e)}")
return []
def format_timestamp(timestamp_str):
"""格式化ISO時間戳為易讀格式"""
try:
dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return timestamp_str
def search_reports(reports, query):
"""根據搜索詞過濾報告(支持標題、表名、內容)"""
if not query:
return reports
query_lower = query.lower()
return [
report for report in reports
if (
query_lower in report.get('title', '').lower() or
any(query_lower in str(table).lower() for table in report.get('original_data', {}).get('tables', [])) or
query_lower in report.get('analysis_report', '').lower() or
query_lower in report.get('original_data', {}).get('log', '').lower()
)
]
# ----------------------
# 主邏輯
# ----------------------
reports = load_reports()
st.title("?? 慢查詢分析報告查看器")
# 初始化會話狀態
if 'selected_report_id' not in st.session_state:
if reports:
st.session_state.selected_report_id = reports[0]['id']
else:
st.session_state.selected_report_id = None
# 側邊欄 - 報告列表和搜索
with st.sidebar:
st.header("報告列表")
# 搜索框
search_query = st.text_input("搜索報告", placeholder="輸入標題、表名或內容關鍵詞...")
filtered_reports = search_reports(reports, search_query)
# 顯示統計信息
st.caption(f"顯示 {len(filtered_reports)} 份報告(共 {len(reports)} 份)")
# 報告列表
if filtered_reports:
# 確保選中的ID在當前過濾列表中
valid_ids = [r['id'] for r in filtered_reports]
if (st.session_state.selected_report_id is None or
st.session_state.selected_report_id not in valid_ids):
st.session_state.selected_report_id = valid_ids[0]
# 報告容器(帶滾動)
st.markdown('<div class="report-container">', unsafe_allow_html=True)
# 逐個顯示報告按鈕,使用回調函數更新狀態
for report in filtered_reports:
report_id = report['id']
timestamp = format_timestamp(report['timestamp'])
title = report['title']
# 按鈕狀態判斷
is_selected = (report_id == st.session_state.selected_report_id)
# 使用回調函數確保狀態即時更新
st.button(
label=f"[{timestamp}] {title}",
key=f"btn_{report_id}",
use_container_width=True,
type="primary" if is_selected else "secondary",
on_click=update_selected_report,
args=(report_id,)
)
st.markdown('</div>', unsafe_allow_html=True)
# 獲取選中的報告
selected_report = next(
(r for r in filtered_reports if r['id'] == st.session_state.selected_report_id),
None
)
else:
st.info("沒有找到匹配的報告")
selected_report = None
# 主內容區 - 顯示報告詳情
if selected_report:
st.subheader(selected_report['title'])
# 基本信息卡片
col1, col2, col3 = st.columns(3)
with col1:
st.info(f"**報告ID**\n\n{selected_report['id']}")
with col2:
st.info(f"**生成時間**\n\n{format_timestamp(selected_report['timestamp'])}")
with col3:
st.info(f"**請求ID**\n\n{selected_report['request_id']}")
# 性能指標
st.subheader("查詢性能指標")
original_data = selected_report.get('original_data', {})
metrics_col1, metrics_col2, metrics_col3, metrics_col4 = st.columns(4)
with metrics_col1:
st.metric("查詢時間", f"{original_data.get('query_time', 'N/A')} 秒")
with metrics_col2:
st.metric("鎖定時間", f"{original_data.get('lock_time', 'N/A')} 秒")
with metrics_col3:
st.metric("返回行數", original_data.get('rows_sent', 'N/A'))
with metrics_col4:
st.metric("掃描行數", original_data.get('rows_examined', 'N/A'))
# 涉及表名
st.subheader("涉及表名")
tables = original_data.get('tables', [])
if tables:
for table in tables:
st.text(f"- {table}")
else:
st.text("無表信息")
# 原始SQL
with st.expander("查看原始SQL", expanded=False):
st.code(original_data.get('log', '無SQL日志'), language="sql")
# 表結構信息
st.subheader("表結構詳情")
table_info_list = selected_report.get('table_info', [])
for table_info in table_info_list:
with st.expander(f"表: {table_info.get('cleaned_name', '未知表')}"):
if table_info.get('error'):
st.error(f"獲取表信息失敗: {table_info['error']}")
else:
st.text(f"預估行數: {table_info.get('row_count', '未知')}")
st.code(table_info.get('ddl', '無DDL信息'), language="sql")
# 分析報告
st.subheader("優化建議")
st.markdown(selected_report.get('analysis_report', '無分析內容'))
else:
if reports:
st.info("請從側邊欄選擇一份報告查看詳情")
else:
st.info("暫無報告數據,請先運行分析任務生成報告")至此,所有準備工作已經完成,我們開始測試功能。
功能測試
由于是對慢查詢記錄進行采集、處理、分析、生成報告。所以測試的基本思路是:創建 MySQL 數據庫表,插入數據,啟動 Fluent-Bit,FluentD 服務準備采集慢查詢日志,啟動 process_slow_log.py 生成分析報告,然后執行一條 SQL 語句觸發慢查詢日志的生成,最后通過show_slow_report.py 查看分析報告的內容。
創建數據庫
在mysql5.7下執行sql生成對應的表,打開MySQL 5.7 Command Line Client,輸入如下 sql 并執行。
-- 創建數據庫
CREATE DATABASE IF NOT EXISTS after_sale
CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci;
USE after_sale;
-- 刪除現有表,如果存在
DROP TABLE IF EXISTS SupportIssue;
DROP TABLE IF EXISTS CustomerService;
DROP TABLE IF EXISTS Solution;
DROP TABLE IF EXISTS Issue;
DROP TABLE IF EXISTS User;
-- 創建用戶表
CREATE TABLE User
(
UserId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 用戶ID
Name VARCHAR(60) NOT NULL, -- 姓名
Email VARCHAR(60) NOT NULL, -- 電子郵件
Phone VARCHAR(24) -- 電話
);
-- 創建問題表
CREATE TABLE Issue
(
IssueId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 問題ID
UserId INTEGER NOT NULL, -- 用戶ID
Description TEXT NOT NULL, -- 問題描述
Status VARCHAR(20) NOT NULL, -- 狀態
FOREIGN KEY (UserId) REFERENCES User (UserId)
ON DELETE NO ACTION ON UPDATE NO ACTION
);
-- 創建解決方案表
CREATE TABLE Solution
(
SolutionId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 解決方案ID
SolutionDescription TEXT NOT NULL, -- 解決方案描述
IsValid TINYINT -- 是否有效(MySQL中用TINYINT表示布爾值,1為真,0為假)
);
-- 創建客服表
CREATE TABLE CustomerService
(
CustomerServiceId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 客服ID
Name VARCHAR(40) NOT NULL, -- 姓名
Email VARCHAR(60), -- 電子郵件
Phone VARCHAR(24) -- 電話
);
-- 創建客服問題關聯表
CREATE TABLE SupportIssue
(
SupportIssueId INTEGER PRIMARY KEY AUTO_INCREMENT, -- 客服問題關聯ID
IssueId INTEGER NOT NULL, -- 問題ID
CustomerServiceId INTEGER NOT NULL, -- 客服ID
SolutionId INTEGER NOT NULL, -- 解決方案ID
ActionTime DATETIME NOT NULL, -- 操作時間
FOREIGN KEY (IssueId) REFERENCES Issue (IssueId)
ON DELETE NO ACTION ON UPDATE NO ACTION,
FOREIGN KEY (CustomerServiceId) REFERENCES CustomerService (CustomerServiceId)
ON DELETE NO ACTION ON UPDATE NO ACTION,
FOREIGN KEY (SolutionId) REFERENCES Solution (SolutionId)
ON DELETE NO ACTION ON UPDATE NO ACTION
);插入測試數據
數據庫和表結構有了,接著插入一些測試數據,為后面慢查詢提供條件。為了保證數據量,我這里利用 python 腳本生成數據,先執行如下命令安裝數據庫連接依賴,方便 python 訪問數據庫。
pip install mysql-connector-python然后,在工作目錄創建 generate_data.py 文件,插入數據。
import mysql.connector
from faker import Faker
import random
from datetime import datetime, timedelta
# 數據庫連接配置 - 請修改為你的數據庫信息
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'after_sale',
'autocommit': True
}
# 生成數據量 (可根據需要調整)
NUM_USERS = 100000 # 10萬用戶
NUM_ISSUES = 500000 # 50萬問題
NUM_SOLUTIONS = 10000 # 1萬解決方案
NUM_CUSTOMERSERVICE = 500 # 500客服
NUM_SUPPORTISSUES = 1000000 # 100萬客服問題關聯記錄
# 初始化Faker
fake = Faker('zh_CN')
def get_db_connection():
"""獲取數據庫連接"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"數據庫連接錯誤: {err}")
raise
def generate_users():
"""生成用戶數據"""
print(f"開始生成 {NUM_USERS} 條用戶數據...")
conn = get_db_connection()
cursor = conn.cursor()
# 批量插入大小
batch_size = 1000
insert_query = """
INSERT INTO User (Name, Email, Phone)
VALUES (%s, %s, %s)
"""
try:
for i in range(0, NUM_USERS, batch_size):
batch_data = []
for _ in range(min(batch_size, NUM_USERS - i)):
name = fake.name()
email = fake.email()
phone = fake.phone_number()
batch_data.append((name, email, phone))
cursor.executemany(insert_query, batch_data)
print(f"已插入 {i + len(batch_data)} 條用戶數據")
except mysql.connector.Error as err:
print(f"插入用戶數據錯誤: {err}")
raise
finally:
cursor.close()
conn.close()
print(f"用戶數據生成完成,共 {NUM_USERS} 條")
def generate_solutions():
"""生成解決方案數據 - 修復了字符串格式化問題"""
print(f"開始生成 {NUM_SOLUTIONS} 條解決方案數據...")
conn = get_db_connection()
cursor = conn.cursor()
# 重新設計解決方案模板,確保每個模板的占位符數量明確
solution_templates = [
["檢查%s并重啟%s", 2], # 包含2個占位符
["更新%s至最新版本", 1], # 包含1個占位符
["清除%s緩存并重新嘗試", 1],
["聯系%s獲取技術支持", 1],
["檢查%s設置是否正確", 1],
["卸載并重新安裝%s", 1],
["重啟%s后再試", 1],
["檢查網絡連接并確保%s可訪問", 1],
["使用%s工具進行診斷", 1],
["恢復%s至默認設置", 1]
]
objects = ["網絡", "系統", "應用", "設備", "軟件", "驅動", "賬戶", "服務"]
batch_size = 1000
insert_query = """
INSERT INTO Solution (SolutionDescription, IsValid)
VALUES (%s, %s)
"""
try:
for i in range(0, NUM_SOLUTIONS, batch_size):
batch_data = []
for _ in range(min(batch_size, NUM_SOLUTIONS - i)):
# 隨機選擇一個模板及其占位符數量
template, param_count = random.choice(solution_templates)
# 根據占位符數量提供相應數量的參數
if param_count == 1:
obj = random.choice(objects)
description = template % obj
elif param_count == 2:
obj1 = random.choice(objects)
obj2 = random.choice(objects)
description = template % (obj1, obj2)
else:
# 默認為1個參數的情況
obj = random.choice(objects)
description = template % obj
is_valid = random.choice([0, 1])
batch_data.append((description, is_valid))
cursor.executemany(insert_query, batch_data)
print(f"已插入 {i + len(batch_data)} 條解決方案數據")
except mysql.connector.Error as err:
print(f"插入解決方案數據錯誤: {err}")
raise
except Exception as e:
print(f"生成解決方案描述時出錯: {e},模板: {template}, 參數數量: {param_count}")
raise
finally:
cursor.close()
conn.close()
print(f"解決方案數據生成完成,共 {NUM_SOLUTIONS} 條")
def generate_customer_service():
"""生成客服數據"""
print(f"開始生成 {NUM_CUSTOMERSERVICE} 條客服數據...")
conn = get_db_connection()
cursor = conn.cursor()
batch_size = 100
insert_query = """
INSERT INTO CustomerService (Name, Email, Phone)
VALUES (%s, %s, %s)
"""
try:
for i in range(0, NUM_CUSTOMERSERVICE, batch_size):
batch_data = []
for _ in range(min(batch_size, NUM_CUSTOMERSERVICE - i)):
name = fake.name()
email = f"{name.replace(' ', '')}.{random.randint(100, 999)}@support.com"
phone = fake.phone_number()
batch_data.append((name, email, phone))
cursor.executemany(insert_query, batch_data)
print(f"已插入 {i + len(batch_data)} 條客服數據")
except mysql.connector.Error as err:
print(f"插入客服數據錯誤: {err}")
raise
finally:
cursor.close()
conn.close()
print(f"客服數據生成完成,共 {NUM_CUSTOMERSERVICE} 條")
def generate_issues():
"""生成問題數據"""
print(f"開始生成 {NUM_ISSUES} 條問題數據...")
conn = get_db_connection()
cursor = conn.cursor()
# 獲取最大用戶ID
cursor.execute("SELECT MAX(UserId) FROM User")
max_user_id = cursor.fetchone()[0]
if not max_user_id:
raise Exception("請先生成用戶數據")
# 問題描述模板
issue_templates = [
["%s無法正常工作", 1],
["無法%s", 1],
["%s出現錯誤", 1],
["%s時遇到問題", 1],
["%s功能異常", 1],
["關于%s的問題", 1],
["%s失敗", 1],
["%s導致系統異常", 1]
]
issue_objects = ["電腦", "手機", "應用", "軟件", "網絡", "賬戶", "訂單", "支付", "登錄", "文件"]
actions = ["連接網絡", "發送消息", "上傳文件", "下載數據", "安裝軟件", "卸載程序", "更新系統", "登錄賬戶"]
statuses = ["未解決", "處理中", "已解決", "已關閉", "重新打開"]
batch_size = 1000
insert_query = """
INSERT INTO Issue (UserId, Description, Status)
VALUES (%s, %s, %s)
"""
try:
for i in range(0, NUM_ISSUES, batch_size):
batch_data = []
for _ in range(min(batch_size, NUM_ISSUES - i)):
user_id = random.randint(1, max_user_id)
template, param_count = random.choice(issue_templates)
if "無法%s" in template:
desc = template % random.choice(actions)
else:
desc = template % random.choice(issue_objects)
status = random.choice(statuses)
batch_data.append((user_id, desc, status))
cursor.executemany(insert_query, batch_data)
print(f"已插入 {i + len(batch_data)} 條問題數據")
except mysql.connector.Error as err:
print(f"插入問題數據錯誤: {err}")
raise
finally:
cursor.close()
conn.close()
print(f"問題數據生成完成,共 {NUM_ISSUES} 條")
def generate_support_issues():
"""生成客服問題關聯數據"""
print(f"開始生成 {NUM_SUPPORTISSUES} 條客服問題關聯數據...")
conn = get_db_connection()
cursor = conn.cursor()
# 獲取各表最大ID
cursor.execute("SELECT MAX(IssueId) FROM Issue")
max_issue_id = cursor.fetchone()[0]
cursor.execute("SELECT MAX(CustomerServiceId) FROM CustomerService")
max_cs_id = cursor.fetchone()[0]
cursor.execute("SELECT MAX(SolutionId) FROM Solution")
max_solution_id = cursor.fetchone()[0]
if not all([max_issue_id, max_cs_id, max_solution_id]):
raise Exception("請先生成所有基礎表數據")
# 生成時間范圍:過去1年
end_date = datetime.now()
start_date = end_date - timedelta(days=365)
batch_size = 1000
insert_query = """
INSERT INTO SupportIssue (IssueId, CustomerServiceId, SolutionId, ActionTime)
VALUES (%s, %s, %s, %s)
"""
try:
for i in range(0, NUM_SUPPORTISSUES, batch_size):
batch_data = []
for _ in range(min(batch_size, NUM_SUPPORTISSUES - i)):
issue_id = random.randint(1, max_issue_id)
cs_id = random.randint(1, max_cs_id)
solution_id = random.randint(1, max_solution_id)
# 隨機生成時間
time_delta = end_date - start_date
random_days = random.randint(0, time_delta.days)
random_seconds = random.randint(0, 86399) # 一天的秒數
action_time = start_date + timedelta(days=random_days, seconds=random_seconds)
batch_data.append((issue_id, cs_id, solution_id, action_time.strftime('%Y-%m-%d %H:%M:%S')))
cursor.executemany(insert_query, batch_data)
print(f"已插入 {i + len(batch_data)} 條客服問題關聯數據")
except mysql.connector.Error as err:
print(f"插入客服問題關聯數據錯誤: {err}")
raise
finally:
cursor.close()
conn.close()
print(f"客服問題關聯數據生成完成,共 {NUM_SUPPORTISSUES} 條")
if __name__ == "__main__":
# 按順序生成數據(有依賴關系)
try:
generate_users()
generate_solutions()
generate_customer_service()
generate_issues()
generate_support_issues()
print("所有數據生成完成!")
except Exception as e:
print(f"生成數據時出錯: {str(e)}")由于在代碼中使用到了假數據,所以需要安裝 faker 組件,執行如下代碼:
pip install faker
python generate_data.py
確保 Fluent-Bit 與 FluentD 執行
在docker desktop 中可以看到容器運行狀態。

啟動 “生成日志分析程序”
工作目錄執行如下命令:
python Process_slow_log.py執行慢查詢 SQL
MySQL 5.7 Command Line Client執行如下 sql。
use after_sale;
SELECT
u.UserId, u.Name, u.Email,
i.IssueId, i.Description, i.Status,
cs.CustomerServiceId, cs.Name,
s.SolutionId, s.SolutionDescription,
si.ActionTime
FROM SupportIssue si
JOIN Issue i ON si.IssueId = i.IssueId
JOIN User u ON i.UserId = u.UserId
JOIN CustomerService cs ON si.CustomerServiceId = cs.CustomerServiceId
JOIN Solution s ON si.SolutionId = s.SolutionId
WHERE s.IsValid = 1
AND i.Status = 'unsolve'
ORDER BY si.ActionTime DESC;返回結果如下:

查看fluent-bit容器日志,容器日志中會顯示采集到的內容。

啟動“生成日志查看應用”
執行如下命令:
streamlit run show_slow_report.py啟動后的控制臺會顯示調用日志和生成報告。

此時通過瀏覽器頁面可以查看慢查詢日志分析報告,如下圖所示。

作者介紹
崔皓,51CTO社區編輯,資深架構師,擁有18年的軟件開發和架構經驗,10年分布式架構經驗。

















