品 RocketMQ 源碼,學習并發編程三大神器
筆者是 RocketMQ 的忠實粉絲,在閱讀源碼的過程中,學習到了很多編程技巧。
這篇文章,筆者結合 RocketMQ 源碼,分享并發編程三大神器的相關知識點。

1 CountDownLatch 實現網絡同步請求
CountDownLatch 是一個同步工具類,用來協調多個線程之間的同步,它能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續執行。
下圖是 CountDownLatch 的核心方法:

我們可以認為它內置一個計數器,構造函數初始化計數值。每當線程執行 countDown 方法,計數器的值就會減一,當計數器的值為 0 時,表示所有的任務都執行完成,然后在 CountDownLatch 上等待的線程就可以恢復執行接下來的任務。
舉例,數據庫有100萬條數據需要處理,單線程執行比較慢,我們可以將任務分為5個批次,線程池按照每個批次執行,當5個批次整體執行完成后,打印出任務執行的時間 。
溫習完 CountDownLatch 的知識點,回到 RocketMQ 源碼。
筆者在沒有接觸網絡編程之前,一直很疑惑,網絡同步請求是如何實現的?
同步請求指:客戶端線程發起調用后,需要在指定的超時時間內,等到響應結果,才能完成本次調用。如果超時時間內沒有得到結果,那么會拋出超時異常。
RocketMQ 的同步發送消息接口見下圖:

追蹤源碼,真正發送請求的方法是通訊模塊的同步請求方法 invokeSyncImpl 。

整體流程:
發送消息線程 Netty channel 對象調用 writeAndFlush 方法后 ,它的本質是通過 Netty 的讀寫線程將數據包發送到內核 , 這個過程本身就是異步的;
ResponseFuture 類中內置一個 CountDownLatch 對象 ,responseFuture 對象調用 waitRepsone 方法,發送消息線程會阻塞 ;

客戶端收到響應命令后, 執行 processResponseCommand 方法,核心邏輯是執行 ResponseFuture 的 putResponse 方法。

該方法的本質就是填充響應對象,并調用 countDownLatch 的 countDown 方法 , 這樣發送消息線程就不再阻塞。
CountDownLatch 實現網絡同步請求是非常實用的技巧,在很多開源中間件里,比如 Metaq ,Xmemcached 都有類似的實現。
2 ReadWriteLock 名字服務路由管理
讀寫鎖是一把鎖分為兩部分:讀鎖和寫鎖,其中讀鎖允許多個線程同時獲得,而寫鎖則是互斥鎖。
它的規則是:讀讀不互斥,讀寫互斥,寫寫互斥,適用于讀多寫少的業務場景。
我們一般都使用 ReentrantReadWriteLock ,該類實現了 ReadWriteLock 。ReadWriteLock 接口也很簡單,其內部主要提供了兩個方法,分別返回讀鎖和寫鎖 。
讀寫鎖的使用方式如下所示:
創建 ReentrantReadWriteLock 對象 , 當使用 ReadWriteLock 的時候,并不是直接使用,而是獲得其內部的讀鎖和寫鎖,然后分別調用 lock / unlock 方法 ;
讀取共享數據 ;
寫入共享數據;
RocketMQ架構上主要分為四部分,如下圖所示 :

Producer :消息發布的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
Consumer :消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
BrokerServer :Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證。
NameServer :名字服務是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的zookeeper,支持Broker的動態注冊與發現。
NameServer 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。Broker 啟動之后會向所有 NameServer 定期(每 30s)發送心跳包(路由信息),NameServer 會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關信息,代表下線。
那么 NameServer 如何保存路由信息呢?

路由信息通過幾個 HashMap 來保存,當 Broker 向 Nameserver 發送心跳包(路由信息),Nameserver 需要對 HashMap 進行數據更新,但我們都知道 HashMap 并不是線程安全的,高并發場景下,容易出現 CPU 100% 問題,所以更新 HashMap 時需要加鎖,RocketMQ 使用了 JDK 的讀寫鎖 ReentrantReadWriteLock 。
更新路由信息,操作寫鎖

查詢主題信息,操作讀鎖

讀寫鎖適用于讀多寫少的場景,比如名字服務,配置服務等。
3 CompletableFuture 異步消息處理
RocketMQ 主從架構中,主節點與從節點之間數據同步/復制的方式有同步雙寫和異步復制兩種模式。
異步復制是指消息在主節點落盤成功后就告訴客戶端消息發送成功,無需等待消息從主節點復制到從節點,消息的復制由其他線程完成。
同步雙寫是指主節點將消息成功落盤后,需要等待從節點復制成功,再告訴客戶端消息發送成功。
同步雙寫模式是阻塞的,筆者按照 RocketMQ 4.6.1 源碼,整理出主節點處理一個發送消息的請求的時序圖。

整體流程:
生產者將消息發送到 Broker , Broker 接收到消息后,發送消息處理器 SendMessageProcessor 的執行線程池SendMessageExecutor 線程池來處理發送消息命令;
執行 ComitLog 的 putMessage 方法;
ComitLog 內部先執行 appendMessage 方法;
然后提交一個 GroupCommitRequest 到同步復制服務 HAService ,等待 HAService 通知 GroupCommitRequest 完成;
返回寫入結果并響應客戶端 。
我們可以看到:發送消息的執行線程需要等待消息復制從節點 , 并將消息返回給生產者才能開始處理下一個消息。
RocketMQ 4.6.1 源碼中,執行線程池的線程數量是 1 ,假如線程處理主從同步速度慢了,系統在這一瞬間無法處理新的發送消息請求,造成 CPU 資源無法被充分利用 , 同時系統的吞吐量也會降低。
那么優化同步雙寫呢 ?
從 RocketMQ 4.7 開始,RocketMQ 引入了 CompletableFuture 實現了異步消息處理 。
發送消息的執行線程不再等待消息復制到從節點后再處理新的請求,而是提前生成 CompletableFuture 并返回 ;
HAService 中的線程在復制成功后,調用 CompletableFuture 的 complete 方法,通知 remoting 模塊響應客戶端(線程池:PutMessageExecutor ) 。
我們分析下 RocketMQ 4.9.4 核心代碼:
Broker 接收到消息后,發送消息處理器 SendMessageProcessor 的執行線程池SendMessageExecutor 線程池來處理發送消息命令;
調用 SendMessageProcessor 的 asyncProcessRequest 方法;

調用 Commitlog 的 aysncPutMessage 方法寫入消息 ;

這段代碼中,當 commitLog 執行完 appendMessage 后, 需要執行刷盤任務和同步復制兩個任務。但這兩個任務并不是同步執行,而是異步的方式。
復制線程復制消息后,喚醒 future ;

組裝響應命令 ,并將響應命令返回給客戶端。
為了便于理解這一段消息發送處理過程的線程模型,筆者在 RocketMQ 源碼中做了幾處埋點,修改 Logback 的日志配置,發送一條普通的消息,觀察服務端日志。

從日志中,我們可以觀察到:
發送消息的執行線程(圖中紅色)在執行完創建刷盤 Future 和同步復制 future 之后,并沒有等待這兩個任務執行完成,而是在結束 asyncProcessRequest 方法后就可以處理發送消息請求了 ;
刷盤線程和復制線程執行完各自的任務后,喚醒 future,然后通過刷盤線程組裝存儲結果,最后通過 PutMessageExecutor 線程池(圖中黃色)將響應命令返回給客戶端。
筆者一直認為:異步是更細粒度的使用系統資源的一種方式,在異步消息處理的過程中,通過 CompletableFuture 這個神器,各個線程各司其職,優雅且高效的提升了 RocketMQ 的性能。





























