精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

分布式計(jì)算引擎 Flink/Spark on k8s 的實(shí)現(xiàn)對(duì)比以及實(shí)踐

網(wǎng)絡(luò) 分布式 Spark
以 Flink 和 Spark 為代表的分布式流批計(jì)算框架的下層資源管理平臺(tái)逐漸從 Hadoop 生態(tài)的 YARN 轉(zhuǎn)向 Kubernetes 生態(tài)的 k8s 原生 scheduler 以及周邊資源調(diào)度器,比如 Volcano 和 Yunikorn 等。這篇文章簡(jiǎn)單比較一下兩種計(jì)算框架在 Native Kubernetes 的支持和實(shí)現(xiàn)上的異同,以及對(duì)于應(yīng)用到生產(chǎn)環(huán)境我們還需要做些什么。

以 Flink 和 Spark 為代表的分布式流批計(jì)算框架的下層資源管理平臺(tái)逐漸從 Hadoop 生態(tài)的 YARN 轉(zhuǎn)向 Kubernetes 生態(tài)的 k8s 原生 scheduler 以及周邊資源調(diào)度器,比如 Volcano 和 Yunikorn 等。這篇文章簡(jiǎn)單比較一下兩種計(jì)算框架在 Native Kubernetes 的支持和實(shí)現(xiàn)上的異同,以及對(duì)于應(yīng)用到生產(chǎn)環(huán)境我們還需要做些什么。

1. 什么是 Native

這里的 native 其實(shí)就是計(jì)算框架直接向 Kubernetes 申請(qǐng)資源。比如很多跑在 YARN 上面的計(jì)算框架,需要自己實(shí)現(xiàn)一個(gè) AppMaster 來(lái)想 YARN 的 ResourceManager 來(lái)申請(qǐng)資源。Native K8s 相當(dāng)于計(jì)算框架自己實(shí)現(xiàn)一個(gè)類(lèi)似 AppMaster 的角色向 k8s 去申請(qǐng)資源,當(dāng)然和 AppMaster 還是有差異的 (AppMaster 需要按 YARN 的標(biāo)準(zhǔn)進(jìn)行實(shí)現(xiàn))。

2. Spark on k8s 使用

提交作業(yè)

向 k8s 集群提交作業(yè)和往 YARN 上面提交很類(lèi)似,命令如下,主要區(qū)別包括:

--master 參數(shù)指定 k8s 集群的 ApiServer
需要通過(guò)參數(shù) spark.kubernetes.container.image 指定在 k8s 運(yùn)行作業(yè)的 image,
指定 main jar,需要 driver 進(jìn)程可訪(fǎng)問(wèn):如果 driver 運(yùn)行在 pod 中,jar 包需要包含在鏡像中;如果 driver 運(yùn)行在本地,那么 jar 需要在本地。
通過(guò) --name 或者 spark.app.name 指定 app 的名字,作業(yè)運(yùn)行起來(lái)之后的 driver 命名會(huì)以 app 名字為前綴。當(dāng)然也可以通過(guò)參數(shù) spark.kubernetes.driver.pod.name 直接指定 dirver 的名字

  1. $ ./bin/spark-submit \    --master k8s://https://: \    --deploy-mode cluster \    --name spark-pi \    --class org.apache.spark.examples.SparkPi \    --conf spark.executor.instances=5 \    --conf spark.kubernetes.container.image= \    local:///path/to/examples.jar 

提交完該命令之后,spark-submit 會(huì)創(chuàng)建一個(gè) driver pod 和一個(gè)對(duì)應(yīng)的 servcie,然后由 driver 創(chuàng)建 executor pod 并運(yùn)行作業(yè)。

deploy-mode

和在 YARN 上面使用 Spark 一樣,在 k8s 上面也支持 cluster 和 client 兩種模式:

cluster mode: driver 在 k8s 集群上面以 pod 形式運(yùn)行。
client mode: driver 運(yùn)行在提交作業(yè)的地方,然后 driver 在 k8s 集群上面創(chuàng)建 executor。為了保證 executor 能夠注冊(cè)到 driver 上面,還需要提交作業(yè)的機(jī)器可以和 k8s 集群內(nèi)部的 executor 網(wǎng)絡(luò)連通(executor 可以訪(fǎng)問(wèn)到 driver,需要注冊(cè))。
資源清理

這里的資源指的主要是作業(yè)的 driver 和 executor pod。spark 通過(guò) k8s 的 onwer reference 機(jī)制將作業(yè)的各種資源連接起來(lái),這樣當(dāng) driver pod 被刪除的時(shí)候,關(guān)聯(lián)的 executor pod 也會(huì)被連帶刪除。但是如果沒(méi)有 driver pod,也就是以 client 模式運(yùn)行作業(yè)的話(huà),如下兩種情況涉及到資源清理:

作業(yè)運(yùn)行完成,driver 進(jìn)程退出,executor pod 運(yùn)行完自動(dòng)退出
driver 進(jìn)程被殺掉,executor pod 連不上 driver 也會(huì)自行退出
可以參考:https://kubernetes.io/docs/concepts/architecture/garbage-collection/

依賴(lài)管理

前面說(shuō)到 main jar 包需要在 driver 進(jìn)程可以訪(fǎng)問(wèn)到的地方,如果是 cluster 模式就需要將 main jar 打包到 spark 鏡像中。但是在日常開(kāi)發(fā)和調(diào)試中,每次重新 build 一個(gè)鏡像的 effort 實(shí)在是太大了。spark 支持提交的時(shí)候使用本地的文件,然后使用 s3 等作為中轉(zhuǎn):先上傳上去,然后作業(yè)運(yùn)行的時(shí)候再?gòu)?s3 上面下載下來(lái)。下面是一個(gè)實(shí)例。

  1. ...--packages org.apache.hadoop:hadoop-aws:3.2.0--conf spark.kubernetes.file.upload.path=s3a:///path--conf spark.hadoop.fs.s3a.access.key=...--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem--conf spark.hadoop.fs.s3a.fast.upload=true--conf spark.hadoop.fs.s3a.secret.key=....--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmpfile:///full/path/to/app.jar 

Pod Template

k8s 的 controller (比如 Deployment,Job)創(chuàng)建 Pod 的時(shí)候根據(jù) spec 中的 pod template 來(lái)創(chuàng)建。下面是一個(gè) Job 的示例。

  1. apiVersion: batch/v1kind: Jobmetadata:  name: hellospec:  template:    # 下面的是一個(gè) pod template    spec:      containers:      - name: hello        image: busybox        command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']      restartPolicy: OnFailure    # The pod template ends here 

由于我們通過(guò) spark-submit 提交 spark 作業(yè)的時(shí)候,最終的 k8s 資源(driver/executor pod)是由 spark 內(nèi)部邏輯構(gòu)建出來(lái)的。但是有的時(shí)候我們想要在 driver/executor pod 上做一些額外的工作,比如增加 sidecar 容器做一些日志收集的工作。這種場(chǎng)景下 PodTemplate 就是一個(gè)比較好的選擇,同時(shí) PodTemplate 也將 spark 和底層基礎(chǔ)設(shè)施(k8s)解耦開(kāi)。比如 k8s 發(fā)布新版本支持一些新的特性,那么我們只要修改我們的 PodTemplate 即可,而不涉及到 spark 的內(nèi)部改動(dòng)。

RBAC

RBAC 全稱(chēng)是 Role-based access control,是 k8s 中的一套權(quán)限控制機(jī)制。通俗來(lái)說(shuō):

RBAC 中包含了一系列的權(quán)限設(shè)置,比如 create/delete/watch/list pod 等,這些權(quán)限集合的實(shí)體叫 Role 或者 ClusterRole
同時(shí) RBAC 還包含了角色綁定關(guān)系(Role Binding),用于將 Role/ClusterRole 賦予一個(gè)或者一組用戶(hù),比如 Service Account 或者 UserAccount
為了將 Spark 作業(yè)在 k8s 集群中運(yùn)行起來(lái),我們還需要一套 RBAC 資源:

指定 namespace 下的 serviceaccount
定義了權(quán)限規(guī)則的 Role 或者 ClusterRole,我們可以使用常見(jiàn)的 ClusterRole "edit"(對(duì)幾乎所有資源具有操作權(quán)限,比如 create/delete/watch 等)
綁定關(guān)系
下面命令在 spark namespace 下為 serviceaccount spark 賦予了操作同 namespace 下其他資源的權(quán)限,那么只要 spark 的 driver pod 掛載了該 serviceaccount,它就可以創(chuàng)建 executor pod 了。

  1. $ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark 

下面做一個(gè)簡(jiǎn)單的演示:

通過(guò)如下命令提交作業(yè) SparkPiSleep 到 k8s 集群中。

  1. $ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory=2g --conf spark.driver.memory=2g --conf spark.driver.core=1 --conf spark.app.name=test12 --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.executor.core=1 --conf spark.kubernetes.container.image= --conf spark.eventLog.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=1 --conf spark.dynamicAllocation.enabled=false --conf sparkspark.kubernetes.namespace=spark --conf sparkspark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.core=1  local:///path/to/main/jar 

查看 k8s 集群中的資源

  1. $ kubectl get po -n sparkNAME                               READY   STATUS              RESTARTS   AGEspark-pi-5b88a27b576050dd-exec-1   0/1     ContainerCreating   0          2stest12-9fd3c27b576039ae-driver     1/1     Running             0          8s 

其中第一個(gè)就是 executor pod,第二個(gè)是 driver 的 pod。除此之外還創(chuàng)建了一個(gè) service,可以通過(guò)該 service 訪(fǎng)問(wèn)到 driver pod,比如 Spark UI 都可以這樣訪(fǎng)問(wèn)到。

  1. $ kubectl get svc -n sparkNAME                                 TYPE           CLUSTER-IP     EXTERNAL-IP     PORT(S)                                       AGEtest12-9fd3c27b576039ae-driver-svc   ClusterIP      None                     7078/TCP,7079/TCP,4040/TCP                    110s 

下面再看一下 service owner reference,executor pod 也是類(lèi)似的。

  1. $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata:  creationTimestamp: "2021-08-18T03:48:50Z"  name: test12-9fd3c27b576039ae-driver-svc  namespace: spark  # service 的 ownerReference 指向了 driver pod,只要 driver pod 被刪除,該 service 也會(huì)被刪除  ownerReferences:  - apiVersion: v1    controller: true    kind: Pod    name: test12-9fd3c27b576039ae-driver    uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95  resourceVersion: "9975441"  uid: 06c1349f-be52-4133-80d9-07af34419b1f 

3. Flink on k8s 使用

Flink on k8s native 的實(shí)現(xiàn)支持兩種模式:

application mode:在遠(yuǎn)程 k8s 集群中啟動(dòng)一個(gè) flink 集群(jm 和 tm),driver 運(yùn)行在 jm 中,也就是只支持 detached 模式,不支持 attached 模式。
session mode:在遠(yuǎn)程 k8s 集群?jiǎn)?dòng)一個(gè)常駐的 flink 集群(只有 jm),然后向上面提交作業(yè),根據(jù)實(shí)際情況決定啟動(dòng)多少個(gè) tm。
在生產(chǎn)上面使用一般不太建議使用 session mode,所以下面主要討論的是 application mode。

Flink 的 native k8s 模式是不需要指定 tm 個(gè)數(shù)的,jm 會(huì)根據(jù)用戶(hù)的代碼計(jì)算需要多少 tm。

提交作業(yè)

下面是一個(gè)簡(jiǎn)單的提交命令,需要包含:

參數(shù) run-application 指定是 application 模式
參數(shù) --target 指定運(yùn)行在 k8s 上
參數(shù) kubernetes.container.image 指定作業(yè)運(yùn)行使用的 flink 鏡像
最后需要指定 main jar,路徑是鏡像中的路徑

  1. $ ./bin/flink run-application \    --target kubernetes-application \    -Dkubernetes.cluster-id=my-first-application-cluster \    -Dkubernetes.container.image=custom-image-name \    local:///opt/flink/usrlib/my-flink-job.jar 

資源清理

Flink 的 native 模式會(huì)先創(chuàng)建一個(gè) JobManager 的 deployment,并將其托管給 k8s。同一個(gè)作業(yè)所有的相關(guān)資源的 owner reference 都指向該 Deployment,也就是說(shuō)刪除了該 deployment,所有相關(guān)的資源都會(huì)被清理掉。下面根據(jù)作業(yè)的運(yùn)行情況討論一下資源如何清理。

作業(yè)運(yùn)行到終態(tài)(SUCCESS,F(xiàn)AILED,CANCELED 等)之后,F(xiàn)link 會(huì)清理掉所有作業(yè)
JobManager 進(jìn)程啟動(dòng)失敗(pod 中的 jm 容器啟動(dòng)失敗),由于控制器是 Deployment,所以會(huì)一直重復(fù)拉起
運(yùn)行過(guò)程中,如果 JobManager 的 pod 被刪除,Deployment 會(huì)重新拉起
運(yùn)行過(guò)程中,如果 JobManager 的 Deployment 被刪除,那么關(guān)聯(lián)的所有 k8s 資源都會(huì)被刪除

Pod Template

Flink native 模式也支持 Pod Template,類(lèi)似 Spark。

RBAC

類(lèi)似 Spark。

依賴(lài)文件管理

Flink 暫時(shí)只支持 main jar 以及依賴(lài)文件在鏡像中。也就是說(shuō)用戶(hù)要提交作業(yè)需要自己定制化鏡像,體驗(yàn)不是很好。一種 workaroud 的方式是結(jié)合 PodTemplate:

如果依賴(lài)是本地文件,需要 upload 到一個(gè) remote 存儲(chǔ)做中轉(zhuǎn),比如各大云廠商的對(duì)象存儲(chǔ)。
如果依賴(lài)是遠(yuǎn)端文件,不需要 upload。
運(yùn)行時(shí)在 template 中使用 initContainer 將用戶(hù)的 jar 以及依賴(lài)文件下載到 Flink 容器中,并加到 classpath 下運(yùn)行。
Flink 的作業(yè) demo 就不在演示了。

4. Spark on Kubernetes 實(shí)現(xiàn)

Spark on Kubernetes 的實(shí)現(xiàn)比較簡(jiǎn)單:

Spark Client 創(chuàng)建一個(gè) k8s pod 運(yùn)行 driver
driver 創(chuàng)建 executor pod,然后開(kāi)始運(yùn)行作業(yè)
作業(yè)運(yùn)行結(jié)束之后 driver pod 進(jìn)入到 Completed 狀態(tài),executor pod 會(huì)被清理掉。作業(yè)結(jié)束之后通過(guò) driver pod 我們還是可以查看 driver pod 的。

代碼實(shí)現(xiàn)

Spark 的 native k8s 實(shí)現(xiàn)代碼在 resource-managers/kubernetes module 中。我們可以從 SparkSubmit 的代碼開(kāi)始分析。我們主要看一下 deploy-mode 為 cluster 模式的代碼邏輯。

  1. // Set the cluster manager    val clusterManager: Int = args.master match {      case "yarn" => YARN      case m if m.startsWith("spark") => STANDALONE      case m if m.startsWith("mesos") => MESOS      case m if m.startsWith("k8s") => KUBERNETES      case m if m.startsWith("local") => LOCAL      case _ =>        error("Master must either be yarn or start with spark, mesos, k8s, or local")        -1    } 

首先根據(jù) spark.master 配置中 scheme 來(lái)判斷是不是 on k8s。我們上面也看到這個(gè)配置的形式為 --master k8s://https://: 。如果是 on k8s 的 cluster 模式,則去加載 Class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,并運(yùn)行其中的 start 方法。childArgs 方法的核心邏輯簡(jiǎn)單來(lái)說(shuō)就是根據(jù) spark-submit 提交的參數(shù)構(gòu)造出 driver pod 提交到 k8s 運(yùn)行。

  1. private[spark] class KubernetesClientApplication extends SparkApplication {  override def start(args: Array[String], conf: SparkConf): Unit = {    val parsedArguments = ClientArguments.fromCommandLineArgs(args)    run(parsedArguments, conf)  }  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {    // For constructing the app ID, we can't use the Spark application name, as the app ID is going    // to be added as a label to group resources belonging to the same application. Label values are    // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate    // a unique app ID (captured by spark.app.id) in the format below.    val kubernetesAppId = KubernetesConf.getKubernetesAppId()    val kubernetesConf = KubernetesConf.createDriverConf(      sparkConf,      kubernetesAppId,      clientArguments.mainAppResource,      clientArguments.mainClass,      clientArguments.driverArgs,      clientArguments.proxyUser)    // The master URL has been checked for validity already in SparkSubmit.    // We just need to get rid of the "k8s://" prefix here.    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))    val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)    Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(      master,      Some(kubernetesConf.namespace),      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,      SparkKubernetesClientFactory.ClientType.Submission,      sparkConf,      None,      None)) { kubernetesClient =>        val client = new Client(          kubernetesConf,          new KubernetesDriverBuilder(),          kubernetesClient,          watcher)        client.run()    }  }} 

上面的代碼的核心就是最后創(chuàng)建 Client 并運(yùn)行。這個(gè) Client 是 Spark 封裝出來(lái)的 Client,內(nèi)置了 k8s client。

  1. private[spark] class Client(    conf: KubernetesDriverConf,    builder: KubernetesDriverBuilder,    kubernetesClient: KubernetesClient,    watcher: LoggingPodStatusWatcher) extends Logging {  def run(): Unit = {    // 構(gòu)造 Driver 的 Pod    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)    val configMapName = KubernetesClientUtils.configMapNameDriver    val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,      conf.sparkConf, resolvedDriverSpec.systemProperties)    val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap)    // 修改 Pod 的 container spec:增加 SPARK_CONF_DIR    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)      .addNewEnv()        .withName(ENV_SPARK_CONF_DIR)        .withValue(SPARK_CONF_DIR_INTERNAL)        .endEnv()      .addNewVolumeMount()        .withName(SPARK_CONF_VOLUME_DRIVER)        .withMountPath(SPARK_CONF_DIR_INTERNAL)        .endVolumeMount()      .build()    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)      .editSpec()        .addToContainers(resolvedDriverContainer)        .addNewVolume()          .withName(SPARK_CONF_VOLUME_DRIVER)          .withNewConfigMap()            .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava)            .withName(configMapName)            .endConfigMap()          .endVolume()        .endSpec()      .build()    val driverPodName = resolvedDriverPod.getMetadata.getName    var watch: Watch = null    var createdDriverPod: Pod = null    try {      // 通過(guò) k8s client 創(chuàng)建 Driver Pod      createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)    } catch {      case NonFatal(e) =>        logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")        throw e    }    try {      // 創(chuàng)建其他資源,修改 owner reference 等      val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)      addOwnerReference(createdDriverPod, otherKubernetesResources)      kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()    } catch {      case NonFatal(e) =>        kubernetesClient.pods().delete(createdDriverPod)        throw e    }    val sId = Seq(conf.namespace, driverPodName).mkString(":")    // watch pod    breakable {      while (true) {        val podWithName = kubernetesClient          .pods()          .withName(driverPodName)        // Reset resource to old before we start the watch, this is important for race conditions        watcher.reset()        watch = podWithName.watch(watcher)        // Send the latest pod state we know to the watcher to make sure we didn't miss anything        watcher.eventReceived(Action.MODIFIED, podWithName.get())        // Break the while loop if the pod is completed or we don't want to wait        // 根據(jù)參數(shù) "spark.kubernetes.submission.waitAppCompletion" 判斷是否需要退出        if(watcher.watchOrStop(sId)) {          watch.close()          break        }      }    }  } 

下面再簡(jiǎn)單介紹一下 Driver 如何管理 Executor 的流程。當(dāng) Spark Driver 運(yùn)行 main 函數(shù)時(shí),會(huì)創(chuàng)建一個(gè) SparkSession,SparkSession 中包含了 SparkContext,SparkContext 需要?jiǎng)?chuàng)建一個(gè) SchedulerBackend 會(huì)管理 Executor 的生命周期。對(duì)應(yīng)到 k8s 上的 SchedulerBackend 其實(shí)就是 KubernetesClusterSchedulerBackend,下面主要看一下這個(gè) backend 是如何創(chuàng)建出來(lái)的。大膽猜想一下,大概率也是根據(jù) spark.master 的 url 的 scheme "k8s" 創(chuàng)建的。

下面是 SparkContext 創(chuàng)建 SchedulerBackend 的核心代碼邏輯。

  1. private def createTaskScheduler(...) = {  case masterUrl =>    // 創(chuàng)建出 KubernetesClusterManager    val cm = getClusterManager(masterUrl) match {      case Some(clusterMgr) => clusterMgr      case None => throw new SparkException("Could not parse Master URL: '" + master + "'")    }    try {      val scheduler = cm.createTaskScheduler(sc, masterUrl)      // 上面創(chuàng)建出來(lái)的 KubernetesClusterManager 這里會(huì)創(chuàng)建出 KubernetesClusterSchedulerBackend      val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)      cm.initialize(scheduler, backend)      (backend, scheduler)    } catch {      case se: SparkException => throw se      case NonFatal(e) =>        throw new SparkException("External scheduler cannot be instantiated", e)    }}// 方法 getClsuterManager 會(huì)通過(guò) ServiceLoader 加載所有實(shí)現(xiàn) ExternalClusterManager 的 ClusterManager (KubernetesClusterManager 和 YarnClusterManager),然后通過(guò) master url 進(jìn)行 filter,選出 KubernetesClusterManagerprivate def getClusterManager(url: String): Option[ExternalClusterManager] = {  val loader = Utils.getContextOrSparkClassLoader  val serviceLoaders =    ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))  if (serviceLoaders.size > 1) {    throw new SparkException(      s"Multiple external cluster managers registered for the url $url: $serviceLoaders")  }  serviceLoaders.headOption} 

后面就是 KubernetesClusterSchedulerBackend 管理 Executor 的邏輯了。

可以簡(jiǎn)單看一下創(chuàng)建 Executor 的代碼邏輯。

  1. private def requestNewExecutors(      expected: Int,      running: Int,      applicationId: String,      resourceProfileId: Int,      pvcsInUse: Seq[String]): Unit = {    val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)    logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +      s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")    // Check reusable PVCs for this executor allocation batch    val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)    for ( _ <- 0 until numExecutorsToAllocate) {      val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()      val executorConf = KubernetesConf.createExecutorConf(        conf,        newExecutorId.toString,        applicationId,        driverPod,        resourceProfileId)      // 構(gòu)造 Executor 的 Pod Spec      val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,        kubernetesClient, rpIdToResourceProfile(resourceProfileId))      val executorPod = resolvedExecutorSpec.pod      val podWithAttachedContainer = new PodBuilder(executorPod.pod)        .editOrNewSpec()        .addToContainers(executorPod.container)        .endSpec()        .build()      val resources = replacePVCsIfNeeded(        podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)      // 創(chuàng)建 Executor Pod      val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)      try {        // 增加 owner reference        addOwnerReference(createdExecutorPod, resources)        resources          .filter(_.getKind == "PersistentVolumeClaim")          .foreach { resource =>            if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {              addOwnerReference(driverPod.get, Seq(resource))            }            val pvc = resource.asInstanceOf[PersistentVolumeClaim]            logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +              s"StorageClass ${pvc.getSpec.getStorageClassName}")            kubernetesClient.persistentVolumeClaims().create(pvc)          }        newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())        logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")      } catch {        case NonFatal(e) =>          kubernetesClient.pods().delete(createdExecutorPod)          throw e      }    }  } 

5. Flink on Kubernetes 實(shí)現(xiàn)

Flink 的 Native K8s 實(shí)現(xiàn):

Flink Client 創(chuàng)建 JobManager 的 Deployment,然后將 Deployment 托管給 k8s
k8s 的 Deployment Controller 創(chuàng)建 JobManager 的 Pod
JobManager 內(nèi)的 ResourceManager 負(fù)責(zé)先 Kubernetes Scheduler 請(qǐng)求資源并創(chuàng)建 TaskManager 等相關(guān)資源并創(chuàng)建相關(guān)的 TaskManager Pod 并開(kāi)始運(yùn)行作業(yè)
當(dāng)作業(yè)運(yùn)行到終態(tài)之后所有相關(guān)的 k8s 資源都被清理掉
代碼(基于分支 release-1.13)實(shí)現(xiàn)主要如下:

CliFrontend 作為 Flink Client 的入口根據(jù)命令行參數(shù) run-application 判斷通過(guò)方法 runApplication 去創(chuàng)建 ApplicationCluster
KubernetesClusterDescriptor 通過(guò)方法 deployApplicationCluster 創(chuàng)建 JobManager 相關(guān)的 Deployment 和一些必要的資源
JobManager 的實(shí)現(xiàn)類(lèi) JobMaster 通過(guò) ResourceManager 調(diào)用類(lèi) KubernetesResourceManagerDriver 中的方法 requestResource 創(chuàng)建 TaskManager 等資源
其中 KubernetesClusterDescriptor 實(shí)現(xiàn)自 interface ClusterDescriptor ,用來(lái)描述對(duì) Flink 集群的操作。根據(jù)底層的資源使用不同, ClusterDescriptor 有不同的實(shí)現(xiàn),包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。

  1. public interface ClusterDescriptor<T> extends AutoCloseable {    /* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */    String getClusterDescription();    /* 查詢(xún)已存在的 Flink 集群. */    ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;    /** 創(chuàng)建 Flink Session 集群 */    ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)            throws ClusterDeploymentException;    /** 創(chuàng)建 Flink Application 集群 **/    ClusterClientProvider<T> deployApplicationCluster(            final ClusterSpecification clusterSpecification,            final ApplicationConfiguration applicationConfiguration)            throws ClusterDeploymentException;    /** 創(chuàng)建 Per-job 集群 **/    ClusterClientProvider<T> deployJobCluster(            final ClusterSpecification clusterSpecification,            final JobGraph jobGraph,            final boolean detached)            throws ClusterDeploymentException;    /** 刪除集群 **/    void killCluster(T clusterId) throws FlinkException;    @Override    void close();} 

下面簡(jiǎn)單看一下 KubernetesClusterDescriptor 的核心邏輯:創(chuàng)建 Application 集群。

  1. public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {    private final Configuration flinkConfig;      // 內(nèi)置 k8s client    private final FlinkKubeClient client;    private final String clusterId;      @Override    public ClusterClientProvider<String> deployApplicationCluster(            final ClusterSpecification clusterSpecification,            final ApplicationConfiguration applicationConfiguration)            throws ClusterDeploymentException {        // 查詢(xún) flink 集群在 k8s 中是否存在        if (client.getRestService(clusterId).isPresent()) {            throw new ClusterDeploymentException(                    "The Flink cluster " + clusterId + " already exists.");        }        final KubernetesDeploymentTarget deploymentTarget =                KubernetesDeploymentTarget.fromConfig(flinkConfig);        if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) {            throw new ClusterDeploymentException(                    "Couldn't deploy Kubernetes Application Cluster."                            + " Expected deployment.target="                            + KubernetesDeploymentTarget.APPLICATION.getName()                            + " but actual one was \""                            + deploymentTarget                            + "\"");        }                // 設(shè)置 application 參數(shù):$internal.application.program-args 和 $internal.application.main        applicationConfiguration.applyToConfiguration(flinkConfig);              // 創(chuàng)建集群        final ClusterClientProvider<String> clusterClientProvider =                deployClusterInternal(                        KubernetesApplicationClusterEntrypoint.class.getName(),                        clusterSpecification,                        false);        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {            LOG.info(                    "Create flink application cluster {} successfully, JobManager Web Interface: {}",                    clusterId,                    clusterClient.getWebInterfaceURL());        }        return clusterClientProvider;    }      // 創(chuàng)建集群邏輯    private ClusterClientProvider<String> deployClusterInternal(            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)            throws ClusterDeploymentException {        final ClusterEntrypoint.ExecutionMode executionMode =                detached                        ? ClusterEntrypoint.ExecutionMode.DETACHED                        : ClusterEntrypoint.ExecutionMode.NORMAL;        flinkConfig.setString(                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.        // 將端口指定為固定值,方便 k8s 的資源構(gòu)建。因?yàn)?nbsp;pod 的隔離性,所以沒(méi)有端口沖突        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);        KubernetesUtils.checkAndUpdatePortConfigOption(                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);        // HA 配置        if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {            flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);            KubernetesUtils.checkAndUpdatePortConfigOption(                    flinkConfig,                    HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,                    flinkConfig.get(JobManagerOptions.PORT));        }        try {            final KubernetesJobManagerParameters kubernetesJobManagerParameters =                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);            // 補(bǔ)充 PodTemplate 邏輯            final FlinkPod podTemplate =                    kubernetesJobManagerParameters                            .getPodTemplateFilePath()                            .map(                                    file ->                                            KubernetesUtils.loadPodFromTemplateFile(                                                    client, file, Constants.MAIN_CONTAINER_NAME))                            .orElse(new FlinkPod.Builder().build());            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(                            podTemplate, kubernetesJobManagerParameters);                      // 核心邏輯:在 k8s 中創(chuàng)建包括 JobManager Deployment 在內(nèi) k8s 資源,比如 Service 和 ConfigMap            client.createJobManagerComponent(kubernetesJobManagerSpec);            return createClusterClientProvider(clusterId);        } catch (Exception e) {            //...        }    }} 

上面代碼中需要說(shuō)的在構(gòu)建 JobManager 的時(shí)候補(bǔ)充 PodTemplate。簡(jiǎn)單來(lái)說(shuō) PodTemplate 就是一個(gè) Pod 文件。

第三步的 TaskManager 創(chuàng)建就不再贅述了。

7. 生態(tài)

這里生態(tài)這個(gè)詞可能也不太合適,這里主要指的的如果要在生產(chǎn)上面使用該功能還有哪些可以做的。下面主要討論在生產(chǎn)環(huán)境上面用來(lái)做 trouble-shooting 的兩個(gè)功能:日志和監(jiān)控。

日志

日志收集對(duì)于線(xiàn)上系統(tǒng)是非常重要的一環(huán),毫不夸張地說(shuō),80% 的故障都可以通過(guò)日志查到原因。但是前面也說(shuō)過(guò),F(xiàn)link 作業(yè)在作業(yè)運(yùn)行到終態(tài)之后會(huì)清理掉所有資源,Spark 作業(yè)運(yùn)行完只會(huì)保留 Driver Pod 的日志,那么我們?nèi)绾问占酵暾淖鳂I(yè)日志呢?

有幾種方案可供選擇:

DaemonSet。每個(gè) k8s 的 node 上面以 DaemonSet 形式部署日志收集 agent,對(duì) node 上面運(yùn)行的所有容器日志進(jìn)行統(tǒng)一收集,并存儲(chǔ)到類(lèi)似 ElasticSearch 的統(tǒng)一日志搜索平臺(tái)。
SideCar。使用 Flink/Spark 提供的 PodTemplate 功能在主容器側(cè)配置一個(gè) SideCar 容器用來(lái)進(jìn)行日志收集,最后存儲(chǔ)到統(tǒng)一的日志服務(wù)里面。
這兩種方式都有一個(gè)前提是有其他的日志服務(wù)提供存儲(chǔ)、甚至搜索的功能,比如 ELK,或者各大云廠商的日志服務(wù)。

除此之外還有一種簡(jiǎn)易的方式可以考慮:利用 log4j 的擴(kuò)展機(jī)制,自定義 log appender,在 appender 中定制化 append 邏輯,將日志直接收集并存儲(chǔ)到 remote storage,比如 hdfs,對(duì)象存儲(chǔ)等。這種方案需要將自定義的 log appender 的 jar 包放到運(yùn)行作業(yè)的 ClassPath 下,而且這種方式有可能會(huì)影響作業(yè)主流程的運(yùn)行效率,對(duì)性能比較敏感的作業(yè)并不太建議使用這種方式。

監(jiān)控

目前 Prometheus 已經(jīng)成為 k8s 生態(tài)的監(jiān)控事實(shí)標(biāo)準(zhǔn),下面我們的討論也是討論如何將 Flink/Spark 的作業(yè)的指標(biāo)對(duì)接到 Prometheus。下面先看一下 Prometheus 的架構(gòu)。

其中的核心在于 Prometheus Servier 收集指標(biāo)的方式是 pull 還是 push:

對(duì)于常駐的進(jìn)程,比如在線(xiàn)服務(wù),一般由 Prometheus Server 主動(dòng)去進(jìn)程暴露出來(lái)的 api pull 指標(biāo)。
對(duì)于會(huì)結(jié)束的進(jìn)程指標(biāo)收集,比如 batch 作業(yè),一般使用進(jìn)程主動(dòng) push 的方式。詳細(xì)流程是進(jìn)程將指標(biāo) push 到常駐的 PushGateway,然后 Prometheus Server 去 PushGateway pull 指標(biāo)。
上面兩種使用方式也是 Prometheus 官方建議的使用方式,但是看完描述不難發(fā)現(xiàn)其實(shí)第一種場(chǎng)景也可以使用第二種處理方式。只不過(guò)第二種方式由于 PushGateway 是常駐的,對(duì)其穩(wěn)定性要求會(huì)比較高。

Flink

Flink 同時(shí)提供了 PrometheusReporter (將指標(biāo)通過(guò) api 暴露,由 Prometheus Server 來(lái)主動(dòng) pull 數(shù)據(jù)) 和 PrometheusPushGatewayReporter (將指標(biāo)主動(dòng) push 給 PushGateway,Prometheus Server 不需要感知 Flink 作業(yè))。

這兩種方式中 PrometheusPushGatewayReporter 會(huì)更簡(jiǎn)單一點(diǎn),但是 PushGateway 可能會(huì)成為瓶頸。如果使用 PrometheusReporter 的方式,需要引入服務(wù)發(fā)現(xiàn)機(jī)制幫助 Prometheus Server 自動(dòng)發(fā)現(xiàn)運(yùn)行的 Flink 作業(yè)的 Endpoint。Prometheus 目前支持的主流的服務(wù)發(fā)現(xiàn)機(jī)制主要有:

基于 Consul。Consul 是基于 etcd 的一套完整的服務(wù)注冊(cè)與發(fā)現(xiàn)解決方案,要使用這種方式,我們需要 Flink 對(duì)接 Consul。比如我們?cè)谔峤蛔鳂I(yè)的時(shí)候,將作業(yè)對(duì)應(yīng)的 Service 進(jìn)行捕獲并寫(xiě)入 Consul。
基于文件。文件也就是 Prometheus 的配置文件,里面配置需要拉取 target 的 endpoint。文件這種方式本來(lái)是比較雞肋的,因?yàn)樗枰?Prometheus Server 和 Flink 作業(yè)同時(shí)都可以訪(fǎng)問(wèn),但是需要文件是 local 的。但是在 k8s 環(huán)境中,基于文件反而變的比較簡(jiǎn)單,我們可以將 ConfigMap 掛載到 Prometheus Server 的 Pod 上面,F(xiàn)link 作業(yè)修改 ConfigMap 就可以了。
基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制簡(jiǎn)單來(lái)說(shuō)就是 label select。可以參考

  1. https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config 

關(guān)于 Prometheus 支持的更多服務(wù)發(fā)現(xiàn)機(jī)制,可以參考:https://prometheus.io/docs/prometheus/latest/configuration/configuration/ ,簡(jiǎn)單羅列包括:

azure
consul
digitalocean
docker
dockerswarm
dns
ec2
eureka
file
gce
hetzner
http
kubernetes
...
Spark

以批計(jì)算為代表的 Spark 使用 PushGateway 的方式來(lái)對(duì)接 Prometheus 是比較好的方式,但是 Spark 官方并沒(méi)有提供對(duì) PushGateway 的支持,只支持了 Prometheus 的 Exporter,需要 Prometheus Server 主動(dòng)去 pull 數(shù)據(jù)。

這里推薦使用基于 Kubernetes 的服務(wù)發(fā)現(xiàn)機(jī)制。

需要注意的是 Prometheus Server 拉取指標(biāo)是按固定時(shí)間間隔進(jìn)行拉取的,對(duì)于持續(xù)時(shí)間比較短的批作業(yè),有可能存在還沒(méi)有拉取指標(biāo),作業(yè)就結(jié)束的情況。

8. 缺陷

雖然 Spark 和 Flink 都實(shí)現(xiàn)了 native k8s 的模式,具體實(shí)現(xiàn)略有差異。但是在實(shí)際使用上發(fā)現(xiàn)兩者的實(shí)現(xiàn)在某些場(chǎng)景下還是略有缺陷的。

Spark

pod 不具有容錯(cuò)性 spark-submit 會(huì)先構(gòu)建一個(gè) k8s 的 driver pod,然后由 driver pod 啟動(dòng) executor 的 pod。但是在 k8s 環(huán)境中并不太建議直接構(gòu)建 pod 資源,因?yàn)?pod 不具有容錯(cuò)性,pod 所在節(jié)點(diǎn)掛了之后 pod 就掛了。熟悉 k8s scheduler 的同學(xué)應(yīng)該知道 pod 有一個(gè)字段叫 podName,scheduler 的核心是為 pod 填充這個(gè)字段,也就是為 pod 選擇一個(gè)合適的 node。一旦調(diào)度完成之后 pod 的該字段就固定下來(lái)了。這也是 pod 不具有 node 容錯(cuò)的原因。

Flink

Deployment 語(yǔ)義。 Deployment 可以認(rèn)為是 ReplicaSet 的增強(qiáng)版,而 ReplicaSet 的官方定義如下。

A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

簡(jiǎn)單來(lái)說(shuō),ReplicaSet 的目的是保證幾個(gè)相同的 Pod 副本可以不間斷的運(yùn)行,說(shuō)是為了線(xiàn)上服務(wù)量身定制的也不為過(guò)(線(xiàn)上服務(wù)最好是無(wú)狀態(tài)且支持原地重啟,比如 WebService)。但是盡管 Flink 以流式作業(yè)為主,但是我們并不能簡(jiǎn)單地將流式作業(yè)等同于無(wú)狀態(tài)的 WebService。比如 Flink 作業(yè)的 Main Jar 如果寫(xiě)的有問(wèn)題,會(huì)導(dǎo)致 JobManager 的 Pod 一直啟動(dòng)失敗,但是由于是 Deployment 語(yǔ)義的問(wèn)題會(huì)不斷被重啟。這個(gè)可能是 ByDesign 的,但是感覺(jué)并不太好。

Batch 作業(yè)處理。 由于 Flink 作業(yè)運(yùn)行完所有資源包括 Deployment 都會(huì)被清理掉,拿不到最終的作業(yè)狀態(tài),不知道成功有否(流作業(yè)的話(huà)停止就可以認(rèn)為是失敗了)。對(duì)于這個(gè)問(wèn)題可以利用 Flink 本身的歸檔功能,將結(jié)果歸檔到外部的文件系統(tǒng)(兼容 s3 協(xié)議,比如阿里云對(duì)象存儲(chǔ) oss)中。涉及到的配置如下:

s3.access-key
s3.secret-key
s3.region
s3.endpoint
jobmanager.archive.fs.dir
如果不想引入外部系統(tǒng)的話(huà),需要改造 Flink 代碼在作業(yè)運(yùn)行完成之后將數(shù)據(jù)寫(xiě)到 k8s 的 api object 中,比如 ConfigMap 或者 Secret。

作業(yè)日志。 Spark 作業(yè)運(yùn)行結(jié)束之后 Executor Pod 被清理掉,Driver Pod 被保留,我們可以通過(guò)它查看到 Driver 的日志。Flink 作業(yè)結(jié)束之后就什么日志都查看不到了。

9. 總結(jié)

本文從使用方式、源碼實(shí)現(xiàn)以及在生產(chǎn)系統(tǒng)上面如何補(bǔ)足周邊系統(tǒng)地介紹了 Spark 和 Flink 在 k8s 生態(tài)上的實(shí)現(xiàn)、實(shí)踐以及對(duì)比。但是限于篇幅,很多內(nèi)容來(lái)不及討論了,比如 shuffle 如何處理。如果你們公司也在做這方面的工作,相信還是有很多參考價(jià)值的,也歡迎留言交流。

另外,YARN 的時(shí)代已經(jīng)過(guò)去了,以后 on k8s scheduler 將成為大數(shù)據(jù)計(jì)算以及 AI 框架的標(biāo)配。但是 k8s scheduler 這種天生為在線(xiàn)服務(wù)設(shè)計(jì)的調(diào)度器在吞吐上面有很大的不足,并不是很契合大數(shù)據(jù)作業(yè)。k8s 社區(qū)的批調(diào)度器 kube-batch,以及基于 kube-batch 衍生出來(lái)的 Volcano 調(diào)度器,基于 YARN 的調(diào)度算法實(shí)現(xiàn)的 k8s 生態(tài)調(diào)度器 Yunikorn 也逐漸在大數(shù)據(jù) on k8s 場(chǎng)景下嶄露頭角,不過(guò)這些都是后話(huà)了,后面有時(shí)間再專(zhuān)門(mén)寫(xiě)文章進(jìn)行分析對(duì)比。

責(zé)任編輯:梁菲 來(lái)源: 阿里云云棲號(hào)
相關(guān)推薦

2021-11-29 08:48:00

K8S KubernetesAirflow

2022-04-02 09:57:51

技術(shù)京東實(shí)踐

2023-12-25 07:35:40

數(shù)據(jù)集成FlinkK8s

2022-08-15 14:56:30

搜索引擎分布式

2022-08-21 07:25:09

Flink云原生K8S

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2019-06-27 09:12:43

FlinkStorm框架

2024-03-01 09:53:34

2017-09-01 05:35:58

分布式計(jì)算存儲(chǔ)

2021-07-05 09:28:11

Flink分布式程序

2013-09-11 16:02:00

Spark分布式計(jì)算系統(tǒng)

2022-10-10 12:54:00

Flink運(yùn)維

2010-06-03 19:46:44

Hadoop

2020-06-02 14:45:48

PostgreSQL架構(gòu)分布式

2009-06-19 14:23:41

RMIJava分布式計(jì)算

2021-10-30 19:30:23

分布式Celery隊(duì)列

2023-02-28 07:01:11

分布式緩存平臺(tái)

2018-10-16 14:26:22

分布式塊存儲(chǔ)引擎

2024-09-27 09:19:30

2022-03-21 19:44:30

CitusPostgreSQ執(zhí)行器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

欧美mv日韩mv国产| 国内成人精品2018免费看| 欧美午夜精品久久久久久超碰| 99久久国产免费免费| 国产成人av一区二区三区不卡| 在线视频观看国产| 黑人精品欧美一区二区蜜桃| 亚洲片国产一区一级在线观看| 免费看黄色a级片| 亚洲一区二区影视| 精品国产精品| 日韩欧美高清视频| 精品视频导航| 久久精品视频国产| 一区二区三区四区精品视频| 亚洲精品日韩专区silk| 亚洲一区二区三区乱码aⅴ| 日本成人免费在线观看| 日本另类视频| 欧美国产精品久久| 国产精品高潮呻吟久久av无限| 右手影院亚洲欧美| 欧美黑人疯狂性受xxxxx野外| 99久久精品费精品国产一区二区| 高清一区二区三区四区五区| 精品人妻一区二区三区日产| 黄视频网站在线观看| 97超碰欧美中文字幕| 91高清免费视频| 免费看黄色aaaaaa 片| 天堂av在线网| 国产女同互慰高潮91漫画| 国产精品福利网站| 二区三区四区视频| 在线精品自拍| 欧美视频二区36p| 婷婷四房综合激情五月| 伊人精品在线视频| 国产精品99久久精品| 91精品国模一区二区三区| 日本福利视频在线观看| 免费观看a视频| 亚洲一区自拍| 亚洲人在线视频| 少妇网站在线观看| 影音先锋男人资源在线| 91一区二区在线观看| 国产精品成人免费电影| 欧美手机在线观看| 国产精品chinese在线观看| 日韩欧美在线观看| 宅男av一区二区三区| 精品人妻一区二区三区换脸明星 | 免费在线观看国产精品| 精品女人视频| 91国产成人在线| 综合一区中文字幕| 日本高清视频在线| 日韩福利视频网| 美女av一区二区三区| 欧类av怡春院| 日本免费成人| 性做久久久久久| 天堂一区二区三区| 丰满人妻一区二区三区四区53| 免费视频一区二区三区在线观看| 色哟哟亚洲精品一区二区| 色哟哟在线观看视频| 中文字幕在线直播| 亚洲女厕所小便bbb| 久久人人九九| 精品久久久久成人码免费动漫| 男人的天堂亚洲| 欧美精品免费在线观看| 国产三级av在线播放| 97一区二区国产好的精华液| 欧美网站一区二区| 日本a在线免费观看| 蜜桃av在线免费观看| 99久久精品免费看国产| 成人免费淫片视频软件| 无码人妻av免费一区二区三区| 亚洲欧美综合| 中文字幕日韩专区| 国产传媒第一页| 亚洲欧美日本国产| 欧美高清性hdvideosex| 国产在线青青草| 98色花堂精品视频在线观看| 亚洲欧美区自拍先锋| 亚洲国产精品一区二区第四页av | 国产传媒日韩欧美成人| 国产精品十八以下禁看| www五月天com| 一区二区三区导航| 另类视频在线观看| 18啪啪污污免费网站| 中文字幕亚洲影视| 亚洲精品美女久久久| 久草福利在线观看| 99re8精品视频在线观看| 91豆麻精品91久久久久久| 日韩av三级在线| 女人天堂av在线播放| 综合久久综合久久| 一区二区视频国产| av女优在线| 亚洲国产高清aⅴ视频| 热re99久久精品国产99热| 无码国产精品高潮久久99| 国产精品主播直播| 91久久大香伊蕉在人线| 国产熟女一区二区丰满| 国内精品视频一区二区三区八戒| 国产精品自产拍在线观| 在线观看黄色国产| 久久精品二区亚洲w码| 国产日韩精品入口| 国产一区二区波多野结衣 | 91视频福利网| 欧美一级片网址| 91麻豆精品久久久久蜜臀| 男人添女人下面免费视频| 成人免费在线观看视频| 欧美日韩三级一区| 奇米视频7777| 日本免费一区二区视频| 欧美mv日韩mv国产网站| 日本一卡二卡在线| 久久av导航| 在线视频精品一| 国产精品精品软件男同| 国产一区日韩欧美| 97香蕉久久超级碰碰高清版| 亚洲va在线观看| 免费久久精品视频| 91美女高潮出水| 亚洲男女视频在线观看| 91在线视频播放| 日韩欧美在线观看强乱免费| 欧美人xxx| 一区二区成人在线| 亚洲中文字幕无码中文字| 国产精品一区二区av影院萌芽| 在线日韩一区二区| 日韩欧美理论片| 午夜视频在线观看精品中文| 亚洲成人网久久久| 男人的天堂官网| 亚洲国产不卡| 97在线视频观看| 中文精品久久久久人妻不卡| 国产一区二区精品久久91| 国产精品久久久久av福利动漫| 午夜视频在线播放| 国产精品久久久久久久蜜臀| 久久久天堂国产精品| 伊人久久国产| 欧美一区二区三区视频免费| 三级电影在线看| 欧美激情黄色片| 97国产一区二区精品久久呦 | 日韩丝袜美女视频| 免费成人深夜夜行p站| 小小影院久久| 亲子乱一区二区三区电影| 国产又粗又长又大视频| av在线播放成人| 欧美 另类 交| 欧美日韩成人影院| 精品免费一区二区三区| 日韩福利在线视频| 日韩视频一区二区三区在线播放免费观看 | 国产成人免费电影| 男人天堂久久久| 日韩欧美国产网站| 欧美日韩一区二区区| 欧美日韩在线播放视频| 97在线观看免费| 国内精品偷拍视频| 国产欧美日韩麻豆91| 欧美这里只有精品| 999精品嫩草久久久久久99| 亚洲欧美自拍一区| 日本亚洲色大成网站www久久| 老司机免费视频一区二区| 久久精品国产第一区二区三区最新章节 | 92看片淫黄大片一级| 亚洲精品国产九九九| 色多多国产成人永久免费网站| 亚洲免费在线视频观看| 丁香天五香天堂综合| 老汉色影院首页| 日本亚洲欧洲无免费码在线| 国产亚洲精品va在线观看| 日韩少妇高潮抽搐| 成人国产亚洲欧美成人综合网| 国产日本欧美在线| 亚洲我射av| 久久久91精品国产| 一本到在线视频| 国产女主播在线一区二区| 国产aaa一级片| 天堂资源在线亚洲| 97avcom| 日本毛片在线观看| 午夜精品视频在线观看| 三级视频网站在线观看| 亚洲激情在线| 国产亚洲二区| 麻豆国产在线| 亚洲国产毛片完整版| 日产精品久久久久久久| 成人久久18免费网站麻豆| 国产玉足脚交久久欧美| 国产一区调教| 97成人精品区在线播放| 四虎精品成人影院观看地址| 欧美日韩国产一区二区三区| 欧美丰满少妇人妻精品| 久久男女视频| 亚洲精品在线视频观看| 四虎影视国产精品| 久久人人爽亚洲精品天堂| www.蜜臀av| 亚洲高清免费视频| 久久人妻少妇嫩草av无码专区| 国产日韩欧美一区| 欧美日韩精品久久| 国产精品亚洲成在人线| 操日韩av在线电影| 亚洲高清视频网站| 欧美日韩国产影院| 91精品久久久久久久久久久久| 毛片av中文字幕一区二区| 99re8这里只有精品| 99热这里只有精品首页| 国产91色在线播放| 男人资源在线播放| 亚洲精品xxx| 国产精品熟女视频| 亚洲女厕所小便bbb| 荫蒂被男人添免费视频| 蜜臀av亚洲一区中文字幕| 麻豆一区二区三区在线观看| 日韩精品免费一区二区三区竹菊| 国产精品久久久久久久久久久久久| 中文字幕日本在线观看| 欧美大片一区二区三区| 国产区一区二区三| 中文字幕亚洲电影| 漂亮人妻被黑人久久精品| 麻豆免费精品视频| 黄页免费在线观看视频| 日韩伦理一区| 国产一区二区三区色淫影院| 99精品在免费线偷拍| 久久久久国产视频| 8888四色奇米在线观看| 亚洲国产精品成人av| 中文字幕欧美在线观看| 亚洲午夜一区二区| а天堂中文在线资源| 91伊人久久大香线蕉| 人妻换人妻仑乱| 三级在线观看一区二区| 天堂8在线天堂资源bt| 成人精品影视| 久久久精彩视频| 精品中文字幕一区二区三区| 日韩av免费在线看| www.8ⅹ8ⅹ羞羞漫画在线看| 日韩中文字幕精品| 黄色小视频在线免费观看| 精品日产卡一卡二卡麻豆| 中文字幕有码视频| 欧美视频裸体精品| 日韩高清精品免费观看| 亚洲卡通动漫在线| 亚欧洲乱码视频| zzijzzij亚洲日本少妇熟睡| 亚洲第一成肉网| 美女一区二区视频| 日韩中文字幕免费在线| 国产欧美二区| 免费一级特黄毛片| 欧美二区视频| 久久精品国产精品亚洲精品色| 国产亚洲精品美女久久久久久久久久| 国产无套精品一区二区| 97视频一区| 豆国产97在线| 一本一道久久a久久| 69堂成人精品视频免费| 电影91久久久| 成人在线小视频| 91精品一久久香蕉国产线看观看| 国产精品欧美风情| 香蕉伊大人中文在线观看| 高清亚洲成在人网站天堂| 好久没做在线观看| 久久久人成影片一区二区三区| 26uuu亚洲电影在线观看| 久久精品国产一区| 国产原创在线观看| 精品国产欧美一区二区五十路| av在线资源网| 在线视频欧美日韩| 欧美三级理伦电影| 久久av.com| 在线欧美三级| 久久免费视频观看| 18video性欧美19sex高清| 国产+人+亚洲| 天堂8中文在线最新版在线| 日本一区二区在线播放| 制服诱惑亚洲| 国产精品午夜一区二区欲梦| 伊人久久综合网另类网站| 91久久精品国产91性色| 欧美电影在线观看一区| av成人观看| 视频福利一区| 神马影院一区二区| 日本一本不卡| 亚洲av综合色区| 一区二区亚洲| 大肉大捧一进一出好爽视频| 久久综合九色| 九九热免费在线观看| 国产精品亚洲第一区在线暖暖韩国 | 美女福利视频在线观看| 一区二区欧美精品| 欧美bbbbbbbbbbbb精品| 在线观看亚洲a| 99久久精品国产一区二区成人| 欧美刺激午夜性久久久久久久| 污视频在线免费观看| 国产一区二区三区18| 爆操欧美美女| 91国产精品电影| 99riav视频一区二区| 99高清视频有精品视频| 视频福利一区| 女女同性女同一区二区三区按摩| 在线看片日韩| 国产高清视频网站| 国产91色综合久久免费分享| 级毛片内射视频| 亚洲品质自拍视频网站| 日日噜噜噜噜人人爽亚洲精品| 欧美日韩精品一区二区天天拍小说 | 日本免费观看视| 欧美日韩一级视频| 天堂网2014av| 日韩有码在线电影| 一个人www视频在线免费观看| 国产精品久久久久久久天堂| 亚洲不卡在线| 亚洲精品欧美精品| 99精品视频网| 美女又黄又免费的视频| 国产日本一区二区| 久久精品视频国产| 69p69国产精品| 国产一级片在线| 久久久久久国产精品| 四虎在线精品| 日本视频一区二区在线观看| 激情视频一区二区三区| 日韩av一卡二卡三卡| 久久精品一区二区三区不卡| 久久婷婷综合国产| 欧美日韩一区久久| 青青操视频在线| 欧美激情按摩在线| 疯狂欧洲av久久成人av电影| 欧美日韩免费高清| 亚洲高清久久| 亚洲精品久久久久久| 国产精品久久久久久久久久免费看| 亚洲国产综合久久| 日韩欧美在线综合网| 毛片在线视频| 国产精品日日做人人爱| 亚洲动漫在线观看| 男人日女人逼逼| 粉嫩嫩av羞羞动漫久久久 | 日韩在线视屏| 男人的天堂日韩| 久久网站热最新地址| 五月婷婷激情网| 亚洲第一精品夜夜躁人人爽| 在线观看av免费| 亚洲va男人天堂| 国产精品久久久久久久久久10秀| 最新中文字幕2018| 欧美国产丝袜视频| 中文字幕精品无码亚| 中国人与牲禽动交精品|