使用DaemonSet實現heapdump文件自動化管理
一、引言
1、為什么要獲取heapdump文件
heapdump文件是Java應用遭遇OOM后的診斷報告,記錄了某一時刻 JVM 堆中對象的詳細使用情況,是 JVM 堆內存的一個快照。通過分析 heapdump 文件,我們可以深入了解到內存中究竟存在哪些對象,它們占用了多少內存空間,以及對象之間的引用關系如何。這對于定位內存泄漏問題至關重要。
2、為什么使用DaemonSet實現
之前在SRE運維筆記公眾號中看到一篇文章《運維救星!一鍵開啟k8s微服務OOM heapdump自動化之旅》,其實現思路通過在應用容器中增加dump腳本,然后通過java參數-XX:OnOutOfMemoryError配置腳本,它的作用是當內存溢出的時候,會調用這個參數配置的腳本做一些后續處理,比如文章中的dump腳本,也可以是重啟應用的腳本等。
上述方法對應用有一定的侵入性,另外,如果文件太大,會出現容器退出導致上傳失敗的情況。結合實際情況,準備使用DaemonSet部署一個heapdump-watcher應用,通過它來監聽heapdump.prof文件實現自動化管理。
Tips:該方法僅適合將heapdump.prof持久化到K8s節點的場景。但是具有一定的參考意義。
3、實施前提
該方案需要以下前提:
- heapdump.prof文件持久化到K8s節點。
- 持久化的目錄具備相同規則,比如:/mnt/logs/<APP_NAME>/logs/heapdump.prof,如果需要避免沖突,目錄可以改造成/mnt/logs/<APP_NAME>/logs/<POD_NAME>heapdump.prof。
- 具備阿里云OSS操作權限。
- 具備一個可用的企業微信機器人。
二、整體思路
圖片
OOM事件觸發通過Java啟動參數配置,增加-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/logs/heapdump.hprof,當應用觸發OOM,則會在/mnt/logs目錄下自動生成heapdump.prof文件。
我們通過fsnotify來監聽文件的變化,當heapdump.prof生成完后,fsnotify就會迅速捕捉到這個事件,我們通過阿里云OSS的SDK實現文件上傳,將heapdump.prof文件壓縮后上傳到阿里云OSS。為了節約節點磁盤空間,當heapdump.prof文件上傳完成后清理本地文件。
為了讓相關開發人員了解到新的heapdump.prof文件已經生成,我們通過企業微信機器人通知到對應的開發群。
三、具體實現
(1)初始化部分
func init() {
// 獲取環境
env = getEnv("ENV", "prod")
var err error
watcher, err = fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Failed to create fsnotify watcher: %v", err)
}
// 加載配置文件
config, err = loadConfig(configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 初始化OSS客戶端
ossClient, err := oss.New(config.OSS.Endpoint, config.OSS.AccessID, config.OSS.AccessKey)
if err != nil {
log.Fatalf("Failed to create OSS client: %v", err)
}
client, _ = ossClient.Bucket(config.OSS.Bucket)
if config.WatchPods {
// 初始化Kubernetes客戶端
kubeClient, err = createKubeClient()
if err != nil {
log.Fatalf("Failed to create Kubernetes client: %v", err)
}
// 獲取當前節點的IP
nodeIP, err = getNodeIP()
if err != nil {
log.Fatalf("Failed to get node IP: %v", err)
}
}
// 初始化信號通道
signalChan = make(chan os.Signal, 1)
stopChan = make(chan struct{})
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
}在這段初始化代碼中,首先通過getEnv函數獲取環境變量ENV的值,如果未設置則默認為prod。接著創建一個fsnotify.Watcher,用于監聽文件系統的變化。然后從指定路徑configPath加載配置文件,配置文件中包含了 OSS、企業微信 Webhook 以及白名單等相關配置信息。
隨后,利用配置信息初始化阿里云 OSS 客戶端,通過提供的Endpoint、AccessID和AccessKey創建ossClient,并獲取指定的Bucket,以便后續進行文件上傳操作。
如果配置中WatchPods字段為true,表示會監聽Pod的變化(因為Pod會重建,如果日志目錄包含POD_NAME,重建后就不應該再監聽原來Pod目錄),則會初始化 Kubernetes 客戶端。通過createKubeClient函數創建kubeClient,用于與 Kubernetes 集群進行交互。還會獲取當前節點的 IP 地址,以便后續監聽該節點上的 Pod 變化。
最后,初始化兩個通道signalChan和stopChan。signalChan用于接收操作系統發送的信號,如SIGINT(中斷信號,通常由用戶按下 Ctrl+C 觸發)和SIGTERM(終止信號,用于正常終止進程),以便程序能夠在接收到這些信號時進行優雅退出;stopChan則用于停止 Informer,當程序接收到終止信號時,通過關閉stopChan來通知 Informer 停止工作。
(2)文件監聽
func watchFiles() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
// 檢測到新文件創建
if strings.HasSuffix(event.Name, "heapdump.prof") {
log.Printf("New heapdump file detected: %s", event.Name)
// 等待文件寫入完成
if err := waitForFileCompletion(event.Name); err != nil {
log.Printf("Failed to wait for file completion: %v", err)
continue
}
// 上傳文件到OSS
appName := filepath.Base(filepath.Dir(filepath.Dir(event.Name)))
err := uploadFileToOSS(event.Name, appName)
if err != nil {
log.Printf("Failed to upload file to OSS: %v", err)
} else {
log.Printf("File uploaded to OSS successfully: %s", event.Name)
// 發送企業微信告警通知
err = sendWechatAlert(appName)
if err != nil {
log.Printf("Failed to send WeChat alert: %v", err)
}
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("Error: %v", err)
}
}
}watchFiles函數是實現文件監聽的核心部分。它通過一個無限循環for { }和select語句來監聽watcher.Events通道和watcher.Errors通道。
當watcher.Events通道有事件發生時,會檢查事件類型是否為文件創建(event.Op&fsnotify.Create == fsnotify.Create)。如果是新文件創建,且文件后綴為heapdump.prof,則表示檢測到了新的 heapdump 文件。
此時,會調用waitForFileCompletion函數等待文件寫入完成。該函數通過不斷檢查文件大小是否變化來判斷文件是否寫入完成,設置了最大檢查時長為 30 秒,檢查間隔為 2 秒。如果文件在規定時間內大小不再變化,則認為文件寫入完成;否則,返回錯誤并繼續監聽下一個事件。
文件寫入完成后,獲取文件所在目錄的應用名稱,然后調用uploadFileToOSS函數將文件上傳到 OSS。上傳成功后,會調用sendWechatAlert函數發送企業微信告警通知,告知相關人員新的 heapdump 文件已生成并上傳。
(3)Pod狀態監聽
該方法主要是針對heapdump.prof所存放的目錄有POD_NAME變量,希望實現的是當原Pod銷毀會取消監聽原Pod目錄,當新Pod創建會監聽新Pod目錄。
func watchPods() {
// 獲取當前節點上的Pod列表
for _, appName := range config.Whitelist {
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", appName),
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeIP),
})
if err != nil {
log.Printf("Failed to list pods for app %s: %v", appName, err)
continue
}
for _, pod := range pods.Items {
addPodWatch(appName, pod.Name)
}
}
// 監聽Pod變化
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod added: %s/%s", pod.Namespace, pod.Name)
addPodWatch(appName, pod.Name)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod deleted: %s/%s", pod.Namespace, pod.Name)
removePodWatch(appName, pod.Name)
}
},
},
)
controller.Run(stopChan) // 使用 stopChan 來停止 Informer}watchPods函數負責監聽 Pod 的變化。首先,遍歷配置中的白名單應用名稱,通過 Kubernetes 客戶端kubeClient獲取當前節點上屬于這些應用的 Pod 列表。使用LabelSelector來篩選出特定應用的 Pod,FieldSelector來指定只獲取當前節點上的 Pod。
對于獲取到的每個 Pod,調用addPodWatch函數為其添加文件監聽。addPodWatch函數會根據應用名稱和 Pod 名稱構建日志目錄路徑,并使用watcher.Add方法將該目錄添加到文件監聽列表中,以便后續能及時監聽到該 Pod 生成的 heapdump 文件。
然后,通過cache.NewInformer創建一個 Informer,用于監聽 Pod 的變化。Informer是 Kubernetes 客戶端中的一個重要組件,它通過ListWatch機制定期從 Kubernetes API Server 獲取 Pod 列表,并監聽 Pod 的變化事件。
ListFunc和WatchFunc分別定義了獲取 Pod 列表和監聽 Pod 變化的方法,都通過kubeClient.CoreV1().Pods(metav1.NamespaceAll)來操作所有命名空間下的 Pod,并根據當前節點 IP 進行篩選。
ResourceEventHandlerFuncs定義了 Informer 在接收到 Pod 添加和刪除事件時的處理邏輯。當有新 Pod 添加時,如果該 Pod 的應用名稱在白名單中,會調用addPodWatch函數為其添加文件監聽;當有 Pod 被刪除時,如果應用名稱在白名單中,會調用removePodWatch函數移除對該 Pod 的文件監聽。
最后,啟動 Informer 并傳入stopChan,當stopChan被關閉時,Informer 會停止運行,實現了優雅停止的功能。
(4)文件上傳
func uploadFileToOSS(filePath string, appName string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 創建臨時文件用于存儲壓縮后的文件
tempFile, err := os.CreateTemp("", "heapdump-*.zip")
if err != nil {
return err
}
defer tempFile.Close()
defer os.Remove(tempFile.Name()) // 刪除臨時文件
// 創建 zip.Writer
zipWriter := zip.NewWriter(tempFile)
defer zipWriter.Close()
// 添加文件到 zip
zipFileWriter, err := zipWriter.Create(filepath.Base(filePath))
if err != nil {
return err
}
_, err = io.Copy(zipFileWriter, file)
if err != nil {
return err
}
// 確保 zip 文件寫入完成
err = zipWriter.Close()
if err != nil {
return err
}
// 重新打開臨時文件用于上傳
tempFile.Seek(0, 0)
tempFileReader := io.Reader(tempFile)
// 構建上傳路徑
timestamp := time.Now().Format("20060102150405")
objectName := fmt.Sprintf("heapdump/%s/heapdump_%s.zip", appName, timestamp)
// 設置文件元數據
expires := time.Now().Add(24 * time.Hour) // 設置過期時間為24小時后
options := []oss.Option{
oss.Expires(expires),
}
err = client.PutObject(objectName, tempFileReader, options...)
if err != nil {
return err
}
// 生成預簽名URL
ossURL, err = client.SignURL(objectName, oss.HTTPGet, expires.Unix()-time.Now().Unix())
if err != nil {
log.Fatalf("Failed to generate presigned URL: %v", err)
}
// 文件上傳成功后,刪除本地文件
log.Printf("Deleting local file: %s", filePath)
if err := os.Remove(filePath); err != nil {
log.Printf("Failed to delete local file: %v", err)
}
return nil
}這一步先將heapdump.prof進行zip壓縮,然后再將其上傳到OSS,上傳成功后刪除本地文件。
(5)發送通知
func sendWechatAlert(appName string) error {
// 構建 Markdown 格式的消息
markdownContent := fmt.Sprintf(`# JAVA OOM DUMP 文件生成
> 應用:%s
> 環境:%s
> 文件:[下載地址](%s)
> *Tips*: 文件只保留1天,請及時下載`, appName, env, ossURL)
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"content": markdownContent,
},
}
_, body, errs := gorequest.New().Post(config.Wechat.WebhookURL).Send(payload).End()
if errs != nil {
return fmt.Errorf("failed to send WeChat alert: %v", errs)
}
log.Printf("WeChat alert response: %s", body)
return nil
}該步驟將產生heapdump的信息發送到對應的告警群。
四、部署驗證
(1)制作鏡像
將應用打包成Docker鏡像。
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /heapdump-watcher
FROM alpine:3.18
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /heapdump-watcher ./heapdump-watcher
CMD ["/heapdump-watcher"](2)在K8s中部署應用
apiVersion: v1
kind: ServiceAccount
metadata:
name: heapdump-watcher
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: heapdump-watcher-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ConfigMap
metadata:
name: heapdump-config
namespace: default
data:
config.yaml: |
oss:
endpoint: your-oss-endpoint
bucket: your-oss-bucket
accessID: your-oss-access-id
accessKey: your-oss-access-key
wechat:
webhookURL: your-wechat-webhook-url
whitelist:
- app1
- app2
- app3
watchPods: false # 控制是否監聽 Pod 變化
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: heapdump-watcher
namespace: default
spec:
selector:
matchLabels:
app: heapdump-watcher
template:
metadata:
labels:
app: heapdump-watcher
spec:
serviceAccountName: heapdump-watcher
containers:
- name: heapdump-watcher
image: your-docker-image:latest
volumeMounts:
- name: logs
mountPath: /mnt/logs
readOnly: false
- name: config
mountPath: /app/config.yaml
subPath: config.yaml
readOnly: true
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ENV
value: prod
volumes:
- name: logs
hostPath:
path: /mnt/logs
type: Directory
- name: config
configMap:
name: heapdump-config
items:
- key: config.yaml
path: config.yaml(3)驗證
當應用產生告警后會通知到對應的企業微信,如下:
圖片
五、最后
當前功能已經初步實現,但仍有許多可以優化和擴展的方向。可以考慮擴展支持更多類型的云存儲,如騰訊云 COS、AWS S3 等,以滿足不同用戶的需求。這樣一來,用戶可以根據自己的實際情況和偏好,選擇最適合自己的云存儲服務,提高方案的通用性和靈活性。
另外在通知內容和方式上,可以進一步豐富通知內容,不僅包含應用名稱、環境和文件下載鏈接,還可以增加更多關于內存問題的詳細信息,如內存使用峰值、OOM 發生的時間點等。在通知方式上,可以增加對其他通信工具的支持,如釘釘、飛書等,讓用戶能夠根據自己團隊的使用習慣選擇合適的通知方式,確保通知能夠及時、準確地傳達給相關人員。
還可以引入更智能的分析功能,在上傳 heapdump 文件后,自動對文件進行初步分析,提取關鍵信息,如內存泄漏的疑似對象、內存占用過高的類等,并將分析結果一并通知給相關人員。這樣可以幫助開發人員更快地定位問題,提高問題解決的效率,為 Java 應用的穩定運行提供更強大的支持。






















