精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink執(zhí)行流程與源碼分析

大數(shù)據(jù)
整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象。現(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

[[422512]]

本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)左右手」,作者王了個博。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)左右手公眾號。

Flink主要組件

作業(yè)管理器(JobManager)

(1) 控制一個應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說,每個應(yīng)用程序 都會被一個不同的Jobmanager所控制執(zhí)行

(2) Jobmanager會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖( Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。

(3) Jobmanager會把 Jobgraph轉(zhuǎn)換成一個物理層面的 數(shù)據(jù)流圖,這個圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會向資源管理器( Resourcemanager)請求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運(yùn)行它們的 Taskmanager上。而在運(yùn)行過程中Jobmanagera會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(diǎn)(checkpoints)的協(xié)調(diào)。

任務(wù)管理器(Taskmanager)

(1) Flink中的工作進(jìn)程。通常在 Flink中會有多個 Taskmanageria運(yùn)行, 每個 Taskmanageri都包含了一定數(shù)量的插槽( slots)。插槽的數(shù)量限制了Taskmanageri能夠執(zhí)行的任務(wù)數(shù)量。

(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調(diào)用。Jobmanager就可以向插槽分配任務(wù)( tasks)來執(zhí)行了。

(3) 在執(zhí)行過程中, 一個 Taskmanagera可以跟其它運(yùn)行同一應(yīng)用程序的Taskmanager交換數(shù)據(jù)。

資源管理器(Resource Manager)

(1) 主要負(fù)責(zé)管理任務(wù)管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。

(2) Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 當(dāng) Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發(fā)起會話,以提供啟動 Taskmanager進(jìn)程的容器。

分發(fā)器(Dispatcher)

(1) 可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。

(2)當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給Jobmanage

(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。

任務(wù)提交流程

  1. 提交應(yīng)用
  2. 啟動并提交應(yīng)用
  3. 請求slots
  4. 任務(wù)啟動
  5. 注冊slots
  6. 發(fā)出提供slot的指令
  7. 提供slots
  8. 提交要在slots中執(zhí)行的任務(wù)
  9. 交換數(shù)據(jù)

任務(wù)提交流程(YARN)

a. Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置

b. 隨后向 Yarn ResourceManager提交任務(wù)ResourceManager分配 Container資源并通知對應(yīng)的NodeManager啟動

c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境

d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager

e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動TaskManager

f. NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動 TaskManager

g. TaskManager 啟動后向 JobManager 發(fā)送心跳包,并等待 JobManager 向其分配任務(wù)。

源碼分析--集群啟動 JobManager 啟動分析

JobManager 的內(nèi)部包含非常重要的三大組件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,啟動主類:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各種服務(wù) 
  7.  /** 
  8.   * 初始化了 主節(jié)點(diǎn)對外提供服務(wù)的時候所需要的 三大核心組件啟動時所需要的基礎(chǔ)服務(wù) 
  9.   *  初始化服務(wù),如 JobManager 的 Akka RPC 服務(wù),HA 服務(wù),心跳檢查服務(wù),metric service 
  10.   *  這些服務(wù)都是 Master 節(jié)點(diǎn)要使用到的一些服務(wù) 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 實(shí)現(xiàn)。RPC 服務(wù)啟動 Akka 參與者來接收從 RpcGateway 調(diào)用 RPC 
  12.   *  2、haServices:    提供對高可用性所需的所有服務(wù)的訪問注冊,分布式計數(shù)器和領(lǐng)導(dǎo)人選舉 
  13.   *  3、blobServer:    負(fù)責(zé)偵聽傳入的請求生成線程來處理這些請求。它還負(fù)責(zé)創(chuàng)建要存儲的目錄結(jié)構(gòu) blob 或臨時緩存它們 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服務(wù)。這包括創(chuàng)建心跳接收器和心跳發(fā)送者。 
  15.   *  5、metricRegistry:   跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存儲執(zhí)行圖ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的 
  21. 工廠實(shí)例 
  22. // 其實(shí)內(nèi)部包含了三個重要的成員變量: 
  23. // 創(chuàng)建 ResourceManager 的工廠實(shí)例 
  24. // 創(chuàng)建 Dispatcher 的工廠實(shí)例 
  25. // 創(chuàng)建 WebMonitorEndpoint 的工廠實(shí)例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 創(chuàng)建 集群運(yùn)行需要的一些組件:Dispatcher, ResourceManager 等 
  29. // 創(chuàng) 建 ResourceManager 
  30. // 創(chuàng) 建 Dispatcher 
  31. // 創(chuàng) 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各種服務(wù)

  1. // 初 始 化 和 啟 動 AkkaRpcService, 內(nèi) 部 其 實(shí) 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一個負(fù)責(zé) IO 的線程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服務(wù)組件,負(fù)責(zé) HA 服務(wù)的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服務(wù)端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一個用來存儲 ExecutionGraph 的 Store, 實(shí)現(xiàn)是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實(shí)例

  1. 1、DispatcherRunnerFactory,默認(rèn)實(shí)現(xiàn):DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默認(rèn)實(shí)現(xiàn):StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默認(rèn)實(shí)現(xiàn):SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 創(chuàng)建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  創(chuàng)建 WebMonitorEndpoint 實(shí)例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  當(dāng)前這個 DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的過程中,會一大堆的 Handler 
  8.   *  2、啟動一個 Netty 的服務(wù)端,綁定了這些 Handler 
  9.   *  3、當(dāng) client 通過 flink 命令執(zhí)行了某些操作(發(fā)起 restful 請求), 服務(wù)端由 webMonitorEndpoint 來執(zhí)行處理 
  10.   *  4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執(zhí)行處理 
  11.   *  5、補(bǔ)充一個:job 由 JobSubmitHandler 執(zhí)行完畢之后,轉(zhuǎn)交給 Dispatcher 去調(diào)度執(zhí)行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 創(chuàng)建 resourceManager

  1. /************************************************* 
  2.  *  創(chuàng)建 StandaloneResourceManager 實(shí)例對象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注釋: 得到一個 StandaloneResourceManager 實(shí)例對象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt請求 
  3. sendSlotReport(..): 將solt請求發(fā)送TaskManager 
  4. registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應(yīng)用程序 
  5. registerTaskExecutor(...): 注冊task執(zhí)行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注釋: 當(dāng)執(zhí)行完畢這個構(gòu)造方法的時候,會觸發(fā)調(diào)用 onStart() 方法執(zhí)行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實(shí)現(xiàn) 
  7.   *  以 ResourceManager 為例說明: 
  8.   *  啟動 ResourceManager 的 RPCServer 服務(wù) 
  9.   *  這里啟動的是 ResourceManager 的 Rpc 服務(wù)端。 
  10.   *  接收 TaskManager 啟動好了而之后, 進(jìn)行注冊和心跳,來匯報 Taskmanagaer 的資源情況 
  11.   *  通過動態(tài)代理的形式構(gòu)建了一個Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在創(chuàng)建resourceManager同級:啟動任務(wù)接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  創(chuàng)建 并啟動 Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一個參數(shù):ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 這個地方是直接創(chuàng)建一個 Dispatcher 對象然后調(diào)用 dispatcher.start() 來啟動 
  9.  *  新版本: 直接創(chuàng)建一個 DispatcherRunner, 內(nèi)部就是要創(chuàng)建和啟動 Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是對 Dispatcher 的封裝。 
  12.  *  DispatcherRunner被創(chuàng)建的代碼的內(nèi)部,會創(chuàng)建 Dispatcher并啟動 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注釋: 注意第三個參數(shù) 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 啟動后,將會等待任務(wù)提交,如果有任務(wù)提交,則會經(jīng)過submitJob(...)函數(shù)進(jìn)入后續(xù)處理。

提交(一個Flink應(yīng)用的提交必須經(jīng)過三個graph的轉(zhuǎn)換)

首先看下一些名詞

StreamGraph

是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。可以用一個 DAG 來表示),DAG 的頂點(diǎn)是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。

  • StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
  • StreamEdge:表示連接兩個StreamNode的邊。

DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構(gòu)造出一棵 StreamTransformation 樹,通過這棵樹轉(zhuǎn)換成 StreamGraph

以map方法為例,看看源碼

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通過java reflection抽出mapper的返回值類型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實(shí)現(xiàn)類 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation會連接上當(dāng)前DataStream中的transformation,從而構(gòu)建成一棵樹 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都會存到 env 中,調(diào)用execute時遍歷該list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map轉(zhuǎn)換將用戶自定義的函數(shù)MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當(dāng)調(diào)用env.execute時,遍歷其中的transformation集合構(gòu)造出StreamGraph

JobGraph

(1) StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點(diǎn) chain 在一起作為一個節(jié)點(diǎn)。

  • 將并不涉及到 shuffle 的算子進(jìn)行合并。
  • 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執(zhí)行。
  • 對于不在同一個 operator chain 里的算子,會在不同的 task 中執(zhí)行。

(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(diǎn)(JobVertex)、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。

(3) JobGraph 定義作業(yè)級別的配置,而每個頂點(diǎn)和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設(shè)置。

JobVertex

JobVertex 相當(dāng)于是 JobGraph 的頂點(diǎn)。經(jīng)過優(yōu)化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

IntermediateDataSet

JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。

JobEdge

job graph中的一條數(shù)據(jù)傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。

(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。

(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉(zhuǎn)換都是在客戶端完成。

(3)最后會將JobGraph轉(zhuǎn)換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。

ExecutionJobVertex

ExecutionJobVertex一一對應(yīng)JobGraph中的JobVertex

ExecutionVertex

一個ExecutionJobVertex對應(yīng)n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務(wù)的一個子任務(wù)

Execution

Execution 是對 ExecutionVertex 的一次執(zhí)行,通過 ExecutionAttemptId 來唯一標(biāo)識。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應(yīng)的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產(chǎn)者,每個生產(chǎn)者的在相應(yīng)的 IntermediateResult 上的輸出對應(yīng)一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區(qū)

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進(jìn)而在不同的 ExecutionVertex 之間建立聯(lián)系。

ExecutionGraph的構(gòu)建

  1. 構(gòu)建JobInformation
  2. 構(gòu)建ExecutionGraph
  3. 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 構(gòu)建JobInformation 
  7.    
  8.   // 構(gòu)建ExecutionGraph 
  9.    
  10.   // 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

構(gòu)建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 構(gòu)建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 連接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分計劃(可執(zhí)行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一個并行的ExecutionVertex節(jié)點(diǎn)都會鏈接到源節(jié)點(diǎn)產(chǎn)生的所有中間結(jié)果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并發(fā)數(shù)等于partition數(shù),則一對一進(jìn)行連接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并發(fā)數(shù)大于partition數(shù),則一對多進(jìn)行連接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并發(fā)數(shù)小于partition數(shù),則多對一進(jìn)行連接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

返回ExecutionGraph

TaskManager

TaskManager啟動

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //調(diào)用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接著的重要函數(shù)是shumitTask()函數(shù),該函數(shù)會通過AKKA機(jī)制,向TaskManager發(fā)出一個submitTask的消息請求,TaskManager收到消息請求后,會執(zhí)行submitTask()方法。(省略了部分代碼)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一個Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //啟動任務(wù) 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后創(chuàng)建執(zhí)行Task的線程,然后調(diào)用startTaskThread()來啟動具體的執(zhí)行線程,Task線程內(nèi)部的run()方法承載了被執(zhí)行的核心邏輯。

Task是執(zhí)行在TaskExecutor進(jìn)程里的一個線程,下面來看看其run方法

(1) 檢測當(dāng)前狀態(tài),正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態(tài)將拋異常。

(2) 讀取DistributedCache文件。

(3) 啟動ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注冊partitionWriter。

(5) 根據(jù)nameOfInvokableClass加載對應(yīng)的類并實(shí)例化。

(6) 將狀態(tài)置為RUNNING并執(zhí)行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

總結(jié)

整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象。現(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

本篇的主題是"Flink架構(gòu)與執(zhí)行流程",做下小結(jié),F(xiàn)link on Yarn的提交執(zhí)行流程:

1 Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置。

2 向Yarn ResourceManager提交任務(wù)。

3 ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster。

4 ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境。

5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。

6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動TaskManager。

7 NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager。

8 TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)左右手
相關(guān)推薦

2022-04-05 12:59:07

源碼線程onEvent

2022-08-27 08:02:09

SQL函數(shù)語法

2016-10-21 13:03:18

androidhandlerlooper

2012-08-30 09:48:02

Struts2Java

2024-07-15 09:58:03

OpenRestyNginx日志

2016-11-25 13:26:50

Flume架構(gòu)源碼

2016-11-29 09:38:06

Flume架構(gòu)核心組件

2016-11-25 13:14:50

Flume架構(gòu)源碼

2020-07-13 09:09:23

Sentinel源碼Bucket

2025-05-26 09:05:00

2022-06-07 10:33:29

Camera組件鴻蒙

2015-01-14 13:22:36

OpenStack創(chuàng)建快照glance api

2009-12-22 13:36:39

Linux Sysfs

2017-08-22 13:45:27

2009-07-08 10:30:57

WebWork

2022-07-15 08:52:03

Linux優(yōu)化

2024-10-21 10:45:52

2017-04-19 15:32:46

ReactRouter構(gòu)建源碼

2016-11-29 16:59:46

Flume架構(gòu)源碼

2011-03-15 11:33:18

iptables
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

国产剧情久久久久久| 日韩av影视综合网| 一本一生久久a久久精品综合蜜 | 99精品视频精品精品视频| 欧美乱熟臀69xxxxxx| 女人被男人躁得好爽免费视频| 亚洲国产精彩视频| 久久婷婷久久| 久久国产精品久久精品| 四虎成人免费视频| 九九热线视频只有这里最精品| 亚洲欧美综合色| 精品不卡一区二区三区| 中日韩av在线| 99热免费精品| 免费不卡欧美自拍视频| 五月天综合视频| 99re8这里有精品热视频8在线| 欧美性xxxxx| 加勒比海盗1在线观看免费国语版| 日韩私人影院| 国产精品主播直播| 国产精品久久久久久久app| 久久伊人成人网| 91综合网人人| 亚洲嫩模很污视频| 初高中福利视频网站| 欧美123区| 欧美性生交xxxxxdddd| 一区二区免费在线观看| 欧美男男激情freegay| 高清不卡在线观看| 7777奇米亚洲综合久久| 精品久久久久久久久久久国产字幕 | av激情在线观看| 操欧美老女人| 亚洲欧美变态国产另类| 无码一区二区精品| 136国产福利精品导航网址应用| 欧美日韩国产成人在线91 | 国产主播一区二区| 国产精品国产自产拍高清av水多| 久久一区二区三区视频| 一区在线视频观看| 欧美激情第一页xxx| 青青草精品在线视频| 天天精品视频| 色先锋资源久久综合5566| www.狠狠爱| 国产精品一区二区99| 精品小视频在线| 三上悠亚ssⅰn939无码播放| 欧美成人基地| 亚洲精品永久免费精品| 草草地址线路①屁屁影院成人| 国产精品115| 欧美α欧美αv大片| 国产裸体视频网站| 午夜电影一区| 精品国产成人系列| 天天插天天射天天干| 精品国产一区二区三区成人影院 | 成人美女免费网站视频| 97国产精品久久久| 黑人巨大精品欧美黑白配亚洲| 国产免费久久av| 国产毛片毛片毛片毛片毛片| 国产精品99久久久久久宅男| 不卡日韩av| 特黄视频在线观看| 久久久久一区二区三区四区| 日本一区二区在线视频观看| 啊v在线视频| 亚洲欧洲av在线| 日产精品久久久久久久蜜臀| 啊啊啊久久久| 色婷婷精品久久二区二区蜜臀av | 亚洲四虎影院| 91精品国产综合久久精品麻豆| 亚洲一二区在线观看| 66精品视频在线观看| 亚洲国产精品资源| 制服 丝袜 综合 日韩 欧美| 欧美大片aaaa| 久久久久国产精品免费网站| av大片在线免费观看| 日韩av中文字幕一区二区三区| 国产精品永久在线| 亚洲精华国产精华精华液网站| av电影天堂一区二区在线| 奇米精品在线| 1区2区在线观看| 欧美午夜精品久久久久久久| 色婷婷激情视频| 巨人精品**| 日韩在线视频国产| 国产精品一区二区6| 麻豆一区二区在线| 国产精品乱码一区二区三区| 成av人电影在线观看| 亚洲美女免费视频| 国产精品第12页| 黄色精品视频网站| 亚洲电影免费观看高清| 国产jizz18女人高潮| 亚洲每日更新| 91精品视频播放| 同心难改在线观看| 亚洲免费av网站| 国产超碰在线播放| 福利片一区二区| 日韩在线视频一区| 亚洲成熟少妇视频在线观看| 国产一区二区三区蝌蚪| 欧美亚洲另类久久综合| 大香伊人久久| 91精品国产欧美一区二区成人| 波多野结衣办公室33分钟| 欧美成人午夜| 国产精品欧美风情| 青青操视频在线| 亚洲va韩国va欧美va| 国产乱女淫av麻豆国产| 国产在线观看91一区二区三区 | 欧美日韩中文视频| 韩国v欧美v日本v亚洲v| 日韩激情视频| 在线看片国产福利你懂的| 日韩一区二区三区视频在线观看 | 日韩激情一区| 日本在线精品视频| 五月婷婷免费视频| 亚洲一区二区三区四区在线观看| 亚洲精品永久视频| 欧美日韩国产在线观看网站| 欧洲中文字幕国产精品| 人妻无码一区二区三区久久99| 亚洲欧美自拍偷拍色图| 日本人69视频| 五月开心六月丁香综合色啪| 国产精品一久久香蕉国产线看观看| 奇米影视888狠狠狠777不卡| 五月综合激情日本mⅴ| 麻豆传媒在线看| 中文字幕免费一区二区三区| 成人久久久久久| а√资源新版在线天堂| 91精品中文字幕一区二区三区| 国产jizz18女人高潮| 美洲天堂一区二卡三卡四卡视频| 色噜噜狠狠色综合网| 欧美日韩视频免费观看| 国产亚洲欧洲高清| 久久久999久久久| 国产精品乱子久久久久| 色www免费视频| 99久久精品费精品国产| 91欧美视频网站| 天堂亚洲精品| 亚洲国产精品大全| 久久人妻免费视频| 欧美韩国一区二区| 天天干天天操天天做| 中文字幕免费一区二区| 大波视频国产精品久久| a国产在线视频| 亚洲天堂av图片| ,亚洲人成毛片在线播放| 亚洲视频 欧洲视频| 亚洲熟女乱综合一区二区| 亚洲无中文字幕| 国产日韩二区| 欧美aa视频| 精品国产欧美成人夜夜嗨| 国产哺乳奶水91在线播放| 亚洲国产乱码最新视频| 醉酒壮男gay强迫野外xx| 日本aⅴ亚洲精品中文乱码| 一本色道婷婷久久欧美| 成人台湾亚洲精品一区二区| 日本三级韩国三级久久| 秋霞a级毛片在线看| 精品区一区二区| 日韩 国产 欧美| 亚洲精选视频免费看| 国产精品日日摸夜夜爽| 麻豆亚洲精品| 可以免费看的黄色网址| 日韩高清三区| 91久久综合亚洲鲁鲁五月天| heyzo高清国产精品| 在线看国产精品| 亚洲第一精品网站| 欧美中文字幕一区二区三区亚洲| 一区二区视频免费看| 久久你懂得1024| 久草福利在线观看| 久久久久免费| 日韩久久久久久久久久久久| 中文字幕av一区二区三区人| 91欧美精品成人综合在线观看| 亚洲精华液一区二区三区| 伦理中文字幕亚洲| 欧洲免费在线视频| 日韩美女视频在线| 中文字幕在线观看第二页| 亚洲网友自拍偷拍| 日本爱爱小视频| 91免费视频大全| 成人三级做爰av| 奇米一区二区三区| 成人毛片视频网站| 午夜亚洲福利| 一区二区三区电影| 亚洲深夜福利在线观看| 丁香五月网久久综合| 精品176极品一区| 欧美性受xxx| 欧美四级在线| 久久精品一区中文字幕| 国产高清在线看| 亚洲精品国产精品国自产在线 | 亚洲综合视频一区| 免费观看久久av| 精品欧美国产| aiss精品大尺度系列| 国产精品视频999| 欧美性suv| 人九九综合九九宗合| 爱看av在线入口| 欧美激情一区二区久久久| а天堂中文在线官网| 两个人的视频www国产精品| 国产三级视频在线| 亚洲欧美精品伊人久久| 性感美女一级片| 亚洲成人黄色在线观看| 成人av免费播放| 欧美大片在线观看一区| 国产免费一区二区三区最新不卡 | 欧美疯狂party性派对| 日韩黄色影视| 欧美色婷婷久久99精品红桃| 日韩欧美精品久久| 国产99久久精品一区二区300| 免费国产一区二区| 国产欧美日韩在线一区二区| 蜜桃视频在线观看91| 日韩激情网站| 欧美性bbwbbwbbwhd| 精品视频日韩| 在线免费观看成人网| 国产精品99视频| 免费看污污视频| 国产综合激情| 少妇高潮毛片色欲ava片| 亚洲国产婷婷| 无码精品a∨在线观看中文| 97人妻精品视频一区| 亚洲特级片在线| 久久久久久久久毛片| 亚洲国产一区在线观看| 在线观看亚洲欧美| 色国产综合视频| 在线观看免费视频一区| 911精品国产一区二区在线| 国产色片在线观看| 日韩你懂的电影在线观看| 日本成人动漫在线观看| 亚洲欧美国产一本综合首页| 国产毛片av在线| 美女视频久久黄| 成入视频在线观看| 国产精品久久久久77777| 性欧美video另类hd尤物| 成人看片在线| 综合国产视频| 天天成人综合网| 亚洲一级黄色| 天天操天天摸天天爽| 国产老肥熟一区二区三区| 亚洲av成人片无码| 国产欧美日韩久久| 久久免费少妇高潮99精品| 色偷偷88欧美精品久久久| 一二三四区在线| 亚洲国产精品久久| 在线日本中文字幕| 久久久在线视频| 韩日精品一区| 精品九九九九| 91精品一区二区三区综合在线爱| 成人午夜免费在线| 久久丁香综合五月国产三级网站| 免费黄色a级片| 国产欧美日韩亚州综合| 国产在线视频二区| 欧美日韩日本视频| 日韩在线观看视频网站| 日韩中文字幕精品视频| 男女啪啪免费视频网站| 日本不卡视频在线| 岛国精品一区二区三区| 国产精品午夜在线观看| 国产精品 欧美 日韩| 欧美军同video69gay| 亚洲色图21p| 欧美精品一区在线播放| 97欧美成人| 久久精品国产精品青草色艺| 91精品国产乱码久久久久久| 日韩在线第三页| 国产91对白在线观看九色| 九一在线免费观看| 精品福利在线看| 精品久久久无码中文字幕| 在线观看视频99| 中文字幕在线看片| 国产伦精品一区二区三| 全球成人免费直播| 国产第一页视频| 成人免费的视频| 538任你躁在线精品视频网站| 欧美熟乱第一页| 欧美新色视频| 4388成人网| 国产精品网址| 很污的网站在线观看| 国产精品自产自拍| 成人高潮免费视频| 欧美另类久久久品| av免费观看一区二区| 国产精品 欧美在线| 欧美欧美黄在线二区| 欧美日韩一道本| 99久久国产免费看| 日韩欧美大片在线观看| 精品国产三级a在线观看| 在线h片观看| 91精品婷婷国产综合久久蝌蚪| 综合天堂久久久久久久| www.51色.com| 一区二区三区四区视频精品免费| 国产精品久久久久久久久久久久久久久久 | 亚洲综合欧美在线| 中文字幕av一区二区三区免费看 | 在线国产成人影院| 日韩av一级大片| 欧美a一区二区| av黄色免费在线观看| 91精品国产综合久久久蜜臀粉嫩| 在线免费观看黄色网址| 91九色蝌蚪国产| 欧美黄色免费| 中文字幕第九页| 精品日本美女福利在线观看| 四虎精品在永久在线观看| 欧美中文在线观看| 成人精品影院| 九一精品久久久| 亚洲一卡二卡三卡四卡五卡| 麻豆tv在线播放| 色男人天堂综合再现| www.com黄色片| 一区二区在线观看免费| 成人无码一区二区三区| 91精品国产九九九久久久亚洲| 在线观看欧美理论a影院| 午夜精品在线免费观看| 亚洲色图清纯唯美| 隣の若妻さん波多野结衣| 538国产精品视频一区二区| 制服丝袜日韩| 亚洲五月激情网| 性做久久久久久免费观看欧美| 四虎影院在线播放| 成人精品在线观看| 国产日韩一区| av在线播放中文字幕| 欧美v日韩v国产v| 欧美freesex| 国产成人亚洲综合无码| 久久色在线视频| 国产农村妇女毛片精品久久| 69国产精品成人在线播放| 精品视频亚洲| 国产精品入口麻豆| 欧美日韩中字一区| 日本天码aⅴ片在线电影网站| 久久久久久久免费| 精品一区二区三区影院在线午夜 | 亚洲伊人成综合成人网| 99国内精品| 久久人妻无码aⅴ毛片a片app| 亚洲精美色品网站| 亚洲在线资源| 97视频在线免费播放| 亚洲欧美视频在线观看视频| 日本中文字幕电影在线观看| 91影视免费在线观看|