精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink 異步 Checkpoint 機制詳解

大數(shù)據(jù)
在流處理場景中,數(shù)據(jù)源源不斷流入,系統(tǒng)可能因節(jié)點故障、網(wǎng)絡問題等導致任務中斷,若無有效的容錯機制,數(shù)據(jù)可能丟失或重復處理。

一、引言:Flink容錯機制與Checkpoint的核心作用

Apache Flink作為分布式流處理引擎,其核心優(yōu)勢之一是“Exactly-Once”(精確一次)的容錯保證。在流處理場景中,數(shù)據(jù)源源不斷流入,系統(tǒng)可能因節(jié)點故障、網(wǎng)絡問題等導致任務中斷,若無有效的容錯機制,數(shù)據(jù)可能丟失或重復處理。Flink通過Checkpoint機制實現(xiàn)容錯:定期為作業(yè)狀態(tài)(State)創(chuàng)建快照(Snapshot),并將快照持久化到可靠存儲(如HDFS、S3)。當任務失敗時,從最近一次成功的Checkpoint快照恢復狀態(tài),確保數(shù)據(jù)處理的連續(xù)性和一致性。

1. 同步Checkpoint的瓶頸

早期的Checkpoint機制多為同步模式:在觸發(fā)Checkpoint時,任務暫停數(shù)據(jù)處理,等待所有狀態(tài)快照完成并持久化到遠程存儲后,再繼續(xù)處理數(shù)據(jù)。這種模式實現(xiàn)簡單,但存在明顯缺陷:

  • 處理延遲增加:狀態(tài)快照和持久化過程可能耗時較長(尤其狀態(tài)量大時),導致數(shù)據(jù)處理暫停,延遲飆升。
  • 吞吐量下降:頻繁的Checkpoint會占用大量處理時間,降低整體吞吐。

2. 異步Checkpoint的誕生

為解決同步Checkpoint的性能問題,F(xiàn)link引入了異步Checkpoint機制。其核心思想是:將狀態(tài)快照的生成與持久化操作從主數(shù)據(jù)處理流程中剝離,主線程僅負責快照的“準備”工作(如觸發(fā)狀態(tài)快照、生成快照元數(shù)據(jù)),而耗時的“持久化”操作(如將快照數(shù)據(jù)寫入遠程存儲)交由獨立線程池異步執(zhí)行。這樣,主數(shù)據(jù)處理流程幾乎不被阻塞,實現(xiàn)“低延遲”與“高吞吐”的平衡。

二、異步Checkpoint的核心原理與設計目標

1. 異步Checkpoint的定義

異步Checkpoint是指:在Checkpoint觸發(fā)過程中,任務(Task)的主數(shù)據(jù)處理線程不等待狀態(tài)快照完全持久化到遠程存儲,而是快速生成快照“句柄”(如文件句柄、內存指針)后立即恢復數(shù)據(jù)處理,快照的持久化操作由后臺線程異步完成。當所有任務的快照句柄生成并持久化完成后,Checkpoint才被標記為“成功”。

2. 核心設計目標

異步Checkpoint的設計需滿足以下目標:

  • 低延遲:主數(shù)據(jù)處理線程阻塞時間盡可能短(毫秒級),避免Checkpoint對正常數(shù)據(jù)處理延遲的影響。
  • 高吞吐:異步持久化不占用主線程資源,確保數(shù)據(jù)處理能力不受Checkpoint頻率影響。
  • 一致性:即使異步持久化失敗,也能保證Checkpoint的“原子性”——要么所有任務快照成功,要么全部失敗,避免狀態(tài)不一致。
  • 可恢復性:快照數(shù)據(jù)需完整、可靠地存儲到遠程存儲,故障恢復時可正確加載。

三、異步Checkpoint的核心架構與組件

異步Checkpoint的實現(xiàn)涉及Flink作業(yè)的多個核心組件,整體架構如下圖所示(簡化版):

┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│  JobManager │       │  TaskManager │       │ Remote Storage│
│ (Checkpoint│       │ (Task,       │       │ (HDFS/S3)    │
│  Coordinator)│       │  StateBackend)│       │              │
└──────┬──────┘       └───────┬──────┘       └───────┬──────┘
       │                      │                      │
       │ 1. Trigger Checkpoint│                      │
       │─────────────────────>│                      │
       │                      │ 2. Inject Barrier     │
       │                      │─────────────────────>│
       │                      │ 3. Async Snapshot     │
       │                      │ (主線程生成快照句柄)    │
       │                      │                      │
       │ 4. Acknowledge       │ 5. Async Persist      │
       │<─────────────────────│ (后臺線程持久化)       │
       │                      │─────────────────────>│
       │ 6. Complete Checkpoint│                      │
       │ (所有Ack收到)         │                      │
       │                      │                      │

1. 核心組件角色

(1) JobManager:CheckpointCoordinator

角色:Checkpoint的“總指揮”,負責觸發(fā)、協(xié)調、監(jiān)控整個作業(yè)的Checkpoint流程。

核心職責:

  • 定期觸發(fā)Checkpoint(基于時間間隔或手動觸發(fā))。
  • 向所有Task發(fā)送Checkpoint觸發(fā)請求(攜帶CheckpointID)。
  • 接收各Task的Checkpoint Ack(確認)或 Nack(失敗)消息。
  • 當所有Task均Ack時,標記Checkpoint為“成功”,并清理舊Checkpoint;若收到Nack,標記為“失敗”。

(2) TaskManager:Task與StateBackend

Task:作業(yè)的基本執(zhí)行單元,包含一個或多個算子(Operator)。

每個Task負責:

  • 接收JobManager的Checkpoint觸發(fā)請求。
  • 在數(shù)據(jù)流中注入Checkpoint Barrier(特殊數(shù)據(jù)事件,標記Checkpoint的起始位置)。
  • 協(xié)調算子進行狀態(tài)快照(通過StateBackend)。
  • 向JobManager上報Checkpoint結果(Ack/Nack)。

StateBackend:狀態(tài)存儲的后端實現(xiàn),負責狀態(tài)的快照與恢復。異步Checkpoint的核心實現(xiàn)依賴StateBackend的異步能力:

  • MemoryStateBackend:狀態(tài)存儲在TaskManager內存,快照時同步序列化到JobManager內存(僅適用于測試,不支持異步)。
  • FsStateBackend:狀態(tài)存儲在本地文件系統(tǒng),快照時異步將本地文件上傳到遠程存儲(如HDFS)。
  • RocksDBStateBackend:狀態(tài)存儲在本地RocksDB,快照時異步生成RocksDB快照并上傳到遠程存儲(生產環(huán)境常用,支持大狀態(tài)異步快照)。

(3) Remote Storage:可靠存儲系統(tǒng)

  • 角色:持久化存儲Checkpoint快照數(shù)據(jù)(如HDFS、S3、Oss等)。
  • 要求:高可靠、高持久化,確保快照數(shù)據(jù)不丟失。

(4) Checkpoint Barrier:數(shù)據(jù)流的“同步信號”

  • 定義:一種特殊的數(shù)據(jù)事件,由Source算子注入,隨數(shù)據(jù)流流向下游算子。
  • 作用:標記Checkpoint的“邊界”——Barrier之前的數(shù)據(jù)屬于當前Checkpoint,Barrier之后的數(shù)據(jù)屬于下一個Checkpoint。
  • 對齊機制:下游算子收到多輸入流的Barrier時,需等待所有輸入流的Barrier到達(稱為“Barrier對齊”),確保快照包含“一致性的狀態(tài)”(即所有輸入流在Barrier之前的數(shù)據(jù)均已處理)。對齊完成后,才觸發(fā)狀態(tài)快照。

四、異步Checkpoint詳細流程與源碼剖析

異步Checkpoint的完整流程可分為6個階段,下面結合Flink 1.18源碼詳細剖析每個階段的實現(xiàn)。

1. 階段1:JobManager觸發(fā)Checkpoint

(1) 觸發(fā)條件

Checkpoint的觸發(fā)分為兩類:

  • 周期性觸發(fā):基于execution.checkpointing.interval配置(如1分鐘),由CheckpointCoordinator的定時任務觸發(fā)。
  • 手動觸發(fā):通過Rest API或StreamExecutionEnvironment.executeCheckpoint()手動觸發(fā)。

(2) 核心流程與源碼

CheckpointCoordinator的triggerCheckpoint()方法是觸發(fā)Checkpoint的入口,核心邏輯如下:

// org.apache.flink.runtime.checkpoint.CheckpointCoordinator
publicvoidtriggerCheckpoint(boolean isPeriodic) {
    // 1. 檢查是否允許觸發(fā)Checkpoint(如作業(yè)狀態(tài)、并發(fā)Checkpoint限制等)
    if (!canTriggerCheckpoint()) {
        return;
    }

    // 2. 生成CheckpointID(全局唯一,遞增)
    longcheckpointID= checkpointIdCounter.getAndIncrement();

    // 3. 創(chuàng)建PendingCheckpoint(記錄Checkpoint的元數(shù)據(jù),如觸發(fā)時間、參與Task等)
    PendingCheckpointpendingCheckpoint=newPendingCheckpoint(
        job,
        checkpointID,
        getTimestamp(),
        getCheckpointStorageLocation(checkpointID),
        tasksToTrigger, // 需要觸發(fā)Checkpoint的所有Task
        getCheckpointConfiguration());

    // 4. 將PendingCheckpoint加入待處理隊列
    pendingCheckpoints.put(checkpointID, pendingCheckpoint);

    // 5. 向所有Task發(fā)送Checkpoint觸發(fā)請求(通過RpcTaskManagerGateway)
    for (ExecutionVertex task : tasksToTrigger) {
        task.getCurrentExecutionAttempt().triggerCheckpointAtSource(
            checkpointID,
            getTimestamp(),
            checkpointOptions);
    }
}

關鍵點解析:

  • CheckpointID:全局唯一標識,用于區(qū)分不同Checkpoint,恢復時通過ID加載對應快照。
  • PendingCheckpoint:記錄Checkpoint的“中間狀態(tài)”,包含參與Task、觸發(fā)時間、存儲位置等。當所有Task Ack后,PendingCheckpoint轉為CompletedCheckpoint。
  • 觸發(fā)請求發(fā)送:通過RpcTaskManagerGateway向TaskManager的Task發(fā)送TriggerCheckpoint消息,攜帶CheckpointID、時間戳等。

2. 階段2:Task注入Checkpoint Barrier

(1) Barrier的作用與注入時機

Barrier是Checkpoint的“同步信號”,由Source算子注入,隨數(shù)據(jù)流流向下游。其核心作用是:

  • 分割數(shù)據(jù)流:Barrier之前的數(shù)據(jù)屬于“當前Checkpoint”,Barrier之后的數(shù)據(jù)屬于“下一個Checkpoint”。
  • 觸發(fā)對齊:下游算子需等待所有輸入流的Barrier到達,確保狀態(tài)快照的“一致性”。

(2) 核心流程與源碼

當Task收到JobManager的TriggerCheckpoint消息后,由StreamTask(流任務基類)處理,核心邏輯如下:

// org.apache.flink.streaming.runtime.tasks.StreamTask
publicvoidtriggerCheckpointAsync(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions) {

    // 1. 異步觸發(fā)Checkpoint(避免阻塞主線程)
    mailboxProcessor.getMainMailboxExecutor().execute(
        () -> triggerCheckpoint(checkpointMetaData, checkpointOptions),
        "Trigger Checkpoint");
}

privatevoidtriggerCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions) {

    // 2. 檢查是否允許觸發(fā)Checkpoint(如Task狀態(tài)、Barrier對齊狀態(tài)等)
    if (!isRunning) {
        return;
    }

    // 3. 向所有輸出流注入Checkpoint Barrier
    for (RecordWriterOutput<?> output : getRecordWriterOutputs()) {
        output.emitWatermark(newCheckpointBarrier(
            checkpointMetaData.getCheckpointId(),
            checkpointMetaData.getTimestamp(),
            checkpointOptions));
    }

    // 4. 觸發(fā)本Task的狀態(tài)快照(見階段3)
    checkpointStateManager.triggerCheckpoint(
        checkpointMetaData,
        checkpointOptions,
        newCheckpointMetrics());
}

關鍵點解析:

  • 異步觸發(fā):通過mailboxProcessor將Checkpoint觸發(fā)任務提交到主線程的郵箱隊列,避免阻塞IO線程(Netty線程)。
  • Barrier注入:RecordWriterOutput向每個輸出流寫入CheckpointBarrier事件。Barrier隨數(shù)據(jù)流動,下游算子通過InputChannel接收。
  • 觸發(fā)狀態(tài)快照:注入Barrier后,立即調用checkpointStateManager.triggerCheckpoint()啟動本Task的狀態(tài)快照流程。

3. 階段3:Task異步生成狀態(tài)快照(核心異步邏輯)

(1) 異步快照的核心設計

異步快照的關鍵是“主線程快照 + 后臺持久化”:

  • 主線程:快速生成狀態(tài)的“輕量級快照”(如RocksDB的快照句柄、內存狀態(tài)的序列化字節(jié)數(shù)組),不等待持久化完成。
  • 后臺線程:將輕量級快照持久化到遠程存儲(如HDFS),持久化完成后通知主線程。

(2) 核心流程與源碼

checkpointStateManager.triggerCheckpoint()最終會調用每個算子的snapshotState()方法,而算子的狀態(tài)快照由StateBackend完成。以RocksDBStateBackend為例,其異步快照邏輯如下:

// org.apache.flink.contrib.streaming.state.RocksDBStateBackend
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions)throws Exception {

    // 1. 主線程:生成RocksDB快照(輕量級操作)
    RocksDBSnapshotsnapshot= db.getSnapshot();
    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = metaInfoSnapshot();

    // 2. 創(chuàng)建異步任務:將快照持久化到遠程存儲
    returnnewFutureTask<>(
        () -> {
            try (CheckpointStreamFactory.CheckpointStateOutputStreamout= streamFactory.createCheckpointStateOutputStream()) {
                // 2.1 將RocksDB快照數(shù)據(jù)寫入輸出流(同步操作,但已在后臺線程執(zhí)行)
                snapshot.writeTo(out);
                stateMetaInfoSnapshots.writeTo(out);

                // 2.2 獲取持久化后的狀態(tài)句柄(包含遠程存儲路徑、文件大小等)
                StreamStateHandlestateHandle= out.closeAndGetHandle();
                return SnapshotResult.of(stateHandle);
            } catch (Exception e) {
                // 持久化失敗,返回失敗結果
                return SnapshotResult.of(e);
            } finally {
                // 釋放RocksDB快照資源
                db.releaseSnapshot(snapshot);
            }
        });
}

關鍵點解析:

① 主線程操作:db.getSnapshot()生成RocksDB的快照句柄(僅記錄當前數(shù)據(jù)文件的指針,不復制數(shù)據(jù)),metaInfoSnapshot()獲取狀態(tài)元信息(如列族名稱、序列化器等)。這兩步是輕量級的,耗時極短(毫秒級)。

② 異步任務封裝:通過FutureTask將持久化邏輯封裝為異步任務,F(xiàn)utureTask實現(xiàn)了RunnableFuture接口,可提交到線程池執(zhí)行。

③ 后臺持久化:FutureTask的run()方法在后臺線程執(zhí)行,核心邏輯包括:

  • 創(chuàng)建CheckpointStateOutputStream(連接遠程存儲的輸出流)。
  • 將RocksDB快照數(shù)據(jù)(通過snapshot.writeTo())和元信息寫入輸出流。
  • 調用out.closeAndGetHandle()獲取遠程存儲的句柄(如HDFS文件路徑)。
  • 釋放RocksDB快照資源(避免內存泄漏)。

④ 結果返回:FutureTask的get()方法可獲取持久化結果(成功返回SnapshotResult,包含狀態(tài)句柄;失敗返回異常)。但主線程不會立即調用get(),而是將FutureTask提交到線程池后繼續(xù)執(zhí)行其他任務。

(3) 異步任務的執(zhí)行線程池

Flink使用ExecutorService執(zhí)行異步持久化任務,線程池配置如下:

  • 線程池類型:ForkJoinPool(默認)或ThreadPoolExecutor,可通過taskmanager.network.netty.io.numThreads配置線程數(shù)(默認為CPU核心數(shù))。
  • 任務提交:StreamTask在生成異步快照后,將FutureTask提交到線程池:
// org.apache.flink.streaming.runtime.tasks.StreamTask
privatevoidtriggerCheckpointOnExecutor(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics) {
    // 生成異步快照(FutureTask)
    RunnableFuture<SnapshotResult<?>> snapshotFuture = operatorChain.snapshotState(checkpointMetaData, checkpointOptions, checkpointMetrics);
    
    // 提交到異步線程池執(zhí)行
    asyncOperationsThreadPool.submit(() -> {
        try {
            // 等待持久化完成(后臺線程執(zhí)行)
            SnapshotResult<?> snapshotResult = snapshotFuture.get();
            // 持久化成功,向JobManager發(fā)送Ack
            acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), snapshotResult);
        } catch (Exception e) {
            // 持久化失敗,向JobManager發(fā)送Nack
            declineCheckpoint(checkpointMetaData.getCheckpointId(), e);
        }
    });
}

4. 階段4:Task向JobManager確認Checkpoint結果

(1) 確認時機

Task的異步持久化任務完成后(成功或失敗),需向JobManager發(fā)送確認消息:

  • Ack:持久化成功,攜帶狀態(tài)句柄(SnapshotResult)。
  • Nack:持久化失敗,攜帶異常信息。

(2) 核心流程與源碼

確認邏輯由TaskExecutorGateway的acknowledgeCheckpoint()方法實現(xiàn),核心如下:

// org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
publicvoidacknowledgeCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        SnapshotResult<?> snapshotResult) {

    // 1. 根據(jù)ExecutionAttemptID找到對應的Task
    Tasktask= taskSlotTable.getTask(executionAttemptID);
    if (task != null) {
        // 2. 通知Task Checkpoint完成
        task.acknowledgeCheckpoint(checkpointId, checkpointMetrics, snapshotResult);
    }
}

// org.apache.flink.streaming.runtime.tasks.StreamTask
publicvoidacknowledgeCheckpoint(
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        SnapshotResult<?> snapshotResult) {

    // 3. 構建Ack消息(包含狀態(tài)句柄)
    AcknowledgeCheckpointmessage=newAcknowledgeCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        snapshotResult.getStateHandles());

    // 4. 向JobManager發(fā)送Ack消息
    jobManagerGateway.acknowledgeCheckpoint(message);
}

關鍵點解析:

  • 狀態(tài)句柄傳遞:SnapshotResult.getStateHandles()包含狀態(tài)快照的遠程存儲句柄(如StreamStateHandle,包含HDFS文件路徑、文件大小等),JobManager通過句柄定位快照數(shù)據(jù)。
  • 異步發(fā)送:通過jobManagerGateway異步發(fā)送Ack消息,避免阻塞Task主線程。

5. 階段5:JobManager匯總確認并完成Checkpoint

(1) 匯總邏輯

JobManager的CheckpointCoordinator接收所有Task的Ack消息后,需檢查:

  • 完整性:所有參與Checkpoint的Task均已Ack。
  • 一致性:所有Task的狀態(tài)句柄均有效(無異常)。

若滿足條件,則標記Checkpoint為“成功”,并將PendingCheckpoint轉為CompletedCheckpoint;否則標記為“失敗”。

(2) 核心流程與源碼

CheckpointCoordinator.receiveAcknowledgeMessage()是處理Ack消息的入口,核心邏輯如下:

// org.apache.flink.runtime.checkpoint.CheckpointCoordinator
publicvoidreceiveAcknowledgeMessage(AcknowledgeCheckpoint message)throws Exception {
    longcheckpointId= message.getCheckpointId();
    PendingCheckpointpendingCheckpoint= pendingCheckpoints.get(checkpointId);

    if (pendingCheckpoint != null) {
        // 1. 記錄Task的Ack結果(包含狀態(tài)句柄)
        pendingCheckpoint.acknowledgeTask(
            message.getTaskExecutionId(),
            message.getStateHandles(),
            message.getCheckpointMetrics());

        // 2. 檢查是否所有Task均已Ack
        if (pendingCheckpoint.isFullyAcknowledged()) {
            // 3. 將PendingCheckpoint轉為CompletedCheckpoint
            CompletedCheckpointcompletedCheckpoint= pendingCheckpoint.toCompletedCheckpoint();

            // 4. 將CompletedCheckpoint加入已完成的Checkpoint隊列
            completedCheckpoints.add(completedCheckpoint);

            // 5. 清理PendingCheckpoint
            pendingCheckpoints.remove(checkpointId);

            // 6. 通知所有監(jiān)聽器(如RestEndpoint)Checkpoint完成
            notifyCheckpointComplete(checkpointId);
        }
    }
}

關鍵點解析:

  • PendingCheckpoint.acknowledgeTask():將Task的狀態(tài)句柄存儲到PendingCheckpoint中,并更新已Ack的Task數(shù)量。
  • isFullyAcknowledged():檢查所有參與Checkpoint的Task均已Ack(通過比較已Ack數(shù)量與總Task數(shù)量)。
  • CompletedCheckpoint:存儲已完成的Checkpoint元數(shù)據(jù),包括狀態(tài)句柄、完成時間、持久化路徑等,用于故障恢復。
  • 通知監(jiān)聽器:通過notifyCheckpointComplete()通知RestEndpoint、Web UI等組件Checkpoint完成,更新作業(yè)狀態(tài)。

6. 階段6:Checkpoint完成后的清理與恢復準備

(1) 清理邏輯

Checkpoint完成后,需清理以下資源:

  • 舊Checkpoint:根據(jù)execution.checkpointing.max-retained-checkpoints配置,保留最新的N個Checkpoint,刪除舊的Checkpoint(釋放遠程存儲空間)。
  • 臨時資源:Task在快照過程中生成的臨時文件(如RocksDB的臨時快照文件)。

(2) 恢復準備

CompletedCheckpoint被存儲到CompletedCheckpointStore(默認為DefaultCompletedCheckpointStore,基于內存或ZooKeeper存儲),故障恢復時,CheckpointCoordinator從CompletedCheckpointStore中獲取最新的CompletedCheckpoint,通過狀態(tài)句柄加載狀態(tài)數(shù)據(jù),重啟Task。

五、異步Checkpoint的關鍵問題與優(yōu)化

1. Barrier對齊延遲與非對齊Checkpoint

(1) Barrier對齊的問題

在異步Checkpoint中,Barrier對齊是導致延遲的主要原因:下游算子需等待所有輸入流的Barrier到達,若某個輸入流的數(shù)據(jù)處理較慢,會導致其他輸入流的數(shù)據(jù)緩沖在內存中,無法處理,從而增加端到端延遲。

(2) 非對齊Checkpoint(Unaligned Checkpoint)

為解決Barrier對齊延遲,F(xiàn)link 1.11引入了非對齊Checkpoint:

核心思想:不等所有輸入流的Barrier到達,立即觸發(fā)狀態(tài)快照,并將緩沖區(qū)中的數(shù)據(jù)(包括Barrier之前和之后的數(shù)據(jù))作為快照的一部分。

實現(xiàn)原理:

  • 算子收到第一個Barrier時,立即停止處理該輸入流的數(shù)據(jù),并將緩沖區(qū)中的數(shù)據(jù)(包括未處理的Barrier)寫入快照。
  • 其他輸入流繼續(xù)處理數(shù)據(jù),直到Barrier到達,重復上述過程。
  • 快照完成后,算子繼續(xù)處理緩沖區(qū)中的數(shù)據(jù)。

適用場景:適用于“背壓”嚴重的作業(yè)(如數(shù)據(jù)傾斜、下游處理慢),可顯著降低Checkpoint延遲。

(3) 源碼實現(xiàn)

非對齊Checkpoint的開關由execution.checkpointing.unaligned.enabled控制,核心邏輯在CheckpointBarrierHandler的processBarrier()方法:

// org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
publicvoidprocessBarrier(CheckpointBarrier barrier)throws Exception {
    if (checkpointOptions.isUnalignedCheckpointEnabled()) {
        // 非對齊Checkpoint:立即處理Barrier,不等待對齊
        processBarrierUnaligned(barrier);
    } else {
        // 對齊Checkpoint:等待所有輸入流的Barrier到達
        processBarrierAligned(barrier);
    }
}

privatevoidprocessBarrierUnaligned(CheckpointBarrier barrier)throws Exception {
    // 1. 停止當前輸入流的數(shù)據(jù)處理
    blockCurrentInput();

    // 2. 將緩沖區(qū)中的數(shù)據(jù)(包括Barrier)寫入快照
    Buffer[] bufferedData = getBufferedData();
    for (Buffer buffer : bufferedData) {
        checkpointStorage.writeBuffer(buffer);
    }

    // 3. 觸發(fā)狀態(tài)快照
    triggerCheckpoint(barrier);

    // 4. 恢復當前輸入流的數(shù)據(jù)處理
    unblockCurrentInput();
}

2. 異步任務失敗處理

(1) 失敗場景

異步持久化任務可能因以下原因失敗:

  • 遠程存儲不可用(如HDFS宕機)。
  • 網(wǎng)絡中斷(無法上傳快照數(shù)據(jù))。
  • 本地磁盤故障(無法讀取RocksDB快照)。

(2) 處理機制

Task級失敗:若某個Task的異步持久化失敗,Task會向JobManager發(fā)送DeclineCheckpoint消息(Nack),攜帶異常信息。

作業(yè)級失敗:JobManager收到Nack后,立即標記當前Checkpoint為“失敗”,并:

  • 丟棄所有Task的本次快照數(shù)據(jù)(避免狀態(tài)不一致)。
  • 若失敗次數(shù)超過閾值(execution.checkpointing.tolerable-failed-checkpoints),觸發(fā)作業(yè)失敗(Failover)。

恢復策略:作業(yè)失敗后,CheckpointCoordinator從最新的CompletedCheckpoint恢復狀態(tài),重啟Task。

(3) 源碼實現(xiàn)

Task的異步持久化失敗處理邏輯如下:

// org.apache.flink.streaming.runtime.tasks.StreamTask
privatevoidtriggerCheckpointOnExecutor(...) {
    asyncOperationsThreadPool.submit(() -> {
        try {
            SnapshotResult<?> snapshotResult = snapshotFuture.get();
            acknowledgeCheckpoint(checkpointId, snapshotResult);
        } catch (Exception e) {
            // 持久化失敗,發(fā)送Nack
            declineCheckpoint(checkpointId, e);
        }
    });
}

privatevoiddeclineCheckpoint(long checkpointId, Throwable cause) {
    DeclineCheckpointmessage=newDeclineCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        cause);
    jobManagerGateway.declineCheckpoint(message);
}

3. StateBackend選擇對異步性能的影響

(1) StateBackend對比

StateBackend

狀態(tài)存儲位置

異步支持

適用場景

MemoryStateBackend

TaskManager內存

不支持

測試、小狀態(tài)作業(yè)

FsStateBackend

本地文件系統(tǒng)+遠程存儲

支持

中等狀態(tài)作業(yè)(GB級)

RocksDBStateBackend

本地RocksDB+遠程存儲

支持

大狀態(tài)作業(yè)(TB級)、生產環(huán)境

(2) RocksDBStateBackend的異步優(yōu)化

RocksDBStateBackend是生產環(huán)境最常用的StateBackend,其異步優(yōu)化點包括:

  • 增量Checkpoint:僅上傳上次Checkpoint后變化的數(shù)據(jù)(通過RocksDB的SST文件差異),減少持久化數(shù)據(jù)量。
  • 本地恢復:優(yōu)先從本地磁盤加載快照(若本地未刪除),避免遠程存儲讀取延遲。
  • 快照壓縮:對快照數(shù)據(jù)進行壓縮(如Snappy),減少網(wǎng)絡傳輸和存儲開銷。

4. 線程池配置優(yōu)化

異步持久化任務的性能依賴線程池配置,關鍵參數(shù)如下:

線程數(shù):taskmanager.network.netty.io.numThreads(默認為CPU核心數(shù)),需根據(jù)作業(yè)特點調整:

  • 若狀態(tài)大、持久化耗時長,可增加線程數(shù)(如CPU核心數(shù)×2)。
  • 若狀態(tài)小、持久化快,保持默認值即可。

隊列容量:taskmanager.network.netty.io.queueCapacity(默認為Integer.MAX_VALUE),避免任務被拒絕。

拒絕策略:默認為AbortPolicy(拋出異常),可改為CallerRunsPolicy(由提交線程執(zhí)行任務),避免任務丟失。

六、總結:異步Checkpoint的價值與未來方向

1. 核心價值

異步Checkpoint是Flink實現(xiàn)“高吞吐、低延遲、Exactly-Once”容錯的核心機制,其價值體現(xiàn)在:

  • 性能提升:主數(shù)據(jù)處理線程幾乎不被阻塞,Checkpoint對作業(yè)延遲和吞吐的影響降至最低。
  • 可靠性保證:通過異步持久化到遠程存儲,確保狀態(tài)快照的可靠性,故障時可快速恢復。
  • 靈活性:支持對齊/非對齊Checkpoint、增量Checkpoint等特性,適應不同作業(yè)場景。

2. 結語

異步Checkpoint機制通過“主線程快照 + 后臺持久化”的設計,巧妙地平衡了容錯與性能的關系。深入理解其原理與源碼,不僅有助于優(yōu)化Flink作業(yè)的性能,更能為分布式系統(tǒng)的容錯設計提供借鑒。隨著Flink的持續(xù)發(fā)展,異步Checkpoint將進一步演進,為實時流處理提供更強大的支撐。

責任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關推薦

2022-01-14 07:56:38

Checkpoint機制Flink

2021-09-06 18:55:57

MySQLCheckpoint機制

2025-05-26 09:05:00

2025-07-08 08:57:29

2024-02-27 08:05:32

Flink分區(qū)機制數(shù)據(jù)傳輸

2010-09-29 13:52:33

PostgreSQL

2025-10-31 07:25:00

2025-04-27 08:15:00

FlinkSavepointCheckpoint

2023-03-22 18:34:30

Flink調度部署

2016-09-07 20:43:36

Javascript異步編程

2009-07-08 15:01:00

Servlet Ses

2025-08-22 14:05:00

RSTP網(wǎng)絡端口

2024-07-16 08:38:06

2021-06-06 16:56:49

異步編程Completable

2021-11-02 06:58:55

FlinkWindow機制

2015-10-26 09:25:42

2024-04-09 07:50:59

Flink語義Watermark

2025-08-27 06:00:00

2009-09-23 16:30:01

Hibernate f

2011-03-17 09:20:05

異常處理機制
點贊
收藏

51CTO技術棧公眾號

日本中文字幕中出在线| 97成人资源站| 日本欧美一区| 中文字幕永久在线不卡| 亚洲在线观看视频网站| 日韩av无码中文字幕| 欧美美女在线| 日韩一区二区三区视频在线| 国产一区二区视频播放| 国产三级视频在线| 国产a区久久久| 国产精品欧美一区二区| 精品人妻在线播放| 久久国产精品亚洲人一区二区三区 | 亚洲区欧美区| 最近2019中文免费高清视频观看www99| 日韩av影视大全| 97成人资源| 一区二区三区成人| 亚洲精品视频一二三| 刘亦菲久久免费一区二区| 免费精品视频最新在线| 91精品国产色综合| 91日韩中文字幕| 日韩精品一区二区久久| www.成人av| 日韩精品一区不卡| 国产综合自拍| 中文字幕一区二区久久人妻网站 | 久久成人av少妇免费| 高清在线视频日韩欧美| 精品无码一区二区三区蜜臀| 色综合综合色| 亚洲欧美制服第一页| 亚洲一二三四五| 久久69av| 9191国产精品| 日本三级黄色网址| av在线日韩| 色综合久久久久综合体| www.av片| av中文在线资源库| 亚洲一区二区三区四区中文字幕| 一区二区三区四区久久| av在线播放网| 国产亚洲一区二区三区四区| 精品一区2区三区| 人妻与黑人一区二区三区| 欧美日韩国产另类不卡| 91亚洲精华国产精华| 五月婷婷丁香在线| 欧美一级二区| 26uuu另类亚洲欧美日本一| 丰满少妇高潮久久三区| 亚洲国产精品久久久天堂| 少妇精69xxtheporn| 精品亚洲aⅴ无码一区二区三区| 亚洲三级网页| 亚洲毛片一区二区| 97人妻精品一区二区三区免| 性欧美lx╳lx╳| 亚洲国内高清视频| 久久精品欧美| 日韩一区二区三区在线观看视频| 成人中文字幕电影| 午夜视频福利在线观看| 一本久道综合久久精品| 欧美精品久久久久a| 久久国产精品波多野结衣| 欧美在线三级| 久久久久久午夜| 国产精品黄色网| 性色一区二区三区| 亚洲国产91视频| 亚洲丝袜制服诱惑| 欧美三级午夜理伦三级老人| 超碰免费在线播放| 亚洲一区二区在线免费观看视频| 午夜裸体女人视频网站在线观看| 2020国产精品久久精品美国| 欧美18视频| 成人在线免费电影| 亚洲人成精品久久久久久| 国产精品免费看久久久无码| av资源中文在线天堂| 色婷婷综合久色| 中文字幕亚洲乱码| 亚洲不卡视频| 亚洲欧美中文字幕| 情侣偷拍对白清晰饥渴难耐| 伊人蜜桃色噜噜激情综合| 国产va免费精品高清在线观看| 在线播放成人av| 成人蜜臀av电影| 亚洲精品中文字幕在线| 第四色日韩影片| 欧美综合亚洲图片综合区| 色哟哟免费视频| 日本福利一区| 久久亚洲精品小早川怜子66| 亚洲欧美在线视频免费| 看片网站欧美日韩| 韩国一区二区三区美女美女秀| 草碰在线视频| 天涯成人国产亚洲精品一区av| 九九热免费精品视频| 一区二区三区四区精品视频| 这里只有精品视频| 国产精品50页| 国产一区二区三区四区在线观看| 久久久久久99| 欧美hdxxxx| 欧美日本一区二区三区四区| 亚洲狠狠婷婷综合久久久久图片| 国产精品成人a在线观看| 91精品国产免费久久久久久 | 国产精品二区三区四区| 8888四色奇米在线观看| 欧美日韩一区二区精品| av影片在线播放| 久久高清免费| 国产精品日韩专区| 涩爱av在线播放一区二区| 一级精品视频在线观看宜春院| 久久久国产欧美| 西野翔中文久久精品字幕| 欧美激情网站在线观看| 国产又粗又猛又黄又爽无遮挡| 久久精品一二三| 免费无码av片在线观看| y111111国产精品久久久| 三级久久三级久久| 国产精品高精视频免费| 天天射天天色天天干| 夜夜嗨av一区二区三区中文字幕| 午夜激情av在线| 国产一区二区观看| 久久久久久久电影| 91大学生片黄在线观看| 欧美成人黄色| 中文字幕9999| 中文字幕在线一| 国产精品视频一区二区三区不卡| 国产免费成人在线| 小说区图片区色综合区| 2018日韩中文字幕| 香蕉视频国产在线| 午夜a成v人精品| 懂色av粉嫩av蜜乳av| 日韩一区二区免费看| 国内视频一区| 欧美性受xxxx黑人| 欧美午夜影院| 粉嫩av一区二区三区免费观看 | 国产一级特黄a大片免费| 波多野结衣二区三区| 粉嫩av亚洲一区二区图片| 4444亚洲人成无码网在线观看 | 国产一区二区自拍| 9999精品成人免费毛片在线看| 精品免费国产二区三区| 欧美黄色一区二区三区| 国产91精品一区二区麻豆网站 | 欧美成人一区二区三区片免费 | 91视频免费进入| 男男gaygays亚洲| 亚洲第一网站免费视频| 亚洲久久在线观看| 欧美经典三级视频一区二区三区| 中文字幕在线观看第三页| 婷婷色综合网| av资源一区二区| 不卡专区在线| 一个色综合导航| 97国产精品久久久| 亚洲专区一二三| 亚洲片av在线| 久久久黄色大片| 国产精品午夜电影| 一级黄色免费毛片| 日韩亚洲国产精品| 先锋影音欧美| 91九色鹿精品国产综合久久香蕉| 午夜精品一区二区三区在线 | 国产99精品| 伊人久久大香线蕉无限次| 欧美激情99| 国产成人黄色| 一区二区三区在线电影| 亚洲一卡久久| 亚洲成人一区二区在线观看| 国产老头和老头xxxx×| 国产午夜久久| 亚洲欧美综合一区| 91精品啪在线观看国产爱臀 | 欧美mv日韩mv国产| 日韩欧美国产另类| 亚洲精选在线视频| 黄色a一级视频| 国产乱码精品一区二区三| 男人和女人啪啪网站| 一区二区三区在线观看免费| 欧美不卡在线一区二区三区| 日韩综合久久| 欧美夜福利tv在线| 色呦呦在线免费观看| 亚洲网站视频福利| 亚洲欧美强伦一区二区| 欧美日韩卡一卡二| 91精品久久| 亚洲美女av电影| a视频免费在线观看| 日韩欧美中文免费| 精品在线视频免费观看| 啦啦啦中文在线观看日本| 亚洲视频专区在线| 亚洲第一黄色片| 欧美色倩网站大全免费| 久草手机在线观看| 亚洲一卡二卡三卡四卡五卡| 蜜桃视频最新网址| 国产人成一区二区三区影院| 中文字幕免费在线播放| 国产一区二区三区免费观看| av丝袜天堂网| 久久精品官网| 欧美亚洲国产成人| 今天的高清视频免费播放成人| www.亚洲一区二区| 色综合久久网| 一个色的综合| 色综合天天综合网中文字幕| 日韩免费av一区二区三区| 欧美亚洲tv| 精品久久久久久中文字幕动漫| 亚洲视频精选| 99re在线观看视频| 欧美h版在线观看| 成人有码视频在线播放| av日韩在线免费观看| 国产玖玖精品视频| 国产精品诱惑| 国产日韩欧美中文在线播放| 国产电影一区二区三区爱妃记| 秋霞成人午夜鲁丝一区二区三区| 成人影院在线视频| 午夜精品久久17c| 黄视频免费在线看| 欧美与欧洲交xxxx免费观看 | 少妇欧美激情一区二区三区| 久99久精品视频免费观看| 国产精品区在线| 免费观看在线综合| 中文字幕在线综合| 国产在线播放一区| 国产精品91av| www.亚洲激情.com| 一级性生活毛片| 国产女人aaa级久久久级| 欧美亚洲色综久久精品国产| 亚洲一区导航| 日韩免费不卡av| 欧美aaa大片视频一二区| 国产欧美精品xxxx另类| 国产精品美女久久久久| 成人一区二区三区四区| 成人影院中文字幕| 免费av一区二区三区| 成久久久网站| 日本一级淫片演员| 99精品国产在热久久| www.xxx亚洲| 久久99久久99| 在线观看欧美一区二区| 99久久久久久| 蜜桃av.com| 亚洲成人免费电影| 波多野结衣小视频| 欧美一区二区女人| 天堂在线免费av| 高清中文字幕一区二区三区| 欧美性猛交xxxxxxxx| 亚洲一卡二卡在线| 日韩欧美区一区二| 五月婷婷六月丁香综合| 伊人青青综合网站| 欧美日韩经典丝袜| 亚洲同志男男gay1069网站| 久久91亚洲人成电影网站| av日韩中文| 国产伦精品一区二区三区精品视频| 国产美女视频一区二区| 久久久久九九九| 亚洲精品成人影院| 日本在线观看a| 国产一区不卡精品| 自拍偷拍亚洲天堂| 亚洲男人的天堂av| 午夜久久久久久久久久影院| 日韩欧美三级在线| 思思99re6国产在线播放| 久久久久亚洲精品| 色999韩欧美国产综合俺来也| 精品国产二区在线| 亚洲人metart人体| 欧美 国产 小说 另类| 国产在线播放一区| 丰满少妇高潮一区二区| 亚洲精品欧美二区三区中文字幕| 日本三级中文字幕| 欧美一级搡bbbb搡bbbb| 国产天堂在线| 91精品国产乱码久久久久久蜜臀| 国产免费av国片精品草莓男男| 欧美日韩在线精品一区二区三区| 欧美成人久久| 不卡的在线视频| 国产日韩精品一区二区三区 | 亚洲第一主播视频| 国产美女www爽爽爽视频| 国产亚洲欧美日韩一区二区| 成年女人在线看片| 91视频免费进入| 欧美永久精品| 又黄又爽又色的视频| 中文字幕制服丝袜成人av| 奴色虐av一区二区三区| 亚洲男人天堂2019| 特黄毛片在线观看| 精品不卡在线| 国产日本精品| 亚洲观看黄色网| 婷婷六月综合亚洲| 天天色天天操天天射| 久久人人爽人人| 亚洲一区二区三区免费| 91精品国产毛片武则天| 狠狠色丁香久久婷婷综| 999精品在线视频| 欧美猛男超大videosgay| 日本成人在线播放| 国产一区红桃视频| 国产精品国产一区| 久久久久久久久久久久久久久国产 | 午夜久久美女| www.偷拍.com| 亚洲制服丝袜在线| 高清毛片aaaaaaaaa片| 久久免费成人精品视频| 精品精品国产三级a∨在线| 青青草国产免费| 99久久精品国产网站| 人人干人人干人人干| 日韩成人中文字幕| 在线中文字幕播放| 日本不卡一区二区三区视频| 视频一区在线播放| 国产欧美一区二区三区在线观看视频 | 99久re热视频精品98| 国产伦精品一区二区三区免费迷| 国产少妇在线观看| 精品国产免费人成电影在线观看四季 | 日韩欧美电影| 在线a免费观看| 亚洲国产精品久久不卡毛片| 五月婷婷狠狠干| 国产精品老牛影院在线观看| 日韩伦理一区| 免费国偷自产拍精品视频| 午夜天堂影视香蕉久久| 能在线看的av| 91深夜福利视频| 亚洲国产精品一区制服丝袜| 波多野结衣片子| 欧美一区二区黄色| 日韩脚交footjobhd| 午夜一区二区三区| 成人国产精品免费网站| 亚洲天堂男人av| 欧美床上激情在线观看| 日韩av影院| 亚洲va在线va天堂va偷拍| 亚洲国产视频网站| av一区在线观看| 国产精品自拍首页| 青娱乐精品在线视频| 久久亚洲成人av| 国产一区二区三区在线观看网站| 中文成人在线| 国内性生活视频| ●精品国产综合乱码久久久久 | 成人av在线一区二区三区| 亚洲精品久久久久久久蜜桃| 精品中文字幕在线2019| 欧美禁忌电影| 欧美极品jizzhd欧美仙踪林| 在线一区二区视频| 成人影音在线| 中文字幕一区二区三区乱码| 91麻豆精品在线观看|