EFK+DeepSeek 智能運維方案:技術架構與實施步驟 原創
開篇
在日常 IT 運營場景里,EFK(Elasticsearch +Fluent Bit + Fluentd + Kibana )組合是日志管理與分析的經典方案,被廣泛應用于各類系統運維中。它憑借 Fluent Bit 輕量高效的日志采集能力,快速獲取多源日志;依托 Fluentd 靈活的日志處理機制,完成過濾、格式化等操作;借由 Elasticsearch 強大的分布式存儲與檢索特性,實現日志的高效存儲和快速查詢;再通過 Kibana 直觀的可視化界面,讓運維人員能清晰洞察日志數據背后的系統狀態,以 “采集 - 處理 - 存儲 - 可視化” 的完整鏈路,助力運維團隊及時發現系統問題,成為保障 IT 系統穩定運行的有力工具。
然而,在實際運維過程中,面對日志里海量的錯誤異常信息,僅靠傳統 EFK 方案仍存在短板。IT 運維人員受限于精力和時間,面對繁雜的異常日志,難以逐一對其深入拆解、分析根源,常常錯過最佳修復時機,導致小故障演變成影響業務的大問題。為突破這一困境,我們嘗試引入 AI 運維模式,借助 AI 大模型對 EFK 采集到的異常信息進行智能分析,讓機器替代人工完成繁瑣的異常診斷、根因定位等工作,以此提升運維效率與質量。
如下圖所示,本文將圍繞 EFK(Fluent Bit + Fluentd + Elasticsearch + Kibana)結合 AI 大模型的日志智能運維方案展開。先利用 Fluent Bit 從指定位置獲取日志,依解析規則處理后推送給 Fluentd;Fluentd 監聽日志,經過濾器提取、解析并標記異常,再輸出給 Elasticsearch 存儲,同時調用大模型處理服務;Elasticsearch 存儲的數據可在 Kibana 可視化展示;大模型處理服務則對異常日志解析、分析并生成報表。以 CPU 使用率異常場景為例,演示從日志采集、處理到智能診斷的全流程。

安裝環境
工欲善其事必先利其器,在開始案例之前我們先把需要用到的應用和環境安裝上,首先保證 docker 安裝完成,然后從文件夾到容器的安裝,按照如下流程進行。
創建文件夾
準備好 fluent-bit、fluentd 以及 web-logs 文件夾。分別用來放置容器的配置文件和對應的日志文件。
請注意,我這里以/Users/cuihao/docker 為基礎目錄, 在這個目錄下創建文件夾和文件,大家可以按照自己的操作系統和目錄情況規劃目錄以及存放文件。
如下圖所示, 我們可以看到 fluent-bit、fluentd 以及 weblogs 三個目錄。稍后我們會分別在這三個目錄下面放置對應應用的配置文件,用來完成日志采集、分析、過濾、輸出等操作。

在完成文件夾的創建之后,接著在 fluent-bit 文件夾下創建 etc 目錄,后面會在 etc 下面創建 fluent-bit.conf 文件,用來配置日志采集的輸入和輸出信息。

完成 fluent-bit 文件夾創建之后,接著在 fluentd 文件夾下面創建 conf 文件夾,為 fluent.conf 的創建做好準備。這里可以劇透一下,在fluent.conf 會有日志采集、過濾、標記、調用智能報表等配置信息。

最后,就是保證創建一個 web-logs 目錄,下面的 metric_log 文件是我們用來模擬 CPU 使用率數據的日志文件。也是案例的起點, fluent-bit 會從這里采集數據。

創建 FluentD Docker 文件
完成目錄布局之后,我們大致知道完成案例大致需要的配置信息,接著在目錄/Users/cuihao/docker/EFK/fluentd/ 下創建 Dockerfile 文件。這個文件用來安裝 fluentd 的基礎鏡像以及對應的插件。
# 使用的基礎鏡像
FROM fluent/fluentd:edge-debian
# 切換用戶為root方便接下來執行安裝命令
USER root
# 安裝系統依賴
RUN apt-get update && \
apt-get install -y curl && \
rm -rf /var/lib/apt/lists/*
# 卸載可能存在的高版本 Elasticsearch gem
RUN gem uninstall elasticsearch elasticsearch-api -a -x || true
# 安裝指定版本 gem 和 Fluentd Elasticsearch 插件
RUN gem install elasticsearch -v 8.17.1 --no-document && \
gem install elasticsearch-api -v 8.17.1 --no-document && \
gem install fluent-plugin-elasticsearch -v 5.4.3 --no-document
USER fluent上述 Dockerfile 文件以官方 fluentd 的 edge-debian 版本,在這個版本的基礎上 fluentd 還需要配置數據轉發或聚合操作,比如本例中需要轉發到ES,就需要安裝對應的插件(fluent-plugin-elasticsearch)。這些插件就需要通過 Dockerfile 文件的方式安裝。
從文件中可以看到,首先切換到 root 用戶安裝系統依賴 curl,再清理可能存在的高版本 Elasticsearch 相關組件,隨后安裝 8.17.1 版本的 elasticsearch(注意這里使用的 ES 版本)、elasticsearch-api gem 包及 5.4.3 版本的 fluent-plugin-elasticsearch 插件,確保與目標 Elasticsearch 服務兼容,最后切換回 fluent 普通用戶以遵循最小權限原則,最終生成一個可直接用于 EFK 日志棧中收集并向 Elasticsearch 發送日志的定制化鏡像。
創建 EFK 組件docker-compose 文件
由于本案例需要安裝 fluent-bit、fluentd、elasticsearch、kibana 等應用,為了方便安裝與調試,我們計劃使用 docker 方式對他們進行安裝。于是 docker compose 的安裝方式就成了最佳選擇,它可以用于定義和管理多容器 Docker 應用的 YAML 配置文件,能將多個關聯的容器(如應用服務、數據庫、緩存等)的配置(鏡像、端口映射、數據卷、環境變量、依賴關系等)集中整合,通過 docker compose 命令一鍵實現多容器的創建、啟動、停止、重啟等操作。其核心益處在于簡化了多容器應用的部署與管理流程,避免了手動逐個操作容器的繁瑣;通過統一配置文件確保了環境一致性,同時清晰的依賴關系定義保證了容器按正確順序啟動。
為了保證安裝的順利進行我們選擇???Fluentd官網的docker-compose??? 文件,并在其基礎上進行修改,從而適應安裝需求。
由于該文件內容比較長,這里我們通過一張大圖將文件的內容進行描述,如下:

該文件用于搭建 EFK(Fluent Bit + Fluentd + Elasticsearch + Kibana)日志管理系統。通過定義 fluent-bit(輕量采集容器日志,依賴 fluentd 健康后啟動,配置掛載日志目錄與配置文件)、fluentd(基于自定義 Dockerfile 構建,格式化日志,依賴 elasticsearch 健康后啟動,配置掛載、端口及健康檢查 )、elasticsearch(存儲日志,單節點模式、關閉安全功能,配置健康檢查與端口 )、kibana(可視化日志,依賴 elasticsearch 健康后啟動,映射 Web 端口 )四個服務,利用 Docker Compose 實現多容器協同,讓日志從采集、處理、存儲到可視化全流程自動化部署與管理,各服務間通過健康檢查依賴保障啟動順序與運行狀態,方便快速搭建日志分析環境。
文件內容如下:
# 定義本文件中所有要啟動的服務(容器)
services:
# Fluent Bit:輕量收集容器日志
# 【服務名】是的,'fluent-bit' 就是這個服務的名稱。它在 Docker Compose 網絡內部被識別為此名。
fluent-bit:
# 使用的鏡像:從 Docker Hub 拉取最新的 Fluent Bit 官方鏡像
image: fluent/fluent-bit:latest
# 指定容器啟動后的名稱,通過 `docker ps` 等命令可以看到這個名字
container_name: fluent-bit
# 依賴關系:指定此服務的啟動依賴于另一個服務 'fluentd'
depends_on:
fluentd:
# 條件:只有當 'fluentd' 服務通過健康檢查(healthy)后,才會啟動 fluent-bit
condition: service_healthy
# 端口映射:將宿主機的端口映射到容器內的端口
# 格式 - "宿主機端口:容器端口"
ports:
- "2020:2020" # 將容器內的 2020 端口(Fluent Bit 的 HTTP Server,常用于健康檢查或監控)映射到宿主機的 2020 端口
# 數據卷掛載:將宿主機的目錄或文件掛載到容器內,實現數據持久化或配置注入
volumes:
# 將宿主機的 '/Users/cuihao/docker/EFK/web-logs' 目錄掛載到容器內的 '/var/log/' 目錄。
# Fluent Bit 會監控這個目錄下的日志文件變化,并收集新產生的日志。
#/var/log/metric_log
#/Users/cuihao/docker/EFK/web-logs
- /Users/cuihao/docker/EFK/web-logs:/var/log/
# 將宿主機的 Fluent Bit 主配置文件掛載到容器內,替代鏡像內的默認配置。
# 這個文件定義了數據輸入(Input)、處理(Parser, Filter)和輸出(Output)的規則。
- /Users/cuihao/docker/EFK/fluent-bit/etc/:/fluent-bit/etc/
# Fluentd:格式化
# 【服務名】'fluentd' 是這個服務的名稱。
fluentd:
container_name: fluent
# 不是使用現成的鏡像,而是通過指定構建上下文路徑(E:\EFK\fluentd)來構建自定義鏡像。
# 該路徑下應該有一個名為 'Dockerfile' 的文件,默認讀取 'Dockerfile' 文件。
build: /Users/cuihao/docker/EFK/fluentd
volumes:
# 將宿主機上的 Fluentd 配置目錄掛載到容器內,使配置變更無需重新構建鏡像。
- /Users/cuihao/docker/EFK/fluentd/conf:/fluentd/etc
depends_on:
elasticsearch:
condition: service_healthy
ports:
- "24224:24224" # Fluentd 默認的 TCP 端口,用于接收來自 Fluent Bit 或其他客戶端轉發來的日志,這個端口會配置到fluent-bit的OUTPUT中
- "24224:24224/udp" # Fluentd 默認的 UDP 端口,用途同上
- "24220:24220" # Fluentd 的健康檢查 API 端口
# 健康檢查配置:Docker 會根據此規則判斷容器是否正常啟動
healthcheck:
test: ["CMD-SHELL", "curl -fs http://localhost:24220/api/plugins.json || exit 1"] # 檢查健康檢查端點是否返回成功
interval: 5s # 每 5 秒檢查一次
timeout: 3s # 每次檢查超時時間為 3 秒
retries: 5 # 連續失敗 5 次才標記為不健康
start_period: 10s # 容器啟動后,等待 10 秒再進行第一次健康檢查
# Elasticsearch:存儲日志
# 【服務名】'elasticsearch' 是這個服務的名稱。
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.1 # 使用 Elastic 官方的 8.17.1 版本鏡像
container_name: elasticsearch
hostname: elasticsearch # 設置容器內部的主機名,在集群中很有用
environment:
- discovery.type=single-node # 設置為單節點模式,適合開發和測試
- xpack.security.enabled=false # 關閉 X-Pack 安全功能(用戶認證、HTTPS等)。生產環境必須開啟,但測試時關閉更簡單。
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health"] # 檢查 ES 集群健康狀態 API
interval: 10s
retries: 5
timeout: 5s
ports:
- "9200:9200" # 將 ES 的 HTTP REST API 端口映射到宿主機,方便通過瀏覽器或命令訪問
# Kibana:日志可視化
# 【服務名】'kibana' 是這個服務的名稱。
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:8.17.1 # Kibana 版本需要與 Elasticsearch 版本一致
depends_on:
elasticsearch:
condition: service_healthy # 等待 Elasticsearch 健康后再啟動
ports:
- "5601:5601" # 將 Kibana 的 Web 界面端口映射到宿主機,通過 http://localhost:5601 訪問啟動 EFK 組件
有了前面的準備, EFK 都通過 Docker 鏡像的方式進行了定義,接著只需要執行 docker compose 命令就可以安裝了。
執行如下命令啟動容器安裝 EFK 組件:
docker compose -f /Users/cuihao/docker/EFK/docker-compose.yml up -d如果出現如下錯誤:
=> ERROR [fluentd internal] load metadata for docker.io/fluent/fluentd:edge-debian 31.1s
------
> [fluentd internal] load metadata for docker.io/fluent/fluentd:edge-debian:
------
failed to solve: DeadlineExceeded: DeadlineExceeded: DeadlineExceeded: fluent/fluentd:edge-debian: failed to resolve source metadata for docker.io/fluent/fluentd:edge-debian: failed to authorize: DeadlineExceeded: failed to fetch anonymous token: Get "https://auth.docker.io/token?scope=repository%3Afluent%2Ffluentd%3Apull&service=registry.docker.io": dial tcp 75.126.124.162:443: i/o timeout說明需要手動拉取 fluentd 的鏡像,執行如下命令:
docker pull fluent/fluentd:edge-debian再次執行如下命令:
docker compose -f /Users/cuihao/docker/EFK/docker-compose.yml up -d完成安裝之后,可以通過 docker desktop 看到容器服務正常運行。如下圖所示,

配置日志采集與分析
完成 EFK 的安裝之后,接下來就開始應用之間的配置了。在配置之前,我們先回顧一下案例的整體思路,fluent-bit 是日志采集的第一步,它會從日志文件中采集日志的信息,這里需要定義日志目錄。如圖所示,綠色區域中我們需要配置“日志目錄”,同時還需要制定fluent-bit 采集之后需要將日志信息輸出到 fluentd 中,這里需要填入“fluentd”作為輸出的服務名。

配置 Fluent Bit
從上面的描述,我們清楚需要對 fluent-bit 的輸入和輸出進行配置,接下來就是編寫配置文件了。在/Users/cuihao/docker/EFK/fluent-bit/etc 創建 fluent-bit.conf 文件如下:
##############################################
# 輸入插件配置(收集日志)
##############################################
# 輸入源1:采集cpu日志文件
[INPUT]
# 使用 tail 插件監控文件變化
Name tail
# 自定義標簽,標識為系統指標類日志
Tag sys.metric
# 系統指標日志文件路徑(可能是由其他工具生成的指標數據)
Path /var/log/metric_log
# 檢查文件變化的間隔時間(秒)
Refresh_Interval 10
##############################################
# 輸出插件(轉發到 Fluentd)
##############################################
[OUTPUT]
# 使用 forward 插件將日志轉發到Fluentd聚合器
Name forward
# 匹配所有標簽的日志(* 是通配符,表示所有輸入源)
Match *
# Fluentd 服務地址(使用Docker Compose服務名進行服務發現)
Host fluentd
# Fluentd 監聽端口(forward插件的默認端口)
Port 24224
# 網絡故障時的最大重試次數,防止無限重試消耗資源
Retry_Limit 10
# 輸出源2:同時輸出到控制臺(用于調試和監控)
[OUTPUT]
# 使用 stdout 插件在控制臺打印日志
Name stdout
# 匹配所有標簽的日志
Match *
# 注意:生產環境通常應注釋或移除此輸出,避免日志重復和性能開銷由于文件中的配置信息都用了備注,比較容易理解就不逐一解釋了。需要注意的是配置文件中的 Host 對應的 fluentd 是 docker 容器的服務名,由于這幾個應用是通過 docker compose 安裝的能夠保證在相同的網絡內,所以可以通過服務名進行訪問,通過下圖的方式查詢。


配置 Fluentd
完成 fluent bit 的配置之后,緊接著就需要對 fluentd 進行配置了,它是整個配置環節的重頭戲,這里還是以案例整體大圖為例。將 fluentd 的部分進行展開說明, 如下圖所示。Fluent Bit 從日志文件獲取日志并推送給 Fluentd,Fluentd 經 24224 端口監聽接收。先由過濾器按規則提取日志(過濾指定 Tag、正則提取 JSON )、解析日志(自定義 JSON 解析器處理原始日志),若 CPU 使用率超 80% 則標記異常;接著通過多路輸出,將日志存至 Elasticsearch、輸出到控制臺,還會對標記異常的日志,調用大模型處理服務(process_log.py )進一步分析。

在/Users/cuihao/docker/EFK/fluentd/conf 目錄下創建 fluent.conf 文件,內容如下:
###############################
# Fluentd 健康檢查(monitor_agent)
###############################
<source>
@type monitor_agent # 啟用監控代理插件,用于收集Fluentd自身運行指標
bind 0.0.0.0 # 監聽所有網絡接口
port 24220 # 監控服務端口,可通過此端口獲取Fluentd運行狀態信息
</source>
###############################
# Fluentd 日志輸入(forward)
###############################
<source>
@type forward # 啟用forward輸入插件,接收來自Fluent-bit或其他Fluentd節點的日志
port 24224 # 轉發協議監聽端口
bind 0.0.0.0 # 監聽所有網絡接口
</source>
###############################
# 處理 collectd 采集的CPU指標數據
###############################
# 第一步:提取log字段中write_log后的JSON部分(現有配置)
<filter sys.metric> # 過濾系統指標數據
@type parser # 使用解析器插件
key_name log # 解析log字段內容
reserve_data true # 保留所有原始字段
remove_key_name_field false # 不移除原始的key_name字段
<parse>
@type regexp # 使用正則表達式解析
# 正則表達式匹配collectd的CPU數據格式,使用命名捕獲組提取CPU信息
expression /write_log values:(?:#012|\s)*(?<log>\[\{.*?\}\])/ # 非貪婪匹配確保只取一個數組
time_key time # 指定時間字段
time_type float # 時間格式為浮點數(Unix時間戳)
</parse>
</filter>
# 第二步:將log字段的JSON字符串解析為JSON對象
<filter sys.metric>
@type parser
key_name log # 解析第一步提取的log字段(此時是JSON字符串)
reserve_data true # 保留所有字段
remove_key_name_field false # 保留解析后的log字段
<parse>
@type json # 用JSON解析器處理
json_parser json # 使用默認JSON解析器
array true # 強制將JSON數組字符串解析為數組對象
</parse>
</filter>
# 第三步:標記異常日志(增加詳細調試字段)
<filter sys.metric>
@type record_transformer
enable_ruby true
<record>
# 直接判斷 values[0] 是否大于 80
status ${(record["plugin"] == "cpu" && record["type"] == "percent" && record["values"].is_a?(Array) && !record["values"].empty? && record["values"][0].to_f > 80) ? "abnormal" : "normal"}
debug_plugin ${record["plugin"].to_s}
debug_type ${record["type"].to_s}
debug_values_content ${record["values"].inspect}
</record>
</filter>
###############################
# 輸出到 Elasticsearch 8、stdout 和 HTTP端點
###############################
<match *.**> # 匹配所有標簽的日志Tag
@type copy # 復制插件,將日志同時發送到多個輸出目的地
# 第一個輸出目標:Elasticsearch
<store>
@id es_output
@type elasticsearch # 輸出到Elasticsearch
host elasticsearch # ES主機地址(可以是主機名或IP)
port 9200 # ES服務端口
scheme http # 使用HTTP協議
logstash_format true # 使用Logstash格式索引命名
logstash_prefix fluentd-${tag} # 索引前綴加上標簽名
logstash_dateformat %Y%m%d # 索引日期格式年月日
include_tag_key true # 在輸出中包含標簽字段
tag_key @log_name # 標簽字段的鍵名
flush_interval 1s # 刷新間隔1秒
</store>
<store>
@id output
@type stdout # 輸出到控制臺/stdout
key status # 字段
pattern ^normal$ # 正則匹配
</store>
<store>
@type relabel # 使用 relabel 輸出插件進行標簽重路由
@label @abnormal # 將所有匹配到的數據重新路由到 @abnormal 標簽處理流程
</store>
</match>
###############################
# 異常日志輸出到 HTTP端點
###############################
<label @abnormal>
# 這個 filter 塊會對所有進入 @abnormal 標簽的數據進行過濾
# 只有通過這個過濾器的數據才會繼續流向后面的 match 塊
<filter **>
@type grep # 使用 grep 過濾器
<regexp>
key status # 檢查每條記錄的 status 字段
pattern ^abnormal$ # 只保留 status 值為 "abnormal" 的記錄
</regexp>
</filter>
# 這個 match 塊會接收到經過上面 filter 過濾后的數據
<match **>
@id http_output
@type http # HTTP輸出插件
endpoint http://host.docker.internal:5001/analyze-fluentd-log # 日志分析API端點,host.docker.internal是Docker特殊主機名,指向宿主機
http_method post # 使用POST方法發送
<format>
@type json # 數據格式為JSON
</format>
<buffer>
# 緩沖配置區塊:控制數據如何緩沖和重試發送
# 用于提高網絡輸出的可靠性和性能,避免頻繁的小數據包發送
# 緩沖刷新間隔:每2秒強制刷新一次緩沖區
# - 即使緩沖區未滿,也會每2秒發送一次累積的數據
# - 平衡實時性和網絡效率:太短會增加網絡請求,太長會降低實時性
flush_interval 2s
# 重試策略:使用指數退避算法
# - 第一次重試等待:基礎等待時間
# - 第二次重試等待:基礎時間 × 2
# - 第三次重試等待:基礎時間 × 4,依此類推
# - 避免網絡擁塞時的大量重試導致雪崩效應
retry_type exponential_backoff
# 基礎重試等待時間:第一次重試前等待1秒
# - 首次重試的初始等待間隔
# - 后續重試會根據指數退避算法遞增
retry_wait 1s
# 最大重試間隔:單次重試最多等待30秒
# - 防止指數增長后的等待時間過長
# - 即使使用指數退避,也不會超過30秒的間隔
retry_max_interval 30s
# 總重試超時時間:10分鐘后放棄重試
# - 從第一次失敗開始計算,10分鐘后停止重試
# - 防止因為長期不可用的目標服務導致無限重試
# - 超時后數據可能會被丟棄(取決于配置)
retry_timeout 10m
</buffer>
</match>
</label>生成智能報表
在完成配置 fluent bit 與 fluentd 的配置之后,我們來到生成智能報表的環節, 這里需要根據 fluentd 傳入的告警信息(CPU 使用率>80%),進行分析并生成報表。
在/Users/cuihao/docker/EFK/目錄下創建process_log.py 文件, 寫入如下內容:
from flask import Flask, request, jsonify, Response
from openai import AsyncOpenAI
import os
import json
import threading
import asyncio
from datetime import datetime
from dotenv import load_dotenv
# 加載環境變量
load_dotenv()
# 創建Flask應用實例
app = Flask(__name__)
# 初始化客戶端
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:
raise ValueError("未找到DEEPSEEK_API_KEY,請在.env文件中配置")
client = AsyncOpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
)
# 路徑配置
current_dir = os.path.dirname(os.path.abspath(__file__))
report_dir = os.path.join(current_dir, "report")
OUTPUT_REPORT_PATH = os.path.join(report_dir, "incident_report.json")
ERROR_INFO_PATH = os.path.join(current_dir, "api_error.log")
OPERATION_LOG_PATH = os.path.join(current_dir, "api_operation.log") # 新增操作日志文件
os.makedirs(report_dir, exist_ok=True)
def write_operation_log(message, request_id=None):
"""記錄操作日志,包含時間戳和可選的請求ID"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 精確到毫秒
request_id = request_id or "N/A"
log_entry = f"[{timestamp}] [REQUEST={request_id}] OPERATION: {message}\n"
try:
with open(OPERATION_LOG_PATH, 'a', encoding='utf-8') as f:
f.write(log_entry)
print(log_entry.strip()) # 同時輸出到控制臺
except Exception as e:
print(f"[日志系統錯誤] 寫入操作日志失敗: {str(e)} | 原始消息: {message}")
def write_error_log(error_message, request_id=None):
"""記錄錯誤日志,包含時間戳和請求ID"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
request_id = request_id or "N/A"
log_entry = f"[{timestamp}] [REQUEST={request_id}] ERROR: {error_message}\n"
try:
with open(ERROR_INFO_PATH, 'a', encoding='utf-8') as f:
f.write(log_entry)
print(log_entry.strip()) # 同時輸出到控制臺
except Exception as e:
print(f"[日志系統錯誤] 寫入錯誤日志失敗: {str(e)} | 原始錯誤: {error_message}")
def append_to_json_file(data, timestamp, request_id):
"""將數據追加到JSON文件,帶日志記錄"""
try:
write_operation_log("開始寫入分析報告", request_id)
data_with_timestamp = {
"timestamp": timestamp,
"request_id": request_id, # 新增請求ID便于追蹤
**data
}
existing_data = []
if os.path.exists(OUTPUT_REPORT_PATH):
try:
with open(OUTPUT_REPORT_PATH, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
write_operation_log(f"成功讀取現有報告({len(existing_data)}條記錄)", request_id)
except (json.JSONDecodeError, FileNotFoundError):
error_msg = "報告文件損壞或不存在,將重新初始化"
write_error_log(error_msg, request_id)
existing_data = []
existing_data.append(data_with_timestamp)
with open(OUTPUT_REPORT_PATH, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
write_operation_log("分析報告寫入成功", request_id)
return True
except Exception as e:
error_msg = f"寫入JSON報告文件時出錯: {str(e)}"
write_error_log(error_msg, request_id)
return False
async def process_llm_background(log_content, call_timestamp, request_id):
"""后臺處理LLM調用,帶詳細日志"""
try:
write_operation_log("開始后臺LLM處理流程", request_id)
# 構造提示詞
write_operation_log("開始構建提示詞", request_id)
prompt = f"""
你是一名系統運維專家。以下是異常日志,請生成根因分析報告:
日志內容:
{log_content}
請輸出格式:
【標題】
...
【異常原因分析】
...
【修復建議】
...
"""
write_operation_log("提示詞構建完成,準備調用LLM", request_id)
# 調用LLM
write_operation_log("開始調用DeepSeek API", request_id)
response = await client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是經驗豐富的運維專家,負責分析日志并提供修復建議"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=800,
stream=False
)
write_operation_log("DeepSeek API調用成功", request_id)
# 處理結果
report_text = response.choices[0].message.content
result_data = {
"raw_log": log_content,
"root_cause_report": report_text
}
write_operation_log("LLM返回結果解析完成", request_id)
# 保存結果
if append_to_json_file(result_data, call_timestamp, request_id):
write_operation_log("分析結果已成功保存到報告文件", request_id)
else:
write_error_log("分析結果保存失敗", request_id)
write_operation_log("后臺LLM處理流程完成", request_id)
except Exception as e:
error_msg = f"LLM調用失敗: {str(e)}"
write_error_log(error_msg, request_id)
def run_async_task(log_content, call_timestamp, request_id):
"""線程包裝器,帶日志"""
try:
write_operation_log("啟動異步任務處理線程", request_id)
asyncio.run(process_llm_background(log_content, call_timestamp, request_id))
except Exception as e:
write_error_log(f"異步任務線程執行失敗: {str(e)}", request_id)
def build_log_content_from_records(records, request_id):
"""構建日志內容,帶日志記錄"""
write_operation_log("開始構建日志內容", request_id)
lines = []
for i, record in enumerate(records or []):
if isinstance(record, dict) and ("log" in record):
value = record.get("log")
if value is not None and str(value).strip() != "":
lines.append(str(value))
write_operation_log(f"成功提取第{i+1}條記錄的log字段", request_id)
else:
write_operation_log(f"第{i+1}條記錄的log字段為空,已忽略", request_id)
else:
write_operation_log(f"第{i+1}條記錄不包含log字段,已忽略", request_id)
log_content = "\n".join(lines).strip()
write_operation_log(f"日志內容構建完成,共包含{len(lines)}條有效記錄", request_id)
return log_content
@app.route('/analyze-fluentd-log', methods=['POST'])
def analyze_fluentd_log():
"""主接口:帶詳細日志記錄的請求處理"""
# 生成唯一請求ID,便于全流程追蹤
request_id = datetime.now().strftime("%Y%m%d%H%M%S") + f"-{os.urandom(4).hex()}"
write_operation_log("收到新的請求", request_id)
try:
# 1. 記錄請求基本信息
call_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
write_operation_log(f"請求方法: {request.method}, 客戶端IP: {request.remote_addr}, Content-Type: {request.content_type}", request_id)
# 2. 驗證請求類型
if request.content_type != "application/x-ndjson":
error_msg = f"不支持的Content-Type: {request.content_type},僅接受application/x-ndjson"
write_error_log(error_msg, request_id)
return jsonify({
"error": error_msg,
"request_id": request_id
}), 415
# 3. 解析NDJSON數據
write_operation_log("開始解析NDJSON數據", request_id)
ndjson_data = request.data.decode().splitlines()
records = []
for i, line in enumerate(ndjson_data):
if line.strip():
try:
record = json.loads(line)
records.append(record)
write_operation_log(f"成功解析第{i+1}行NDJSON數據", request_id)
except json.JSONDecodeError as e:
error_msg = f"第{i+1}行NDJSON解析失敗: {str(e)}"
write_error_log(error_msg, request_id)
records.append({"message": line, "parse_error": str(e)})
else:
write_operation_log(f"第{i+1}行是空白行,已忽略", request_id)
write_operation_log(f"NDJSON數據解析完成,共{len(records)}條記錄", request_id)
# 4. 過濾有效日志記錄
write_operation_log("開始過濾有效日志記錄", request_id)
log_records = [r for r in records if isinstance(r, dict) and 'log' in r and str(r.get('log', '')).strip()]
write_operation_log(f"有效日志記錄過濾完成: {len(log_records)}/{len(records)}", request_id)
if not log_records:
error_msg = "未找到有效日志記錄(缺少log字段)"
write_error_log(error_msg, request_id)
return Response(
json.dumps({
"status": "ignored",
"reason": error_msg,
"raw_records_count": len(records),
"request_id": request_id
}, ensure_ascii=False, indent=2),
mimetype="application/json"
)
# 5. 構建日志內容
log_content = build_log_content_from_records(log_records, request_id)
if not log_content:
error_msg = "日志內容為空"
write_error_log(error_msg, request_id)
return jsonify({
"error": error_msg,
"request_id": request_id
}), 400
# 6. 參數驗證通過,準備啟動后臺任務
write_operation_log("參數驗證通過,準備啟動后臺處理任務", request_id)
# 7. 啟動后臺線程處理LLM調用
thread = threading.Thread(
target=run_async_task,
args=(log_content, call_timestamp, request_id),
daemnotallow=True
)
thread.start()
write_operation_log("后臺處理線程已啟動", request_id)
# 8. 立即返回響應
response_data = {
"status": "accepted",
"message": "請求已接收,正在后臺處理",
"timestamp": call_timestamp,
"log_records_count": len(log_records),
"request_id": request_id # 返回請求ID便于追蹤
}
write_operation_log("請求處理完成,已返回響應", request_id)
return Response(
json.dumps(response_data, ensure_ascii=False, indent=2),
mimetype="application/json",
status=202
)
except Exception as e:
error_msg = f"參數驗證失敗: {str(e)}"
write_error_log(error_msg, request_id)
return jsonify({
"error": error_msg,
"request_id": request_id
}), 400
@app.route('/health', methods=['GET'])
def health_check():
"""健康檢查接口,帶日志"""
request_id = f"health-{datetime.now().strftime('%Y%m%d%H%M%S')}"
write_operation_log("收到健康檢查請求", request_id)
response = jsonify({
"status": "healthy",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"request_id": request_id
})
write_operation_log("健康檢查響應已返回", request_id)
return response
if __name__ == '__main__':
print("Flask API 啟動中...")
print("環境變量配置:")
print(f" - 從 .env 文件加載 DEEPSEEK_API_KEY: {'已配置' if DEEPSEEK_API_KEY else '未配置'}")
print("日志文件路徑:")
print(f" - 操作日志: {OPERATION_LOG_PATH}")
print(f" - 錯誤日志: {ERROR_INFO_PATH}")
print("可用的端點:")
print(" - POST /analyze-fluentd-log - 接收日志并在后臺處理")
print(" - GET /health - 服務健康檢查")
app.run(host="0.0.0.0", port=5001, debug=True)
代碼是基于 Flask 框架的日志分析 API 服務,用于接收并處理 EFK 體系中 Fluentd 推送的異常日志,結合 AI 大模型實現智能運維分析。代碼首先加載環境變量獲取 DeepSeek API 密鑰,初始化異步客戶端;通過定義日志記錄函數,實現操作日志、錯誤日志的詳細追蹤,以及分析報告的 JSON 持久化存儲。核心接口/analyze-fluentd-log接收 NDJSON 格式的日志數據,生成唯一請求 ID 用于全流程追蹤,解析日志后過濾出含有效log字段的記錄,構建日志內容并啟動后臺線程,異步調用 DeepSeek 大模型生成根因分析報告(含標題、異常原因、修復建議),最終將結果寫入 JSON 文件。
測試
接下來需要將整個案例進行測試,從日志文件生成到采集、分析、調用大模型。
日志生成與報表展示
由于需要生成測試的日志文件以及查看生成的智能日志,所以需要在 EFK 目錄下創建log_generator.py 和log_ui.py 分別完成上述功能。
import streamlit as st
import json
import re
import os
from log_generator import LogGenerator
# 初始化日志生成器
log_generator = LogGenerator()
# ========================
# 讀取 JSON 文件
# ========================
#file_path = r"report\incident_report.json"
file_path = os.path.join("report", "incident_report.json")
# 初始化reports變量
reports = []
# 檢查文件是否存在和讀取數據
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
try:
with open(file_path, "r", encoding="utf-8") as f:
reports = json.load(f)
except json.JSONDecodeError as e:
st.sidebar.error(f"JSON文件格式錯誤: {e}")
except Exception as e:
st.sidebar.error(f"讀取文件時發生錯誤: {e}")
else:
if not os.path.exists(file_path):
st.sidebar.warning(f"?? 文件 {file_path} 不存在")
else:
st.sidebar.warning(f"?? 文件 {file_path} 為空")
# ========================
# 提取標題(標題在【標題】后的下一行)
# ========================
def extract_title(report_text):
# 去掉空行,逐行處理
lines = [line.strip() for line in report_text.splitlines() if line.strip()]
for i, line in enumerate(lines):
if line.startswith("【標題】"):
# 找到【標題】,取下一行作為真正的標題
if i + 1 < len(lines):
return lines[i + 1].strip()
else:
return "未命名報告"
return "未命名報告"
# ========================
# 按【字段】分段解析報告
# 例如:分出 "異常原因分析"、"修復建議"、"異常日志"
# ========================
def split_sections(report_text):
sections = {}
# 用正則把【xxx】作為分隔符拆開
parts = re.split(r"(【.*?】)", report_text)
current_key = None
for part in parts:
if not part.strip():
continue
if part.startswith("【") and part.endswith("】"):
# 當前是一個小節標題,例如【異常原因分析】
current_key = part.strip("【】")
sections[current_key] = ""
else:
# 當前是小節內容
if current_key:
sections[current_key] += part.strip() + "\n"
return sections
# ========================
# 側邊欄:搜索功能
# ========================
st.sidebar.title("異常報告列表")
# 如果沒有報告數據,顯示提示信息
if not reports:
st.sidebar.info("?? 暫無報告數據")
# 右側主內容區域顯示友好提示
st.title("?? 異常報告系統")
st.info("""
## 歡迎使用異常報告系統!
### ?? 當前狀態
- 報告數據文件未找到或為空
- 請確保 `report/incident_report.json` 文件存在且包含有效數據
### ?? 如何添加數據
1. 在 `report` 文件夾中創建 `incident_report.json` 文件
2. 按照以下格式添加報告數據:
```json
[
{
"timestamp": "2024-01-01 10:00:00",
"raw_log": "日志內容...",
"root_cause_report": "【標題】\n報告標題\n【異常原因分析】\n分析內容..."
}
]
```
### ?? 功能說明
- 支持按時間倒序查看報告
- 支持搜索報告內容
- 自動解析報告結構并展示
""")
st.stop()
# 倒序排序:最新的報告排在最前面
reports_sorted = sorted(reports, key=lambda x: x["timestamp"], reverse=True)
# 添加搜索輸入框
search_query = st.sidebar.text_input("?? 搜索報告內容(標題/日志/分析/建議)").strip()
# 判斷一條報告是否匹配搜索條件
def report_matches(report, query):
if not query:
return True
query = query.lower()
if query in extract_title(report["root_cause_report"]).lower():
return True
if query in report["raw_log"].lower():
return True
if query in report["root_cause_report"].lower():
return True
return False
# 根據搜索條件過濾報告
filtered_reports = [r for r in reports_sorted if report_matches(r, search_query)]
# 構建側邊欄顯示內容(報告生成時間 | 標題)
sidebar_items = [
f"{r['timestamp']} | {extract_title(r['root_cause_report'])}" for r in filtered_reports
]
# 沒有結果時提示
if not sidebar_items:
st.sidebar.warning("未找到匹配的報告")
st.stop()
# 側邊欄選擇框
selected_item = st.sidebar.radio("選擇報告", sidebar_items)
# ========================
# 側邊欄:測試區域
# ========================
st.sidebar.markdown("---")
st.sidebar.subheader("?? 測試數據生成")
# 日志類型選擇
log_type = st.sidebar.radio(
"選擇日志類型",
["CPU"],
help="CPU: 生成CPU使用率日志",
)
# CPU使用率輸入(僅在選擇CPU類型時顯示)
cpu_usage = None
if log_type == "CPU":
cpu_usage = st.sidebar.number_input(
"CPU使用率 (%)",
min_value=0.1,
max_value=100.0,
value=85.0,
step=0.1,
help="輸入0.1-100之間的CPU使用率值"
)
# 生成按鈕
if st.sidebar.button(f"生成{log_type}日志", type="primary"):
with st.spinner("正在生成日志..."):
result = log_generator.generate_and_save_log(log_type.lower(), cpu_usage)
if result['status'] == 'success':
st.sidebar.success(result['message'])
st.sidebar.info(f"日志文件: {result['log_file']}")
if 'log_content_preview' in result:
st.sidebar.code(result['log_content_preview'], language="text")
else:
st.sidebar.error(result['message'])
# 根據選擇找到對應的報告
selected_index = sidebar_items.index(selected_item)
selected_report = filtered_reports[selected_index]
# 提取報告標題
report_title = extract_title(selected_report["root_cause_report"])
# ========================
# 右側內容展示
# ========================
st.title(report_title)
st.subheader(f"**報告生成時間**: {selected_report['timestamp']}")
# ---- 原始日志(默認折疊) ----
with st.expander("原始日志", expanded=False):
st.code(selected_report["raw_log"], language="text")
# ---- 根因分析報告 ----
sections = split_sections(selected_report["root_cause_report"])
# 去掉【標題】部分,避免重復展示
sections.pop("標題", None)
for key, value in sections.items():
st.subheader(key) # 小節標題,例如 “異常原因分析” st.markdown(value.strip()) # 小節內容啟動 EFK 服務
按照下圖所示,啟動 EFK 服務,如果已經啟動,建議重啟一下,保證配置文件修改之后可以生效。

啟動分析服務
當大模型(DeepSeek)接收到告警信息之后會生成對應的報告。
執行如下指令:
python process_log.py看到如下內容的時候, 說明該服務啟動了。
Flask API 啟動中...
環境變量配置:
- 從 .env 文件加載 DEEPSEEK_API_KEY: 已配置
日志文件路徑:
- 操作日志: /Users/cuihao/docker/EFK/api_operation.log
- 錯誤日志: /Users/cuihao/docker/EFK/api_error.log
可用的端點:
- POST /analyze-fluentd-log - 接收日志并在后臺處理
- GET /health - 服務健康檢查
* Serving Flask app "process_log" (lazy loading)生成測試數據
通過如下命令執行啟動測試和報告查看界面。
streamlit run log_ui.py如下圖所示,在打開的 Web UI 界面的左側,輸入 CPU 使用率為 85.5%,并點擊“生成 CPU 日志”。

觀察結果
由于日志文件寫入了內容, 日志采集服務、過濾服務以及處理服務都啟動了,接下來就可以觀察日志處理的過程了。通過在容器中執行 Fluent-bit 的日志文件可以發現,它已經采集到了對應的日志信息,如下:

通過是在 Fluent-D 中也可以看到接收到的日志信息,如下:

ES + Kibana 日志展示
這里只是展示日志內容,包含了正常和異常兩類數據,但是沒有對智能日志的顯示。
配置確認
ES和Kibana的鏡像版本都是使用的8.17.1,版本保持要一致,ES和Kibana服務的配置在??docker-compse??中。?

ES的數據來自fluentd轉發,由于轉發功能依賴fluentd的插件fluent-plugin-elasticsearch,?fluentd的??dockerfile??文件中配置有安裝插件的命令,在通過docker compose生成容器啟動時會執行dockerfile中的命令安裝插件到fluentd容器中。?

轉發到ES需要修改配置文件??fluent.conf??,通過服務名進行轉發到ES。?

KIbana鏡像中默認配置連接到ES(ES默認啟動的端口是9200,如果ES的端口需要手動修改,這里也需要改成相應的端口)。

日志展示
我在 fluentd 中完成了到ES的轉發配置,并成功轉發才會生成索引。通過配置文件可以看出,Kibana的訪問端口為 5601。通過如下地址:http://localhost:5601/app/discover#/ 訪問,如果首次Kibana打開會提示創建數據視圖。

如下圖所示,在彈窗中配置名稱和匹配索引的規則。fluentd-*表示匹配所有fluentd-開頭的索引,在右邊列表會顯示匹配的索引。

視圖創建完成后會跳轉到展示頁面,默認顯示最近15分鐘的數據。

修改時間范圍,這里切換到15天,右邊的Refresh按鈕會變成Update。

點擊Update,就可以看到數據列表。


作者介紹
崔皓,51CTO社區編輯,資深架構師,擁有18年的軟件開發和架構經驗,10年分布式架構經驗。

















