肝了很久的字節(jié)跳動(dòng)消息隊(duì)列面經(jīng),我不信你能看完!!
作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。自開源半年多以來(lái),已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時(shí)調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗(yàn)。為使更多童鞋受益,現(xiàn)給出開源框架地址:https://github.com/sunshinelyz/mykit-delay
寫在前面
又到了年底跳槽高峰季,很多小伙伴出去面試時(shí),不少面試官都會(huì)問(wèn)到消息隊(duì)列的問(wèn)題,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上卻沒(méi)有很好的表達(dá)出來(lái),究其根本原因,還是對(duì)相關(guān)的知識(shí)點(diǎn)理解的不夠透徹。今天,我們就一起來(lái)探討下這個(gè)話題。注:文章有點(diǎn)長(zhǎng),你說(shuō)你能一鼓作氣看完,我有點(diǎn)不信!!
文章已收錄到:
https://github.com/sunshinelyz/technology-binghe
https://gitee.com/binghe001/technology-binghe
什么是消息隊(duì)列?
消息隊(duì)列(Message Queue)是在消息的傳輸過(guò)程中保存消息的容器,是應(yīng)用間的通信方式。消息發(fā)送后可以立即返回,由消息系統(tǒng)保證消息的可靠傳輸,消息發(fā)布者只管把消息寫到隊(duì)列里面而不用考慮誰(shuí)需要消息,而消息的使用者也不需要知道誰(shuí)發(fā)布的消息,只管到消息隊(duì)列里面取,這樣生產(chǎn)和消費(fèi)便可以做到分離。
為什么要使用消息隊(duì)列?
優(yōu)點(diǎn):
- 異步處理:例如短信通知、終端狀態(tài)推送、App推送、用戶注冊(cè)等
- 數(shù)據(jù)同步:業(yè)務(wù)數(shù)據(jù)推送同步
- 重試補(bǔ)償:記賬失敗重試
- 系統(tǒng)解耦:通訊上下行、終端異常監(jiān)控、分布式事件中心
- 流量消峰:秒殺場(chǎng)景下的下單處理
- 發(fā)布訂閱:HSF的服務(wù)狀態(tài)變化通知、分布式事件中心
- 高并發(fā)緩沖:日志服務(wù)、監(jiān)控上報(bào)
使用消息隊(duì)列比較核心的作用就是:解耦、異步、削峰。
缺點(diǎn):
系統(tǒng)可用性降低 系統(tǒng)引入的外部依賴越多,越容易掛掉?如何保證消息隊(duì)列的高可用?
系統(tǒng)復(fù)雜度提高 怎么保證消息沒(méi)有重復(fù)消費(fèi)?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?
一致性問(wèn)題 A 系統(tǒng)處理完了直接返回成功了,人都以為你這個(gè)請(qǐng)求就成功了;但是問(wèn)題是,要是 BCD 三個(gè)系統(tǒng)那里,BD 兩個(gè)系統(tǒng)寫庫(kù)成功了,結(jié)果 C 系統(tǒng)寫庫(kù)失敗了,咋整?你這數(shù)據(jù)就不一致了。
以下主要討論的RabbitMQ和Kafka兩種消息隊(duì)列。
如何保證消息隊(duì)列的高可用?
RabbitMQ的高可用
RabbitMQ的高可用是基于主從(非分布式)做高可用性。RabbitMQ 有三種模式:?jiǎn)螜C(jī)模式(Demo級(jí)別)、普通集群模式(無(wú)高可用性)、鏡像集群模式(高可用性)。
- 普通集群模式
普通集群模式,意思就是在多臺(tái)機(jī)器上啟動(dòng)多個(gè) RabbitMQ 實(shí)例,每個(gè)機(jī)器啟動(dòng)一個(gè)。你創(chuàng)建的 queue,只會(huì)放在一個(gè) RabbitMQ 實(shí)例上,但是每個(gè)實(shí)例都同步 queue 的元數(shù)據(jù)(元數(shù)據(jù)可以認(rèn)為是 queue 的一些配置信息,通過(guò)元數(shù)據(jù),可以找到 queue 所在實(shí)例)。你消費(fèi)的時(shí)候,實(shí)際上如果連接到了另外一個(gè)實(shí)例,那么那個(gè)實(shí)例會(huì)從 queue 所在實(shí)例上拉取數(shù)據(jù)過(guò)來(lái)。
這種方式確實(shí)很麻煩,也不怎么好,沒(méi)做到所謂的分布式,就是個(gè)普通集群。因?yàn)檫@導(dǎo)致你要么消費(fèi)者每次隨機(jī)連接一個(gè)實(shí)例然后拉取數(shù)據(jù),要么固定連接那個(gè) queue 所在實(shí)例消費(fèi)數(shù)據(jù),前者有數(shù)據(jù)拉取的開銷,后者導(dǎo)致單實(shí)例性能瓶頸。
而且如果那個(gè)放 queue 的實(shí)例宕機(jī)了,會(huì)導(dǎo)致接下來(lái)其他實(shí)例就無(wú)法從那個(gè)實(shí)例拉取,如果你開啟了消息持久化,讓 RabbitMQ 落地存儲(chǔ)消息的話,消息不一定會(huì)丟,得等這個(gè)實(shí)例恢復(fù)了,然后才可以繼續(xù)從這個(gè) queue 拉取數(shù)據(jù)。
所以這個(gè)事兒就比較尷尬了,這就沒(méi)有什么所謂的高可用性,這方案主要是提高吞吐量的,就是說(shuō)讓集群中多個(gè)節(jié)點(diǎn)來(lái)服務(wù)某個(gè) queue 的讀寫操作。
- 鏡像集群模式
這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創(chuàng)建的 queue,無(wú)論元數(shù)據(jù)還是 queue 里的消息都會(huì)存在于多個(gè)實(shí)例上,就是說(shuō),每個(gè) RabbitMQ 節(jié)點(diǎn)都有這個(gè) queue 的一個(gè)完整鏡像,包含 queue 的全部數(shù)據(jù)的意思。然后每次你寫消息到 queue 的時(shí)候,都會(huì)自動(dòng)把消息同步到多個(gè)實(shí)例的 queue 上。
那么如何開啟這個(gè)鏡像集群模式呢?其實(shí)很簡(jiǎn)單,RabbitMQ 有很好的管理控制臺(tái),就是在后臺(tái)新增一個(gè)策略,這個(gè)策略是鏡像集群模式的策略,指定的時(shí)候是可以要求數(shù)據(jù)同步到所有節(jié)點(diǎn)的,也可以要求同步到指定數(shù)量的節(jié)點(diǎn),再次創(chuàng)建 queue 的時(shí)候,應(yīng)用這個(gè)策略,就會(huì)自動(dòng)將數(shù)據(jù)同步到其他的節(jié)點(diǎn)上去了。
這樣的話,好處在于,你任何一個(gè)機(jī)器宕機(jī)了,沒(méi)事兒,其它機(jī)器(節(jié)點(diǎn))還包含了這個(gè) queue 的完整數(shù)據(jù),別的 consumer 都可以到其它節(jié)點(diǎn)上去消費(fèi)數(shù)據(jù)。壞處在于,第一,這個(gè)性能開銷也太大了吧,消息需要同步到所有機(jī)器上,導(dǎo)致網(wǎng)絡(luò)帶寬壓力和消耗很重!第二,這么玩兒,不是分布式的,就沒(méi)有擴(kuò)展性可言了,如果某個(gè) queue 負(fù)載很重,你加機(jī)器,新增的機(jī)器也包含了這個(gè) queue 的所有數(shù)據(jù),并沒(méi)有辦法線性擴(kuò)展你的 queue。你想,如果這個(gè) queue 的數(shù)據(jù)量很大,大到這個(gè)機(jī)器上的容量無(wú)法容納了,此時(shí)該怎么辦呢?
Kafka的高可用
Kafka 一個(gè)最基本的架構(gòu)認(rèn)識(shí):由多個(gè) broker 組成,每個(gè) broker 是一個(gè)節(jié)點(diǎn);你創(chuàng)建一個(gè) topic,這個(gè) topic 可以劃分為多個(gè) partition,每個(gè) partition 可以存在于不同的 broker 上,每個(gè) partition 就放一部分?jǐn)?shù)據(jù)。
這就是天然的分布式消息隊(duì)列,就是說(shuō)一個(gè) topic 的數(shù)據(jù),是分散放在多個(gè)機(jī)器上的,每個(gè)機(jī)器就放一部分?jǐn)?shù)據(jù)。
實(shí)際上 RabbmitMQ 之類的,并不是分布式消息隊(duì)列,它就是傳統(tǒng)的消息隊(duì)列,只不過(guò)提供了一些集群、HA(High Availability, 高可用性) 的機(jī)制而已,因?yàn)闊o(wú)論怎么玩兒,RabbitMQ 一個(gè) queue 的數(shù)據(jù)都是放在一個(gè)節(jié)點(diǎn)里的,鏡像集群下,也是每個(gè)節(jié)點(diǎn)都放這個(gè) queue 的完整數(shù)據(jù)。
Kafka 0.8 以前,是沒(méi)有 HA 機(jī)制的,就是任何一個(gè) broker 宕機(jī)了,那個(gè) broker 上的 partition 就廢了,沒(méi)法寫也沒(méi)法讀,沒(méi)有什么高可用性可言。
比如說(shuō),我們假設(shè)創(chuàng)建了一個(gè) topic,指定其 partition 數(shù)量是 3 個(gè),分別在三臺(tái)機(jī)器上。但是,如果第二臺(tái)機(jī)器宕機(jī)了,會(huì)導(dǎo)致這個(gè) topic 的 1/3 的數(shù)據(jù)就丟了,因此這個(gè)是做不到高可用的。
Kafka 0.8 以后,提供了 HA 機(jī)制,就是 replica(復(fù)制品) 副本機(jī)制。每個(gè) partition 的數(shù)據(jù)都會(huì)同步到其它機(jī)器上,形成自己的多個(gè) replica 副本。所有 replica 會(huì)選舉一個(gè) leader 出來(lái),那么生產(chǎn)和消費(fèi)都跟這個(gè) leader 打交道,然后其他 replica 就是 follower。寫的時(shí)候,leader 會(huì)負(fù)責(zé)把數(shù)據(jù)同步到所有 follower 上去,讀的時(shí)候就直接讀 leader 上的數(shù)據(jù)即可。只能讀寫 leader?很簡(jiǎn)單,要是你可以隨意讀寫每個(gè) follower,那么就要 care 數(shù)據(jù)一致性的問(wèn)題,系統(tǒng)復(fù)雜度太高,很容易出問(wèn)題。Kafka 會(huì)均勻地將一個(gè) partition 的所有 replica 分布在不同的機(jī)器上,這樣才可以提高容錯(cuò)性。
這么搞,就有所謂的高可用性了,因?yàn)槿绻硞€(gè) broker 宕機(jī)了,沒(méi)事兒,那個(gè) broker上面的 partition 在其他機(jī)器上都有副本的。如果這個(gè)宕機(jī)的 broker 上面有某個(gè) partition 的 leader,那么此時(shí)會(huì)從 follower 中重新選舉一個(gè)新的 leader 出來(lái),大家繼續(xù)讀寫那個(gè)新的 leader 即可。這就有所謂的高可用性了。
寫數(shù)據(jù)的時(shí)候,生產(chǎn)者就寫 leader,然后 leader 將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動(dòng)從 leader 來(lái) pull 數(shù)據(jù)。一旦所有 follower 同步好數(shù)據(jù)了,就會(huì)發(fā)送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會(huì)返回寫成功的消息給生產(chǎn)者。(當(dāng)然,這只是其中一種模式,還可以適當(dāng)調(diào)整這個(gè)行為)
消費(fèi)的時(shí)候,只會(huì)從 leader 去讀,但是只有當(dāng)一個(gè)消息已經(jīng)被所有 follower 都同步成功返回 ack 的時(shí)候,這個(gè)消息才會(huì)被消費(fèi)者讀到。
如何保證消息不重復(fù)消費(fèi)(冪等性)?
首先,所有的消息隊(duì)列都會(huì)有這樣重復(fù)消費(fèi)的問(wèn)題,因?yàn)檫@是不MQ來(lái)保證,而是我們自己開發(fā)保證的,我們使用Kakfa來(lái)討論是如何實(shí)現(xiàn)的。
Kakfa有個(gè)offset的概念,就是每個(gè)消息寫進(jìn)去都會(huì)有一個(gè)offset值,代表消費(fèi)的序號(hào),然后consumer消費(fèi)了數(shù)據(jù)之后,默認(rèn)每隔一段時(shí)間會(huì)把自己消費(fèi)過(guò)的消息的offset值提交,表示我已經(jīng)消費(fèi)過(guò)了,下次要是我重啟啥的,就讓我從當(dāng)前提交的offset處來(lái)繼續(xù)消費(fèi)。
但是凡事總有意外,比如我們之前生產(chǎn)經(jīng)常遇到的,就是你有時(shí)候重啟系統(tǒng),看你怎么重啟了,如果碰到點(diǎn)著急的,直接 kill 進(jìn)程了,再重啟。這會(huì)導(dǎo)致 consumer 有些消息處理了,但是沒(méi)來(lái)得及提交 offset,尷尬了。重啟之后,少數(shù)消息會(huì)再次消費(fèi)一次。
其實(shí)重復(fù)消費(fèi)不可怕,可怕的是你沒(méi)考慮到重復(fù)消費(fèi)之后,怎么保證冪等性。
舉個(gè)例子吧。假設(shè)你有個(gè)系統(tǒng),消費(fèi)一條消息就往數(shù)據(jù)庫(kù)里插入一條數(shù)據(jù),要是你一個(gè)消息重復(fù)兩次,你不就插入了兩條,這數(shù)據(jù)不就錯(cuò)了?但是你要是消費(fèi)到第二次的時(shí)候,自己判斷一下是否已經(jīng)消費(fèi)過(guò)了,若是就直接扔了,這樣不就保留了一條數(shù)據(jù),從而保證了數(shù)據(jù)的正確性。一條數(shù)據(jù)重復(fù)出現(xiàn)兩次,數(shù)據(jù)庫(kù)里就只有一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。冪等性,通俗點(diǎn)說(shuō),就一個(gè)數(shù)據(jù),或者一個(gè)請(qǐng)求,給你重復(fù)來(lái)多次,你得確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò)。
所以第二個(gè)問(wèn)題來(lái)了,怎么保證消息隊(duì)列消費(fèi)的冪等性?
其實(shí)還是得結(jié)合業(yè)務(wù)來(lái)思考,我這里給幾個(gè)思路:
- 比如你拿個(gè)數(shù)據(jù)要寫庫(kù),你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒(méi)問(wèn)題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個(gè)場(chǎng)景,那做的稍微復(fù)雜一點(diǎn),你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時(shí)候,里面加一個(gè)全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費(fèi)到了之后,先根據(jù)這個(gè) id 去比如 Redis 里查一下,之前消費(fèi)過(guò)嗎?如果沒(méi)有消費(fèi)過(guò),你就處理,然后這個(gè) id 寫 Redis。如果消費(fèi)過(guò)了,那你就別處理了,保證別重復(fù)處理相同的消息即可。
- 比如基于數(shù)據(jù)庫(kù)的唯一鍵來(lái)保證重復(fù)數(shù)據(jù)不會(huì)重復(fù)插入多條。因?yàn)橛形ㄒ绘I約束了,重復(fù)數(shù)據(jù)插入只會(huì)報(bào)錯(cuò),不會(huì)導(dǎo)致數(shù)據(jù)庫(kù)中出現(xiàn)臟數(shù)據(jù)。
當(dāng)然,如何保證 MQ 的消費(fèi)是冪等性的,需要結(jié)合具體的業(yè)務(wù)來(lái)看。
如何保證消息的可靠傳輸(不丟失)?
這個(gè)是肯定的,MQ的基本原則就是數(shù)據(jù)不能多一條,也不能少一條,不能多其實(shí)就是我們前面重復(fù)消費(fèi)的問(wèn)題。不能少,就是數(shù)據(jù)不能丟,像計(jì)費(fèi),扣費(fèi)的一些信息,是肯定不能丟失的。
數(shù)據(jù)的丟失問(wèn)題,可能出現(xiàn)在生產(chǎn)者、MQ、消費(fèi)者中,咱們從 RabbitMQ 和 Kafka 分別來(lái)分析一下吧。
RabbitMQ如何保證消息的可靠
生產(chǎn)者丟數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)問(wèn)題啥的,都有可能。
此時(shí)可以選擇用 RabbitMQ 提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)channel.txSelect,然后發(fā)送消息,如果消息沒(méi)有成功被 RabbitMQ 接收到,那么生產(chǎn)者會(huì)收到異常報(bào)錯(cuò),此時(shí)就可以回滾事務(wù)channel.txRollback,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)channel.txCommit。
- // 開啟事務(wù)
- channel.txSelect
- try {
- // 這里發(fā)送消息
- } catch (Exception e) {
- channel.txRollback
- // 這里再次重發(fā)這條消息
- }
- // 提交事務(wù)
- channel.txCommit
但是問(wèn)題是,RabbitMQ 事務(wù)機(jī)制(同步)一搞,基本上吞吐量會(huì)下來(lái),因?yàn)樘男阅堋?/p>
所以一般來(lái)說(shuō),如果你要確保說(shuō)寫 RabbitMQ 的消息別丟,可以開啟 confirm 模式,在生產(chǎn)者那里設(shè)置開啟 confirm 模式之后,你每次寫的消息都會(huì)分配一個(gè)唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息,告訴你說(shuō)這個(gè)消息 ok 了。如果 RabbitMQ 沒(méi)能處理這個(gè)消息,會(huì)回調(diào)你的一個(gè) nack 接口,告訴你這個(gè)消息接收失敗,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài),如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào),那么你可以重發(fā)。
事務(wù)機(jī)制和 confirm 機(jī)制最大的不同在于,事務(wù)機(jī)制是同步的,你提交一個(gè)事務(wù)之后會(huì)阻塞在那兒,但是 confirm 機(jī)制是異步的,你發(fā)送個(gè)消息之后就可以發(fā)送下一個(gè)消息,然后那個(gè)消息 RabbitMQ 接收了之后會(huì)異步回調(diào)你的一個(gè)接口通知你這個(gè)消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用 confirm 機(jī)制的。
RabbitMQ丟數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù),這個(gè)你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會(huì)持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟。除非極其罕見(jiàn)的是,RabbitMQ 還沒(méi)持久化,自己就掛了,可能導(dǎo)致少量數(shù)據(jù)丟失,但是這個(gè)概率較小。
設(shè)置持久化有兩個(gè)步驟:
- 創(chuàng)建 queue 的時(shí)候?qū)⑵湓O(shè)置為持久化 這樣就可以保證 RabbitMQ 持久化 queue 的元數(shù)據(jù),但是它是不會(huì)持久化 queue 里的數(shù)據(jù)的。
- 第二個(gè)是發(fā)送消息的時(shí)候?qū)⑾⒌?deliveryMode 設(shè)置為 2 就是將消息設(shè)置為持久化的,此時(shí) RabbitMQ 就會(huì)將消息持久化到磁盤上去。
必須要同時(shí)設(shè)置這兩個(gè)持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會(huì)從磁盤上重啟恢復(fù) queue,恢復(fù)這個(gè) queue 里的數(shù)據(jù)。
注意,哪怕是你給 RabbitMQ 開啟了持久化機(jī)制,也有一種可能,就是這個(gè)消息寫到了 RabbitMQ 中,但是還沒(méi)來(lái)得及持久化到磁盤上,結(jié)果不巧,此時(shí) RabbitMQ 掛了,就會(huì)導(dǎo)致內(nèi)存里的一點(diǎn)點(diǎn)數(shù)據(jù)丟失。
所以,持久化可以跟生產(chǎn)者那邊的 confirm 機(jī)制配合起來(lái),只有消息被持久化到磁盤之后,才會(huì)通知生產(chǎn)者 ack 了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到 ack,你也是可以自己重發(fā)的。
消費(fèi)者丟數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù),主要是因?yàn)槟阆M(fèi)的時(shí)候,剛消費(fèi)到,還沒(méi)處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認(rèn)為你都消費(fèi)了,這數(shù)據(jù)就丟了。
這個(gè)時(shí)候得用 RabbitMQ 提供的 ack 機(jī)制,簡(jiǎn)單來(lái)說(shuō),就是你必須關(guān)閉 RabbitMQ 的自動(dòng)ack,可以通過(guò)一個(gè) api 來(lái)調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里ack 一把。這樣的話,如果你還沒(méi)處理完,不就沒(méi)有 ack 了?那 RabbitMQ 就認(rèn)為你還沒(méi)處理完,這個(gè)時(shí)候 RabbitMQ 會(huì)把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會(huì)丟的。
Kakfa如何保證消息的可靠
- 消費(fèi)者丟數(shù)據(jù)
唯一可能導(dǎo)致消費(fèi)者弄丟數(shù)據(jù)的情況,就是說(shuō),你消費(fèi)到了這個(gè)消息,然后消費(fèi)者那邊自動(dòng)提交了 offset,讓 Kafka 以為你已經(jīng)消費(fèi)好了這個(gè)消息,但其實(shí)你才剛準(zhǔn)備處理這個(gè)消息,你還沒(méi)處理,你自己就掛了,此時(shí)這條消息就丟咯。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會(huì)自動(dòng)提交 offset,那么只要關(guān)閉自動(dòng)提交 offset,在處理完之后自己手動(dòng)提交 offset,就可以保證數(shù)據(jù)不會(huì)丟。但是此時(shí)確實(shí)還是可能會(huì)有重復(fù)消費(fèi),比如你剛處理完,還沒(méi)提交 offset,結(jié)果自己掛了,此時(shí)肯定會(huì)重復(fù)消費(fèi)一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個(gè)問(wèn)題,就是說(shuō)我們的 Kafka 消費(fèi)者消費(fèi)到了數(shù)據(jù)之后是寫到一個(gè)內(nèi)存的 queue 里先緩沖一下,結(jié)果有的時(shí)候,你剛把消息寫入內(nèi)存 queue,然后消費(fèi)者會(huì)自動(dòng)提交 offset。然后此時(shí)我們重啟了系統(tǒng),就會(huì)導(dǎo)致內(nèi)存 queue 里還沒(méi)來(lái)得及處理的數(shù)據(jù)就丟失了。
- Kafka丟數(shù)據(jù)
這塊比較常見(jiàn)的一個(gè)場(chǎng)景,就是 Kafka 某個(gè) broker 宕機(jī),然后重新選舉 partition 的 leader。大家想想,要是此時(shí)其他的 follower 剛好還有些數(shù)據(jù)沒(méi)有同步,結(jié)果此時(shí) leader 掛了,然后選舉某個(gè) follower 成 leader 之后,不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
生產(chǎn)環(huán)境也遇到過(guò),我們也是,之前 Kafka 的 leader 機(jī)器宕機(jī)了,將 follower 切換為 leader 之后,就會(huì)發(fā)現(xiàn)說(shuō)這個(gè)數(shù)據(jù)就丟了。
所以此時(shí)一般是要求起碼設(shè)置如下 4 個(gè)參數(shù):
我們生產(chǎn)環(huán)境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障,進(jìn)行 leader 切換時(shí),數(shù)據(jù)不會(huì)丟失。
- 給 topic 設(shè)置 replication.factor 參數(shù):這個(gè)值必須大于 1,要求每個(gè) partition 必須有至少 2 個(gè)副本。
- 在 Kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù):這個(gè)值必須大于 1,這個(gè)是要求一個(gè) leader 至少感知到有至少一個(gè) follower 還跟自己保持聯(lián)系,沒(méi)掉隊(duì),這樣才能確保 leader 掛了還有一個(gè) follower 吧。
- 在 producer 端設(shè)置 acks=all:這個(gè)是要求每條數(shù)據(jù),必須是寫入所有 replica 之后,才能認(rèn)為是寫成功了。
- 在 producer 端設(shè)置 retries=MAX(很大很大很大的一個(gè)值,無(wú)限次重試的意思):這個(gè)是要求一旦寫入失敗,就無(wú)限重試,卡在這里了。
- 生產(chǎn)者丟數(shù)據(jù)
如果按照上述的思路設(shè)置了 acks=all,一定不會(huì)丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒(méi)滿足這個(gè)條件,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無(wú)限次。
如何保證消息的順序性?
我舉個(gè)例子,我們以前做過(guò)一個(gè) mysql binlog 同步的系統(tǒng),壓力還是非常大的,日同步數(shù)據(jù)要達(dá)到上億,就是說(shuō)數(shù)據(jù)從一個(gè) mysql 庫(kù)原封不動(dòng)地同步到另一個(gè) mysql 庫(kù)里面去(mysql -> mysql)。常見(jiàn)的一點(diǎn)在于說(shuō)比如大數(shù)據(jù) team,就需要同步一個(gè) mysql 庫(kù)過(guò)來(lái),對(duì)公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復(fù)雜的操作。
你在 mysql 里增刪改一條數(shù)據(jù),對(duì)應(yīng)出來(lái)了增刪改 3 條 binlog 日志,接著這三條 binlog發(fā)送到 MQ 里面,再消費(fèi)出來(lái)依次執(zhí)行,起碼得保證人家是按照順序來(lái)的吧?不然本來(lái)是:增加、修改、刪除;你楞是換了順序給執(zhí)行成刪除、修改、增加,不全錯(cuò)了么。
本來(lái)這個(gè)數(shù)據(jù)同步過(guò)來(lái),應(yīng)該最后這個(gè)數(shù)據(jù)被刪除了;結(jié)果你搞錯(cuò)了這個(gè)順序,最后這個(gè)數(shù)據(jù)保留下來(lái)了,數(shù)據(jù)同步就出錯(cuò)了。
先看看順序會(huì)錯(cuò)亂的倆場(chǎng)景:
- RabbitMQ:一個(gè) queue,多個(gè) consumer。比如,生產(chǎn)者向 RabbitMQ 里發(fā)送了三條數(shù)據(jù),順序依次是 data1/data2/data3,壓入的是 RabbitMQ 的一個(gè)內(nèi)存隊(duì)列。有三個(gè)消費(fèi)者分別從 MQ 中消費(fèi)這三條數(shù)據(jù)中的一條,結(jié)果消費(fèi)者2先執(zhí)行完操作,把 data2 存入數(shù)據(jù)庫(kù),然后是 data1/data3。這不明顯亂了。
- Kafka:比如說(shuō)我們建了一個(gè) topic,有三個(gè) partition。生產(chǎn)者在寫的時(shí)候,其實(shí)可以指定一個(gè) key,比如說(shuō)我們指定了某個(gè)訂單 id 作為 key,那么這個(gè)訂單相關(guān)的數(shù)據(jù),一定會(huì)被分發(fā)到同一個(gè) partition 中去,而且這個(gè) partition 中的數(shù)據(jù)一定是有順序的。消費(fèi)者從 partition 中取出來(lái)數(shù)據(jù)的時(shí)候,也一定是有順序的。到這里,順序還是 ok 的,沒(méi)有錯(cuò)亂。接著,我們?cè)谙M(fèi)者里可能會(huì)搞多個(gè)線程來(lái)并發(fā)處理消息。因?yàn)槿绻M(fèi)者是單線程消費(fèi)處理,而處理比較耗時(shí)的話,比如處理一條消息耗時(shí)幾十 ms,那么 1 秒鐘只能處理幾十條消息,這吞吐量太低了。而多個(gè)線程并發(fā)跑的話,順序可能就亂掉了。
RabbitMQ解決方案
拆分多個(gè) queue,每個(gè) queue 一個(gè) consumer,就是多一些 queue 而已,確實(shí)是麻煩點(diǎn);或者就一個(gè) queue 但是對(duì)應(yīng)一個(gè) consumer,然后這個(gè) consumer 內(nèi)部用內(nèi)存隊(duì)列做排隊(duì),然后分發(fā)給底層不同的 worker 來(lái)處理。
Kafka解決方案
- 一個(gè) topic,一個(gè) partition,一個(gè) consumer,內(nèi)部單線程消費(fèi),單線程吞吐量太低,一般不會(huì)用這個(gè)。
- 寫 N 個(gè)內(nèi)存 queue,具有相同 key 的數(shù)據(jù)都到同一個(gè)內(nèi)存 queue;然后對(duì)于 N 個(gè)線程,每個(gè)線程分別消費(fèi)一個(gè)內(nèi)存 queue 即可,這樣就能保證順序性。
如何處理消息推積?
大量消息在 mq 里積壓了幾個(gè)小時(shí)了還沒(méi)解決
一個(gè)消費(fèi)者一秒是 1000 條,一秒 3 個(gè)消費(fèi)者是 3000 條,一分鐘就是 18 萬(wàn)條。所以如果你積壓了幾百萬(wàn)到上千萬(wàn)的數(shù)據(jù),即使消費(fèi)者恢復(fù)了,也需要大概 1 小時(shí)的時(shí)間才能恢復(fù)過(guò)來(lái)。
一般這個(gè)時(shí)候,只能臨時(shí)緊急擴(kuò)容了,具體操作步驟和思路如下:
- 先修復(fù) consumer 的問(wèn)題,確保其恢復(fù)消費(fèi)速度,然后將現(xiàn)有 consumer 都停掉。
- 新建一個(gè) topic,partition 是原來(lái)的 10 倍,臨時(shí)建立好原先 10 倍的 queue 數(shù)量。
- 然后寫一個(gè)臨時(shí)的分發(fā)數(shù)據(jù)的 consumer 程序,這個(gè)程序部署上去消費(fèi)積壓的數(shù)據(jù),消費(fèi)之后不做耗時(shí)的處理,直接均勻輪詢寫入臨時(shí)建立好的 10 倍數(shù)量的 queue。
- 接著臨時(shí)征用 10 倍的機(jī)器來(lái)部署 consumer,每一批 consumer 消費(fèi)一個(gè)臨時(shí) queue 的數(shù)據(jù)。這種做法相當(dāng)于是臨時(shí)將 queue 資源和 consumer 資源擴(kuò)大 10 倍,以正常的 10 倍速度來(lái)消費(fèi)數(shù)據(jù)。
- 等快速消費(fèi)完積壓數(shù)據(jù)之后,得恢復(fù)原先部署的架構(gòu),重新用原先的 consumer 機(jī)器來(lái)消費(fèi)消息。
mq 中的消息過(guò)期失效了
假設(shè)你用的是 RabbitMQ,RabbtiMQ 是可以設(shè)置過(guò)期時(shí)間的,也就是 TTL。如果消息在 queue 中積壓超過(guò)一定的時(shí)間就會(huì)被 RabbitMQ 給清理掉,這個(gè)數(shù)據(jù)就沒(méi)了。那這就是第二個(gè)坑了。這就不是說(shuō)數(shù)據(jù)會(huì)大量積壓在 mq 里,而是大量的數(shù)據(jù)會(huì)直接搞丟。
這個(gè)情況下,就不是說(shuō)要增加 consumer 消費(fèi)積壓的消息,因?yàn)閷?shí)際上沒(méi)啥積壓,而是丟了大量的消息。我們可以采取一個(gè)方案,就是批量重導(dǎo),這個(gè)我們之前線上也有類似的場(chǎng)景干過(guò)。就是大量積壓的時(shí)候,我們當(dāng)時(shí)就直接丟棄數(shù)據(jù)了,然后等過(guò)了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點(diǎn)以后,用戶都睡覺(jué)了。這個(gè)時(shí)候我們就開始寫程序,將丟失的那批數(shù)據(jù),寫個(gè)臨時(shí)程序,一點(diǎn)一點(diǎn)的查出來(lái),然后重新灌入 mq 里面去,把白天丟的數(shù)據(jù)給他補(bǔ)回來(lái)。也只能是這樣了。
假設(shè) 1 萬(wàn)個(gè)訂單積壓在 mq 里面,沒(méi)有處理,其中 1000 個(gè)訂單都丟了,你只能手動(dòng)寫程序把那 1000 個(gè)訂單給查出來(lái),手動(dòng)發(fā)到 mq 里去再補(bǔ)一次。
mq 都快寫滿了
如果消息積壓在 mq 里,你很長(zhǎng)時(shí)間都沒(méi)有處理掉,此時(shí)導(dǎo)致 mq 都快寫滿了,咋辦?這個(gè)還有別的辦法嗎?沒(méi)有,誰(shuí)讓你第一個(gè)方案執(zhí)行的太慢了,你臨時(shí)寫程序,接入數(shù)據(jù)來(lái)消費(fèi),消費(fèi)一個(gè)丟棄一個(gè),都不要了,快速消費(fèi)掉所有的消息。然后走第二個(gè)方案,到了晚上再補(bǔ)數(shù)據(jù)吧。
參考資料:
- Kafa深度解析
- RabbitMQ源碼解析
本文轉(zhuǎn)載自微信公眾號(hào)「冰河技術(shù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系冰河技術(shù)公眾號(hào)。







































