精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Go語言如何操縱Kafka保證無消息丟失

開發 后端 Kafka
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。

[[423396]]

背景

目前一些互聯網公司會使用消息隊列來做核心業務,因為是核心業務,所以對數據的最后一致性比較敏感,如果中間出現數據丟失,就會引來用戶的投訴,年底績效就變成325了。之前和幾個朋友聊天,他們的公司都在用kafka來做消息隊列,使用kafka到底會不會丟消息呢?如果丟消息了該怎么做好補償措施呢?本文我們就一起來分析一下,并介紹如何使用Go操作Kafka可以不丟失數據。

本文操作kafka基于:https://github.com/Shopify/sarama

初識kafka架構

維基百科對kafka的介紹:

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。此外,Kafka可以通過Kafka Connect連接到外部系統(用于數據輸入/輸出),并提供了Kafka Streams——一個Java]流式處理庫。該設計受事務日志的影響較大。

kafka的整體架構比較簡單,主要由producer、broker、consumer組成:

截屏2021-09-12 上午10.00.13

針對架構圖我們解釋一個各個模塊:

  • Producer:數據的生產者,可以將數據發布到所選擇的topic中。
  • Consumer:數據的消費者,使用Consumer Group進行標識,在topic中的每條記錄都會被分配給訂閱消費組中的一個消費者實例,消費者實例可以分布在多個進程中或者多個機器上。
  • Broker:消息中間件處理節點(服務器),一個節點就是一個broker,一個Kafka集群由一個或多個broker組成。

還有些概念我們也介紹一下:

  • topic:可以理解為一個消息的集合,topic存儲在broker中,一個topic可以有多個partition分區,一個topic可以有多個Producer來push消息,一個topic可以有多個消費者向其pull消息,一個topic可以存在一個或多個broker中。
  • partition:其是topic的子集,不同分區分配在不同的broker上進行水平擴展從而增加kafka并行處理能力,同topic下的不同分區信息是不同的,同一分區信息是有序的;每一個分區都有一個或者多個副本,其中會選舉一個leader,fowller從leader拉取數據更新自己的log(每個分區邏輯上對應一個log文件夾),消費者向leader中pull信息。

kafka丟消息的三個節點

生產者push消息節點

先看一下producer的大概寫入流程:

  • producer先從kafka集群找到該partition的leader
  • producer將消息發送給leader,leader將該消息寫入本地
  • follwers從leader pull消息,寫入本地log后leader發送ack
  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 發送 ack

截屏2021-09-12 上午11.16.43

通過這個流程我們可以看到kafka最終會返回一個ack來確認推送消息結果,這里kafka提供了三種模式:

  1. NoResponse RequiredAcks = 0 
  2. WaitForLocal RequiredAcks = 1 
  3. WaitForAll RequiredAcks = -1 
  • NoResponse RequiredAcks = 0:這個代表的就是數據推出的成功與否都與我無關了
  • WaitForLocal RequiredAcks = 1:當local(leader)確認接收成功后,就可以返回了
  • WaitForAll RequiredAcks = -1:當所有的leader和follower都接收成功時,才會返回

所以根據這三種模式我們就能推斷出生產者在push消息時有一定幾率丟失的,分析如下:

  • 如果我們選擇了模式1,這種模式丟失數據的幾率很大,無法重試
  • 如果我們選擇了模式2,這種模式下只要leader不掛,就可以保證數據不丟失,但是如果leader掛了,follower還沒有同步數據,那么就會有一定幾率造成數據丟失
  • 如果選擇了模式3,這種情況不會造成數據丟失,但是有可能會造成數據重復,假如leader與follower同步數據是網絡出現問題,就有可能造成數據重復的問題。

所以在生產環境中我們可以選擇模式2或者模式3來保證消息的可靠性,具體需要根據業務場景來進行選擇,在乎吞吐量就選擇模式2,不在乎吞吐量,就選擇模式3,要想完全保證數據不丟失就選擇模式3是最可靠的。

kafka集群自身故障造成

kafka集群接收到數據后會將數據進行持久化存儲,最終數據會被寫入到磁盤中,在寫入磁盤這一步也是有可能會造成數據損失的,因為寫入磁盤的時候操作系統會先將數據寫入緩存,操作系統將緩存中數據寫入磁盤的時間是不確定的,所以在這種情況下,如果kafka機器突然宕機了,也會造成數據損失,不過這種概率發生很小,一般公司內部kafka機器都會做備份,這種情況很極端,可以忽略不計。

消費者pull消息節點

push消息時會把數據追加到Partition并且分配一個偏移量,這個偏移量代表當前消費者消費到的位置,通過這個Partition也可以保證消息的順序性,消費者在pull到某個消息后,可以設置自動提交或者手動提交commit,提交commit成功,offset就會發生偏移:

截屏2021-09-12 下午3.37.33

所以自動提交會帶來數據丟失的問題,手動提交會帶來數據重復的問題,分析如下:

  • 在設置自動提交的時候,當我們拉取到一個消息后,此時offset已經提交了,但是我們在處理消費邏輯的時候失敗了,這就會導致數據丟失了
  • 在設置手動提交時,如果我們是在處理完消息后提交commit,那么在commit這一步發生了失敗,就會導致重復消費的問題。

比起數據丟失,重復消費是符合業務預期的,我們可以通過一些冪等性設計來規避這個問題。

實戰

完整代碼已經上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo

解決push消息丟失問題

主要是通過兩點來解決:

  • 通過設置RequiredAcks模式來解決,選用WaitForAll可以保證數據推送成功,不過會影響時延時
  • 引入重試機制,設置重試次數和重試間隔

因此我們寫出如下代碼(摘出創建client部分):

  1. func NewAsyncProducer() sarama.AsyncProducer { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  cfg.Version = version 
  9.  cfg.Producer.RequiredAcks = sarama.WaitForAll // 三種模式任君選擇 
  10.  cfg.Producer.Partitioner = sarama.NewHashPartitioner 
  11.  cfg.Producer.Return.Successes = true 
  12.  cfg.Producer.Return.Errors = true 
  13.  cfg.Producer.Retry.Max = 3 // 設置重試3次 
  14.  cfg.Producer.Retry.Backoff = 100 * time.Millisecond 
  15.  cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg) 
  16.  if err != nil{ 
  17.   log.Fatal("NewAsyncProducer failed", err.Error()) 
  18.   return nil 
  19.  } 
  20.  return cli 

解決pull消息丟失問題

這個解決辦法就比較粗暴了,直接使用自動提交的模式,在每次真正消費完之后在自己手動提交offset,但是會產生重復消費的問題,不過很好解決,使用冪等性操作即可解決。

代碼示例:

  1. func NewConsumerGroup(group string) sarama.ConsumerGroup { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  
  9.  cfg.Version = version 
  10.  cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange 
  11.  cfg.Consumer.Offsets.Initial = sarama.OffsetOldest 
  12.  cfg.Consumer.Offsets.Retry.Max = 3 
  13.  cfg.Consumer.Offsets.AutoCommit.Enable = true // 開啟自動提交,需要手動調用MarkMessage才有效 
  14.  cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 間隔 
  15.  client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg) 
  16.  if err != nil { 
  17.   log.Fatal("NewConsumerGroup failed", err.Error()) 
  18.  } 
  19.  return client 

上面主要是創建ConsumerGroup部分,細心的讀者應該看到了,我們這里使用的是自動提交,說好的使用手動提交呢?這是因為我們這個kafka庫的特性不同,這個自動提交需要與MarkMessage()方法配合使用才會提交(有疑問的朋友可以實踐一下,或者看一下源碼),否則也會提交失敗,因為我們在寫消費邏輯時要這樣寫:

  1. func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 
  2.  for msg := range claim.Messages() { 
  3.   var data common.KafkaMsg 
  4.   if err := json.Unmarshal(msg.Value, &data); err != nil { 
  5.    return errors.New("failed to unmarshal message err is " + err.Error()) 
  6.   } 
  7.   // 操作數據,改用打印 
  8.   log.Print("consumerClaim data is "
  9.  
  10.   // 處理消息成功后標記為處理, 然后會自動提交 
  11.   session.MarkMessage(msg,""
  12.  } 
  13.  return nil 

或者直接使用手動提交方法來解決,只需兩步:

第一步:關閉自動提交:

  1. consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自動提交,改為手動 

第二步:消費邏輯中添加如下代碼,手動提交模式下,也需要先進行標記,在進行commit

  1. session.MarkMessage(msg,""
  2. session.Commit() 

完整代碼可以到github上下載并進行驗證!

總結

本文我們主要說明了兩個知識點:

Kafka會產生消息丟失

使用Go操作Kafka如何配置可以不丟失數據

 

日常業務開發中,很多公司都喜歡拿消息隊列進行解耦,那么你就要注意了,使用Kafka做消息隊列無法保證數據不丟失,需要我們自己手動配置補償,別忘記了,要不又是一場P0事故。

 

責任編輯:武曉燕 來源: Golang夢工廠
相關推薦

2024-06-18 08:26:22

2024-08-06 09:55:25

2021-08-04 07:47:18

Kafka消息框架

2021-03-08 10:19:59

MQ消息磁盤

2021-10-22 08:37:13

消息不丟失rocketmq消息隊列

2022-08-26 05:24:04

中間件技術Kafka

2019-03-13 09:27:57

宕機Kafka數據

2025-07-21 09:02:45

2023-11-27 17:29:43

Kafka全局順序性

2024-11-11 07:05:00

Redis哨兵模式主從復制

2024-02-26 08:10:00

Redis數據數據庫

2025-11-11 09:05:09

2022-03-31 08:26:44

RocketMQ消息排查

2023-09-13 08:14:57

RocketMQ次數機制

2025-01-06 00:00:01

KratosGo微服務

2024-01-16 08:24:59

消息隊列KafkaRocketMQ

2022-07-11 08:01:55

Kafka服務器宕機

2024-05-09 08:04:23

RabbitMQ消息可靠性

2020-10-14 08:36:10

RabbitMQ消息

2024-12-18 07:43:49

點贊
收藏

51CTO技術棧公眾號

亚洲欧洲一区二区| 国产精品69久久| 无套内谢丰满少妇中文字幕| 黄色免费在线看| 国产福利一区二区三区在线视频| 欧美激情视频在线观看| 一区二区三区免费在线观看视频| av在线日韩| 亚洲精品国产a| 欧美日韩一区二| 国产又黄又粗又长| 亚洲精品孕妇| 日韩在线中文视频| www.88av| 精品999日本久久久影院| 午夜久久久久久| 亚洲精品一区二区三区av| 亚洲精品久久久久久无码色欲四季| 免费视频一区二区三区在线观看| www.美女亚洲精品| 日本黄色片在线播放| 青青伊人久久| 色婷婷av久久久久久久| 成人性生活视频免费看| 欧美三级黄网| 国产亚洲制服色| 成人在线视频网址| 国产又粗又黄视频| 日日摸夜夜添夜夜添亚洲女人| 久久夜精品香蕉| 中字幕一区二区三区乱码| 成人18夜夜网深夜福利网| 欧美日韩不卡在线| 亚洲 中文字幕 日韩 无码| a级大胆欧美人体大胆666| 国产精品免费视频网站| 欧美亚洲丝袜| 五月婷婷在线观看视频| 国产成人高清在线| 91久久久久久国产精品| 中文字幕精品在线观看| 久久久久看片| 欧美有码在线观看视频| 日本在线视频免费| 亚洲视频综合| 久久精视频免费在线久久完整在线看| 国产视频三区四区| 久久99国内| 亚洲乱码av中文一区二区| 精品人妻伦一二三区久| 草草视频在线一区二区| 欧美成人精品福利| 中文字幕无人区二| 日韩成人在线看| 日韩欧美一区中文| 日本黄色www| 国产精品麻豆| 欧美一区二区三区在线视频| www.污污视频| 懂色av色香蕉一区二区蜜桃| 欧美高清性hdvideosex| 波多野结衣国产精品| 亚洲欧美在线综合| 欧美一区二区不卡视频| 在线观看一区二区三区视频| 中文无码日韩欧| 亚洲国产日韩精品在线| 国产精品一级黄片| 亚洲精品亚洲人成在线| 亚洲视频在线免费看| 国产传媒国产传媒| 99久久激情| 色综合久久88色综合天天看泰| 久久国产露脸精品国产| 影音先锋亚洲电影| 欧美一级淫片丝袜脚交| 特级西西444www高清大视频| 九色porny丨国产精品| 成人h视频在线| 国产高中女学生第一次| 成人美女视频在线观看| 欧美精品一区二区视频| av网站无病毒在线| 亚洲婷婷综合久久一本伊一区| 国产激情片在线观看| 国产乱码午夜在线视频| 欧美一a一片一级一片| 亚洲男人天堂2021| 亚洲电影影音先锋| 精品福利在线看| xxav国产精品美女主播| 中文字幕网站在线观看| 四虎成人av| 欧美高清在线观看| 中文字幕亚洲精品在线| 石原莉奈在线亚洲三区| 成人福利在线观看| 少妇一区二区三区四区| 国产午夜一区二区三区| 一级黄色片播放| 在线女人免费视频| 欧美日韩免费高清一区色橹橹 | 欧美1区3d| 91精品国产高清久久久久久| 6—12呦国产精品| 99亚偷拍自图区亚洲| 午夜精品亚洲一区二区三区嫩草 | 亚洲aⅴ怡春院| av视屏在线播放| 澳门久久精品| 久久精品中文字幕一区| 黄瓜视频在线免费观看| 国产成人av电影在线观看| 日本在线视频一区| 超碰97免费在线| 欧美久久久久久久久中文字幕| 成人免费看片载| 色综合天天综合网中文字幕| 琪琪第一精品导航| 成人免费公开视频| 综合久久一区二区三区| av网址在线观看免费| 精品五月天堂| 欧美激情网友自拍| 国产精品国产一区二区三区四区| 久久久久9999亚洲精品| 日日摸日日碰夜夜爽无码| 99亚洲男女激情在线观看| 亚洲欧美成人一区二区在线电影| 久久这里只有精品国产| 国产又黄又大久久| 亚洲国产欧洲综合997久久| 涩涩视频网站在线观看| 亚洲激情久久久| 精品一区二区三区人妻| 国产精品一二三四五| 亚洲视频在线观看日本a| 日韩欧美另类一区二区| 精品无码久久久久久国产| 国产精品99无码一区二区| 国精产品一区一区三区mba桃花 | 欧美综合在线视频| 国产精品高清无码在线观看| 在线综合视频| 久久99精品久久久久久水蜜桃| 欧洲在线视频| 精品久久国产老人久久综合| 欧美黄色免费看| 国产**成人网毛片九色 | 日本理论片午伦夜理片在线观看| 91麻豆精品国产| 国产色无码精品视频国产| 久久综合综合久久综合| 二级片在线观看| 精品国产亚洲一区二区三区| 欧美成人免费小视频| 午夜精品久久久久久久91蜜桃| 亚洲综合在线第一页| 国产清纯白嫩初高中在线观看性色| 欧美日韩国产探花| 国产欧美日韩综合一区在线观看 | 五月天丁香社区| 亚洲精品专区| 欧美久久电影| 成人国产网站| 欧美成人免费在线观看| 亚洲h视频在线观看| 亚洲一区二区三区激情| 国产精品300页| 免费在线看一区| 国产大尺度在线观看| 日韩欧美中文在线观看| 高清欧美性猛交xxxx| 欧美挠脚心网站| 欧美日韩一级大片网址| 无码黑人精品一区二区| 粉嫩av一区二区三区在线播放| 欧美成人三级在线视频| 国产精品嫩模av在线| 国产在线精品一区免费香蕉| 青青青国内视频在线观看软件| 亚洲福利视频免费观看| 少妇又紧又色又爽又刺激视频| 亚洲美女视频在线观看| 香港三日本8a三级少妇三级99| 美女久久一区| 特色特色大片在线| 日韩深夜影院| 成人精品久久一区二区三区| 乱人伦视频在线| www.国产精品一二区| 天天摸夜夜添狠狠添婷婷| 欧美在线观看禁18| 国产第一页在线播放| 国产视频一区二区在线| aaa黄色大片| 久久99精品国产麻豆婷婷| 日本欧美视频在线观看| 97精品97| 日本一区二区三区视频在线观看| 色妞ww精品视频7777| 国产99在线|中文| 色呦呦在线视频| 日韩在线视频二区| 欧美黄色小说| 精品毛片乱码1区2区3区| 欧美一级黄视频| 欧美日韩免费在线观看| 日本a级片视频| 欧美极品aⅴ影院| 亚洲精品女人久久久| 国产美女av一区二区三区| 在线观看的毛片| 国产欧美91| 国产片侵犯亲女视频播放| 日韩在线理论| 任我爽在线视频精品一| 久久亚洲道色| 成人在线看片| 一区二区三区高清在线观看| 国产日韩综合一区二区性色av| 在线中文字幕播放| 久久欧美在线电影| 97影院秋霞午夜在线观看| 在线性视频日韩欧美| 青青草免费在线| 亚洲第一天堂无码专区| 精品人妻无码一区二区| 在线不卡中文字幕播放| 在线观看毛片网站| 91国产福利在线| 亚洲综合久久网| 亚洲成人在线观看视频| 久久精品国产亚洲av无码娇色| 亚洲女性喷水在线观看一区| fc2ppv在线播放| 国产精品美女久久福利网站| 欧美人与性囗牲恔配| 国产日产精品1区| 性欧美13一14内谢| 久久蜜桃一区二区| 中文字幕丰满孑伦无码专区| 99久久亚洲一区二区三区青草| 黑人玩弄人妻一区二区三区| 成人综合婷婷国产精品久久| 激情小说欧美色图| www.久久久久久久久| 亚洲国产精品第一页| 成人99免费视频| 天天躁日日躁狠狠躁免费麻豆| 福利电影一区二区三区| 国产一级免费片| 99久久久国产精品免费蜜臀| 色综合久久五月| 91欧美激情一区二区三区成人| 最近日本中文字幕| 国产午夜精品一区二区三区视频 | 丁香六月天婷婷| 亚洲第一天堂无码专区| 日韩国产福利| 中文字幕亚洲一区二区三区五十路| 在线看的av网站| 欧美成人精品在线视频| f2c人成在线观看免费视频| 欧美性在线视频| 少妇精品视频一区二区免费看| 成人激情视频免费在线| 2020国产精品极品色在线观看| 国产欧美韩日| 国产99亚洲| 亚洲永久激情精品| 欧美国产激情| 欧美激情国产精品日韩| 美女视频第一区二区三区免费观看网站| 中文字幕亚洲欧洲| 成人aaaa免费全部观看| 日本欧美一区二区三区不卡视频| 亚洲欧美日韩国产成人精品影院| 国产一级特黄视频| 色婷婷国产精品久久包臀| 国产孕妇孕交大片孕| 精品国产乱码久久久久久浪潮 | 国产精品国产三级国产aⅴ中文 | 91久久线看在观草草青青| 国产又黄又大又爽| 日韩电影中文字幕在线观看| av在线免费观看网| 欧美疯狂做受xxxx高潮| 久久sese| av资源站久久亚洲| 欧美一区2区| 成人免费观看在线| 青娱乐精品在线视频| 美女伦理水蜜桃4| 国产精品国产三级国产普通话三级| 精品在线视频免费| 欧美日韩一级视频| 神马精品久久| 久久精品电影网站| 625成人欧美午夜电影| 91精品国产91久久久久青草| 国产精品美女久久久久久不卡| 欧美 亚洲 视频| 日产欧产美韩系列久久99| 日本久久久久久久久久| 国产精品狼人久久影院观看方式| 亚洲国产精品午夜在线观看| 538prom精品视频线放| 毛片在线能看| 国语自产精品视频在免费| 91久久青草| 性刺激综合网| 国产精品乱看| 屁屁影院国产第一页| 亚洲欧美日韩国产综合| 免费看av在线| 亚洲人av在线影院| 在线观看网站免费入口在线观看国内 | 成人av资源网站| 日本一二三区在线观看| 欧美日韩中文字幕精品| 精品欧美不卡一区二区在线观看| 欧美黄色性视频| 国产视频一区二| 一区二区三区四区欧美| 日韩高清不卡一区二区| 久久久久久久久免费看无码 | 天堂av免费在线观看| 亚洲国产精品字幕| 国产美女一区视频| 99视频网站| 午夜精品婷婷| jizz欧美性11| 国产精品久久久久久久久晋中 | 日韩精品久久久| 亚洲一区二区成人| 国产不卡一二三| 亚洲高清视频在线| 东京干手机福利视频| 欧美福利视频网站| 亚洲一区二区三区四区电影 | 日韩午夜精品电影| 在线观看免费视频你懂的| 99精品欧美一区二区三区| 欧美精品aa| 国产xxx在线观看 | 国产精品乱码一区二区视频| 亚洲乱码国产乱码精品精| 国偷自产一区二区免费视频| 欧美一区二区三区在线播放 | 欧美日韩中文在线观看| 日韩欧美电影在线观看| 日韩av色在线| 青青草国产成人a∨下载安卓| 三级视频中文字幕| 中文字幕一区二区三区蜜月 | 国产精品一区二区久激情瑜伽 | 超碰97在线资源| 最新亚洲视频| 亚洲第一香蕉网| 欧美性生活影院| 麻豆传媒免费在线观看| 91久色国产| 99精品视频免费观看| 精品成人av一区二区三区| 欧美午夜精品免费| av毛片在线免费| 国产一区在线免费观看| 久久性色av| 裸体武打性艳史| 亚洲国产精品免费| 国产精品99精品一区二区三区∴| 在线无限看免费粉色视频| 国产成人在线色| 久久国产视频一区| 久久久国产一区二区| 国产精品久久久久av蜜臀 | 国产一区二区精品在线观看| 国产午夜福利一区二区| 亚洲性日韩精品一区二区| 亚州欧美在线| 欧美又粗又长又爽做受| 国产日韩欧美精品电影三级在线| 国产熟女一区二区三区五月婷| 韩国三级电影久久久久久| 精品国产一区二区三区四区| 久久精品无码一区二区三区毛片| 欧美日韩国产综合新一区| 国产高清一区在线观看| 99re国产视频| 日本视频在线一区| 国产午夜小视频| 色偷偷888欧美精品久久久| 国产ts一区| 中文字幕亚洲影院| 日本韩国视频一区二区| a级片免费在线观看| 一区二区三区四区五区视频 | 国产又粗又长免费视频| 亚洲国产精品99久久|