RocketMQ順序消息解析!
順序消息是消息隊列 RocketMQ 提供的一種高級消息類型。
對于一個指定的Topic,消息嚴格按照先進先出(FIFO)的原則進行消息發布和消費。
- 即先發送的消息先消費,后發送的消息后消費。
順序消息適用于對消息發送和消費順序有嚴格要求的情況。
應用場景
順序消息和普通消息的對比如下:
消息類型 | 消費順序 | 性能 | 適用場景 |
普通消息 | 無順序 | 高 | 適用于對吞吐量要求高,且對生產和消費順序無要求 |
順序消息 | 指定的 Topic 內的消息遵循先入先出(FIFO)規則 | 一般 | 吞吐量要求一般 但是要求特定的 |
訂單創建場景:
在一些電商系統中,同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息。
必須嚴格按照先后順序來進行生產或者消費,否則消費中傳遞訂單狀態會發生紊亂,影響業務的正常進行。
因此,該訂單的消息必須按照一定的順序在客戶端和消息隊列中進行生產和消費。
- 同時消息之間有先后的依賴關系,后一條消息需要依賴于前一條消息的處理結果。
順序消息分為全局有序和局部有序。
全局有序
可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費。
從而保證消息的全局有序,但是這種方式效率低,一般不使用。

局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID。
- 比如想保證一個訂單的相關消息有序,那么就使用訂單ID當做路由ID。
在發送消息的時候,通過訂單ID對消息隊列的個數取余,根據取余結果選擇消息隊列。
- 這樣同一個訂單的數據就可以保證發送到一個消息隊列中。
消費者端使用MessageListenerOrderly處理有序消息。
這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。
圖片
實現原理
消費者在啟動時會調用DefaultMQPushConsumerImpl的start方法。
圖片
在DefaultMQPushConsumerImpl的start方法中,對消息監聽器類型進行了判斷。
如果類型是MessageListenerOrderly表示要進行順序消費。
此時使用ConsumeMessageOrderlyService對ConsumeMessageService進行實例化。
- 然后調用它的start方法進行啟動。
圖片
加鎖定時任務
進入到ConsumeMessageOrderlyService的start方法中。
可以看到,如果是集群模式,會啟動一個定時加鎖的任務,周期性的對訂閱的消息隊列進行加鎖。
具體是通過調用RebalanceImpl的lockAll方法實現的。
圖片
為什么集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者。
而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行。
- 所以在廣播模式下不存在競爭關系,也就不需要對消息隊列進行加鎖。
而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中。
- 因此在集群模式下就要加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。
整個順序消費過程涉及了三把鎖,它們分別對應不同的情況。
向Broker申請的消息隊列鎖
集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行。
為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖。
如果加鎖成功,記錄到消息隊列對應的ProcessQueue中的locked變量中,它是boolean類型的。
public class ProcessQueue {
private volatile boolean locked = false;
}消費者處理拉取消息時的消息隊列鎖
消費者在處理拉取到的消息時,由于可以開啟多線程進行處理。
所以處理消息前通過MessageQueueLock中的mqLockTable獲取到了消息隊列對應的鎖。
鎖住要處理的消息隊列,這里加消息隊列鎖主要是處理多線程之間的競爭。
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();消息消費鎖
消費者在調用consumeMessage方法之前會加消費鎖。
主要是為了避免在消費消息時,由于負載均衡等原因,ProcessQueue被刪除。
public class ProcessQueue {
private final Lock consumeLock = new ReentrantLock();
}
圖片
順序消息缺陷
消費順序消息的并行度依賴于隊列的數量。
隊列熱點問題,個別隊列由于哈希不均導致消息過多,消費速度跟不上,產生消息堆積問題。
遇到消息失敗的消息,無法跳過,當前隊列消費暫停。
熱點問題,只能通過拆分MessageQueue和優化路由方法來盡量均衡的將消息分配到不同的MessageQueue。
消費并行度理論上不會有太大問題,因為MessageQueue的數量可以調整。
消費失敗的無法跳過是不可避免的。
因為跳過可能導致后續的數據處理都是錯誤的。
不過可以提供一些策略,由用戶根據錯誤類型來決定是否跳過,并且提供重試隊列之類的功能。
- 在跳過之后用戶可以在其他地方重新消費到這條消息。
資料分享:
參考:
丁威、周繼鋒《RocketMQ技術內幕》
https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage/


































