RocketMQ如何保證消息的可靠性投遞?
介紹
要想保證消息的可靠型投遞,無非保證如下3個階段的正常執行即可。
- 生產者將消息成功投遞到broker
- broker將投遞過程的消息持久化下來
- 消費者能從broker消費到消息
發送端消息重試
producer向broker發送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數可以設置,默認為2次
- DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
- // 同步發送設置重試次數為5次
- producer.setRetryTimesWhenSendFailed(5);
- // 異步發送設置重試次數為5次
- producer.setRetryTimesWhenSendAsyncFailed(5);
消息持久化
我們先來了解一下消息的存儲流程,這個知識對后面分析消費端消息重試非常重要。
和消息相關的文件有如下幾種
- CommitLog:存儲消息的元數據
- ConsumerQueue:存儲消息在CommitLog的索引
- IndexFile:可以通過Message Key,時間區間快速查找到消息
整個消息的存儲流程如下
- Producer將消息順序寫到CommitLog中
- 有一個線程根據消息的隊列信息,寫入到相關的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當前消費到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
- Consumer從ConsumerQueue的consumerOffset讀取到當前應該消費的消息在CommitLog中的偏移量,到CommitLog中找到對應的消息,消費成功后移動consumerOffset
刷盤機制
「異步刷盤」:消息被寫入內存的PAGECACHE,返回寫成功狀態,當內存里的消息量積累到一定程度時,統一觸發寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息
「同步刷盤」:消息寫入內存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執行完成后喚醒等待的線程,給應用返回消息寫成功的狀態。吞吐量低,但不會造成消息丟失
主從復制
如果一個broker有master和slave時,就需要將master上的消息復制到slave上,復制的方式有兩種
- 「同步復制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數據不丟失,但是同步復制會增加數據寫入延遲,降低吞吐量
- 「異步復制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當master出現故障后,有可能造成數據丟失
消費端消息重試
順序消息的重試
對于順序消息,當消費者消費消息失敗后,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應用會出現消息消費被阻塞的情況。所以一定要做好監控,避免阻塞現象的發生
「順序消息消費失敗后不會消費下一條消息而是不斷重試這條消息,應該是考慮到如果跨過這條消息消費后面的消息會對業務邏輯產生影響」
「順序消息暫時僅支持集群消費模式,不支持廣播消費模式」
無序消息的重試
對于無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
「無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息」
「消費時候后,重試的配置方式有如下三種」
- 返回Action.ReconsumeLater(推薦)
- 返回Null
- 拋出異常
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- //消息處理邏輯拋出異常,消息將重試。
- doConsumeMessage(message);
- //方式1:返回Action.ReconsumeLater,消息將重試。
- return Action.ReconsumeLater;
- //方式2:返回null,消息將重試。
- return null;
- //方式3:直接拋出異常,消息將重試。
- throw new RuntimeException("Consumer Message exception");
- }
- }
「消費失敗后,無需重試的配置方式」
集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- doConsumeMessage(message);
- } catch (Throwable e) {
- //捕獲消費邏輯中的所有異常,并返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- //消息處理正常,直接返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- }
「消息重試次數」
「RocketMQ默認允許每條消息最多重試16次,每次消費失敗發送一條延時消息到重試隊列,同一條消息失敗一次將延時等級提高一次,然后再放到重試隊列。重試16次后如果還沒有消費成功,則將消息放到死信隊列中。」
「注意:重試隊列和死信隊列都是按照Consumer Group劃分的」
重試隊列topic名字:%RETRY% + consumerGroup
死信隊列topic名字:%DLQ% + consumerGroup
「為什么重試隊列和死信隊列要按照Consumer Group來進行劃分?」
「因為在RocketMQ的時候使用一定要保持訂閱關系一致。即一個Consumer Group訂閱的topic和tag要完全一致,不然可能會導致消費邏輯混亂,消息丟失」
如下任意一種情況都表現為訂閱關系不一致
- 相同ConsumerGroup下的Consumer實例訂閱了不同的Topic。
- 相同ConsumerGroup下的Consumer實例訂閱了相同的Topic,但訂閱的Tag不一致。
我們可以通過控制臺查看各種類型的主題
消息每次重試的間隔時間如下
第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
| 第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
|---|---|---|---|
| 1 | 10 秒 | 9 | 7 分鐘 |
| 2 | 30 秒 | 10 | 8 分鐘 |
| 3 | 1 分鐘 | 11 | 9 分鐘 |
| 4 | 2 分鐘 | 12 | 10 分鐘 |
| 5 | 3 分鐘 | 13 | 20 分鐘 |
| 6 | 4 分鐘 | 14 | 30 分鐘 |
| 7 | 5 分鐘 | 15 | 1 小時 |
| 8 | 6 分鐘 | 16 | 2 小時 |
「前面說到RocketMQ的消息重試是通過往重試隊列發送定時消息來實現的。」 RocketMQ支持18個級別的定時延時,每個級別定時消息的延時時間如下。
- // MessageStoreConfig.java
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
消息重試只是把定時消息的前2個級別去掉,每次發送下一個級別的定時消息
我們可以設置消費端消息重試次數
- 最大重試次數小于等于16次,則重試時間間隔同上表描述。
- 最大重試次數大于16次,超過16次的重試時間間隔均為每次2小時。
- Properties properties = new Properties();
- // 配置對應Group ID的最大消息重試次數為20次,最大重試次數為字符串類型。
- properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
- Consumer consumer =ONSFactory.createConsumer(properties);
「那么重試隊列中的消息是如何被消費的?」
消息消費者在啟動的時候,會訂閱正常的topic和重試隊列的topic
定時消息的實現邏輯也比較簡單,可以歸納為如下幾步
1.發送延時消息
1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發到對應的consumeQueue中,則消息會被立馬消費)
1.2 將消息原來的topic,queueId放到消息擴展屬性中
1.3 將消息應該執行的時間放到tagsCode中
將消息順序寫到CommitLog中
將消息對應的信息分發到對應的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應18個延遲級別)
定時任務不斷判斷消息是否到達投遞時間,沒有到達則后續執行投遞
如果到達投遞時間,則從commitLog中拉取消息的內容,重新設置消息topic,queueId為原來的(原來的topic,queueId在消息擴展屬性中),然后將消息投遞到commitLog中,此時消息就會被分發到對應的隊列中,然后被消費。
本文轉載自微信公眾號「Java識堂」,可以通過以下二維碼關注。轉載本文請聯系Java識堂公眾號。


































