精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Master 分配資源并在 Worker上啟動 Executor ,逐行代碼注釋版

開發(fā) 前端
這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

[[432016]]

本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)」,作者wangkai。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)公眾號。

一、回顧一下之前的內(nèi)容

上一次閱讀到了 SparkContext 初始化,繼續(xù)往下之前,先溫故一下之前的內(nèi)容。

這里有個假設(shè)是:Spark 集群以 Standalone 的方式來啟動的,作業(yè)也是提交到 Spark standalone 集群。

首先需要啟動 Spark 集群,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。

啟動好之后,開始提交作業(yè),使用 spark-submit 命令來提交。

  • 首先在提交任務(wù)的機(jī)器上使用 java 命令啟動了一個虛擬機(jī),并且執(zhí)行了主類 SparkSubmit 的 main 方法作為入口。
  • 然后根據(jù)提交到不同的集群,來 new 不同的客戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然后把 java DriverWrapper 這個命令封裝到 RequestSubmmitDriver 消息中,把這個消息發(fā)送給 Master;
  • Master 隨機(jī)找一個滿足資源條件的 Worker 來啟動 Driver,實(shí)際上是在虛擬機(jī)里執(zhí)行 DriverWrapper 的 main 方法;
  • 然后 Worker 開始啟動 Driver,啟動的時候會執(zhí)行用戶提交的 java 包里的 main 方法,然后開始執(zhí)行 SparkContext 的初始化,依次在 Driver 中創(chuàng)建了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的實(shí)例。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。

二、Master 處理應(yīng)用的注冊

接著上次 ClientEndpoint 啟動之后,會向 Master 發(fā)送一個 RegisterApplication 消息,Master 開始處理這個消息。

然后看到 Matster 類處理 RegisterApplication 消息的地方:

可以看到,用應(yīng)用程序的描述和 Driver 的引用創(chuàng)建了一個 Application,然后開始注冊這個 Application。

注冊 Application 很簡單,就是往 Master 的內(nèi)存中加入各種信息,重點(diǎn)來了,把 ApplicationInfo 加入到了 waitingApps 這個結(jié)構(gòu)里,然后 schedule() 方法會遍歷這個列表,為 Application 分配資源,并調(diào)度起來。

然后往 zk 中寫入了 Application 的信息,并且往 Driver 發(fā)送了一個 RegisteredApplication 應(yīng)用已經(jīng)注冊的消息。

接著開始 schedule(),這個方法上次講過,它會遍歷兩個列表,一個是遍歷 waitingDrivers 來啟動 Driver,一個是遍歷 waitingApps,來啟動 Application。

waitingDrivers 列表在客戶端請求啟動 Driver 的時候就處理過了,本次重點(diǎn)看這個方法:

  1. startExecutorsOnWorkers() 

三、Master 對資源的調(diào)度

有以下幾個步驟:

  • 遍歷 waitingApps 的所有 app;
  • 如果 app 需要的核數(shù)小于一個 Executor 可以提供的核數(shù),就不為 app 分配新的 Executor;
  • 過濾出還有可供調(diào)度的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
  • 計(jì)算所有 usableWorkers 上要分配多少 CPU;
  • 然后遍歷可用的 Workers,分配資源并執(zhí)行調(diào)度,啟動 Executor。

源碼從 Master 類的 schedule() 方法的最后一行 startExecutorsOnWorkers() 開始:

這個方法主要作用是計(jì)算 worker 的 executor 數(shù)量和分配的資源并啟動 executor。

  1. /** 
  2.  * Schedule and launch executors on workers 
  3.  */ 
  4. private def startExecutorsOnWorkers(): Unit = { 
  5.     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 
  6.     // in the queue, then the second app, etc. 
  7.  
  8.     for (app <- waitingApps) { 
  9.         val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) 
  10.         // If the cores left is less than the coresPerExecutor,the cores left will not be allocated 
  11.         if (app.coresLeft >= coresPerExecutor) { 
  12.             // 1. 剩余內(nèi)存大于單個 executor 需要的內(nèi)存 
  13.             // 2. 剩余的內(nèi)核數(shù)大于單個 executor 需要的內(nèi)核數(shù) 
  14.             // 3. 按照內(nèi)核數(shù)從大到小排序 
  15.             // Filter out workers that don't have enough resources to launch an executor 
  16.             val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 
  17.                 .filter(canLaunchExecutor(_, app.desc)) 
  18.                 .sortBy(_.coresFree).reverse 
  19.             val appMayHang = waitingApps.length == 1 && 
  20.                 waitingApps.head.executors.isEmpty && usableWorkers.isEmpty 
  21.             if (appMayHang) { 
  22.                 logWarning(s"App ${app.id} requires more resource than any of Workers could have."
  23.             } 
  24.             // 計(jì)算每個 Worker 上可用的 cores 
  25.             val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 
  26.              
  27.             // Now that we've decided how many cores to allocate on each worker, let's allocate them 
  28.             for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 
  29.                 allocateWorkerResourceToExecutors( 
  30.                     app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) 
  31.             } 
  32.         } 
  33.     } 

(1)遍歷 waitingApps,如果 app 還需要的 cpu 核數(shù)大于每個執(zhí)行器的核數(shù),才繼續(xù)分配。

(2)過濾可用的 worker,條件一:該 worker 剩余內(nèi)存大于單個 executor 需要的內(nèi)存;條件二:該 worker 剩余 cpu 核數(shù)大于單個 executor 需要的核數(shù);然后按照可用 cpu核數(shù)從大到小排序。

(3)下面兩個方法是關(guān)鍵的方法

scheduleExecutorsOnWorkers(),用來計(jì)算每個 Worker 上可用的 cpu 核數(shù);

allocateWorkerResourceToExecutors() 用來真正在 Worker 上分配 Executor。

四、scheduleExecutorsOnWorkers 計(jì)算每個 Worker 可用的核數(shù)

這個方法很長,首先看方法注釋,大致翻譯了一下:

當(dāng)執(zhí)行器分配的 cpu 核數(shù)(spark.executor.cores)被顯示設(shè)置的時候,如果這個 worker 上有足夠的核數(shù)和內(nèi)存的話,那么每個 worker 上可以執(zhí)行多個執(zhí)行器;反之,沒有設(shè)置的時候,每個 worker 上只能啟動一個執(zhí)行器;并且,這個執(zhí)行器會使用 worker 能提供出來的盡可能多的核數(shù);

appA 和 appB 都有一個執(zhí)行器運(yùn)行在 worker1 上。但是 appA 還需要一些 cpu 核,當(dāng) appB 執(zhí)行結(jié)束,釋放了它在 worker1 上的核數(shù)時, 下一次調(diào)度的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上啟動了多個執(zhí)行器。

設(shè)置 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:集群有4個worker,每個worker有16核;用戶請求 3 個執(zhí)行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設(shè)置這個參數(shù),那么每次分配 1 個 cpu核心,每個 worker 輪流分配一個 cpu核,最終 4 個執(zhí)行器分配 12 個核心給每個 executor,4 個 worker 也同樣分配了48個核心,但是最終每個 executor 只有 12核 < 16 核,所以最終沒有執(zhí)行器被啟動。

如果看我的翻譯還是很費(fèi)勁,我就再精簡下:

  • 如果沒有設(shè)置 spark.executor.cores,那么每個 Worker 只能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數(shù);
  • 如果顯示設(shè)置了,那么每個 Worker 可以啟動多個 Executor;

下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再分配 Executor 了。

重點(diǎn)是中間方法后面那一段,遍歷每個 Worker 分配 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直分配,直到 Worker 資源分配完畢。

  1. private def scheduleExecutorsOnWorkers( 
  2.     app: ApplicationInfo, 
  3.     usableWorkers: Array[WorkerInfo], 
  4.     spreadOutApps: Boolean): Array[Int] = { 
  5.     // 每個 executor 的核數(shù) 
  6.     val coresPerExecutor = app.desc.coresPerExecutor 
  7.     // 每個 executor 的最小核數(shù) 為1 
  8.     val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 
  9.     //  每個Worker分配一個Executor? 這個參數(shù)可以控制這個行為 
  10.     val oneExecutorPerWorker = coresPerExecutor.isEmpty 
  11.     //  每個Executor的內(nèi)存 
  12.     val memoryPerExecutor = app.desc.memoryPerExecutorMB 
  13.     val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor 
  14.     // 可用 Worker 的總數(shù) 
  15.      
  16.     val numUsable = usableWorkers.length 
  17.     // 給每個Worker的cores數(shù) 
  18.     val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker 
  19.     // 給每個Worker上新的Executor數(shù) 
  20.     val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker 
  21.     // app 需要的核心數(shù) 和 所有 worker 能提供的核心總數(shù),取最小值 
  22.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum
  23.  
  24.     //  判斷指定的worker是否可以為這個app啟動一個executor 
  25.     /** Return whether the specified worker can launch an executor for this app. */ 
  26.     def canLaunchExecutorForApp(pos: Int): Boolean = { 
  27.         // 如果能提供的核心數(shù) 大于等 executor 需要的最小核心數(shù),則繼續(xù)分配 
  28.         val keepScheduling = coresToAssign >= minCoresPerExecutor 
  29.         // 是否有足夠的核心:當(dāng)前 worker 能提供的核數(shù) 減去 每個 worker 已分配的核心數(shù) ,大于每個 executor最小的核心數(shù) 
  30.         val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 
  31.         // 當(dāng)前 worker 新分配的 executor 個數(shù) 
  32.         val assignedExecutorNum = assignedExecutors(pos) 
  33.  
  34.         //  如果每個worker允許多個executor,就能一直在啟動新的的executor 
  35.         //  如果在這個worker上已經(jīng)有executor,則給這個executor更多的core 
  36.         // If we allow multiple executors per worker, then we can always launch new executors. 
  37.         // Otherwise, if there is already an executor on this worker, just give it more cores. 
  38.  
  39.         // 如果一個 worker 上可以啟動多個 executor  或者 這個 worker 還沒分配 executor 
  40.         val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 
  41.         if (launchingNewExecutor) { 
  42.             // 總共已經(jīng)分配的內(nèi)存 
  43.             val assignedMemory = assignedExecutorNum * memoryPerExecutor 
  44.             // 是否有足夠的內(nèi)存:當(dāng)前worker 的剩余內(nèi)存 減去 已分配的內(nèi)存 大于每個 executor需要的內(nèi)存 
  45.             val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor 
  46.             // 
  47.             val assignedResources = resourceReqsPerExecutor.map { 
  48.                 req => req.resourceName -> req.amount * assignedExecutorNum 
  49.             }.toMap 
  50.             val resourcesFree = usableWorkers(pos).resourcesAmountFree.map { 
  51.                 case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) 
  52.             } 
  53.             val enoughResources = ResourceUtils.resourcesMeetRequirements( 
  54.                 resourcesFree, resourceReqsPerExecutor) 
  55.             // 所有已分配的核數(shù)+app需要的核數(shù)  小于 app的核數(shù)限制 
  56.             val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit 
  57.             keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit 
  58.         } else { 
  59.             // We're adding cores to an existing executor, so no need 
  60.             // to check memory and executor limits 
  61.             keepScheduling && enoughCores 
  62.         } 
  63.     } 
  64.  
  65.     // 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達(dá)到了這個Application的要求 
  66.     // Keep launching executors until no more workers can accommodate any 
  67.     // more executors, or if we have reached this application's limits 
  68.     // 過濾出可以啟動 executor 的 workers 
  69.     var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) 
  70.  
  71.     while (freeWorkers.nonEmpty) { 
  72.         // 遍歷每個 worker 
  73.         freeWorkers.foreach { pos => 
  74.             var keepScheduling = true 
  75.             while (keepScheduling && canLaunchExecutorForApp(pos)) { 
  76.                 coresToAssign -= minCoresPerExecutor 
  77.                 assignedCores(pos) += minCoresPerExecutor 
  78.  
  79.                 //  如果我們在每個worker上啟動一個executor,每次迭代為每個executor增加一個core 
  80.                 //  否則,每次迭代都會為新的executor分配cores 
  81.                 // If we are launching one executor per worker, then every iteration assigns 1 core 
  82.                 // to the executor. Otherwise, every iteration assigns cores to a new executor. 
  83.                 if (oneExecutorPerWorker) { 
  84.                     assignedExecutors(pos) = 1 
  85.                 } else { 
  86.                     assignedExecutors(pos) += 1 
  87.                 } 
  88.  
  89.                 //  如果不使用Spreading out方法,我們會在這個worker上繼續(xù)調(diào)度executor,直到使用它所有的資源 
  90.                 //  否則,就跳轉(zhuǎn)到下一個worker 
  91.                 // Spreading out an application means spreading out its executors across as 
  92.                 // many workers as possible. If we are not spreading outthen we should keep 
  93.                 // scheduling executors on this worker until we use all of its resources. 
  94.                 // Otherwise, just move on to the next worker. 
  95.                 if (spreadOutApps) { 
  96.                     keepScheduling = false 
  97.                 } 
  98.             } 
  99.         } 
  100.         freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) 
  101.     } 
  102.     assignedCores 

接著真正開始在 Worker 上啟動 Executor:

在 launchExecutor 在方法里:

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 
  2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 
  3.     worker.addExecutor(exec
  4.     worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, 
  5.         exec.application.descexec.cores, exec.memory, exec.resources)) 
  6.     exec.application.driver.send( 
  7.         ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 

給 Worker 發(fā)送了一個 LaunchExecutor 消息。

然后給執(zhí)行器對應(yīng)的 Driver 發(fā)送了 ExecutorAdded 消息。

五、總結(jié)

本次我們講了 Master 處理應(yīng)用的注冊,重點(diǎn)是把 app 信息加入到 waitingApps 列表中,然后調(diào)用 schedule() 方法,計(jì)算每個 Worker 可用的 cpu核數(shù),并且在 Worker 上啟動執(zhí)行器。

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)
相關(guān)推薦

2011-04-19 13:32:52

2009-12-24 11:04:59

固定分配資源動態(tài)分配資源

2015-04-17 10:28:02

無線頻譜移動通信頻譜

2021-08-31 23:09:27

Spark資源分配

2012-06-05 08:59:35

Hadoop架構(gòu)服務(wù)器

2012-03-09 17:38:17

ibmdw

2014-12-26 10:58:35

托管云托管私有云公共云

2010-04-07 15:55:17

無線接入頻段

2022-12-12 08:42:06

Java對象棧內(nèi)存

2022-06-06 12:02:23

代碼注釋語言

2013-04-17 15:10:07

銳捷寬帶寬帶網(wǎng)絡(luò)

2011-04-19 13:48:55

vCloud Dire

2024-10-09 14:25:21

2013-05-21 09:08:24

服務(wù)器虛擬化網(wǎng)卡

2019-12-20 08:50:21

LinuxKsnip截圖

2021-06-22 16:40:32

鴻蒙HarmonyOS應(yīng)用

2022-04-19 07:47:13

數(shù)據(jù)中心末端資源分配

2016-03-21 18:56:54

物聯(lián)網(wǎng)IoTIT基礎(chǔ)架構(gòu)

2011-01-26 11:01:37

虛擬機(jī)負(fù)載管理資源分配

2023-10-24 07:25:10

容器資源云分級
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

国产 欧美 在线| 成年人在线看片| 蜜桃视频污在线观看| 亚洲啪啪91| 一区二区三区亚洲| www.成人黄色| 日本在线影院| 中文字幕制服丝袜成人av| 国产欧美日韩综合精品二区| 久久精品视频5| 这里只有精品在线| 亚洲欧洲国产精品| 男人操女人下面视频| 亚洲优女在线| 亚洲欧美日韩国产综合在线| 精品国产一区二区三区麻豆小说| 一级黄色录像大片| 噜噜噜91成人网| 欧美疯狂做受xxxx高潮| 91麻豆制片厂| 欧美美女黄色| 日韩欧美中文一区| 欧美三级理论片| 超碰在线中文字幕| 中文字幕亚洲一区二区va在线| 国产美女精品久久久| 91精品中文字幕| 日韩中文欧美在线| 18性欧美xxxⅹ性满足| 私库av在线播放| 日韩一级毛片| 亚洲视频第一页| 香港三级日本三级| 99ri日韩精品视频| 欧美一区二区三区在| 无码内射中文字幕岛国片| 2018av在线| 依依成人综合视频| 一道本在线观看视频| 国产黄在线看| 久久久久久久国产精品影院| 国产伦精品一区二区三区免| 国产不卡av在线播放| 激情图片小说一区| 成人欧美一区二区三区黑人| 在线观看亚洲黄色| 丝袜诱惑制服诱惑色一区在线观看| 国外成人性视频| 欧美国产精品一二三| 99久久久久国产精品| 中文字幕精品一区二区精品| 中文字幕被公侵犯的漂亮人妻| 精品中国亚洲| 欧美精品一区二区三区一线天视频| 中文字幕55页| 日韩精品视频中文字幕| 日韩一二三区视频| 成人免费黄色av| 日本一区二区三区视频在线看| 91精品国产黑色紧身裤美女| 午夜天堂在线视频| 国产精品一区二区精品视频观看| 欧美美女网站色| 一区二区三区四区毛片| 国产精品一区二区三区av| 日韩一区二区在线免费观看| 制服下的诱惑暮生| 国产精品jk白丝蜜臀av小说| 亚洲成人在线网| 黄色工厂在线观看| 精品一区在线| 中文字幕欧美精品日韩中文字幕| 国产白丝一区二区三区 | 国精一区二区三区| 亚洲午夜av在线| 国产h视频在线播放| 中文字幕这里只有精品| 91福利在线观看| 国产成年人视频网站| 欧美中文高清| 亚洲精品电影网| 色婷婷av777| 久久精品国产亚洲夜色av网站| 久久国产加勒比精品无码| 久久免费黄色网址| 久久国产直播| 国产一区二中文字幕在线看| 亚洲国产一二三区| 久久一区二区视频| 国产又粗又硬又长| 免费高潮视频95在线观看网站| 色噜噜狠狠成人网p站| 天堂av2020| 激情小说亚洲图片| 在线亚洲午夜片av大片| 久久黄色小视频| 视频一区二区三区在线| 亚洲综合在线中文字幕| 亚洲av成人精品毛片| 一区免费观看视频| 欧美综合在线播放| 免费一区二区三区四区| 日韩av在线网| 在线观看美女av| 久久精品123| 成人片在线免费看| 国产人成在线观看| 亚洲图片欧美色图| 在线观看免费的av| 欧美亚洲国产日韩| 欧美成人精品在线| 进去里视频在线观看| 不卡一二三区首页| 天天综合五月天| 日韩精品麻豆| 亚洲黄在线观看| 色欲人妻综合网| 日产国产高清一区二区三区| 国产精品播放| 黄网站免费在线观看| 色伊人久久综合中文字幕| 无码国产精品一区二区高潮| 国产探花一区在线观看| 91av在线网站| 少妇高潮久久久| 国产精品久久久久久亚洲伦| 国产日产欧美视频| ady日本映画久久精品一区二区| 日韩中文娱乐网| 一级一片免费看| www.成人网.com| 蜜臀av性久久久久蜜臀av| 日韩黄色三级| 在线观看免费高清视频97| 国产成人免费看| 成人app下载| 日韩一级片免费视频| 日本亚州欧洲精品不卡| 久久躁狠狠躁夜夜爽| 亚洲系列第一页| 国产女主播在线一区二区| 免费日韩视频在线观看| 欧美激情久久久久久久久久久| 欧美—级a级欧美特级ar全黄| 99riav国产| 亚洲免费观看高清| 国内av一区二区| 9191国语精品高清在线| 91情侣偷在线精品国产| 免费在线观看av| 精品视频123区在线观看| 色欲狠狠躁天天躁无码中文字幕| 久久午夜电影| 视频一区免费观看| 欧美日韩亚洲国产| 中文字幕亚洲精品| 夜夜嗨av禁果av粉嫩avhd| 国产日韩欧美综合一区| 天天操天天摸天天爽| 日本不卡免费一区| 91精品久久久久久久久青青| 超碰在线免费播放| 精品国产一区二区三区忘忧草| 青娱乐免费在线视频| 国产91富婆露脸刺激对白| 国产欧美日韩网站| 美女精品一区最新中文字幕一区二区三区| 欧美一级黄色网| 国产视频第一区| 欧美日本视频在线| 澳门黄色一级片| 成人aa视频在线观看| 波多野结衣作品集| 99精品在线| 国产精品久久国产三级国电话系列 | 中文字幕免费在线观看视频| 久久众筹精品私拍模特| 免费涩涩18网站入口| 亚洲综合中文| 精品国产一区二区三| 国产一区一一区高清不卡| www.色综合| 神马久久久久久久久久| 91福利资源站| 久久久久97国产| 91麻豆免费观看| 亚洲欧美手机在线| 中国女人久久久| 亚洲第一导航| 高清日韩欧美| 国产精品亚洲网站| 91福利在线尤物| 少妇精69xxtheporn| 成人免费一级视频| 欧美视频一区二| 国产一级在线播放| 国产精品你懂的| 理论片大全免费理伦片| 蜜芽一区二区三区| 国产av麻豆mag剧集| 99视频精品全国免费| 国产日韩亚洲精品| 国产乱码精品一区二区三区亚洲人 | 国产精品久久久免费看| 波多野结衣中文一区| 天堂av在线8| 久久婷婷麻豆| 成年人午夜免费视频| 天天射综合网视频| 欧美日韩日本网| 狠狠久久伊人| 91网在线免费观看| 国产精品久久久久久妇女| 午夜欧美不卡精品aaaaa| 国内精品久久久久国产| 亚洲欧美日韩在线一区| 色噜噜一区二区三区| 911精品产国品一二三产区| 亚洲熟女综合色一区二区三区| 一区二区三区在线影院| 亚洲女同二女同志奶水| 久久久久久久精| 网站免费在线观看| 成人精品鲁一区一区二区| 亚洲妇熟xx妇色黄蜜桃| 蜜臀精品久久久久久蜜臀| www国产精品内射老熟女| 欧美激情aⅴ一区二区三区| 亚洲一区二区三区午夜| 欧美日韩国产高清电影| 欧美精品成人一区二区在线观看| 中文字幕日韩在线| 99中文视频在线| 嫩呦国产一区二区三区av| 国产精品香蕉在线观看| 欧美xnxx| 国产精品久久久久久久app| 综合日韩av| 欧美夜福利tv在线| 日韩理论视频| 欧洲日韩成人av| 性感美女一区二区在线观看| 91精品国产色综合久久不卡98口 | 亚洲夂夂婷婷色拍ww47| 精品国产欧美日韩不卡在线观看 | 亚洲三级免费看| 五十路在线视频| 亚洲精品美女免费| 可以在线观看的av网站| 亚洲女同精品视频| 精品视频一二区| 亚洲区免费影片| 懂色av中文在线| 深夜福利日韩在线看| 黄色网址免费在线观看| 精品激情国产视频| a视频在线观看| 欧美激情中文字幕在线| 国产白浆在线免费观看| 高清欧美性猛交xxxx黑人猛交| fc2ppv国产精品久久| 欧美夫妻性生活xx| 24小时免费看片在线观看| 欧美在线免费看| 日韩在线免费| 亚洲999一在线观看www| 视频一区中文字幕精品| 精品欧美一区二区精品久久| 四虎884aa成人精品最新| 日本亚洲导航| 亚洲a一区二区三区| 欧美久久久久久久久久久久久久| 在线日韩电影| 男人亚洲天堂网| 美国十次了思思久久精品导航| 欧美丝袜在线观看| 成人的网站免费观看| 亚洲狠狠婷婷综合久久久久图片| 亚洲国产精品国自产拍av| 国产成人自拍网站| 午夜精品福利一区二区蜜股av| 久久精品视频7| 91精品国产欧美日韩| 日本又骚又刺激的视频在线观看| 一区二区三区视频在线 | 欧美综合在线观看| 亚洲影视资源| 国产一区二区三区高清视频| 欧美日韩在线播放视频| 国产精品va在线观看无码| 久久亚洲图片| 欧美性猛交xx| 久久久99精品久久| 四虎免费在线视频| 色哟哟在线观看一区二区三区| 国产特级aaaaaa大片| 亚洲欧美精品伊人久久| 羞羞视频在线观看免费| 国产成人一区二区| av成人资源网| 一区不卡视频| 噜噜噜91成人网| 极品人妻一区二区| 中文字幕不卡一区| 亚洲免费在线观看av| 日韩欧美精品在线视频| jzzjzzjzz亚洲成熟少妇| 久久久久久午夜| 国产精品日本一区二区不卡视频| 欧美精品亚洲| 国产一在线精品一区在线观看| 污片在线免费看| 久久久www成人免费无遮挡大片| 精品视频一区二区在线观看| 欧美日韩一区二区在线观看视频| 欧洲一级在线观看| 高清一区二区三区四区五区| 国产不卡精品| 视频一区免费观看| 久久久久看片| 在线观看国产网站| 亚洲午夜激情网页| 精品国产va久久久久久久| 久久精品最新地址| 韩日精品一区| 日本不卡二区高清三区| 一区二区日本视频| 在线观看亚洲免费视频| 亚洲精品免费在线播放| 国产三级按摩推拿按摩| 最新中文字幕亚洲| 成人影院在线免费观看| 欧美一区二区福利| 性欧美长视频| 国产精品1000部啪视频| 亚洲国产aⅴ成人精品无吗| 国产白浆在线观看| 欧美高清激情视频| 亚洲一级大片| 成人免费a级片| 国产成人精品影视| 久久黄色免费视频| 精品国产百合女同互慰| 9999在线视频| 久久国产精品一区二区三区| 国产精品普通话对白| 亚洲精品乱码久久久久久不卡| 亚洲成人黄色影院| 污污网站免费在线观看| 91国产视频在线| 亚洲理论电影片| 免费裸体美女网站| 欧美激情综合五月色丁香| 在线免费a视频| 欧美成人午夜视频| av动漫精品一区二区| 久久综合色视频| 国产午夜亚洲精品午夜鲁丝片| 亚洲精品国产精品乱码视色| 色婷婷久久av| 日韩精品一区二区三区中文字幕| 日韩一级特黄毛片| a在线播放不卡| av一级在线观看| 色偷偷av一区二区三区| 午夜视频一区二区在线观看| 精品丰满人妻无套内射| 久久久亚洲午夜电影| 中文字幕日韩三级| 不卡av电影在线观看| 国产精品自在| 日本激情视频在线| 亚洲天堂成人网| 成人免费视频国产| 国产精品扒开腿爽爽爽视频| 999国产精品| 捆绑裸体绳奴bdsm亚洲| 在线亚洲免费视频| sm国产在线调教视频| 精品国产第一页| 奇米精品一区二区三区四区| 四虎精品免费视频| 日韩av网址在线| 欧美特黄色片| 91动漫在线看| 欧美韩国日本综合| 亚洲第一天堂网| 日本a级片电影一区二区| 色婷婷亚洲mv天堂mv在影片| 欧美人与性动交α欧美精品| 高跟丝袜欧美一区| 成人av黄色| 欧美综合激情| 国产成人日日夜夜| 久久久精品毛片| 久久的精品视频| 亚洲三级性片| 国产高潮失禁喷水爽到抽搐 | 国产极品美女高潮无套久久久| 中文字幕一区二区三区色视频|