精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

PySpark源碼解析,教你用Python調用高效Scala接口,搞定大規模數據分析

新聞 前端 Spark
相較于Scala語言而言,Python具有其獨有的優勢及廣泛應用性,因此Spark也推出了PySpark,在框架上提供了利用Python語言的接口,為數據科學家使用該框架提供了便利。

相較于Scala語言而言,Python具有其獨有的優勢及廣泛應用性,因此Spark也推出了PySpark,在框架上提供了利用Python語言的接口,為數據科學家使用該框架提供了便利。

[[286753]]

眾所周知,Spark 框架主要是由 Scala 語言實現,同時也包含少量 Java 代碼。Spark 面向用戶的編程接口,也是 Scala。然而,在數據科學領域,Python 一直占據比較重要的地位,仍然有大量的數據工程師在使用各類 Python 數據處理和科學計算的庫,例如 numpy、Pandas、scikit-learn 等。同時,Python 語言的入門門檻也顯著低于 Scala。

為此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便廣大數據科學家使用。本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:

  • PySpark 的多進程架構;
  • Python 端調用 Java、Scala 接口;
  • Python Driver 端 RDD、SQL 接口;
  • Executor 端進程間通信和序列化;
  • Pandas UDF;
  • 總結。 

PySpark項目地址:https://github.com/apache/spark/tree/master/python 

1、PySpark 的多進程架構

PySpark 采用了 Python、JVM 進程分離的多進程架構,在 Driver、Executor 端均會同時有 Python、JVM 兩個進程。當通過 spark-submit 提交一個 PySpark 的 Python 腳本時,Driver 端會直接運行這個 Python 腳本,并從 Python 中啟動 JVM;而在 Python 中調用的 RDD 或者 DataFrame 的操作,會通過 Py4j 調用到 Java 的接口。

在 Executor 端恰好是反過來,首先由 Driver 啟動了 JVM 的 Executor 進程,然后在 JVM 中去啟動 Python 的子進程,用以執行 Python 的 UDF,這其中是使用了 socket 來做進程間通信。總體的架構圖如下所示:

PySpark源码解析,教你用Python调用高效Scala接口,搞定大规模数据分析

2、Python Driver 如何調用 Java 的接口

上面提到,通過 spark-submit 提交 PySpark 作業后,Driver 端首先是運行用戶提交的 Python 腳本,然而 Spark 提供的大多數 API 都是 Scala 或者 Java 的,那么就需要能夠在 Python 中去調用 Java 接口。這里 PySpark 使用了 Py4j 這個開源庫。當創建 Python 端的 SparkContext 對象時,實際會啟動 JVM,并創建一個 Scala 端的 SparkContext 對象。代碼實現在 python/pyspark/context.py:

  1. def _ensure_initialized(cls, instance=None, gateway=None, conf=None): 
  2.     ""
  3.     Checks whether a SparkContext is initialized or not. 
  4.     Throws error if a SparkContext is already running. 
  5.     ""
  6.     with SparkContext._lock: 
  7.         if not SparkContext._gateway: 
  8.             SparkContext._gateway = gateway or launch_gateway(conf) 
  9.             SparkContext._jvm = SparkContext._gateway.jvm 

在 launch_gateway (python/pyspark/java_gateway.py) 中,首先啟動 JVM 進程:

  1. SPARK_HOME = _find_spark_home() 
  2. # Launch the Py4j gateway using Spark's run command so that we pick up the 
  3. # proper classpath and settings from spark-env.sh 
  4. on_windows = platform.system() == "Windows" 
  5. script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" 
  6. command = [os.path.join(SPARK_HOME, script)] 

然后創建 JavaGateway 并 import 一些關鍵的 class:

  1. gateway = JavaGateway( 
  2.         gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, 
  3.                                              auto_convert=True)) 
  4. # Import the classes used by PySpark 
  5. java_import(gateway.jvm, "org.apache.spark.SparkConf"
  6. java_import(gateway.jvm, "org.apache.spark.api.java.*"
  7. java_import(gateway.jvm, "org.apache.spark.api.python.*"
  8. java_import(gateway.jvm, "org.apache.spark.ml.python.*"
  9. java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*"
  10. # TODO(davies): move into sql 
  11. java_import(gateway.jvm, "org.apache.spark.sql.*"
  12. java_import(gateway.jvm, "org.apache.spark.sql.api.python.*"
  13. java_import(gateway.jvm, "org.apache.spark.sql.hive.*"
  14. java_import(gateway.jvm, "scala.Tuple2"
  15. 拿到 JavaGateway 對象,即可以通過它的 jvm 屬性,去調用 Java 的類了,例如: 
  16. gateway = JavaGateway() 
  17.  
  18. gateway = JavaGateway() 
  19. jvm = gateway.jvm 
  20. l = jvm.java.util.ArrayList() 

然后會繼續創建 JVM 中的 SparkContext 對象:

  1. def _initialize_context(self, jconf): 
  2.     ""
  3.     Initialize SparkContext in function to allow subclass specific initialization 
  4.     ""
  5.     return self._jvm.JavaSparkContext(jconf) 
  6.  
  7. # Create the Java SparkContext through Py4J 
  8. self._jsc = jsc or self._initialize_context(self._conf._jconf) 

3、Python Driver 端的 RDD、SQL 接口

在 PySpark 中,繼續初始化一些 Python 和 JVM 的環境后,Python 端的 SparkContext 對象就創建好了,它實際是對 JVM 端接口的一層封裝。和 Scala API 類似,SparkContext 對象也提供了各類創建 RDD 的接口,和 Scala API 基本一一對應,我們來看一些例子。

  1. def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, 
  2.                      valueConverter=None, conf=None, batchSize=0): 
  3.     jconf = self._dictToJavaMap(conf) 
  4.     jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, 
  5.                                                 valueClass, keyConverter, valueConverter, 
  6.                                                 jconf, batchSize) 
  7.     return RDD(jrdd, self) 

可以看到,這里 Python 端基本就是直接調用了 Java/Scala 接口。而 PythonRDD (core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),則是一個 Scala 中封裝的伴生對象,提供了常用的 RDD IO 相關的接口。另外一些接口會通過 self._jsc 對象去創建 RDD。其中 self._jsc 就是 JVM 中的 SparkContext 對象。拿到 RDD 對象之后,可以像 Scala、Java API 一樣,對 RDD 進行各類操作,這些大部分都封裝在 python/pyspark/rdd.py 中。

這里的代碼中出現了 jrdd 這樣一個對象,這實際上是 Scala 為提供 Java 互操作的 RDD 的一個封裝,用來提供 Java 的 RDD 接口,具體實現在 core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 中。可以看到每個 Python 的 RDD 對象需要用一個 JavaRDD 對象去創建。

對于 DataFrame 接口,Python 層也同樣提供了 SparkSession、DataFrame 對象,它們也都是對 Java 層接口的封裝,這里不一一贅述。

4、Executor 端進程間通信和序列化

對于 Spark 內置的算子,在 Python 中調用 RDD、DataFrame 的接口后,從上文可以看出會通過 JVM 去調用到 Scala 的接口,最后執行和直接使用 Scala 并無區別。而對于需要使用 UDF 的情形,在 Executor 端就需要啟動一個 Python worker 子進程,然后執行 UDF 的邏輯。那么 Spark 是怎樣判斷需要啟動子進程的呢?

在 Spark 編譯用戶的 DAG 的時候,Catalyst Optimizer 會創建 BatchEvalPython 或者 ArrowEvalPython 這樣的 Logical Operator,隨后會被轉換成 PythonEvals 這個 Physical Operator。在 PythonEvals(sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

  1. object PythonEvals extends Strategy { 
  2.   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { 
  3.     case ArrowEvalPython(udfs, output, child, evalType) => 
  4.       ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil 
  5.     case BatchEvalPython(udfs, output, child) => 
  6.       BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil 
  7.     case _ => 
  8.       Nil 
  9.   } 

創建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而這二者內部會創建 ArrowPythonRunner、PythonUDFRunner 等類的對象實例,并調用了它們的 compute 方法。由于它們都繼承了 BasePythonRunner,基類的 compute 方法中會去啟動 Python 子進程:

  1. def compute( 
  2.       inputIterator: Iterator[IN], 
  3.       partitionIndex: Int, 
  4.       context: TaskContext): Iterator[OUT] = { 
  5.   // ...... 
  6.  
  7.   val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) 
  8.   // Start a thread to feed the process input from our parent's iterator 
  9.   val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context) 
  10.   writerThread.start() 
  11.   val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) 
  12.  
  13.   val stdoutIterator = newReaderIterator( 
  14.     stream, writerThread, startTime, env, worker, releasedOrClosed, context) 
  15.   new InterruptibleIterator(context, stdoutIterator) 

這里 env.createPythonWorker 會通過 PythonWorkerFactory(core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去啟動 Python 進程。Executor 端啟動 Python 子進程后,會創建一個 socket 與 Python 建立連接。所有 RDD 的數據都要序列化后,通過 socket 發送,而結果數據需要同樣的方式序列化傳回 JVM。

對于直接使用 RDD 的計算,或者沒有開啟 spark.sql.execution.arrow.enabled 的 DataFrame,是將輸入數據按行發送給 Python,可想而知,這樣效率極低。

在 Spark 2.2 后提供了基于 Arrow 的序列化、反序列化的機制(從 3.0 起是默認開啟),從 JVM 發送數據到 Python 進程的代碼在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。這個類主要是重寫了 newWriterThread 這個方法,使用了 ArrowWriter 向 socket 發送數據:

  1. val arrowWriter = ArrowWriter.create(root) 
  2. val writer = new ArrowStreamWriter(root, null, dataOut) 
  3. writer.start() 
  4.  
  5. while (inputIterator.hasNext) { 
  6. val nextBatch = inputIterator.next() 
  7.  
  8. while (nextBatch.hasNext) { 
  9.     arrowWriter.write(nextBatch.next()) 
  10.  
  11. arrowWriter.finish() 
  12. writer.writeBatch() 
  13. arrowWriter.reset() 

可以看到,每次取出一個 batch,填充給 ArrowWriter,實際數據會保存在 root 對象中,然后由 ArrowStreamWriter 將 root 對象中的整個 batch 的數據寫入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 會調用 writeBatch 方法去序列化消息并寫數據,代碼參考 ArrowWriter.java#L131。

  1. protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException { 
  2.   ArrowBlock block = MessageSerializer.serialize(out, batch, option); 
  3.   LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}"
  4.       block.getOffset(), block.getMetadataLength(), block.getBodyLength()); 
  5.   return block; 

在 MessageSerializer 中,使用了 flatbuffer 來序列化數據。flatbuffer 是一種比較高效的序列化協議,它的主要優點是反序列化的時候,不需要解碼,可以直接通過裸 buffer 來讀取字段,可以認為反序列化的開銷為零。我們來看看 Python 進程收到消息后是如何反序列化的。

Python 子進程實際上是執行了 worker.py 的 main 函數 (python/pyspark/worker.py):

  1. if __name__ == '__main__'
  2.     # Read information about how to connect back to the JVM from the environment. 
  3.     java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) 
  4.     auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"
  5.     (sock_file, _) = local_connect_and_auth(java_port, auth_secret) 
  6.     main(sock_file, sock_file) 

這里會去向 JVM 建立連接,并從 socket 中讀取指令和數據。對于如何進行序列化、反序列化,是通過 UDF 的類型來區分:

  1. eval_type = read_int(infile) 
  2. if eval_type == PythonEvalType.NON_UDF: 
  3.     func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
  4. else
  5.     func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) 

在 read_udfs 中,如果是 PANDAS 類的 UDF,會創建 ArrowStreamPandasUDFSerializer,其余的 UDF 類型創建 BatchedSerializer。我們來看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):

  1. def dump_stream(self, iterator, stream): 
  2.     import pyarrow as pa 
  3.     writer = None 
  4.     try
  5.         for batch in iterator: 
  6.             if writer is None: 
  7.                 writer = pa.RecordBatchStreamWriter(stream, batch.schema) 
  8.             writer.write_batch(batch) 
  9.     finally
  10.         if writer is not None: 
  11.             writer.close() 
  12.  
  13. def load_stream(self, stream): 
  14.     import pyarrow as pa 
  15.     reader = pa.ipc.open_stream(stream) 
  16.     for batch in reader: 
  17.         yield batch 

可以看到,這里雙向的序列化、反序列化,都是調用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好對應的,也是按 batch 來讀寫數據。對于 Pandas 的 UDF,讀到一個 batch 后,會將 Arrow 的 batch 轉換成 Pandas Series。

  1. def arrow_to_pandas(self, arrow_column): 
  2.     from pyspark.sql.types import _check_series_localize_timestamps 
  3.  
  4.     # If the given column is a date type column, creates a series of datetime.date directly 
  5.     # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by 
  6.     # datetime64[ns] type handling. 
  7.     s = arrow_column.to_pandas(date_as_object=True) 
  8.  
  9.     s = _check_series_localize_timestamps(s, self._timezone) 
  10.     return s 
  11.  
  12. def load_stream(self, stream): 
  13.     ""
  14.     Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. 
  15.     ""
  16.     batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) 
  17.     import pyarrow as pa 
  18.     for batch in batches: 
  19.         yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] 

5、Pandas UDF

前面我們已經看到,PySpark 提供了基于 Arrow 的進程間通信來提高效率,那么對于用戶在 Python 層的 UDF,是不是也能直接使用到這種高效的內存格式呢?答案是肯定的,這就是 PySpark 推出的 Pandas UDF。區別于以往以行為單位的 UDF,Pandas UDF 是以一個 Pandas Series 為單位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 這個參數來控制。這是一個來自官方文檔的示例:

  1. def multiply_func(a, b): 
  2.     return a * b 
  3.  
  4. multiply = pandas_udf(multiply_func, returnType=LongType()) 
  5.  
  6. df.select(multiply(col("x"), col("x"))).show() 

上文已經解析過,PySpark 會將 DataFrame 以 Arrow 的方式傳遞給 Python 進程,Python 中會轉換為 Pandas Series,傳遞給用戶的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 來完成計算,在易用性和性能上都得到了很大的提升。

6、總結

PySpark 為用戶提供了 Python 層對 RDD、DataFrame 的操作接口,同時也支持了 UDF,通過 Arrow、Pandas 向量化的執行,對提升大規模數據處理的吞吐是非常重要的,一方面可以讓數據以向量的形式進行計算,提升 cache 命中率,降低函數調用的開銷,另一方面對于一些 IO 的操作,也可以降低網絡延遲對性能的影響。

然而 PySpark 仍然存在著一些不足,主要有:

  • 進程間通信消耗額外的 CPU 資源;
  • 編程接口仍然需要理解 Spark 的分布式計算原理;
  • Pandas UDF 對返回值有一定的限制,返回多列數據不太方便。 

Databricks 提出了新的 Koalas 接口來使得用戶可以以接近單機版 Pandas 的形式來編寫分布式的 Spark 計算作業,對數據科學家會更加友好。而 Vectorized Execution 的推進,有望在 Spark 內部一切數據都是用 Arrow 的格式來存放,對跨語言支持將會更加友好。同時也能看到,在這里仍然有很大的性能、易用性的優化空間,這也是我們平臺近期的主要發力方向之一。

作者介紹

陳緒,匯量科技(Mobvista)高級算法科學家,負責匯量科技大規模數據智能計算引擎和平臺的研發工作。在此之前陳緒是阿里巴巴高級技術專家,負責阿里集團大規模機器學習平臺的研發。

 

責任編輯:張燕妮 來源: 機器之心
相關推薦

2021-08-25 08:23:51

AI數據機器學習

2023-10-26 01:26:04

Vaex數據數據集

2013-04-27 09:09:07

大數據全球技術峰會

2020-10-30 11:09:30

Pandas數據代碼

2024-04-02 14:29:12

網絡安全數據泄露

2023-10-07 08:30:07

B+樹數據庫管理系統

2020-08-23 12:27:39

測試接口技巧

2023-10-05 12:43:48

數據處理

2016-10-12 09:22:51

數據分析技術Apache Kyli

2020-07-23 14:03:09

數據中心數據網絡

2022-06-24 09:00:00

數據管理數據卷數據存儲

2024-08-21 15:14:21

2023-12-15 10:16:51

容器工具數據

2020-12-11 19:52:06

數據中心超大規模數據中心

2020-06-10 10:00:53

Serverless數據處理函數

2023-02-14 11:24:36

2022-12-30 14:14:51

數據中心服務器

2016-05-30 12:08:14

2021-09-24 11:34:44

MaxCompute Python 數據分析

2020-12-07 16:20:53

Python 開發編程語言
點贊
收藏

51CTO技術棧公眾號

国产精品久久久免费| 天然素人一区二区视频| www.欧美.com| 国产精品电影观看| 成人免费视频国产免费观看| 91精品丝袜国产高跟在线| 日韩欧美中文免费| 青青草免费在线视频观看| 少妇高潮久久久| 麻豆精品一区二区综合av| 色综合久久悠悠| 精品人妻无码一区二区三区| 国产精品一区二区三区四区在线观看 | 亚洲精品第二页| 精品国产九九九| 久久激情中文| 久久国产精品久久久| 免费观看av网站| 综合中文字幕| 欧美日本视频在线| 国产91美女视频| 密臀av在线| 亚洲欧洲美洲综合色网| 美女主播视频一区| 老牛影视av牛牛影视av| 国产一区美女在线| 国产精品第一区| 欧美在线观看不卡| 亚洲二区精品| 欧美日韩xxx| 青青操在线播放| 免费短视频成人日韩| 精品91自产拍在线观看一区| 日本一二三区在线| 国产91在线精品| 日本久久精品电影| 久久久一本二本三本| 成年人视频免费在线播放| 成人免费一区二区三区在线观看| 欧美日韩一区二区三区在线观看免| av网站免费播放| 久久国产尿小便嘘嘘| 国产精品成人品| 日本久久综合网| 国产欧美大片| 91视频精品| 日本久久一区二区| 久久精品香蕉视频| 欧美xx视频| 一本大道久久精品懂色aⅴ| 秋霞无码一区二区| 国产探花视频在线观看| 亚洲女厕所小便bbb| 中文字幕日韩精品久久| 麻豆传媒视频在线观看免费| 国产精品毛片无遮挡高清| 日韩国产伦理| 日本黄色片在线观看| 国产精品国产a| 天天干天天色天天爽| 国产视频中文字幕在线观看| 亚洲人成网站影音先锋播放| 日本免费黄色小视频| 在线观看操人| 午夜久久福利影院| 99色精品视频| 日本在线视频一区二区| 欧美日韩免费高清一区色橹橹| 99re精彩视频| 欧美不卡在线观看| 精品福利av导航| 欧美无人区码suv| 精品国产1区| 久久亚洲精品成人| 免费人成视频在线| 男人天堂网在线观看| 国产成人精品一区二三区在线观看 | 青青草伊人久久| 国产精品香蕉在线观看| 99精品在线视频观看| 岛国一区二区三区| 欧美精品二区三区四区免费看视频| 伦理片一区二区三区| 国产精品久久久久天堂| 免费cad大片在线观看| 日韩精品美女| 欧美日韩mp4| 亚洲精品国产成人av在线| 亚洲人和日本人hd| 久久影视免费观看| 99精品视频99| 极品少妇xxxx精品少妇| 国产激情美女久久久久久吹潮| 免费在线稳定资源站| 综合亚洲深深色噜噜狠狠网站| 国产黄色激情视频| av成人在线观看| 日韩免费视频一区二区| 91精品人妻一区二区| 99精品综合| 97香蕉久久超级碰碰高清版| 91麻豆一区二区| caoporn97在线视频| 天堂久久一区二区三区| 91在线观看欧美日韩| 三级理论午夜在线观看| 136国产福利精品导航| 国产在线青青草| 国产一区二区三区精品在线观看| 亚洲毛片在线观看.| 人妻少妇精品一区二区三区| 久久一二三区| 国产精品裸体一区二区三区| 一级毛片视频在线| 色香色香欲天天天影视综合网| 亚洲区 欧美区| 日韩欧美字幕| 青青草99啪国产免费| 人妻精品一区二区三区| 综合中文字幕亚洲| 三上悠亚在线一区| 免费成人网www| 97**国产露脸精品国产| www.蜜臀av| 亚洲欧洲www| 欧在线一二三四区| 色爱综合av| 欧美精品精品精品精品免费| 国产精品视频a| 国产精品情趣视频| www.日本xxxx| 综合国产视频| 亚洲91精品在线观看| 亚洲AV午夜精品| 欧美亚洲综合视频| 色偷偷成人一区二区三区91| 欧美久久久久久久久久久| 亚洲成av人电影| 国产日产久久高清欧美一区| 成人激情电影在线看| 日韩欧美在线看| 国产熟妇搡bbbb搡bbbb| 国产欧美精品久久| 精品久久精品久久| 蜜桃视频动漫在线播放| 亚洲成人av在线| 日韩乱码一区二区| 97久久超碰精品国产| 国产日韩av网站| 老司机aⅴ在线精品导航| 久久久伊人日本| 后进极品白嫩翘臀在线视频| 亚洲大尺度视频在线观看| 国产日韩视频一区| 亚洲激情视频| 免费av一区二区三区| 免费亚洲电影| 中文字幕精品一区二区精品| 亚洲影院一区二区三区| 中文字幕亚洲区| 日本精品一区在线| 午夜电影亚洲| 狠狠色综合一区二区| 黑人巨大亚洲一区二区久| 国产午夜精品视频| 91精品人妻一区二区三区果冻| 国产精品超碰97尤物18| 日本美女久久久| 激情久久中文字幕| 欧美成人在线免费观看| 成人一区视频| 欧美成人一区二区三区电影| 懂色av成人一区二区三区| 天天色综合成人网| 免费看裸体网站| 粉嫩av在线播放| 中文字幕国产一区| 久久6免费视频| 亚洲视频一区| 日本一区二区三区四区在线观看 | 国产精品网站入口| 18av在线播放| 亚洲精品白浆高清久久久久久| 国产剧情在线视频| 日韩理论在线观看| a天堂视频在线观看| 日韩精品亚洲一区| 超碰超碰超碰超碰超碰| 亚洲精华一区二区三区| 91精品久久久久久久久久久久久| 日本动漫理论片在线观看网站| 日韩二区三区在线| 91无套直看片红桃| 午夜精品一区二区三区电影天堂 | 亚洲理论在线观看| 亚洲欧美色图视频| 国产麻豆精品一区二区| 少妇性饥渴无码a区免费| 香蕉视频官网在线观看日本一区二区| 国产成人av一区二区三区| 欧美日韩激情电影| 欧美激情精品久久久久久免费印度| 国产在线观看免费| 精品国内二区三区| 亚洲图片欧美在线| 精品久久久久久中文字幕大豆网 | 久久精品蜜桃| 亚洲成av人乱码色午夜| 日本欧美www| 亚洲mv在线观看| 日韩欧美综合视频| 中文字幕欧美激情| 香蕉视频黄色在线观看| 国产福利电影一区二区三区| 日韩福利视频在线| 国产欧美二区| 国产日韩亚洲欧美在线| 欧美国产一级| 视频一区不卡| 亚洲黄页网站| 九色一区二区| 91亚洲无吗| 亚洲已满18点击进入在线看片| 怡红院成人在线| 91精品国产91久久久久| 欧美极品少妇videossex| 色偷偷888欧美精品久久久 | 国产亚洲视频系列| 人妻无码中文久久久久专区| 国产成人欧美日韩在线电影| 亚洲美女性囗交| 麻豆91在线看| 污污动漫在线观看| 日本欧美一区二区| 黑人粗进入欧美aaaaa| 久久这里只有| 亚洲国产精品毛片av不卡在线| 国产一区导航| 色综合久久久久无码专区| 黄色日韩在线| 国产美女在线一区| 亚洲裸体俱乐部裸体舞表演av| 国产女主播自拍| 国产在线日韩| 欧美在线观看视频免费| 国产主播精品| 欧美人成在线观看| 亚洲欧洲一区| 欧美视频在线播放一区| 亚洲综合精品| 欧美亚洲日本在线观看| 日韩成人免费电影| 中文av一区二区三区| 美日韩一区二区三区| www.久久av.com| 国产精品综合二区| 日本不卡视频一区| 99久久婷婷国产综合精品电影| 亚洲欧美日韩偷拍| 久久久av毛片精品| xxxxx99| 亚洲特级片在线| 欧美成欧美va| 欧美日韩免费观看中文| 91久久国产综合久久91| 欧美性一级生活| 国产精品自拍电影| 欧美精品一区二区在线播放| 深夜福利视频一区| 中文字幕欧美日韩精品| www.在线视频| 8x拔播拔播x8国产精品| 欧美性片在线观看| 亚洲最大福利视频网| 日韩在线你懂的| 亚洲免费不卡| 海角社区69精品视频| 免费在线激情视频| 久久99国产精品尤物| 中文字幕99页| 中文字幕国产一区| 精品无码免费视频| 欧洲av一区二区嗯嗯嗯啊| 国产麻豆精品一区| 亚洲国产天堂久久综合网| 搞黄视频在线观看| 久久久免费电影| 免费在线观看一区| 国产精品日韩欧美一区二区| 国内黄色精品| 国产一区二区片| 欧美aa在线视频| 少妇搡bbbb搡bbb搡打电话| 亚洲国产精品成人久久综合一区| 农村黄色一级片| 91九色最新地址| 性生活免费网站| 亚洲人成欧美中文字幕| 欧美xxxx黑人又粗又长| 国产精品免费观看在线| 久久人人爽人人爽人人片av不| 婷婷久久伊人| 日韩一区二区免费看| 在线一区二区不卡| 久久免费电影网| 精品一区在线视频| 欧美日本高清视频在线观看| 水中色av综合| 欧美大荫蒂xxx| 久久婷婷五月综合色丁香| 国产欧美视频在线观看| 国产ts在线观看| 国产精品高清亚洲| 欧美h在线观看| 精品裸体舞一区二区三区| 8888四色奇米在线观看| 欧美一区二区三区免费视| 97久久亚洲| 日本一区二区免费高清视频| 日韩精品一区第一页| 国产麻豆剧传媒精品国产av| 亚洲美女免费在线| 在线观看国产黄| 国产一区二区三区直播精品电影| 激情在线视频播放| 亚洲精品免费网站| 久久成人综合| 欧美成人黄色网址| 国产午夜精品一区二区三区嫩草 | 欧美日韩精品一区二区三区 | www.av中文字幕| 国产福利不卡视频| 青草影院在线观看| 91麻豆精品91久久久久久清纯| 9色在线视频网站| 国产精品1234| 欧州一区二区| 波多野结衣天堂| 亚洲国产成人私人影院tom| 天天干天天插天天射| 亚洲少妇激情视频| 欧洲一级精品| 日本精品一区二区三区不卡无字幕| 国产精品久久国产愉拍| 国产老熟女伦老熟妇露脸| 亚洲国产精品影院| 欧美 日韩 中文字幕| 91福利精品第一导航| 999这里有精品| 中文成人综合网| 911美女片黄在线观看游戏| 最近更新的2019中文字幕| 日韩大陆av| 97在线免费视频观看| 成人午夜av影视| 日韩成人免费在线观看| 日韩精品在线观| 中文字幕av一区二区三区佐山爱| 欧美综合77777色婷婷| 日韩国产欧美视频| 永久免费看片视频教学| 日韩亚洲欧美成人一区| 超碰在线公开| 欧美日韩综合另类| 美腿丝袜在线亚洲一区| 欧美精品乱码视频一二专区| 精品免费一区二区三区| 欧洲一区精品| 色之综合天天综合色天天棕色| 久久精品国产99国产| 国产波霸爆乳一区二区| 日韩av在线免费播放| 欧美三区四区| 亚洲成人动漫在线| 波多野洁衣一区| 一级久久久久久| 欧美日韩国产999| 日韩高清影视在线观看| 日本中文字幕精品—区二区| 一区二区三区四区中文字幕| 亚洲欧美自偷自拍| 国产日产欧美a一级在线| 激情久久久久久| 国产传媒在线看| 精品久久一区二区三区| 高潮一区二区| 日本道在线视频| 久久久另类综合| 精品女同一区二区三区| 日本久久久a级免费| 亚洲综合色站| 91精品人妻一区二区三区蜜桃欧美| 制服丝袜中文字幕一区| 精品三级久久| 潘金莲一级淫片aaaaa免费看| 久久在线观看免费| 亚洲va天堂va欧美ⅴa在线| 日韩av电影在线播放| 亚洲无线视频| 久久99久久99精品免费看小说|