從RocketMQ的Broker源碼層面驗證一下這兩個點
本篇博客會從源碼層面,驗證在RocketMQ基礎概念剖析,并分析一下Producer的底層源碼中提到的結論,分別是:
- Broker在啟動時,會將自己注冊到所有的NameServer上
- Broker在啟動之后,會每隔30S向NameServer發送心跳
之前的文章中,我們知道了RocketMQ中的一些核心概念,例如Broker、NameServer、Topic和Tag等等。Producer從啟動到發送消息的整個過程,從源碼級別分析了Producer在發送消息到Broker的時候,是如何拿到Broker的數據的,如何從多個MessageQueue中選擇對應的Queue發送消息。
但是由于篇幅原因,文章開頭提到的兩個已知結論在上篇博客里并沒沒有對其進行驗證,這次就從源碼層面來驗證一下。
一開頭就看到Broker主從架構相關的源碼
在上篇博客中提到過,Broker為了保證自身的高可用,會采取一主一從的架構。即使Master Broker因為意外原因掛了,Slave Broker上還有一份完整的數據,Broker可以繼續提供服務。
isEnableDLegerCommitLog中提到的DLeger可以先不管,我們目前只需要知道其默認返回的結果是false。所以Broker首次啟動的時候,就會執行被If包裹住的邏輯。
RocketMQ本身是有主從架構的,但是功能不夠完善,如果Master Broker出現了故障,需要人工的將Slave Broker切換成Master。
就有點類似于手動的將一臺Redis設置成另一臺Redis的Slave節點,如果此時Redis的Master掛了,還需要手動的進行切換一樣。為了解決這個問題,Redis搞出了Sentinel,可以在發生故障的時候自動的實現故障轉移。所以RocketMQ在4.5版本之后推出的Dleger差不多也是這么個東西,除此之外,Dleger還可以實現多副本。
不使用Dleger時,主從數據如何進行同步
先給出結論,在RocketMQ的主從架構下,主從同步采取的是Slave主動拉取的方式。
如果當前執行注冊的Broker角色是Slave,那就會使用ScheduledExecutorService啟動一個周期性的定時任務,每隔10秒就會去Master同步一次,同步的數據包括Topic的相關配置、Consumer的消費偏移量、延遲消息的Offset、訂閱組的相關數據和配置。
ScheduledExecutorService的作用和原理下面會做簡單介紹。
首次啟動時強制進行Broker注冊
因為是首次啟動,所以參數forceRegister被直接設置成了true。
使用ScheduledExecutorService啟動定時任務
通過入口進來之后,Broker會啟動一個定時任務,周期性的去注冊。ScheduledExecutorService底層就是一個newSingleThreadScheduledExecutor,只有一個線程的線程池,其關鍵的參數corePoolSize值為1,然后按照指定的頻率周期性的執行某個任務。
ScheduledExecutorService主要的功能有兩個,分別是:
- ScheduledExecutorService 以固定的頻率執行任務
- ScheduledExecutorService 執行完之后,間隔制定的時間后再執行下一個任務
使用scheduleAtFixedRate實現心跳機制
此處我們使用的是scheduleAtFixedRate,如下圖。
至于執行的頻率,我們能夠配置的范圍最大不能超過一分鐘,也就是說這個范圍是在10-60秒之間,默認30秒執行一次,這也就驗證了每30秒,Broker會向NameServer發送一次心跳。
獲取執行頻率的這個判斷有點意思,甚至看起來有那么一絲絲簡潔,但是理解其具體可配置的時間范圍可能需要花點時間。在實際業務性代碼中,個人建議還是不要這么寫,業務中代碼的可讀性和可維護性我認為是需要放在首位的。
值得注意的是,此處啟動心跳,給了一個10秒的延遲,因為在不使用Dleger的情況下,在之前的邏輯中已經執行過一次注冊了。如果不做延遲,那么幾乎是同一個時間就會有兩次注冊操作,而這明顯是不符合預期的;同時forceRegister也從true變成了通過函數isForceRegister來進行獲取。
調用registerBrokerAll注冊
定時任務注冊完成之后,之后的每次觸發都會執行registerBrokerAll方法來執行注冊,你可能會有疑問,我當前不就是一個Broker嗎,怎么名字有個后綴All?那是因為NameServer會有多個,Broker啟動的時候會將自己注冊到所有的NameServer上去。當然,口說無憑,我們繼續看下去。
繼續往里走,如果當前滿足注冊條件,則會實際的執行注冊操作。那具體滿足什么條件呢?由變量forceRegister和一個needRegister方法來決定,forceRegister默認是true,所以當第一執行這個邏輯的時候是一定會執行注冊操作的。
通過對比數據版本判斷當前Broker是否需要進行注冊
感興趣的話,可以繼續跟隨文章了解一下,needRegister是根據什么來判斷是否需要注冊的。
首先,Broker一旦注冊到了NameServer之后,由于Producer不停的在寫入數據,Consumer也在不停的消費數據,Broker也可能因為故障導致MessageQueue等關鍵路由信息發生變動,NameServer中的數據和Broker中實際的數據就會不一致,如果不及時更新,Producer拉取到的路由數據就可能有誤。
所以每次定時任務觸發的時候會去對比NameServer和Broker的數據,如果發現數據版本不一致,Broker會重新進行注冊,將最新的數據更新到NameServer。說直白一點,就是做一個數據定時更新。以下紅框中的代碼就是數據對比的核心代碼。
當Broker和所有的NameServer節點一一完成數據對比之后,就會進行結果判定,但凡有一個節點數據不一致,都需要進行重新注冊,把最新的數據更新到NameServer,核心判斷邏輯同樣用紅框標出。
至此,其實我們就已經完成了 Broker在啟動的時候會向所有NameServer進行注冊 的驗證。但是由于后續仍然有值得關注發光點,我們繼續后續的源碼閱讀。
使用CountDownLatch獲取所有注冊異步任務的返回結果
除此之外,還值得注意的是在needRegister中,對于和多個NameServer的交互,RocketMQ是通過線程池異步實現的,同時使用了CountDownLatch來等待所有的請求結束,返回結果給主線程。
既然聊到了CountDownLatch,就順帶提一下。假設我們有5個互不依賴的計算任務,如果快速的計算出結果并返回呢?那當然是5個任務并發執行,這就需要通過新開線程實現,結果就無法一起返回了。
而CountDownLatch可以讓主線程等待,等待這5個計算任務全部結束之后,喚醒主線程再繼續后面的邏輯。這就是CountDownLatch的作用,如果平時只是單純的CRUD功能的話,可能連CountDownLatch是什么都做不知道,這也是為什么大廠面試會問這些問題,因為在大廠的復雜業務背景下,你必須要會使用它們。
指定需要注冊之后,接下來就是核心的注冊方法了,核心邏輯由registerBrokerAll來實現。Broker同樣會去每一個NameServer節點上注冊自己,并且為了提前執行的效率,同樣開線程采用了異步的方式。在獲取所有結果時,同樣的使用了CountDownLatch。
使用CopyOnWriteArrayList存儲注冊請求的返回
除此之外,用于保存注冊結果的列表,使用的是CopyOnWriteArrayList,被面試虐過的同學應該熟悉。我們知道此處開啟了多線程去不同的NameServer注冊,寫入注冊結果的時候,多線程對同一個列表進行寫入,會產生線程安全的問題。
而我們知道ArrayList是非線程安全的,這也是為什么此處要使用CopyOnWriteArrayList來保存注冊結果。為什么CopyOnWriteArrayList能夠保證線程安全?
這歸功于COW(Copy On Write),讀請求時共用同一個List,涉及到寫請求時,會復制出一個List,并在寫入數據的時候加入獨占鎖。比起直接對所有操作加鎖,讀寫鎖的形式分離了讀、寫請求,使其互不影響,只對寫請求加鎖,降低了加鎖的消耗,提升了整體操作的并發。
上面并發執行的注冊操作,具體做了哪些事情呢?先看代碼。
上面就是單個注冊的所有邏輯,可以看到在構建完請求之后,有一個oneway的判斷。
oneway值為false,表示單向通信,Broker不關心NameServer的返回,也不會觸發任何回調函數。接下來Broker就會把已經寫進request body的所有數據發送給NameServer。請求數據統一由一個叫TopicConfigSerializeWrapper的Wrapper給包裹住。
其可以看為兩部分:
- 存在該Broker節點上的所有Topic的數據
- 數據版本
然后帶著這些數據,Broker會同步的調用invokeSync發送請求給NameServe,并且在執行之后觸發實現特定功能的回調函數。
EOF
至此,我們完成了對開篇所提結論的驗證,同時也發現了RocketMQ的主從架構、Master和Slave同步數據的方式、心跳機制的實現等等,也基本從源碼中看完了Broker啟動的所有流程。看這些老哥寫的源碼還是挺有意思的,之后有時間隨緣再看看NameServer端相關的源碼吧。







































