精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Spark源碼走讀之3:Task運行期之函數調用

數據庫 Spark
本篇主要闡述在TaskRunner中執行的task其業務邏輯是如何被調用到的,另外試圖講清楚運行著的task其輸入的數據從哪獲取,處理的結果返回到哪里,如何返回。

準備

  1. spark已經安裝完畢

  2. spark運行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也稱為偽分布式,可以使用如下指令運行

  1. MASTER=local[1,2,1024] bin/spark-shell 

[1,2,1024] 分別表示,executor number, core number和內存大小,其中內存大小不應小于默認的512M

Driver Programme的初始化過程分析

初始化過程的涉及的主要源文件

  1. SparkContext.scala       整個初始化過程的入口

  2. SparkEnv.scala          創建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager

  3. DAGScheduler.scala       任務提交的入口,即將Job劃分成各個stage的關鍵

  4. TaskSchedulerImpl.scala 決定每個stage可以運行幾個task,每個task分別在哪個executor上運行

  5. SchedulerBackend

    1. 最簡單的單機運行模式的話,看LocalBackend.scala

    2. 如果是集群模式,看源文件SparkDeploySchedulerBackend

初始化過程步驟詳解

步驟1: 根據初始化入參生成SparkConf,再根據SparkConf來創建SparkEnv, SparkEnv中主要包含以下關鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

  1. private[spark] val env = SparkEnv.create( 
  2.     conf, 
  3.     "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true
  4.     isLocalisLocal = isLocal) 
  5.   SparkEnv.set(env) 

步驟2:創建TaskScheduler,根據Spark的運行模式來選擇相應的SchedulerBackend,同時啟動taskscheduler,這一步至為關鍵

  1. private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) 
  2.  taskScheduler.start() 

TaskScheduler.start目的是啟動相應的SchedulerBackend,并啟動定時器進行檢測

  1. override def start() { 
  2.     backend.start() 
  3.  
  4.     if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher 
  5.       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, 
  6.             SPECULATION_INTERVAL milliseconds) { 
  7.         checkSpeculatableTasks() 
  8.       } 
  9.     } 
  10.   } 

步驟3:以上一步中創建的TaskScheduler實例為入參創建DAGScheduler并啟動運行

  1. @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) 
  2.   dagScheduler.start() 

步驟4:啟動WEB UI

  1. ui.start() 

RDD的轉換過程

還是以最簡單的wordcount為例說明rdd的轉換過程

  1. sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) 

上述一行簡短的代碼其實發生了很復雜的RDD轉換,下面仔細解釋每一步的轉換過程和轉換結果

步驟1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通過map操作生成MappedRDD,如果在spark-shell中執行上述語句,得到的結果可以證明所做的分析

  1. scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; 
  2. assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: 
  3. Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: 
  4. Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0:
  5.  org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13 

步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap將原來的MappedRDD轉換成為FlatMappedRDD

  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =   
  2. new FlatMappedRDD(this, sc.clean(f)) 

步驟3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相應的鍵值對,上一步的FlatMappedRDD被轉換成為MappedRDD

步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最復雜

步驟2,3中使用到的operation全部定義在RDD.scala中,而這里使用到的reduceByKey卻在RDD.scala中見不到蹤跡。reduceByKey的定義出現在源文件PairRDDFunctions.scala

細心的你一定會問reduceByKey不是MappedRDD的屬性和方法啊,怎么能被MappedRDD調用呢?其實這背后發生了一個隱式的轉換,該轉換將MappedRDD轉換成為PairRDDFunctions

  1. implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = 
  2.     new PairRDDFunctions(rdd) 

這種隱式的轉換是scala的一個語法特征,如果想知道的更多,請用關鍵字"scala implicit method"進行查詢,會有不少的文章對此進行詳盡的介紹。

接下來再看一看reduceByKey的定義

  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { 
  2.   reduceByKey(defaultPartitioner(self), func) 
  3.  
  4. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { 
  5.   combineByKey[V]((v: V) => v, func, func, partitioner) 
  6.  
  7. def combineByKey[C](createCombiner: V => C, 
  8.     mergeValue: (C, V) => C, 
  9.     mergeCombiners: (C, C) => C, 
  10.     partitioner: Partitioner, 
  11.     mapSideCombine: Boolean = true
  12.     serializerClass: String = null): RDD[(K, C)] = { 
  13.   if (getKeyClass().isArray) { 
  14.     if (mapSideCombine) { 
  15.       throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner])
  16.  { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
  17.  if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => 
  18. { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } 
  19. else if (mapSideCombine) 
  20. { val combined = self.mapPartitionsWithContext((context, iter) => 
  21. { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) =>
  22.  { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) }
  23.  else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) =>
  24.  { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true
  25.   } 

reduceByKey最終會調用combineByKey, 在這個函數中PairedRDDFunctions會被轉換成為ShuffleRDD,當調用mapPartitionsWithContext之后,shuffleRDD被轉換成為MapPartitionsRDD

Log輸出能證明我們的分析

  1. res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13 

RDD轉換小結

小結一下整個RDD轉換過程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整個轉換過程好長啊,這一切的轉換都發生在任務提交之前。

運行過程分析

數據集操作分類

在對任務運行過程中的函數調用關系進行分析之前,我們也來探討一個偏理論的東西,作用于RDD之上的Transformantion為什么會是這個樣子?

對這個問題的解答和數學搭上關系了,從理論抽象的角度來說,任務處理都可歸結為“input->processing->output"。input和output對應于數據集dataset.

在此基礎上作一下簡單的分類

  1. one-one 一個dataset在轉換之后還是一個dataset,而且dataset的size不變,如map

  2. one-one 一個dataset在轉換之后還是一個dataset,但size發生更改,這種更改有兩種可能:擴大或縮小,如flatMap是size增大的操作,而subtract是size變小的操作

  3. many-one 多個dataset合并為一個dataset,如combine, join

  4. one-many 一個dataset分裂為多個dataset, 如groupBy

Task運行期的函數調用

task的提交過程參考本系列中的第二篇文章。本節主要講解當task在運行期間是如何一步步調用到作用于RDD上的各個operation

TaskRunner.run

Task.run

Task.runTask (Task是一個基類,有兩個子類,分別為ShuffleMapTask和ResultTask)

RDD.iterator

RDD.computeOrReadCheckpoint

RDD.compute 

或許當看到RDD.compute函數定義時,還是覺著f沒有被調用,以MappedRDD的compute定義為例

  1. override def compute(split: Partition, context: TaskContext) =                                                                                                       
  2.   firstParent[T].iterator(split, context).map(f)   

注意,這里最容易產生錯覺的地方就是map函數,這里的map不是RDD中的map,而是scala中定義的iterator的成員函數map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆棧輸出

  1. 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 
  2. 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 
  3. 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 
  4. 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 
  5. 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  6. 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  7. 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  8. 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  9. 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  10. 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
  11. 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  12. 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  13. 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  14. 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  15. 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  16. 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 
  17. 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  18. 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  19. 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 
  20. 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 
  21. 100         at org.apache.spark.scheduler.Task.run(Task.scala:53) 
  22. 101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) 

ResultTask

compute的計算過程對于ShuffleMapTask比較復雜,繞的圈圈比較多,對于ResultTask就直接許多。

  1. override def runTask(context: TaskContext): U = { 
  2.     metrics = Some(context.taskMetrics) 
  3.     try { 
  4.       func(context, rdd.iterator(split, context)) 
  5.     } finally { 
  6.       context.executeOnCompleteCallbacks() 
  7.     } 
  8.   }  

 計算結果的傳遞

上面的分析知道,wordcount這個job在最終提交之后,被DAGScheduler分為兩個stage,***個Stage是shuffleMapTask,第二個Stage是ResultTask.

那么ShuffleMapTask的計算結果是如何被ResultTask取得的呢?這個過程簡述如下

  1. ShffuleMapTask將計算的狀態(注意不是具體的數據)包裝為MapStatus返回給DAGScheduler

  2. DAGScheduler將MapStatus保存到MapOutputTrackerMaster中

  3. ResultTask在執行到ShuffleRDD時會調用BlockStoreShuffleFetcher的fetch方法去獲取數據

    1. ***件事就是咨詢MapOutputTrackerMaster所要取的數據的location

    2. 根據返回的結果調用BlockManager.getMultiple獲取真正的數據

BlockStoreShuffleFetcher的fetch函數偽碼

  1. val blockManager = SparkEnv.get.blockManager 
  2.  
  3. val startTime = System.currentTimeMillis 
  4. val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) 
  5. logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) 
  6. val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)  

注意上述代碼中的getServerStatusesgetMultiple,一個是詢問數據的位置,一個是去獲取真正的數據。

有關Shuffle的詳細解釋,請參考”詳細探究Spark的shuffle實現一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

原文鏈接:http://www.cnblogs.com/hseagle/p/3673132.html

 

責任編輯:彭凡 來源: 博客園
相關推薦

2014-07-04 10:58:47

Apache Spar

2014-07-03 15:40:09

Apache Spar

2014-07-15 10:59:58

Spark代碼跟讀

2022-08-27 08:02:09

SQL函數語法

2021-08-09 09:00:00

Kubernetes云計算架構

2021-08-30 18:09:57

鴻蒙HarmonyOS應用

2012-02-22 22:56:19

開源Apache

2011-03-21 10:49:33

LAMPApache

2014-02-14 15:43:16

ApacheSpark

2010-06-04 09:11:10

.NET并行編程

2015-03-31 18:26:43

陌陌社交

2010-09-16 09:35:17

SQL函數

2021-04-15 08:45:25

Zabbix 5.2Apache監控

2011-03-21 11:33:09

LAMPApache

2022-05-19 15:08:43

技術函數調用棧Linux

2011-06-23 14:27:48

QT QLibrary 動態庫

2010-06-08 08:41:08

.NET 4并行編程

2010-06-07 08:43:46

.NET 4并行編程

2024-03-14 08:17:33

JVMJava對象

2017-08-04 10:58:55

RDDSpark算子
點贊
收藏

51CTO技術棧公眾號

中文字幕中文字幕在线十八区 | 国产理论视频在线观看| 日本久久精品| 91精品中文字幕一区二区三区| 只有这里有精品| 神马久久高清| 免费观看在线综合| 久久久久久有精品国产| ass极品国模人体欣赏| 91精品国产自产在线丝袜啪| 91激情五月电影| 人人妻人人澡人人爽欧美一区双 | www.欧美精品| 北岛玲一区二区| 伊人久久大香| 日韩欧美第一页| 狠狠精品干练久久久无码中文字幕| 午夜小视频免费| 国产一区不卡在线| 精品亚洲一区二区三区在线观看| 欧美国产极速在线| 亚洲调教欧美在线| 亚洲福利影视| 免费高潮视频95在线观看网站| 欧美午夜大胆人体| 99久久免费精品高清特色大片| 国产精品偷伦一区二区| 亚洲精品视频在线观看免费视频| 99精品视频在线| 亚洲乱码国产乱码精品精天堂| 香蕉视频在线观看黄| 国产精品99精品一区二区三区∴| 欧美日韩国产一区在线| 亚洲啊啊啊啊啊| 日本在线看片免费人成视1000| 91在线云播放| 国产日韩久久| 亚洲国产欧美另类| 狠狠狠色丁香婷婷综合激情 | 一本久道久久综合| 精品三级久久久久久久电影聊斋| 波多野洁衣一区| 亚洲一区二区三区成人在线视频精品| 涩涩视频在线观看| 日日夜夜精品视频免费| 欧美亚洲另类制服自拍| 一区二区三区视频免费看| 伊人久久综合| 欧美激情亚洲激情| 国产成人亚洲精品| 欧美激情 亚洲| 高清不卡一区| 91麻豆精品久久久久蜜臀| 182午夜在线观看| 国产91在线精品| 欧美三日本三级三级在线播放| 农村妇女精品一二区| 国产精品专区免费| 91久久精品国产91性色tv| 免费大片在线观看| 成人性生交大片免费观看网站| 粉嫩老牛aⅴ一区二区三区| 乱妇乱女熟妇熟女网站| 美女搞黄视频在线观看| 欧美天天综合色影久久精品| 国产91在线免费| 香蕉视频亚洲一级| 在线看国产一区| www.超碰97.com| 精品一区二区三区中文字幕| 日韩视频一区在线观看| 国产综合内射日韩久| 理论片一区二区在线| 亚洲乱亚洲乱妇无码| 欧美丰满老妇熟乱xxxxyyy| 国产精品videosex性欧美| 久久亚洲精品一区二区| 精品人妻在线播放| 国产精品亚洲综合色区韩国| 国产成人久久精品| 伊人网免费视频| 国产成人精品1024| 蜜桃av噜噜一区二区三区| 成人精品福利| 亚洲六月丁香色婷婷综合久久 | av毛片在线免费| 亚洲国产精品麻豆| 免费看黄色一级大片| 精品成人18| 亚洲精品mp4| 国产黄色录像视频| 精品成人免费| 国产精品偷伦一区二区| 亚洲第一天堂影院| 国产亚洲欧美日韩在线一区| 亚洲3p在线观看| 久久久久免费看黄a片app| 性xxxxfreexxxxx欧美丶| 欧美日韩免费在线视频| 精品人妻二区中文字幕| 精品99久久| 久久久久久国产精品三级玉女聊斋| 日本视频免费在线| 美腿丝袜亚洲三区| 久久精品国产综合精品| 国产二区三区在线| 欧美性猛交xxxx免费看久久久| 99re6在线观看| 精品综合久久88少妇激情| 亚洲香蕉成视频在线观看| 精品国产乱码久久久久久鸭王1 | 91淫黄看大片| 北条麻妃在线一区二区免费播放 | 美国黄色特级片| 亚洲一级网站| 91老司机在线| www免费网站在线观看| 性欧美疯狂xxxxbbbb| 青青草久久伊人| 国产精品日韩精品中文字幕| 久久久久久18| 国产精品久久久久毛片| 国产网站一区二区三区| 少妇av一区二区三区无码| 高清久久精品| www.久久色.com| 日韩欧美国产另类| 久久综合给合久久狠狠狠97色69| 国产欧美123| av一级久久| 这里只有精品在线观看| 日韩综合在线观看| 日韩激情精品| 亚洲卡通动漫在线| 激情五月俺来也| 国模精品一区| 欧美在线一级视频| 少妇喷水在线观看| 亚洲香肠在线观看| 日韩精品视频网址| 天天做天天爱天天综合网| 国产精品户外野外| 国产高清一区在线观看| 色综合久久中文字幕| 免费日本黄色网址| 99精品久久久| 国产欧美在线一区二区| 精品丝袜在线| 亚洲精品动漫100p| 国产无人区码熟妇毛片多| 成人动漫av在线| 国产中文字幕乱人伦在线观看| 99热这里只有精品首页 | 欧美精品一区二区三区在线看午夜| 欧美xxxxhdvideosex| 精品免费国产一区二区三区四区| 国产女人被狂躁到高潮小说| 国产suv一区二区三区88区| av日韩在线看| 久久大胆人体视频| 欧美亚洲成人免费| 电影av一区| 欧美高清视频不卡网| 国产一区二区视频在线观看免费| 国产一区二区三区香蕉| www.男人天堂网| 国内毛片久久| 国产mv久久久| 色影视在线观看| 91精品国产综合久久福利软件| 18岁成人毛片| av高清不卡在线| 日韩精品无码一区二区三区免费| 日本不卡二三区| 亚洲一区二区少妇| free性m.freesex欧美| 亚洲欧美一区二区三区在线| 中文字幕欧美色图| 在线观看免费网站黄| 欧美日韩激情网| 日韩毛片无码永久免费看| 精品一区二区国语对白| 国产日韩欧美大片| 偷拍亚洲色图| 国产在线不卡精品| 成人一级福利| 中文字幕日韩在线观看| 亚洲国产精彩视频| 一本一道久久a久久精品| 欧美h片在线观看| 成人中文字幕合集| 浓精h攵女乱爱av| 欧美日本二区| 日本在线观看一区| 91精品国产自产在线丝袜啪| 日韩av理论片| 免费在线国产视频| 中文字幕久久精品| 亚洲aⅴ在线观看| 欧美精品亚洲二区| 国产精品一区二区三区四| 亚洲丝袜制服诱惑| 精品无码一区二区三区 | 亚洲一区二区三区久久| 日本久久免费| 久久久久五月天| 日韩av中文| 亚洲免费高清视频| 亚洲成人一二三区| 欧美狂野另类xxxxoooo| 依依成人综合网| 亚洲综合久久久| 秋霞欧美一区二区三区视频免费| 26uuuu精品一区二区| wwwxxxx在线观看| 美女视频黄频大全不卡视频在线播放 | 日本精品一二区| 91精品国产一区二区| 精产国品一区二区| 婷婷丁香激情综合| 久久久精品国产sm调教| 国产精品久久久久久久久免费丝袜| 亚洲精品第二页| 国产一区二区三区精品欧美日韩一区二区三区| 日日橹狠狠爱欧美超碰| 欧美三级黄美女| 91嫩草国产丨精品入口麻豆| 日韩欧美字幕| 欧美一级二级三级| 羞羞色国产精品网站| 国严精品久久久久久亚洲影视| 精品中文在线| 91九色精品视频| 色哟哟免费视频| 精品三级久久久| 成人精品一区二区三区| yiren22亚洲综合| 国产不卡视频在线| 日韩电影免费观| 7m精品福利视频导航| 99热99re6国产在线播放| 超碰97人人做人人爱少妇| 免费成人黄色| www.亚洲免费视频| 麻豆传媒视频在线观看免费| 日韩亚洲综合在线| 成人短视频在线观看| 欧美精品少妇videofree| 中文字幕伦理免费在线视频 | 久久精品国产亚洲av高清色欲| 亚洲日本在线观看| 在线免费观看亚洲视频| 亚洲激情在线激情| 欧美日韩免费做爰视频| 夜色激情一区二区| 国产无码精品在线观看| 欧美日韩国产精品一区| 亚洲天堂一区在线观看| 一本大道综合伊人精品热热| 日本三级一区二区三区| 欧美日韩成人一区| 国产熟女精品视频| 精品国产一区二区三区忘忧草| 日韩中文字幕观看| 精品亚洲一区二区| 最新97超碰在线| 欧美二区在线播放| 国产污视频在线播放| 国产成人精品综合| 57pao成人永久免费| 岛国视频一区| 欧美极品在线观看| 吴梦梦av在线| 亚洲久色影视| 激情 小说 亚洲 图片: 伦| 国产真实乱偷精品视频免| 久久无码专区国产精品s| 99免费精品在线观看| 亚洲无人区码一码二码三码的含义 | 欧美日本在线看| 黑人精品一区二区| 国产亚洲一区二区在线| a视频在线免费看| 97超碰蝌蚪网人人做人人爽| www.一区| 韩国一区二区三区美女美女秀| 欧美丝袜激情| 免费在线看黄色片| 日韩国产精品久久| 啊v视频在线一区二区三区| 加勒比av在线播放| 欧美日韩美女在线观看| 91超薄丝袜肉丝一区二区| 亚洲第一精品电影| 在线看的av网站| 国内精品久久久久| 四虎精品在线观看| 久久精品日韩精品| 欧美在线精品一区| 波多野结衣天堂| 成人福利视频在线| 国产精品免费人成网站酒店| 欧美午夜激情在线| www.久久久久久| 深夜福利日韩在线看| 欧美少妇网站| caoporn国产精品免费公开| 欧美精品一区二区三区精品| 亚洲理论电影在线观看| 精品一区二区三区在线观看国产 | 国产日韩精品久久久| 国产主播在线播放| 欧美一区二区三区四区视频| 久蕉在线视频| 久久久噜噜噜久久| 另类视频一区二区三区| 午夜精品美女久久久久av福利| 亚洲精品男同| 99国产精品免费视频| 国产精品短视频| 日韩不卡高清视频| 亚洲欧美一区二区激情| 蜜桃视频m3u8在线观看| 国产精品青青草| 欧美久久综合| 亚洲在线观看网站| 国产精品美女久久久久高潮| 337p粉嫩色噜噜噜大肥臀| 日韩激情视频在线播放| 毛片电影在线| 精品午夜一区二区三区| 在线成人国产| 中文字幕一区二区三区乱码不卡| 伊人夜夜躁av伊人久久| 国产精品久久久久久久成人午夜| 中文字幕成人在线| 深夜视频一区二区| 欧美一二三四五区| 日韩高清在线不卡| 亚洲v国产v欧美v久久久久久| 欧美日韩综合视频| 日本a一级在线免费播放| 91精品国产99| 亚洲区小说区| 97在线播放视频| 久久久久久久久伊人| 国产一级免费视频| 亚洲色图国产精品| 99亚洲伊人久久精品影院| 亚洲ai欧洲av| 国内一区二区视频| 91日韩中文字幕| 日韩精品中文字幕一区二区三区| av在线播放国产| 成人av网站观看| 99在线精品免费视频九九视 | 99视频在线免费| 青青青手机在线视频观看| 欧美成人免费在线视频| 国产美女视频一区二区| 国产精品久久久影院| 国产成人激情av| 日韩黄色在线视频| 亚洲欧美精品伊人久久| 成人四虎影院| 天天干天天色天天爽| 国产丶欧美丶日本不卡视频| 国产一级在线免费观看| 日韩精品一区二区视频| 羞羞影院欧美| 中文字幕黄色大片| 成人激情小说乱人伦| 亚洲 欧美 中文字幕| 色妞欧美日韩在线| 91成人午夜| 亚洲一区二区蜜桃| 亚洲女与黑人做爰| 天天操天天干天天| 国产精品久久久久久久久久| 91精品国产自产在线观看永久∴| 在线xxxxx| 在线观看日韩av先锋影音电影院| 蜜桃视频网站在线观看| 国产精品v欧美精品∨日韩| 久久一区亚洲| 国产在线一卡二卡| 亚洲人成电影网站色www| 国产激情一区| 91精品91久久久中77777老牛| 国产精品久久看| 性欧美18一19性猛交| 国产成人综合精品| 国产精品v欧美精品v日本精品动漫| v8888av| 日韩一区二区免费视频| 欧美最新精品| a级黄色片免费| 国产精品嫩草影院com| 四虎免费在线观看| 成人写真福利网|