精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

萬字長文講透 RocketMQ 的消費邏輯

開發 項目管理
名字服務是是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態注冊與發現。

RocketMQ 是筆者非常喜歡的消息隊列,4.9.X 版本是目前使用最廣泛的版本,但它的消費邏輯相對較重,很多同學學習起來沒有頭緒。

這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啟發。

圖片

一、架構概覽

在展開集群消費邏輯細節前,我們先對 RocketMQ 4.9.X 架構做一個概覽。

圖片

整體架構中包含四種角色 :

1、NameServer

名字服務是是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態注冊與發現。

2、BrokerServer

Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證 。

3、Producer

消息發布的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。

4、Consumer

消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。

RocketMQ 集群工作流程:

1、啟動 NameServer,NameServer 起來后監聽端口,等待 Broker、Producer 、Consumer 連上來,相當于一個路由控制中心。

2、Broker 啟動,跟所有的 NameServer 保持長連接,定時發送心跳包。心跳包中包含當前 Broker信息( IP+端口等 )以及存儲所有 Topic 信息。注冊成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關系。

3、收發消息前,先創建 Topic,創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。

4、Producer 發送消息,啟動時先跟 NameServer 集群中的其中一臺建立長連接,并從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的 Broker 建立長連接從而向 Broker 發消息。

5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費消息。

二、發布訂閱

RocketMQ 的傳輸模型是:發布訂閱模型 。

發布訂閱模型具有如下特點:

  • 消費獨立相比隊列模型的匿名消費方式,發布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間相互獨立不會相互影響。
  • 一對多通信基于獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發布訂閱模型可以實現一對多通信。

RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broadcasting )。

集群消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。

圖片

廣播消費:當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。

圖片

為了實現這種發布訂閱模型 , RocketMQ 精心設計了它的存儲模型。先進入 Broker 的文件存儲目錄。

圖片

RocketMQ 采用的是混合型的存儲結構。

1、Broker 單個實例下所有的隊列共用一個數據文件(commitlog)來存儲

生產者發送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產者發送的消息就不會丟失。

單個文件大小默認 1G , 文件名長度為 20 位,左邊補零,剩余為起始偏移量,比如 00000000000000000000 代表了第一個文件,起始偏移量為 0 ,文件大小為1 G = 1073741824 。

圖片

commitlog 目錄

這種設計有兩個優點:

  • 充分利用順序寫,大大提升寫入數據的吞吐量;
  • 快讀定位消息。因為消息是一條一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據消息的物理偏移量通過二分查找,定位消息位于那個文件中,并獲取到消息實體數據。

2、Broker 端的后臺服務線程會不停地分發請求并異步構建 consumequeue(消費文件)和 indexfile(索引文件)

進入索引文件存儲目錄 :

圖片

1、消費文件按照主題存儲,每個主題下有不同的隊列,圖中主題 my-mac-topic 有 16 個隊列 (0 到 15) ;

2、每個隊列目錄下 ,存儲 consumequeue 文件,每個 consumequeue 文件也是順序寫入,數據格式見下圖。

圖片

每個 consumequeue 文件包含 30 萬個條目,每個條目大小是 20 個字節,每個文件的大小是 30 萬 * 20 = 60萬字節,每個文件大小約 5.72M 。

和 commitlog 文件類似,consumequeue 文件的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位于哪一個文件里。

消費文件按照主題-隊列來保存 ,這種方式特別適配發布訂閱模型。

消費者從 Broker 獲取訂閱消息數據時,不用遍歷整個 commitlog 文件,只需要根據邏輯偏移量從 consumequeue 文件查詢消息偏移量 ,  最后通過定位到 commitlog 文件, 獲取真正的消息數據。

要實現發布訂閱模型,還需要一個重要文件:消費進度文件。原因有兩點:

  • 不同消費組之間相互獨立,不會相互影響 ;
  • 消費者下次拉取數據時,需要知道從哪個進度開始拉取 ,就像我們小時候玩單機游戲存盤一樣。

因此消費進度文件需要保存消費組所訂閱主題的消費進度。

我們瀏覽下集群消費場景下的 Broker 端的消費進度文件 consumerOffset.json 。

圖片

圖片

在進度文件 consumerOffset.json 里,數據以 key-value 的結構存儲,key 表示:主題@消費者組 , value 是 consumequeue 中每個隊列對應的邏輯偏移量 。

寫到這里,我們粗糙模擬下 RocketMQ 存儲模型如何滿足發布訂閱模型(集群模式) 。

圖片

1、發送消息:生產者發送消息到 Broker ;

2、保存消息:Broker 將消息存儲到 commitlog 文件 ,異步線程會構建消費文件 consumequeue ;

3、消費流程:消費者啟動后,會通過負載均衡分配對應的隊列,然后向 Broker 發送拉取消息請求。Broker 收到消費者拉取請求之后,根據訂閱組,消費者編號,主題,隊列名,邏輯偏移量等參數 ,從該主題下的 consumequeue 文件查詢消息消費條目,然后從 commitlog 文件中獲取消息實體。消費者在收到消息數據之后,執行消費監聽器,消費完消息;

4、保存進度:消費者將消費進度提交到 Broker ,Broker 會將該消費組的消費進度存儲在進度文件里。

三、消費流程

我們重點講解下集群消費的消費流程 ,因為集群消費是使用最普遍的消費模式,理解了集群消費,廣播消費也就能順理成章的掌握了。

圖片

集群消費示例代碼里,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監聽器,最后調用 start 方法啟動。

消費者啟動后,我們可以將整個流程簡化成:

圖片

四、負載均衡

消費端的負載均衡是指將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者,負載均衡是客戶端開始消費的起點。

RocketMQ 負載均衡的核心設計理念是

  • 消費隊列在同一時間只允許被同一消費組內的一個消費者消費
  • 一個消費者能同時消費多個消息隊列

負載均衡是每個客戶端獨立進行計算,那么何時觸發呢 ?

  • 消費端啟動時,立即進行負載均衡;
  • 消費端定時任務每隔 20 秒觸發負載均衡;
  • 消費者上下線,Broker 端通知消費者觸發負載均衡。

負載均衡流程如下:

1、發送心跳

消費者啟動后,它就會通過定時任務不斷地向 RocketMQ 集群中的所有 Broker 實例發送心跳包(消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端實例編號等信息)。

Broker 端在收到消費者的心跳消息后,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量 channelInfoTable 中,為之后做消費端的負載均衡提供可以依據的元數據信息。

2、啟動負載均衡服務

負載均衡服務會根據消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:

(1) 獲取該主題下的消息消費隊列集合;

(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;

(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列;

平均分配算法

這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應該分配到的記錄。

(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作。

消費者實例內 ,processQueueTable 對象存儲著當前負載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。

  1. 標紅的 Entry 部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。
  2. 綠色的 Entry 部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。
  3. 黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,為每個分配的新隊列創建一個消息拉取請求  pullRequest  ,  在消息拉取請求中保存一個處理隊列 processQueue (隊列消費快照),內部是紅黑樹(TreeMap),用來保存拉取到的消息。

最后創建拉取消息請求列表,并將請求分發到消息拉取服務,進入拉取消息環節。

五、長輪詢

在負載均衡這一小節,我們已經知道負載均衡觸發了拉取消息的流程。

消費者啟動的時候,會創建一個拉取消息服務 PullMessageService ,它是一個單線程的服務。

核心流程如下:

1、負載均衡服務將消息拉取請求放入到拉取請求隊列 pullRequestQueue , 拉取消息服務從隊列中獲取拉取消息請求 ;

2、拉取消息服務向 Brorker 服務發送拉取請求 ,拉取請求的通訊模式是異步回調模式 ;

消費者的拉取消息服務本身就是一個單線程,使用異步回調模式,發送拉取消息請求到 Broker 后,拉取消息線程并不會阻塞 ,可以繼續處理隊列 pullRequestQueue 中的其他拉取任務。

3、Broker 收到消費者拉取消息請求后,從存儲中查詢出消息數據,然后返回給消費者;

4、消費者的網絡通訊層會執行拉取回調函數相關邏輯,首先會將消息數據存儲在隊列消費快照 processQueue 里;

消費快照使用紅黑樹 msgTreeMap 存儲拉取服務拉取到的消息 。

5、回調函數將消費請求提交到消息消費服務 ,而消息消費服務會異步的消費這些消息;

6、回調函數會將處理中隊列的拉取請放入到定時任務中;

7、定時任務再次將消息拉取請求放入到隊列 pullRequestQueue 中,形成了閉環:負載均衡后的隊列總會有任務執行拉取消息請求,不會中斷。

細心的同學肯定有疑問:既然消費端是拉取消息,為什么是長輪詢呢 ?

雖然拉模式的主動權在消費者這一側,但是缺點很明顯。

因為消費者并不知曉 Broker 端什么時候有新的消息 ,所以會不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導致消息延遲。

所以要想消費消息的延遲低,服務端的推送必不可少。

下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。

核心流程如下:

1、Broker 端接收到消費者的拉取消息請求后,拉取消息處理器開始處理請求,根據拉取請求查詢消息存儲 ;

2、從消息存儲中獲取消息數據 ,若存在新消息 ,則將消息數據通過網絡返回給消費者。若無新消息,則將拉取請求放入到拉取請求表 pullRequestTable 。

3、長輪詢請求管理服務 pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取消息請求的隊列是否有新的消息。

判定標準是:拉取消息請求的偏移量是否小于當前消費隊列最大偏移量,如果條件成立則說明有新消息了。

若存在新的消息 ,  長輪詢請求管理服務會觸發拉取消息處理器重新處理該拉取消息請求。

4、當 commitlog 中新增了新的消息,消息分發服務會構建消費文件和索引文件,并且會通知長輪詢請求管理服務,觸發拉取消息處理器重新處理該拉取消息請求。

六、消費消息

在拉取消息的流程里, Broker 端返回消息數據,消費者的通訊框架層會執行回調函數。

回調線程會將數據存儲在隊列消費快照 processQueue(內部使用紅黑樹 msgTreeMap)里,然后將消息提交到消費消息服務,消費消息服務會異步消費這些消息。

消息消費服務有兩種類型:并發消費服務和順序消費服務 。

6.1 并發消費

并發消費是指消費者將并發消費消息,消費的時候可能是無序的。

消費消息并發服務啟動后,會初始化三個組件:消費線程池、清理過期消息定時任務、處理失敗消息定時任務。

核心流程如下:

0、通訊框架回調線程會將數據存儲在消費快照里,然后將消息列表 msgList 提交到消費消息服務

1、 消息列表 msgList 組裝成消費對象

2、將消費對象提交到消費線程池

我們看到10 條消息被組裝成三個消費請求對象,不同的消費線程會執行不同的消費請求對象。

3、消費線程執行消息監聽器

執行完消費監聽器,會返回消費結果。

4、處理異常消息

當消費異常時,異常消息將重新發回 Broker 端的重試隊列( RocketMQ 會為每個 topic 創建一個重試隊列,以 %RETRY% 開頭),達到重試時間后將消息投遞到重試隊列中進行消費重試。

我們將在重試機制這一節重點講解 RocketMQ 如何實現延遲消費功能 。

假如異常的消息發送到 Broker 端失敗,則重新將這些失敗消息通過處理失敗消息定時任務重新提交到消息消費服務。

5、更新本地消費進度

消費者消費一批消息完成之后,需要保存消費進度到進度管理器的本地內存。

首先我們會從隊列消費快照 processQueue 中移除消息,返回消費快照 msgTreeMap 第一個偏移量 ,然后調用消費消息進度管理器 offsetStore 更新消費進度。

待更新的偏移量是如何計算的呢?

  • 場景1:快照中1001(消息1)到1010(消息10)消費了,快照中沒有了消息,返回已消費的消息最大偏移量 + 1 也就是1011。
  • 場景2:快照中1001(消息1)到1008(消息8)消費了,快照中只剩下兩條消息了,返回最小的偏移量 1009。
  • 場景3:1001(消息1)在消費對象中因為某種原因一直沒有被消費,即使后面的消息1005-1010都消費完成了,返回的最小偏移量是1001。

在場景3,RocketMQ 為了保證消息肯定被消費成功,消費進度只能維持在1001(消息1),直到1001也被消費完,本地的消費進度才會一下子更新到1011。

假設1001(消息1)還沒有消費完成,消費者實例突然退出(機器斷電,或者被 kill ),就存在重復消費的風險。

因為隊列的消費進度還是維持在1001,當隊列重新被分配給新的消費者實例的時候,新的實例從 Broker 上拿到的消費進度還是維持在1001,這時候就會又從1001開始消費,1001-1010這批消息實際上已經被消費過還是會投遞一次。

所以業務必須要保證消息消費的冪等性。

寫到這里,我們會有一個疑問:假設1001(消息1)因為加鎖或者消費監聽器邏輯非常耗時,導致極長時間沒有消費完成,那么消費進度就會一直卡住 ,怎么解決呢 ?

RocketMQ 提供兩種方式一起配合解決:

  • 拉取服務根據并發消費間隔配置限流拉取消息服務在拉取消息時候,會判斷當前隊列的 processQueue 消費快照里消息的最大偏移量 - 消息的最小偏移量大于消費并發間隔(2000)的時候 , 就會觸發流控 ,  這樣就可以避免消費者無限循環的拉取新的消息。
  • 清理過期消息消費消息并發服務啟動后,會定期掃描所有消費的消息,若當前時間減去開始消費的時間大于消費超時時間,首先會將過期消息發送 sendMessageBack 命令發送到 Broker ,然后從快照中刪除該消息。

6.2 順序消費

順序消息是指對于一個指定的 Topic ,消息嚴格按照先進先出(FIFO)的原則進行消息發布和消費,即先發布的消息先消費,后發布的消息后消費。

順序消息分為分區順序消息和全局順序消息。

1、分區順序消息

對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區塊分區,同一個分區內的消息按照嚴格的先進先出(FIFO)原則進行發布和消費。同一分區內的消息保證順序,不同分區之間的消息順序不做要求。

  • 適用場景:適用于性能要求高,以 Sharding Key 作為分區字段,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行消息發布和消費的場景。
  • 示例:電商的訂單創建,以訂單 ID 作為 Sharding Key ,那么同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發布的先后順序來消費。

2、全局順序消息

對于指定的一個 Topic ,所有消息按照嚴格的先入先出(FIFO)的順序來發布和消費。

  • 適用場景:適用于性能要求不高,所有的消息嚴格按照 FIFO 原則來發布和消費的場景。
  • 示例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式發布和消費全局順序消息。

全局順序消息實際上是一種特殊的分區順序消息,即 Topic 中只有一個分區,因此全局順序和分區順序的實現原理相同。

因為分區順序消息有多個分區,所以分區順序消息比全局順序消息的并發度和性能更高。

消息的順序需要由兩個階段保證:

  • 消息發送如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產生的順序,業務上要求同一訂單的消息保持順序,例如訂單 A 的消息發送和消費都按照 A1、A2、A3 的順序。如果是普通消息,訂單A 的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。下圖是生產者發送順序消息的封裝,原理是發送消息時,實現 MessageQueueSelector 接口, 根據 Sharding Key 使用 Hash 取模法來選擇待發送的隊列。生產者順序發送消息封裝
  • 消息消費消費者消費消息時,需要保證單線程消費每個隊列的消息數據,從而實現消費順序和發布順序的一致。

順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,并發消費和順序消費并沒有什么大的差別。

最大的差別在于:順序消費會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔20秒會重新嘗試。

順序消費核心流程如下:

1、 組裝成消費對象

2、 將請求對象提交到消費線程池

和并發消費不同的是,這里的消費請求包含消費快照 processQueue ,消息隊列 messageQueue 兩個對象,并不對消息列表做任何處理。

3、 消費線程內,對消費隊列加鎖

順序消費也是通過線程池消費的,synchronized 鎖用來保證同一時刻對于同一個隊列只有一個線程去消費它

4、 從消費快照中取得待消費的消息列表

消費快照 processQueue 對象里,創建了一個紅黑樹對象 consumingMsgOrderlyTreeMap 用于臨時存儲的待消費的消息。

5、 執行消息監聽器

消費快照的消費鎖 consumeLock 的作用是:防止負載均衡線程把當前消費的 MessageQueue 對象移除掉。

6、 處理消費結果

消費成功時,首先計算需要提交的偏移量,然后更新本地消費進度。

消費失敗時,分兩種場景:

  • 假如已消費次數小于最大重試次數,則將對象 consumingMsgOrderlyTreeMap 中臨時存儲待消費的消息,重新加入到消費快照紅黑樹 msgTreeMap 中,然后使用定時任務嘗試重新消費。
  • 假如已消費次數大于等于最大重試次數,則將失敗消息發送到 Broker ,Broker 接收到消息后,會加入到死信隊列里 , 最后計算需要提交的偏移量,然后更新本地消費進度。

我們做一個關于順序消費的總結 :

  1. 順序消費需要由兩個階段消息發送和消息消費協同配合,底層支撐依靠的是 RocketMQ 的存儲模型;
  2. 順序消費服務啟動后,隊列的數據都會被消費者實例單線程的執行消費;
  3. 假如消費者擴容,消費者重啟,或者 Broker 宕機 ,順序消費也會有一定幾率較短時間內亂序,所以消費者的業務邏輯還是要保障冪等。

七、保存進度

RocketMQ 消費者消費完一批數據后, 會將隊列的進度保存在本地內存,但還需要將隊列的消費進度持久化。

1、 集群模式

圖片

集群模式下,分兩種場景:

  • 拉取消息服務會在拉取消息時,攜帶該隊列的消費進度,提交給 Broker 的拉取消息處理器。
  • 消費者定時任務,每隔5秒將本地緩存中的消費進度提交到 Broker 的消費者管理處理器。

Broker 的這兩個處理器都調用消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務異步將消費進度持久化到消費進度文件 consumerOffset.json 中。

圖片

2、 廣播模式

廣播模式消費進度存儲在消費者本地,定時任務每隔 5 秒通過 LocalFileOffsetStore 持久化到本地文件offsets.json ,數據格式為 MessageQueue:Offset 。

圖片

廣播模式下,消費進度和消費組沒有關系,本地文件 offsets.json 存儲在配置的目錄,文件中包含訂閱主題中所有的隊列以及隊列的消費進度。

八、重試機制

集群消費下,重試機制的本質是 RocketMQ 的延遲消息功能。

消費消息失敗后,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。

Broker 端會為每個 topic 創建一個重試隊列 ,隊列名稱是:%RETRY% + 消費者組名 ,達到重試時間后將消息投遞到重試隊列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數后消息還沒有成功被消費,則消息將被投遞至死信隊列。

第幾次重試

與上次重試的間隔時間

第幾次重試

與上次重試的間隔時間

1

10 秒

9

7 分鐘

2

30 秒

10

8 分鐘

3

1 分鐘

11

9 分鐘

4

2 分鐘

12

10 分鐘

5

3 分鐘

13

20 分鐘

6

4 分鐘

14

30 分鐘

7

5 分鐘

15

1 小時

8

6 分鐘

16

2 小時

圖片

開源 RocketMQ 4.X 支持延遲消息,默認支持18 個 level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項確定的,如下:

圖片

Broker 在啟動時,內部會創建一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲 level 的個數,創建對應數量的隊列,也就是說18個 level 對應了18個隊列。

我們先梳理下延遲消息的實現機制。

1、生產者發送延遲消息

Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

2、Broker端存儲延遲消息

延遲消息在 RocketMQ Broker 端的流轉如下圖所示:

圖片

第一步:修改消息 Topic 名稱和隊列信息

Broker 端接收到生產者的寫入消息請求后,首先都會將消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會根據消息中的 Topic 信息和隊列信息,將其轉發到目標 Topic 的指定隊列 consumequeue 中。

但由于消息一旦存儲到 consumequeue 中,消費者就能消費到,而延遲消息不能被立即消費,所以 RocketMQ 將 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,并根據延遲級別確定要投遞到哪個隊列下。

同時,還會將消息原來要發送到的目標 Topic 和隊列信息存儲到消息的屬性中。

圖片

第二步:構建 consumequeue 文件時,計算并存儲投遞時間

圖片

圖片

上圖是 consumequeue 文件一條消息的格式,最后 8 個字節存儲 Tag 的哈希值,此時存儲消息的投遞時間。

第三步:定時調度服務啟動

ScheduleMessageService 類是一個定時調度服務,讀取 SCHEDULE_TOPIC_XXXX 隊列的消息,并將消息投遞到目標 Topic 中。

定時調度服務啟動時,創建一個定時調度線程池 ,并根據延遲級別的個數,啟動對應數量的 HandlePutResultTask ,每個 HandlePutResultTask 負責一個延遲級別的消費與投遞。

圖片

第四步:投遞時間到了,將消息數據重新寫入到 commitlog

消息到期后,需要投遞到目標 Topic 。第一步已經記錄了原來的 Topic 和隊列信息,這里需要重新設置,再存儲到 commitlog 中。

第五步:將消息投遞到目標 Topic 中

Broker 端的后臺服務線程會不停地分發請求并異步構建 consumequeue(消費文件)和 indexfile(索引文件)。因此消息會直接投遞到目標 Topic 的 consumequeue 中,之后消費者就可以消費到這條消息。

回顧了延遲消息的機制,消費消息失敗后,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。

Broker 端 SendMessageProcessor 處理器會調用 asyncConsumerSendMsgBack 方法。

圖片

首先判斷消息的當前重試次數是否大于等于最大重試次數,如果達到最大重試次數,或者配置的重試級別小于0,則重新創建 Topic ,規則是 %DLQ% + consumerGroup,后續處理消息發送到死信隊列。

正常的消息會進入 else 分支,對于首次重試的消息,默認的 delayLevel 是 0 ,RocketMQ 會將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時級別,消息消費重試首次,是延遲了第三個級別發起的重試,也就是距離首次發送 10s 后重試,其主題的默認規則是 %RETRY% + consumerGroup。

當延時級別設置完成,刷新消息的重試次數為當前次數加 1 ,Broker 端將該消息刷盤,邏輯如下:

圖片

延遲消息寫入到 commitlog 里 ,這里其實和延遲消息機制的第一步類似,后面按照延遲消息機制的流程執行即可(第二步到第六步)。

九、總結

下圖展示了集群模式下消費者并發消費流程 :

圖片

核心流程如下:

  1. 消費者啟動后,觸發負載均衡服務 ,負載均衡服務為消費者實例分配對應的隊列 ;
  2. 分配完隊列后,負載均衡服務會為每個分配的新隊列創建一個消息拉取請求  pullRequest  ,  拉取請求保存一個處理隊列 processQueue,內部是紅黑樹(TreeMap),用來保存拉取到的消息 ;
  3. 拉取消息服務單線程從拉取請求隊列  pullRequestQueue 中彈出拉取消息,執行拉取任務 ,拉取請求是異步回調模式,將拉取到的消息放入到處理隊列;
  4. 拉取請求在一次拉取消息完成之后會復用,重新被放入拉取請求隊列 pullRequestQueue 中 ;
  5. 拉取完成后,調用消費消息服務  consumeMessageService 的  submitConsumeRequest 方法 ,消費消息服務內部有一個消費線程池;
  6. 消費線程池的消費線程從消費任務隊列中獲取消費請求,執行消費監聽器  listener.consumeMessage ;
  7. 消費完成后,若消費成功,則更新偏移量 updateOffset,先更新到內存 offsetTable,定時上報到 Broker ;若消費失敗,則將失敗消費發送到 Broker 。
  8. Broker 端接收到請求后, 調用消費進度管理器的 commitOffset 方法修改內存的消費進度,定時刷盤到  consumerOffset.json。

RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:

  1. 客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實現完整的負載均衡邏輯,此外還需要實現拉消息、位點管理、消費失敗后將消息發回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
  2. 保證冪等非常重要。當客戶端升級或者下線時,或者 Broker 宕機,都要進行負載均衡操作,可能造成消息堆積,同時有一定幾率造成重復消費。

參考資料:

1、RocketMQ 4.9.4 Github 文檔

https://github.com/apache/rocketmq/tree/rocketmq-all-4.9.4/docs

2、RocketMQ 技術內幕

3、消息隊列核心知識點

https://mp.weixin.qq.com/s/v7_ih9X5mG3X4E4ecfgYXA

4、消息ACK機制及消費進度管理

https://zhuanlan.zhihu.com/p/25265380

責任編輯:武曉燕 來源: 勇哥java實戰分享
相關推薦

2021-08-26 05:02:50

分布式設計

2021-10-18 11:58:56

負載均衡虛擬機

2022-09-06 08:02:40

死鎖順序鎖輪詢鎖

2021-01-19 05:49:44

DNS協議

2022-09-14 09:01:55

shell可視化

2024-03-07 18:11:39

Golang采集鏈接

2020-07-15 08:57:40

HTTPSTCP協議

2020-11-16 10:47:14

FreeRTOS應用嵌入式

2022-07-19 16:03:14

KubernetesLinux

2020-07-09 07:54:35

ThreadPoolE線程池

2022-10-10 08:35:17

kafka工作機制消息發送

2024-05-10 12:59:58

PyTorch人工智能

2024-01-11 09:53:31

面試C++

2022-09-08 10:14:29

人臉識別算法

2024-01-05 08:30:26

自動駕駛算法

2022-07-15 16:31:49

Postman測試

2021-06-04 07:27:24

sourcemap前端技術

2022-04-25 10:56:33

前端優化性能

2022-02-15 18:45:35

Linux進程調度器

2024-09-09 05:00:00

RedisString數據庫
點贊
收藏

51CTO技術棧公眾號

超碰超碰超碰超碰| 美女流白浆视频| 98在线视频| 国内精品自线一区二区三区视频| 久久大大胆人体| 男人的天堂免费| 天天综合av| 国产精品久99| 国产一区二区三区免费不卡| www.com国产| 91久久夜色精品国产按摩| 日韩欧美国产成人一区二区| 国产免费一区二区三区视频| 日本在线观看视频| 99久久精品国产一区二区三区 | av3级在线| 久久精品视频一区二区三区| 亚洲精品欧美日韩| 亚洲va在线观看| 五月天久久网站| 亚洲男女性事视频| 在线免费看污网站| 爱啪啪综合导航| 日本一区二区久久| 精品国产乱码一区二区三区四区 | 日韩精品在线网站| 免费国产成人av| 国产污视频在线播放| ...中文天堂在线一区| 麻豆精品传媒视频| 超碰在线观看av| 麻豆久久一区二区| 欧洲精品毛片网站| 九九九在线视频| 欧美3p在线观看| 日韩风俗一区 二区| 国产一级二级av| 久久青草视频| 色婷婷久久久亚洲一区二区三区| 黄色激情在线视频| 好吊日视频在线观看| 中文字幕精品三区| 欧美另类一区| 香蕉视频免费网站| av软件在线观看| 欧美国产视频在线| 欧美日韩最好看的视频| 人妻妺妺窝人体色www聚色窝| 国产在线播精品第三| 国产精品一区=区| 久久国产乱子伦精品| 亚洲欧美日韩视频二区| 欧美丰满片xxx777| 青青草成人免费| 亚洲五月综合| 欧美成人高清视频| 国产1区2区3区4区| 欧美粗暴jizz性欧美20| 久久综合电影一区| 青春草免费视频| 国产综合激情| 久久久久久久久久久91| 免费在线观看av网址| 亚洲图片在线| 97精品久久久| 国产区一区二区三| 久久综合九色| 国产精品久久77777| 无码人妻丰满熟妇奶水区码| 日韩高清不卡在线| 国产裸体写真av一区二区| 在线视频1卡二卡三卡| 久久国产生活片100| 亚洲va欧美va国产综合久久| 国产成人精品av在线观| www.性欧美| 蜜桃传媒视频麻豆第一区免费观看| 日韩毛片在线一区二区毛片| 国产午夜精品在线观看| 一区二区三区我不卡| 在线中文字幕电影| 午夜精品久久久久久久久久| 黄色国产精品视频| 激情久久一区二区| 日韩欧美一区二区视频| 视频免费在线观看| 欧美理论电影大全| 久久深夜福利免费观看| 亚洲一区 视频| 久久精品午夜| 96国产粉嫩美女| 亚洲欧美日韩成人在线| 国产情人综合久久777777| 在线视频精品一区| 国产伦久视频在线观看| 欧美性69xxxx肥| 自拍偷拍一区二区三区四区| 一区二区三区自拍视频| 日韩精品免费一线在线观看| 国产精品麻豆免费版现看视频| 午夜性色一区二区三区免费视频| 日本亚洲欧洲色| 一本到在线视频| a亚洲天堂av| 中文字幕一区二区三区5566| ****av在线网毛片| 欧美在线观看视频一区二区三区 | 91浏览器在线视频| 综合视频在线观看| 自拍偷拍欧美视频| 日韩视频永久免费| 9.1片黄在线观看| 伊人成年综合电影网| 国产精品美女呻吟| www.国产高清| 久久精品999| 欧美lavv| av影视在线| 欧美色视频一区| 亚洲色图欧美日韩| 99欧美视频| 欧洲美女7788成人免费视频| 国产丰满美女做爰| 久久美女艺术照精彩视频福利播放| 亚洲区成人777777精品| 日韩av电影资源网| 亚洲国产精品久久91精品| 亚洲不卡在线播放| 免费精品视频在线| 欧美日韩大片一区二区三区| 成年网站在线视频网站| 日韩女优毛片在线| 亚洲女人久久久| 蜜臀99久久精品久久久久久软件| 九九九九精品| gogo久久| 亚洲成人av在线| 九九热视频精品| 国产福利一区二区三区视频在线 | 成人福利网站| 欧美视频一区在线| 免费福利视频网站| 久久综合婷婷| 日本成人三级| 26uuu亚洲电影| 亚洲欧美成人一区二区在线电影| 日韩av在线电影| 成人免费视频网站在线观看| 国产日韩欧美大片| 成人亚洲精品| 欧美老少做受xxxx高潮| 亚洲一区二区天堂| 欧美高清在线一区二区| 蜜臀av免费观看| 色97色成人| 精品久久久91| 伊人22222| 国产精品色一区二区三区| 亚洲欧美激情网| 色婷婷亚洲mv天堂mv在影片| 国产久一一精品| 麻豆传媒在线观看| 日韩欧美一二三区| 成人免费看片98| 成人福利在线看| 久久免费视频3| 免费观看不卡av| 美女福利精品视频| 国产福利资源在线| 香蕉久久一区二区不卡无毒影院| 中文字幕人妻一区二区三区| 亚洲在线电影| 亚洲国产日韩综合一区| 国产精品1区| 国产69精品99久久久久久宅男| 天堂中文在线资源| 欧美午夜www高清视频| 久久久久久成人网| 国产伦精品一区二区三区视频青涩| 国产资源第一页| 久久人人爽人人爽人人片av不| 青青精品视频播放| 1769在线观看| 欧美成人激情免费网| 欧美三日本三级少妇99| 国产日韩在线不卡| 黄色片免费网址| 日韩视频不卡| 图片区小说区区亚洲五月| 亚洲精品三区| 性欧美视频videos6一9| se在线电影| 欧美成人精品福利| 波多野结衣视频在线看| 亚洲美女区一区| 性久久久久久久久久| 精品一区二区三区日韩| 97超碰在线人人| 欧美www视频在线观看| 国产精品国产亚洲精品看不卡15 | 色噜噜狠狠色综合欧洲selulu| 91香蕉国产视频| 成人毛片老司机大片| 亚洲精品高清无码视频| 欧美激情自拍| 日韩免费一区二区三区| 91蝌蚪精品视频| 国产精品久久久久久久天堂| 成人性生交大片免费看网站| 中文字幕亚洲欧美日韩2019| 三级在线观看网站| 91精品午夜视频| 午夜精品一区二| 亚洲最大的成人av| 日本在线一级片| 久久久青草青青国产亚洲免观| 红桃视频一区二区三区免费| 视频一区在线播放| 日韩人妻无码精品久久久不卡| 9999国产精品| 日本一区免费| 林ゆな中文字幕一区二区| 成人黄色av网站| 欧美日韩国产网站| 欧美一级电影久久| 亚洲精品白浆| 久久久国产精品亚洲一区| 噜噜噜噜噜在线视频| 亚洲变态欧美另类捆绑| 国产丰满美女做爰| 5月丁香婷婷综合| 中文在线观看av| 色av成人天堂桃色av| 久久久久久久久久影院| 亚洲高清免费观看高清完整版在线观看| 国产农村妇女精品一区| 国产偷国产偷精品高清尤物| 人妻少妇精品视频一区二区三区| 成人丝袜高跟foot| 女同性αv亚洲女同志| 国产一区亚洲一区| 中文字幕亚洲影院| 国内精品写真在线观看| 999精彩视频| 麻豆成人免费电影| 天天爽人人爽夜夜爽| 男人的j进女人的j一区| 国产超碰在线播放| 日本人妖一区二区| 欧美女同在线观看| 黄色小说综合网站| 无套内谢丰满少妇中文字幕| 韩国v欧美v日本v亚洲v| 婷婷激情小说网| 国产精品中文字幕日韩精品| 日韩精品aaa| 国产精品综合在线视频| 四虎国产精品免费| 国产 欧美在线| av黄色一级片| 93久久精品日日躁夜夜躁欧美| 国产精品1000部啪视频| 久久亚洲综合av| 亚洲天堂最新地址| 亚洲人成小说网站色在线| 黄色录像二级片| 一区二区三区日本| 亚洲国产成人精品激情在线| 色综合久久中文字幕综合网| 国内av在线播放| 欧美精品日韩一本| 精品人妻一区二区三区浪潮在线 | av资源中文在线天堂| 51久久精品夜色国产麻豆| av日韩电影| 国产精品亚洲综合天堂夜夜| 国产精品亚洲一区二区在线观看| www日韩av| 网红女主播少妇精品视频| 亚洲精品国产一区| 欧美成人高清| 50路60路老熟妇啪啪| 紧缚捆绑精品一区二区| www.美色吧.com| 久久综合久久99| 天天操夜夜操av| 午夜免费久久看| 制服丝袜在线一区| 欧美精品一区二区蜜臀亚洲| 国产区视频在线| 色综合久综合久久综合久鬼88 | 亚洲aⅴ男人的天堂在线观看 | 日韩国产网站| 亚洲自拍欧美另类| 偷拍自拍一区| www.黄色网址.com| 久久久精品日韩| 无套白嫩进入乌克兰美女| 91麻豆精东视频| 搜索黄色一级片| 欧美视频在线视频| 精品人妻aV中文字幕乱码色欲| 日韩电影中文字幕一区| caopo在线| 国产精品99一区| 国产精品x8x8一区二区| 午夜久久资源| 一本色道久久综合亚洲精品高清 | 欧美国产一区视频在线观看| 日干夜干天天干| 制服丝袜亚洲播放| 免费一级毛片在线观看| 欧美老肥婆性猛交视频| 韩国精品主播一区二区在线观看| 99久久精品无码一区二区毛片| 精品国产91| 免费毛片网站在线观看| 国产美女精品一区二区三区| 小早川怜子久久精品中文字幕| 亚洲综合精品自拍| 国产毛片一区二区三区va在线 | 亚洲妇熟xxxx妇色黄| 国产精品男人的天堂| 五月国产精品| 999在线观看视频| 国产成人丝袜美腿| 极品魔鬼身材女神啪啪精品| 欧美午夜片在线观看| 日韩精品视频无播放器在线看 | 日本高清不卡码| 欧美精品一区二区三区久久久| 影音先锋在线播放| 成人网页在线免费观看| 黄色一级大片在线免费看国产一 | 污污视频在线免费看| 国产精品视区| 久久精品最新地址| 色欲av无码一区二区三区| av有声小说一区二区三区| 国产精品美日韩| 美女视频久久| 国产精品午夜福利| 黄色另类av| 日韩久久久久久久久久久久| 久久99精品久久久久婷婷| 欧美人与禽zoz0善交| 欧美午夜影院一区| av电影在线观看一区二区三区| 国产成人综合精品| 免费精品国产| 污污视频网站免费观看| 日本一区二区在线不卡| 亚洲视频在线观看免费视频| 中文字幕亚洲在线| 久久精品黄色| 国产欧美自拍视频| 成人一区二区视频| 国产成人在线观看网站| 日韩久久精品成人| 成人涩涩视频| 日本特级黄色大片| 丰满白嫩尤物一区二区| 黄网在线观看视频| 亚洲最新视频在线| 色成人综合网| 国产欧美精品aaaaaa片| 91在线观看一区二区| 成人免费视频国产免费| 日韩中文在线不卡| 亚洲91网站| 国产成人a亚洲精v品无码| 亚洲国产精品t66y| 99视频国产精品免费观看a | 超碰人人干人人| 欧美一区二区在线播放| 波多野结衣在线播放| 日韩福利二区| 狠狠色综合播放一区二区| 精品无码av在线| 亚洲欧洲激情在线| 欧美123区| 又大又硬又爽免费视频| 日本sm残虐另类| 欧美精品乱码视频一二专区| 亚洲国产高清福利视频| 黄色精品视频| 国产日产欧美一区二区| 91丨porny丨首页| 国产精品sm调教免费专区| 欧美成人全部免费| 女人抽搐喷水高潮国产精品| 99re国产视频| 一区二区视频欧美| 熟女丰满老熟女熟妇| 欧美精品在线观看播放| 三级网站视频在在线播放| 青娱乐国产91| 国模一区二区三区白浆| 日韩精品一区二区亚洲av| 社区色欧美激情 |