Pulsar存儲計算分離架構設計之消息副本同步和故障轉移機制
一、前言
我們繼續來講Pulsar存儲計算分離架構設計系列,這篇我們來說說消息副本和故障轉移機制。
在分布式消息系統的高可用性架構中,消息副本機制與故障轉移能力是確保數據可靠性和服務連續性的核心支柱,基本上每款消息中間件都有消息副本機制與故障轉移能力,包括但不限于Kafka、RocketMQ、RabbitMQ。
Apache Pulsar通過多副本存儲和智能故障轉移機制,構建了多層次容錯體系:其基于BookKeeper的持久化存儲層采用多副本(通常3副本)機制,將消息數據分散存儲在獨立的Bookie節點上,通過Quorum協議保證數據強一致性;計算層Broker則通過ZooKeeper協調實現故障自動檢測與流量切換,當主節點宕機時,備用節點能在秒級內接管服務,確保生產者/消費者的連接不中斷。這種存儲與計算解耦的架構設計,使Pulsar既能避免單點故障導致的數據丟失,又能實現業務無感知的故障恢復,特別適合金融交易、物聯網等對可靠性要求嚴苛的場景。
二、核心類圖

Pulsar的持久化復制機制核心流程如下:PersistentTopic作為消息處理的中心樞紐,通過ManagedLedger接口管理消息的持久化存儲,當需要跨集群復制時,PersistentReplicator組件會創建與遠程集群的連接并使用ProducerImpl發送消息。
復制流程從ManagedCursor讀取Entry消息開始,經過MessageImpl反序列化處理,通過ProducerImpl的sendAsync方法異步發送到目標集群,發送完成后通過回調機制更新游標的消費位置并觸發下一批消息的讀取,形成了一個完整的復制閉環。整個過程中DispatchRateLimiter負責控制復制速率防止網絡擁塞,而PersistentMessageExpiryMonitor則負責處理消息過期邏輯,確保系統資源的有效利用。
三、核心操作時序圖

Pulsar消息副本和故障轉移機制的核心流程如下:
- 消息發布與本地存儲:客戶端向本地 PersistentTopic 發布消息,消息首先被持久化存儲到本地 ManagedLedger 中,存儲完成后向客戶端返回確認。
- 消息復制流程:PersistentReplicator 通過 ManagedCursor 從本地 ManagedLedger 中讀取待復制的消息條目,然后通過 ProducerImpl 將消息異步發送到遠程集群,遠程集群將其存儲到自己的 ManagedLedger 中并返回確認。
- 復制狀態更新:消息成功復制到遠程集群后,PersistentReplicator 會更新本地 ManagedCursor 的位置,標記已復制的消息,避免重復復制。
- 故障處理與恢復:當與遠程集群的連接失敗時,PersistentReplicator 會檢測到連接異常并進行故障處理,包括重試機制,在連接恢復后重新建立連接并從上次中斷的位置繼續復制流程。
整個機制通過異步非阻塞的方式實現高效的消息復制,同時具備故障檢測和自動恢復能力,確保消息在跨集群環境中的可靠傳輸。
四、源碼分析
4.1 消息副本同步機制
當Entry成功添加到ManagedLedger后,會通過AddEntryCallback的addComplete方法回調通知調用方。

這是通過CountDownLatch機制同步等待異步操作完成的結果,這里的邏輯對應上面時序圖中的前兩步。
我們這里主要來看下PersistentReplicator讀取消息的機制,對應的時序圖中的Replicator。
PersistentTopic中的addProducer重寫了父類AbstractTopic的addProducer,其它的子類沒有重新,這是因為PersistentTopic有以下兩個功能要保障。

public class Policies {
// 需要復制的集群信息
public Set<String> replication_clusters = new HashSet<>();
// ...
}
核心方法 readMoreEntries:

readMoreEntries 方法是 PersistentReplicator 中控制消息復制流程的核心實現,主要負責根據系統狀態和資源配置動態調整消息讀取策略。該方法首先通過 getAvailablePermits() 計算當前可讀取的消息數量,考慮了生產者隊列容量和限流設置等因素。如果允許讀取(availablePermits > 0),則根據可用許可和 readBatchSize 確定實際讀取消息數,并通過原子操作 HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE) 確保同一時間只有一個讀取操作在進行,避免重復讀取。當生產者不可寫時,會將讀取批量減小到1條消息以實現流量控制。如果觸發限流(返回-1),則延遲 MESSAGE_RATE_BACKOFF_MS 毫秒后重新嘗試讀取。最終通過 cursor.asyncReadEntriesOrWait() 異步從本地 ManagedLedger 讀取消息,讀取完成后回調 readEntriesComplete 方法進行處理,形成完整的復制循環。
接著看這個readEntriesComplete方法,上面readMoreEntries完成后,會回掉到readEntriesComplete方法里來。
org.apache.pulsar.broker.service.persistent.PersistentReplicator#readEntriesComplete
public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Read entries complete of {} messages", topicName, localCluster, remoteCluster,
entries.size());
}
int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
if (readBatchSize < maxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster,
remoteCluster, readBatchSize, newReadBatchSize);
}
readBatchSize = newReadBatchSize;
}
readFailureBackoff.reduceToHalf();
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
try {
// This flag is set to true when we skip atleast one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
int length = entry.getLength();
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
continue;
}
if (isEnableReplicatedSubscriptions) {
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
}
if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", topicName,
localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0/* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName,
localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", topicName,
localCluster, remoteCluster, entry.getPosition());
}
isLocalMessageSkippedOnce = true;
entry.release();
msg.recycle();
continue;
}
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
msgOut.recordEvent(headersAndPayload.readableBytes());
msg.setReplicatedFrom(localCluster);
headersAndPayload.retain();
getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfoForReplicator(schemaInfo);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
localCluster, remoteCluster, ex);
returnnull;
});
atLeastOneMessageSentForReplication = true;
}
} catch (Exception e) {
log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
e);
}
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
if (atLeastOneMessageSentForReplication && !isWritable()) {
// Don't read any more entries until the current pending entries are persisted
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", topicName,
localCluster, remoteCluster, atLeastOneMessageSentForReplication, isWritable());
}
} else {
readMoreEntries();
}
}readEntriesComplete 方法是 PersistentReplicator 中處理從本地集群讀取消息完成后的核心回調方法。當從本地 ManagedLedger 讀取到消息后,該方法會遍歷所有讀取到的 Entry 條目,對每條消息進行反序列化和一系列過濾檢查,包括檢查是否為已復制的消息、是否應該復制到目標集群、是否過期等。對于需要復制的消息,會通過 producer.sendAsync 方法異步發送到遠程集群,發送時會附帶 ProducerSendCallback 回調來處理發送結果。成功發送到遠程集群的消息會被標記刪除,避免重復復制,同時根據發送狀態和系統負載情況決定是否繼續讀取更多消息進行復制,從而形成完整的消息復制循環流程。
這樣就完成了消息副本的同步機制了!!!
4.2 故障轉移機制
當與遠程集群的連接失敗時,PersistentReplicator 會檢測到連接異常并進行故障處理,包括重試機制,在連接恢復后重新建立連接并從上次中斷的位置繼續復制流程。
4.2.1 ProducerSendCallback.sendComplete 方法中處理發送異常的情況


當發送消息到遠程集群失敗時,會記錄錯誤日志并將游標回退,以便在連接恢復后重新發送消息。
4.2.2 readEntriesFailed 方法處理讀取條目失敗的情況

4.2.3 readMoreEntries 方法中的重試機制
之前上面分析過,我這里就不重復了。
} else if (availablePermits == -1) {
// no permits from rate limit
topic.getBrokerService().executor().schedule(
() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}當遇到限流等情況時,會安排延遲重試。
4.2.4 ProducerImpl 內部的連接管理和重試機制
ProducerImpl 內部的連接管理和重試機制主要通過 ClientCnx 對象管理與 broker 的連接,使用 getState() 方法檢查生產者狀態(如 Ready、Connecting、Closing、Closed 等),并在連接斷開時觸發 connectionClosed() 回調進行重連。在 sendAsync() 方法中,當發送失敗時會檢查是否需要重試,并使用 pendingMessages 隊列暫存待發送消息,通過 grabCnx() 方法獲取或重新建立連接。連接恢復處理通過 reconnectLater() 方法實現延遲重連邏輯,使用 backoff 策略控制重連間隔,并通過 connectionOpened() 回調處理連接成功建立后的操作。消息確認和重傳機制通過 ackReceived() 處理 broker 的確認響應,對于未確認的消息,在重連后會重新發送,并使用 pendingMessages 隊列跟蹤待確認消息,確保了 ProducerImpl 在網絡波動或 broker 重啟等異常情況下能夠自動恢復連接并保證消息的可靠傳輸。
以上四點的機制共同確保了在與遠程集群連接失敗時能夠檢測到異常并進行適當的故障處理和重試。




























