小紅書一面:Kafka 是如何選擇 Leader 的?
Kafka作為一款優秀的分布式消息中間件,內部也存在一些選舉機制,這篇文章,我們將詳細地分析 Kafka如何實現選擇 Leader?

一、Kafka集群整體架構
Kafka集群是由多個 Kafka Broker通過連同一個 Zookeeper組成,整個架構可以抽象成下圖:

在 Kafka中,數據以 Topic的形式組織,每個主題又被劃分為多個分區(Partition),每個分區的數據在 Broker之間有多個副本(Replica),保證數據的高可用和持久性。
二、Controller的作用
Kafka Controller是一個特殊的 Broker實例,它負責 Kafka集群中的領導者選舉、分區的分配、以及在 Broker上下線期間重新分配 Leader和副本。Controller通過與 Zookeeper交互來感知集群狀態的變化,從而進行必要的領導者重新選舉。
三、選主的原理分析
1.Leader的概念
在 Kafka中,每個分區會有多個副本,其中只有一個副本是Leader,其他副本為Follower。Producer和 Consumer會向分區的Leader寫入或讀取數據,Follower從 Leader復制數據。這樣設計實現了高吞吐量的同時保證了數據的冗余。
2.選主過程
選主過程主要包括兩個方面:Controller選舉和分區Leader選舉。
(1) Controller選舉
在 Kafka啟動時,會注冊到 Zookeeper的/brokers/ids的路徑下,其中會有一個 Broker節點通過與 Zookeeper的交互被選舉為 Controller。具體而言,Brokers通過在 Zookeeper的/controller路徑嘗試創建一個臨時節點(ephemeral node)來競爭成為 Controller,選舉規則也很簡單,誰先注冊到 Zookeeper中的/controller節點,誰就是 Controller。
當當前Controller失效(如宕機或網絡問題)時,Zookeeper會刪除/controller節點,其他Broker會再次競爭,該過程保證了Controller的高可用。
(2) 分區Leader選舉
一旦一個 Broker成為 Controller,它會獲取所有分區的最新信息,并基于持久化在 Zookeeper的數據進行當前各分區 Leader的選舉。Controller使用 ISR(In-Sync Replica)列表,即當前與 Leader保持同步的所有Follower副本進行選主。默認情況下,ISR中第一個副本被選為新的 Leader。
比如上圖中 TopicA中的 Partition0號分區,選擇 broker0作為 Leader, 然后會將選擇的節點信息注冊到 Zookeeper的/brokers/topics路徑下,記錄誰是 Leader,有哪些服務器可用。
Kafka 在實現 Controller選舉方面采用了一種基于 Zookeeper的機制,這種機制充分利用了 Zookeeper的特性來確保集群的高可用性和一致性,接下來,我將深入解析這兩種選主的機制。
四、Controller選舉機制詳解
1. Zookeeper的Role
Zookeeper在Kafka中作為一個分布式協調服務,其負責維護集群的元數據信息,包括Kafka節點的活動狀態和每個分區的Replica信息。在Controller選舉過程中,Zookeeper充當著協調者的角色,利用其特有的臨時節點機制來實現一個分布式的鎖。
2. 臨時節點
在Zookeeper中,臨時節點(Ephemeral Znode)是一個重要的特性,這種節點在客戶端會話結束時自動被刪除。Kafka利用這一特性實現Controller的自動化選舉。
3. Controller選舉過程
Kafka的Controller選舉過程主要分為以下幾個步驟:
- 初始化: 當Kafka Broker啟動時,所有Broker都試圖成為Controller。每個Broker會進行一次自檢,初始化必要的Controller管理器和相關結構。
- 創建Zookeeper路徑: 每個Broker嘗試在Zookeeper的特定路徑(通常是/controller)下創建一個臨時節點。該節點的路徑即為Zookeeper中控制選舉的關鍵路徑。
- 競爭鎖: 因為臨時節點的特性,只有第一個成功創建的節點會存在于Zookeeper。因此,能創建成功的Broker就會成為當前集群的Controller。這相當于分布式鎖機制,誰獲取到鎖誰成為Controller。
- 故障處理與重新選舉: 如果當前的Controller(持有Zookeeper節點的Broker)崩潰或因網絡問題與Zookeeper斷開連接,Zookeeper會自動刪除該Broker創建的臨時節點。剩余的Broker會監聽這個節點的變化(通過Zookeeper的Watcher機制),當節點被刪除時,會重新發起競爭,確保能夠快速選出一個新的Controller。
4. 實現細節
從實現的角度來看,我們可以看看 Kafka的相關主要類和方法涉及的過程:
- Zookeeper客戶端初始化: 初始化時,Kafka的KafkaController類通過Zookeeper客戶端來與ZooKeeper服務建立連接,這是基礎。
- Controller路徑定義: 在Kafka源碼中,通常由ControllerZNodePaths.CONTROLLER_PATH常量定義Controller路徑。
在ControllerEventManager類中,核心的方法參與Zookeeper節點的創建與監聽:
public void onControllerFailover() {
try {
// 嘗試在Zookeeper創建臨時節點
zkClient.createEphemeralPathExpectConflictHandleZnode(
ControllerZNodePaths.CONTROLLER_PATH,
controllerString(),
onControllerFailover);
// 設置Controller監聽器
zkClient.subscribeDataChanges(ControllerZNodePaths.CONTROLLER_PATH, new ControllerChangeListener());
} catch (Exception e) {
// 異常處理
}
}在上述代碼段中,展示了當一個 Broker準備競選為Controller時,他會在Zookeeper的/controller路徑創建一個臨時節點,并設置對該節點變化的監聽器。
5. 監聽機制
每個Broker通過設置Watcher來監聽/controller節點的刪除事件。一旦現有Controller的連接丟失,所有的Broker都會收到這個事件通知。這個機制確保了在現有Controller失效時,能夠迅速選出新的Controller。
6. Leader和集群的穩定性
一旦新的Controller被選出,它就會獲取集群的元數據,并開始執行其職責,包括領導者重新選舉和分區管理等操作。為了確保集群狀態的一致性和穩定性,Controller必須在全面獲取并更新當前集群狀態后才能完全上線。
五、分區Leader選舉詳解
當然,Kafka中的分區Leader選舉是確保數據高可用性和一致性的關鍵機制之一。讓我們更詳細地探討一下這一過程,包括其觸發條件、具體步驟和相關代碼實現細節。
1.分區Leader選舉的觸發條件
分區Leader選舉主要在以下幾種情況下被觸發:
- Broker新增或宕機:當一個Broker加入集群或者從集群中失聯(掉線)時,需要重新分配分區的Leader。
- ISR(In-Sync Replica)變化:ISR列表中的Replica發生變化,比如某個Replica落后過多或恢復同步。
- Controller切換:如果當前的Controller失效,新Controller上線后需要重新確認并分配分區的Leader。
2.選舉的具體步驟
分區Leader選舉過程主要涉及以下幾步:
(1) 獲取分區信息
一旦選舉觸發,Controller需獲取每個分區的元數據信息,包括:
- 當前Leader。
- ISR列表(保持與Leader同步的副本集合)。
- 所有分區的Replica列表。
(2) 確定新Leader
Controller根據ISR列表來選擇新的Leader,通常選擇第一個Replica作為新的Leader,這樣保證選擇的是同步的且相對最新的副本。此外,Kafka允許通過配置參數自定義選舉策略,確保更靈活地處理特殊場景。
(3) 更新Zookeeper
選出新的Leader后,需將這個新的Leader信息更新到Zookeeper,這包括更新分區的Leader和ISR信息。此步驟確保其他Broker即使在Controller切換的情況下,也能從Zookeeper獲取到正確的分區Leader信息。
(4) 通知各Broker
更新完Zookeeper后,Controller通過向集群中其他Broker發送Leader和ISR更新信息,通知它們該分區的Leader已發生改變。這涉及使用Kafka的內部API向其他Broker推送集群狀態變更。
3.核心代碼分析
以下是分區Leader選舉過程中的一些核心代碼實現示例:
(1) 獲取ISR列表
public List<Integer> getIsrForPartition(Partition partition) {
// 獲取分區的ISR列表
return partition.getIsr();
}ISR列表的獲取是選舉過程中的基礎步驟,確保后續的Leader選舉從一致的數據集合中挑選。
(2) 選舉新Leader
下面的代碼展示了如何選擇 Leader,通過從 ISR中挑選第一個節點作為新 Leader,確保選擇的 Leader始終是最新同步過的一個。
public int selectNewLeader(/* some parameters */) {
List<Integer> isr = currentIsr(partition);
if (isr.isEmpty()) {
throw new IllegalStateException("ISR should not be empty");
}
// 默認選擇ISR列表中的第一個
int newLeader = isr.get(0);
// 更新新Leader信息到Zookeeper
zkClient.updateLeaderAndIsr(newLeader, isr);
return newLeader;
}(3) 更新到Zookeeper
public void updateLeaderAndISR(int newLeader, List<Integer> newIsrList, Partition partition) {
zkClient.setData(ControllerZNodePaths.getTopicPartitionStatePath(partition),
new LeaderAndIsrZNodeData(newLeader, newIsrList));
}這段代碼表示將新的 Leader和 ISR信息更新到Zookeeper,確保全局一致性。
總結
本文,我們分析了 Kafka的 Leader選舉機制原理,它通過巧妙利用 Zookeeper和 ISR列表,提升了 Kafka的可靠性和可用性,但是,因為重度依賴 Zookeeper,因此使得 Kafka也存在很多風險。作為程序員,了解 Kafka的機制,可以幫助我們更好地使用和運維 Kafka。

































