聊聊Flink:Flink的運行時架構
一、運行時架構
上一篇我們可以看到Flink的核心組件的Deploy層,該層主要涉及了Flink的部署模式,Flink支持多種部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。
圖片
- Local(本地):單機模式,一般本地開發調試使用,像我們程序寫的WordCountStream那個例子,直接運行main方法啟動。
- Cluster(集群)
- Standalone(獨立模式):Flink自帶集群,自己管理資源調度,生產環境也會有所應用。
- YARN(YARN模式):計算資源統一由Hadoop YARN管理,生產環境應用較多。
- Cloud(云端):AliCloud Realtime Compute、Amazon EMR、Huawei Cloud Stream Service 等。
我們這里主要來介紹Cluster集群的兩種模式Standalone、YARN。
二、YARN集群架構
在講解Flink集群架構之前,我們先了解一下YARN集群架構,我覺得是很有必要的。YARN集群總體上是經典的主/從(Master/Slave)架構,主要由ResourceManager、NodeManager、ApplicationMaster和Container等幾個組件構成。
圖片
2.1 ResourceManager
以后臺進程的形式運行,負責對集群資源進行統一管理和任務調度。ResourceManager的主要職責如下:
- 接收來自客戶端的請求。
- 啟動和管理各個應用程序的ApplicationMaster。
- 接收來自ApplicationMaster的資源申請,并為其分配Container。
- 管理NodeManager,接收來自NodeManager的資源和節點健康情況匯報。
2.2 NodeManager
集群中每個節點上的資源和任務管理器,以后臺進程的形式運行。它會定時向ResourceManager匯報本節點上的資源(內存、CPU)使用情況和各個Container的運行狀態,同時會接收并處理來自ApplicationMaster的Container啟動/停止等請求。NodeManager不會監視任務,它僅監視Container中的資源使用情況,例如。如果一個Container消耗的內存比最初分配的更多,就會結束該Container。
2.3 Task
應用程序具體執行的任務。一個應用程序可能有多個任務,例如一個MapReduce程序可以有多個Map任務和多個Reduce任務。
2.4 Container
YARN中資源分配的基本單位,封裝了CPU和內存資源的一個容器,相當于一個Task運行環境的抽象。從實現上看,Container是一個Java抽象類,定義了資源信息。應用程序的Task將會被發布到Container中運行,從而限定了Task使用的資源量。
一個應用程序所需的Container分為兩類:運行ApplicationMaster的Container和運行各類Task的Container。前者是由ResourceManager向內部的資源調度器申請和啟動的,后者是由ApplicationMaster向ResourceManager申請的,并由ApplicationMaster請求NodeManager進行啟動。
我們可以將Container類比成數據庫連接池中的連接,需要的時候進行申請,使用完畢后進行釋放,而不需要每次獨自創建。
2.5 ApplicationMaster
ApplicationMaster可在Container內運行任何類型的Task。例如,MapReduce ApplicationMaster請求一個容器來啟動Map Task或Reduce Task。也可以實現一個自定義的ApplicationMaster來運行特定的Task,以便任何分布式框架都可以受YARN支持,只要實現了相應的ApplicationMaster即可。
我們可以這樣認為:ResourceManager管理整個集群,NodeManager管理集群中的單個節點,ApplicationMaster管理單個應用程序(集群中可能同時有多個應用程序在運行,每個應用程序都有各自的ApplicationMaster)。
YARN集群中應用程序的執行流程如下圖所示:
- 客戶端提交應用程序(可以是MapReduce程序、Spark程序等)到ResourceManager。
- ResourceManager分配用于運行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動ApplicationMaster。ApplicationMaster啟動后,它將負責此應用程序的整個生命周期。
- ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應用程序的運行狀態)并請求運行應用程序各個Task所需的Container(資源請求是對一些Container的請求)。如果符合條件,ResourceManager會分配給ApplicationMaster所需的Container(表達為Container ID和主機名)。
- ApplicationMaster請求NodeManager使用這些Container來運行應用程序的相應Task(即將Task發布到指定的Container中運行)。
此外,各個運行中的Task會通過RPC協議向ApplicationMaster匯報自己的狀態和進度,這樣一旦某個Task運行失敗,ApplicationMaster就可以對其重新啟動。當應用程序運行完成時,ApplicationMaster會向ResourceManager申請注銷自己。
圖片
三、Flink Standalone模式
Flink Standalone模式為經典的主從(Master/Slave)架構,資源調度是Flink自己實現的。集群啟動后,主節點上會啟動一個JobManager進程,類似YARN集群的ResourceManager,因此主節點也稱為JobManager節點;各個從節點上會啟動一個TaskManager進程,類似YARN集群的NodeManager,因此從節點也稱為TaskManager節點。從Flink 1.6版本開始,將主節點上的進程名稱改為了StandaloneSessionClusterEntrypoint,從節點的進程名稱改為了TaskManagerRunner,在這里為了方便使用,仍然沿用之前版本的稱呼,即JobManager和TaskManager。
Client接收到Flink應用程序后,將作業提交給JobManager。JobManager要做的第一件事就是分配Task(任務)所需的資源。完成資源分配后,Task將被JobManager提交給相應的TaskManager,TaskManager會啟動線程開始執行。在執行過程中,TaskManager會持續向JobManager匯報狀態信息,例如開始執行、進行中或完成等狀態。作業執行完成后,結果將通過JobManager發送給Client。
Flink所有組件之間的通信使用的是Akka框架,組件之間的數據交互使用的是Netty框架。
圖片
Client 不是運行時和程序執行的一部分,而是用于準備數據流并將其發送給 JobManager。之后,客戶端可以斷開連接(分離模式),或保持連接來接收進程報告(附加模式)。客戶端可以作為觸發執行 Java/Scala 程序的一部分運行,也可以在命令行進程./bin/flink run …中運行。
可以通過多種方式啟動 JobManager 和 TaskManager:直接在機器上作為standalone 集群啟動、在容器中啟動、或者通過YARN等資源框架管理并啟動。TaskManager 連接到 JobManagers,宣布自己可用,并被分配工作。
3.1 JobManager
JobManager 具有許多與協調 Flink 應用程序的分布式執行有關的職責:它決定何時調度下一個 task(或一組 task)、對完成的 task 或執行失敗做出反應、協調 checkpoint、并且協調從失敗中恢復等等。這個進程由三個不同的組件組成:
- ResourceManager
ResourceManager 負責 Flink 集群中的資源提供、回收、分配 - 它管理 task slots,這是 Flink 集群中資源調度的單位(請參考TaskManagers)。Flink 為不同的環境和資源提供者(例如 YARN、Kubernetes 和 standalone 部署)實現了對應的 ResourceManager。在 standalone 設置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行啟動新的 TaskManager。 - Dispatcher
Dispatcher 提供了一個 REST 接口,用來提交 Flink 應用程序執行,并為每個提交的作業啟動一個新的 JobMaster。它還運行 Flink WebUI 用來提供作業執行信息。 - JobMaster
JobMaster 負責管理單個JobGraph的執行。Flink 集群中可以同時運行多個作業,每個作業都有自己的 JobMaster。
始終至少有一個 JobManager。高可用(HA)設置中可能有多個 JobManager,其中一個始終是 leader,其他的則是 standby。
3.2 TaskManager
TaskManager是Flink集群的工作進程。Task被調度到TaskManager上執行。TaskManager相互通信,只為在后續的Task之間交換數據。
TaskManager的主要作用如下:
- 接收JobManager分配的任務,負責具體的任務執行。
- TaskManager會在同一個JVM進程內以多線程的方式執行任務。· 負責對應任務在每個節點上的資源申請,管理任務的啟動、停止、銷毀、異常恢復等生命周期。
- 負責對數據進行緩存。TaskManager之間采用數據流的形式進行數據交互。
3.3 Tasks 和算子鏈
對于分布式執行,Flink 將算子的 subtasks 鏈接成 tasks。每個 task 由一個線程執行。將算子鏈接成 task 是個有用的優化:它減少線程間切換、緩沖的開銷,并且減少延遲的同時增加整體吞吐量。鏈行為是可以配置的。
下圖中樣例數據流用 5 個 subtask 執行,因此有 5 個并行線程。

3.4 Task Slots 和資源
每個 worker(TaskManager)都是一個 JVM 進程,可以在單獨的線程中執行一個或多個 subtask。為了控制一個 TaskManager 中接受多少個 task,就有了所謂的 task slots(至少一個)。
每個 task slot 代表 TaskManager 中資源的固定子集。例如,具有 3 個 slot 的 TaskManager,會將其托管內存 1/3 用于每個 slot。分配資源意味著 subtask 不會與其他作業的 subtask 競爭托管內存,而是具有一定數量的保留托管內存。注意此處沒有 CPU 隔離;當前 slot 僅分離 task 的托管內存。
通過調整 task slot 的數量,用戶可以定義 subtask 如何互相隔離。每個 TaskManager 有一個 slot,這意味著每個 task 組都在單獨的 JVM 中運行(例如,可以在單獨的容器中啟動)。具有多個 slot 意味著更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 連接(通過多路復用)和心跳信息。它們還可以共享數據集和數據結構,從而減少了每個 task 的開銷。
圖片
默認情況下,Flink 允許 subtask 共享 slot,即便它們是不同的 task 的 subtask,只要是來自于同一作業即可。結果就是一個 slot 可以持有整個作業管道。允許 slot 共享有兩個主要優點:
- Flink 集群所需的 task slot 和作業中使用的最大并行度恰好一樣。無需計算程序總共包含多少個 task(具有不同并行度)。
- 容易獲得更好的資源利用。如果沒有 slot 共享,非密集 subtask(source/map())將阻塞和密集型 subtask(window) 一樣多的資源。通過 slot 共享,我們示例中的基本并行度從 2 增加到 6,可以充分利用分配的資源,同時確保繁重的 subtask 在 TaskManager 之間公平分配。
圖片
四、Flink On YARN模式
Flink On YARN模式遵循YARN的官方規范,YARN只負責資源的管理和調度,運行哪種應用程序由用戶自己實現,因此可能在YARN上同時運行MapReduce程序、Spark程序、Flink程序等。YARN很好地對每一個程序實現了資源的隔離,這使得Spark、MapReduce、Flink等可以運行于同一個集群中,共享集群存儲資源與計算資源。Flink On YARN模式的運行架構如下圖所示。
圖片
- 當啟動一個Client(客戶端)會話時,Client首先會上傳Flink應用程序JAR包和配置文件到HDFS。
- Client向ResourceManager申請用于運行ApplicationMaster的Container。
- ResourceManager分配用于運行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動ApplicationMaster(ApplicationMaster與Flink JobManager運行于同一Container中,這樣ApplicationMaster就能知道Flink JobManager的地址)。ApplicationMaster啟動后,它將負責此應用程序的整個生命周期。另外,ApplicationMaster還提供了Flink的WebUI服務。
- ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應用程序的運行狀態)并請求運行Flink TaskManager所需的Container(資源請求是對一些Container的請求)。如果符合條件,ResourceManager會分配給ApplicationMaster所需的Container(表達為Container ID和主機名)。ApplicationMaster請求NodeManager使用這些Container來運行Flink TaskManager。各個NodeManager從HDFS中下載Flink JAR包和配置文件。至此,Flink相關任務就可以運行了。
此外,各個運行中的Flink TaskManager會通過RPC協議向ApplicationMaster匯報自己的狀態和進度。
































