精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Spark Delta Lake寫數(shù)據(jù)使用及實現(xiàn)原理代碼解析

大數(shù)據(jù) Spark
Delta Lake 寫數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現(xiàn)原理之前先來看看如何使用它,具體使用如下。

[[278252]]

Delta Lake 寫數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現(xiàn)原理之前先來看看如何使用它,具體使用如下: 

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/"
  2.   
  3. //數(shù)據(jù)按照 dt 分區(qū) 
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/"
  5.   
  6. // 覆蓋之前的數(shù)據(jù) 
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/"

大家可以看出,使用寫 Delta 數(shù)據(jù)是非常簡單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。

Delta Lake 寫數(shù)據(jù)原理

前面簡單了解了如何使用 Delta Lake 來寫數(shù)據(jù),本小結(jié)我們將深入介紹 Delta Lake 是如何保證寫數(shù)據(jù)的基本原理以及如何保證事務(wù)性。

得益于 Apache Spark 強(qiáng)大的數(shù)據(jù)源 API,我們可以很方便的給 Spark 添加任何數(shù)據(jù)源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實現(xiàn)的一種新的數(shù)據(jù)源,我們調(diào)用 df.write.format("delta") 其實底層調(diào)用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類。為了簡單起見,本文介紹的是 Delta Lake 批量寫的實現(xiàn),實時流寫 Delta Lake 本文不涉及,后面有機(jī)會再介紹。 Delta Lake 批量寫擴(kuò)展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質(zhì),并實現(xiàn)了其中的方法。我們調(diào)用上面的寫數(shù)據(jù)方法首先會調(diào)用 DeltaDataSource 類的 createRelation 方法,它的具體實現(xiàn)如下: 

  1. override def createRelation( 
  2.     sqlContext: SQLContext, 
  3.     mode: SaveMode, 
  4.     parameters: Map[String, String], 
  5.     data: DataFrame): BaseRelation = { 
  6.   
  7.   // 寫數(shù)據(jù)的路徑 
  8.   val path = parameters.getOrElse("path", { 
  9.     throw DeltaErrors.pathNotSpecifiedException 
  10.   }) 
  11.   
  12.   // 分區(qū)字段 
  13.   val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY) 
  14.     .map(DeltaDataSource.decodePartitioningColumns) 
  15.     .getOrElse(Nil) 
  16.   
  17.   
  18.   // 事務(wù)日志對象 
  19.   val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) 
  20.   
  21.   // 真正的寫操作過程 
  22.   WriteIntoDelta( 
  23.     deltaLog = deltaLog, 
  24.     mode = mode, 
  25.     new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf), 
  26.     partitionColumns = partitionColumns, 
  27.     configuration = Map.empty, 
  28.     data = data).run(sqlContext.sparkSession) 
  29.   
  30.   deltaLog.createRelation() 

其中 mode 就是保持?jǐn)?shù)據(jù)的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個傳遞的參數(shù),比如分區(qū)字段、數(shù)據(jù)保存路徑以及 Delta 支持的一些參數(shù)(replaceWhere、mergeSchema、overwriteSchema 等,具體參見 org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數(shù)據(jù)。

createRelation 方法緊接著就是獲取數(shù)據(jù)保存的路徑,分區(qū)字段等信息。然后初始化 deltaLog,deltaLog 的初始化會做很多事情,比如會讀取磁盤所有的事務(wù)日志(_delta_log 目錄下),并構(gòu)建最新事務(wù)日志的最新快照,里面可以拿到最新數(shù)據(jù)的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會緩存到 deltaLogCache 中,這是一個使用 Guava 的 CacheBuilder 類實現(xiàn)的一個緩存,緩存的數(shù)據(jù)保持一小時,緩存大小可以通過 delta.log.cacheSize 參數(shù)進(jìn)行設(shè)置。只要寫數(shù)據(jù)的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無效才會再次初始化。DeltaLog 類是 Delta Lake 中最重要的類之一,涉及的內(nèi)容非常多,所以我們會單獨(dú)使用一篇文章進(jìn)行介紹。

緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴(kuò)展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴(kuò)展這個類的。初始化完 WriteIntoDelta 之后,就會調(diào)用 run 方法執(zhí)行真正的寫數(shù)據(jù)操作。WriteIntoDelta 的 run 方法實現(xiàn)如下: 

  1. override def run(sparkSession: SparkSession): Seq[Row] = { 
  2.     deltaLog.withNewTransaction { txn => 
  3.       val actions = write(txn, sparkSession) 
  4.       val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere) 
  5.       txn.commit(actions, operation) 
  6.     } 
  7.     Seq.empty 

Delta Lake 所有的更新操作都是在事務(wù)中進(jìn)行的,deltaLog.withNewTransaction 就是一個事務(wù),withNewTransaction 的實現(xiàn)如下: 

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
  2.   try { 
  3.     // 更新當(dāng)前表事務(wù)日志的快照 
  4.     update() 
  5.     // 初始化樂觀事務(wù)鎖對象 
  6.     val txn = new OptimisticTransaction(this) 
  7.     // 開啟事務(wù) 
  8.     OptimisticTransaction.setActive(txn) 
  9.     // 執(zhí)行寫數(shù)據(jù)操作 
  10.     thunk(txn) 
  11.   } finally { 
  12.     // 關(guān)閉事務(wù) 
  13.     OptimisticTransaction.clearActive() 
  14.   } 

在開啟事務(wù)之前,需要更新當(dāng)前表事務(wù)的快照,因為在執(zhí)行寫數(shù)據(jù)之前,這張表可能已經(jīng)被修改了,執(zhí)行 update 操作之后,就可以拿到當(dāng)前表的最新版本,緊接著開啟樂觀事務(wù)鎖。thunk(txn) 就是需要執(zhí)行的事務(wù)操作,對應(yīng) deltaLog.withNewTransaction 里面的所有代碼。

我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執(zhí)行寫數(shù)據(jù)的操作,它的實現(xiàn)如下: 

  1.   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
  2.     import sparkSession.implicits._ 
  3.     // 如果不是第一次往表里面寫數(shù)據(jù),需要判斷寫數(shù)據(jù)的模式是否符合條件 
  4.     if (txn.readVersion > -1) { 
  5.       // This table already exists, check if the insert is valid. 
  6.       if (mode == SaveMode.ErrorIfExists) { 
  7.         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
  8.       } else if (mode == SaveMode.Ignore) { 
  9.         return Nil 
  10.       } else if (mode == SaveMode.Overwrite) { 
  11.         deltaLog.assertRemovable() 
  12.       } 
  13.     } 
  14.   
  15.     // 更新表的模式,比如是否覆蓋現(xiàn)有的模式,是否和現(xiàn)有的模式進(jìn)行 merge 
  16.     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
  17.   
  18.     // 是否定義分區(qū)過濾條件 
  19.     val replaceWhere = options.replaceWhere 
  20.     val partitionFilters = if (replaceWhere.isDefined) { 
  21.       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
  22.       if (mode == SaveMode.Overwrite) { 
  23.         verifyPartitionPredicates( 
  24.           sparkSession, txn.metadata.partitionColumns, predicates) 
  25.       } 
  26.       Some(predicates) 
  27.     } else { 
  28.       None 
  29.     } 
  30.   
  31.     // 第一次寫數(shù)據(jù)初始化事務(wù)日志的目錄 
  32.     if (txn.readVersion < 0) { 
  33.       // Initialize the log path 
  34.       deltaLog.fs.mkdirs(deltaLog.logPath) 
  35.     } 
  36.   
  37.     // 寫數(shù)據(jù)到文件系統(tǒng)中 
  38.     val newFiles = txn.writeFiles(data, Some(options)) 
  39.       
  40.     val deletedFiles = (mode, partitionFilters) match { 
  41.        // 全量覆蓋,直接拿出緩存在內(nèi)存中最新事務(wù)日志快照里面的所有 AddFile 文件 
  42.       case (SaveMode.Overwrite, None) => 
  43.         txn.filterFiles().map(_.remove) 
  44.       // 從事務(wù)日志快照中獲取對應(yīng)分區(qū)里面的所有 AddFile 文件 
  45.       case (SaveMode.Overwrite, Some(predicates)) => 
  46.         // Check to make sure the files we wrote out were actually valid. 
  47.         val matchingFiles = DeltaLog.filterFileList( 
  48.           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
  49.         val invalidFiles = newFiles.toSet -- matchingFiles 
  50.         if (invalidFiles.nonEmpty) { 
  51.           val badPartitions = invalidFiles 
  52.             .map(_.partitionValues) 
  53.             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
  54.             .mkString(", "
  55.           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
  56.         } 
  57.   
  58.         txn.filterFiles(predicates).map(_.remove) 
  59.       case _ => Nil 
  60.     } 
  61.   
  62.     newFiles ++ deletedFiles 
  63.   } 

如果 txn.readVersion == -1,說明是第一次寫數(shù)據(jù)到 Delta Lake 表,所以當(dāng)這個值大于 -1 的時候,需要判斷一下寫數(shù)據(jù)的操作是否合法。

由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數(shù)對應(yīng)的操作。

因為 Delta Lake 表支持分區(qū),所以我們可能在寫數(shù)據(jù)的時候指定某個分區(qū)進(jìn)行覆蓋。

真正寫數(shù)據(jù)的操作是 txn.writeFiles 函數(shù)執(zhí)行的,具體實現(xiàn)如下: 

  1. def writeFiles( 
  2.       data: Dataset[_], 
  3.       writeOptions: Option[DeltaOptions], 
  4.       isOptimize: Boolean): Seq[AddFile] = { 
  5.     hasWritten = true 
  6.   
  7.     val spark = data.sparkSession 
  8.     val partitionSchema = metadata.partitionSchema 
  9.     val outputPath = deltaLog.dataPath 
  10.   
  11.     val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) 
  12.     val partitioningColumns = 
  13.       getPartitioningColumns(partitionSchema, outputoutput.length < data.schema.size
  14.   
  15.     // 獲取 DelayedCommitProtocol,里面可以設(shè)置寫文件的名字, 
  16.     // commitTask 和 commitJob 等做一些事情 
  17.     val committer = getCommitter(outputPath) 
  18.   
  19.     val invariants = Invariants.getFromSchema(metadata.schema, spark) 
  20.   
  21.     SQLExecution.withNewExecutionId(spark, queryExecution) { 
  22.       val outputSpec = FileFormatWriter.OutputSpec( 
  23.         outputPath.toString, 
  24.         Map.empty, 
  25.         output
  26.   
  27.       val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) 
  28.   
  29.       FileFormatWriter.write( 
  30.         sparkSession = spark, 
  31.         plan = physicalPlan, 
  32.         fileFormat = snapshot.fileFormat, 
  33.         committer = committer, 
  34.         outputSpec = outputSpec, 
  35.         hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), 
  36.         partitionColumns = partitioningColumns, 
  37.         bucketSpec = None, 
  38.         statsTrackers = Nil, 
  39.         options = Map.empty) 
  40.     } 
  41.   
  42.     // 返回新增的文件 
  43.     committer.addedStatuses 

Delta Lake 寫操作最終調(diào)用 Spark 的 FileFormatWriter.write 方法進(jìn)行的,通過這個方法的復(fù)用將我們真正的數(shù)據(jù)寫入到 Delta Lake 表里面去了。

在 Delta Lake 中,如果是新增文件則會在事務(wù)日志中使用 AddFile 類記錄相關(guān)的信息,AddFile 持久化到事務(wù)日志里面的內(nèi)容如下:

  1. {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}} 

可以看出 AddFile 里面記錄了新增文件的保存路徑,分區(qū)信息,新增的文件大小,修改時間等信息。如果是刪除文件,也會在事務(wù)日志里面記錄這個刪除操作,對應(yīng)的就是使用 RemoveFile 類存儲,RemoveFile 持久化到事務(wù)日志里面的內(nèi)容如下:

  1. {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}} 

RemoveFile 里面保存了刪除文件的路徑,刪除時間等信息。如果新增一個文件,再刪除一個文件,那么最新的事務(wù)日志快照里面只會保存刪除這個文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對文件級別的。

上面的寫操作肯定會產(chǎn)生新的文件,所以寫操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要刪除的文件(RemoveFile)。針對那些文件需要刪除需要做一些判斷,主要分兩種情況(具體參見 write 方法里面的):

  • 如果是全表覆蓋,則直接從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出所有 AddFile 文件,然后將其標(biāo)記為 RemoveFile;
  • 如果是分區(qū)內(nèi)的覆蓋,則從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出對應(yīng)分區(qū)下的 AddFile 文件,然后將其標(biāo)記為 RemoveFile。

最后 write 方法返回新增的文件和需要刪除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務(wù)日志里面去。關(guān)于事務(wù)日志是如何寫進(jìn)去的請參見這篇文章的詳細(xì)分析。

 

責(zé)任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關(guān)推薦

2022-07-06 09:53:04

開源數(shù)據(jù)湖

2023-06-30 07:40:31

數(shù)據(jù)分析Delta lake

2018-04-09 12:25:11

2015-03-10 13:55:31

JavaScript預(yù)解析原理及實現(xiàn)

2022-03-17 08:55:43

本地線程變量共享全局變量

2020-12-04 14:31:45

大數(shù)據(jù)Spark

2023-12-22 13:58:00

C++鏈表開發(fā)

2021-07-07 10:13:56

大數(shù)據(jù)Delta Lake 湖倉一體

2016-12-20 09:47:38

Apache SparLambda架構(gòu)

2018-05-22 09:47:07

2014-02-14 15:43:16

ApacheSpark

2018-04-10 10:32:07

NginxApache服務(wù)器

2022-06-01 13:52:11

開源大數(shù)據(jù)

2019-10-08 17:38:17

開源技術(shù) 趨勢

2018-07-27 08:39:44

負(fù)載均衡算法實現(xiàn)

2012-04-11 15:41:48

JavaNIO

2015-10-16 09:21:13

SparkMySQL數(shù)據(jù)分析

2017-06-26 15:00:17

2020-07-10 09:04:55

HTTPS瀏覽器網(wǎng)絡(luò)協(xié)議

2011-08-04 15:52:48

Objective-C HTML
點贊
收藏

51CTO技術(shù)棧公眾號

日本成人在线免费视频| 国产乱码精品一区二区三区不卡| 精品伦精品一区二区三区视频密桃| 素人啪啪色综合| 国产精品国产馆在线真实露脸| 3d动漫啪啪精品一区二区免费| 日本熟妇毛茸茸丰满| 欧美人与牛zoz0性行为| 91精品一区二区三区久久久久久| 日本一本中文字幕| 国产精品无码2021在线观看| 国产福利91精品一区二区三区| 性欧美xxxx交| 希岛爱理中文字幕| 亚洲精品亚洲人成在线观看| 欧美一区二区成人6969| 99爱视频在线| 羞羞电影在线观看www| 久久久蜜臀国产一区二区| 亚洲va久久久噜噜噜| 一级片在线观看免费| 欧美黄色aaaa| 社区色欧美激情 | 疯狂揉花蒂控制高潮h| 高清一区二区三区av| 一本色道久久综合亚洲91| 乱熟女高潮一区二区在线| 黄色国产在线| 99久久99久久精品国产片果冻 | 日韩黄色一级视频| 激情91久久| 久久影视电视剧免费网站| 日本丰满少妇裸体自慰 | 久久一区激情| 性欧美xxxx交| 久久在线视频精品| 五月天久久久| 最新国产成人av网站网址麻豆| 少妇一级淫免费观看| 亚洲一区二区三区中文字幕在线观看| 欧美性猛交xxxx乱大交退制版 | 亚洲国产午夜伦理片大全在线观看网站 | 免费精品视频一区| 天堂在线视频免费| 丁香亚洲综合激情啪啪综合| 亚洲伊人第一页| 亚洲一区二区色| 免费观看在线综合| 国产精品流白浆视频| www.久久久久久久| 久久国产福利| 国产97在线亚洲| 黄色污污网站在线观看| 男女精品网站| 国产成人一区二区| 免费精品一区二区| 免费久久精品视频| 成人高清视频观看www| 亚洲性在线观看| 麻豆国产91在线播放| 国产区精品在线观看| 亚洲在线视频播放| 久久精品99国产精品日本| 国产精品免费看久久久香蕉| 国产乡下妇女三片| 蜜臀精品久久久久久蜜臀| 国产精品96久久久久久| 成人毛片一区二区三区| 日本不卡一区二区| 成人网在线免费观看| jizz中国女人| 成人av午夜电影| 裸体丰满少妇做受久久99精品| 青青九九免费视频在线| 久久精品日产第一区二区三区高清版 | 亚洲区小说区| 国产亚洲激情在线| 亚洲视频重口味| 激情av一区| 91精品国产亚洲| 特级西西444www大胆免费看| 麻豆精品视频在线观看视频| 亚洲a成v人在线观看| 成人免费视频国产免费麻豆| av在线不卡观看免费观看| 欧美一级爱爱| mm1313亚洲国产精品美女| 亚洲一区二区三区三| 久久免费视频3| 成人免费一区| 欧美成va人片在线观看| 欧美色图亚洲激情| 婷婷另类小说| 国产91精品不卡视频| 国产偷人爽久久久久久老妇app| 国产一区二区在线观看视频| 久久综合九色99| 男人的天堂在线视频免费观看 | 四虎影视在线播放| 国产精品成人免费| 日日橹狠狠爱欧美超碰| 中韩乱幕日产无线码一区| 日韩午夜小视频| 在线免费观看日韩av| 围产精品久久久久久久| 日本久久91av| 亚洲精品久久久久久无码色欲四季| 久久婷婷色综合| 免费观看国产视频在线| 巨茎人妖videos另类| 欧美一卡2卡三卡4卡5免费| 一二三不卡视频| 欧美福利影院| 国产精品视频久久久久| 天堂在线资源8| 亚洲三级在线看| 日韩亚洲在线视频| eeuss鲁片一区二区三区| 在线播放国产一区中文字幕剧情欧美| 国产第一页在线播放| 久久电影国产免费久久电影| 麻豆av福利av久久av| 麻豆av在线免费观看| 欧美乱熟臀69xxxxxx| 中文字幕在线看高清电影| 亚洲无毛电影| 亚洲中国色老太| 伊人免费在线| 日本久久精品电影| 人妻熟女aⅴ一区二区三区汇编| 午夜精品婷婷| 国产免费一区二区三区在线观看| 欧美女同网站| 大伊人狠狠躁夜夜躁av一区| 中文字幕视频观看| 自拍日韩欧美| 成人午夜激情免费视频| av电影在线观看一区二区三区| 日韩欧美成人区| 欧美深性狂猛ⅹxxx深喉| 激情久久久久久| 成人自拍网站| 大香伊人中文字幕精品| 日韩精品一区二区三区四区| 欧美一区二区三区爽爽爽| 精品亚洲国产成人av制服丝袜 | 欧美日韩一本到| 精品欧美一区二区久久久| 日韩天天综合| 久久精精品视频| 五月天av在线| 亚洲免费电影一区| 日韩精品成人免费观看视频| 26uuu亚洲综合色| 国产主播在线看| 国产aⅴ精品一区二区三区久久| 57pao精品| 国产午夜精品一区理论片| 色94色欧美sute亚洲13| 九一在线免费观看| 六月丁香婷婷色狠狠久久| 亚洲国产欧美日韩| 麻豆国产一区| 国语自产精品视频在线看一大j8 | 欧美精品激情在线| 色婷婷中文字幕| 日韩欧美国产一区二区| 精品人妻中文无码av在线| 蜜桃av一区二区在线观看| 一区二区91美女张开腿让人桶| 丁香婷婷久久| 欧美巨猛xxxx猛交黑人97人| 蜜臀av午夜精品| 日韩人在线观看| 免费看91的网站| 激情成人午夜视频| a级免费在线观看| 三级小说欧洲区亚洲区| 国产精品久久久久久久久久| 好了av在线| 亚洲精品www久久久| 草莓视频18免费观看| 亚洲天堂av一区| 日本性生活一级片| 日韩成人精品在线观看| 中文字幕乱码免费| 任你弄精品视频免费观看| 国产精品日韩精品| 成人三级小说| 一区二区三区日韩在线| www.av网站| 狠狠躁夜夜躁人人躁婷婷91 | 亚洲成人午夜电影| www色com| 成人黄页毛片网站| 亚洲77777| 亚洲久久视频| 一区不卡视频| 在线日韩网站| 91嫩草视频在线观看| 欧美特黄aaaaaaaa大片| 欧美成人全部免费| 国产精品99999| 精品国产乱码久久| 91影院在线播放| 黑人巨大精品欧美一区二区| 亚洲不卡在线播放| 久久久久久久网| 亚洲成年人在线观看| 美女在线观看视频一区二区| 欧美 日韩 国产 高清| 91精品啪在线观看国产81旧版| 久久国产一区| 国产伦精品一区二区三区在线播放| 国产精品一区二区久久久久| 国产夫妻在线| 欧美激情乱人伦| 国产美女在线观看| 中文字幕亚洲一区二区三区| 天天综合永久入口| 欧美成人艳星乳罩| 国产剧情久久久| 欧美揉bbbbb揉bbbbb| 国产www在线| 亚洲午夜在线视频| 欧美黑人猛猛猛| 国产精品伦理在线| 少妇av片在线观看| 久久久99精品久久| 国产精品一区二区入口九绯色| 国产一区激情在线| 999热精品视频| 紧缚捆绑精品一区二区| 五月天亚洲视频| 日本伊人色综合网| 人妻无码视频一区二区三区| 午夜亚洲性色福利视频| 精品国产一二三四区| 99riav1国产精品视频| 精品视频在线观看一区| 欧美日韩国产探花| 久久久天堂国产精品| 牛夜精品久久久久久久99黑人| 樱空桃在线播放| 亚洲深深色噜噜狠狠爱网站| 一区二区三区四区久久| 欧美高清在线| 手机福利在线视频| 在线精品国产| 国产午夜精品视频一区二区三区| 欧美三级视频| 国产不卡一区二区视频| 亚洲一区二区伦理| 成人久久久久久久久| 丝袜诱惑亚洲看片| 亚欧在线免费观看| 久久精品国产免费| 毛毛毛毛毛毛毛片123| 国产成人亚洲综合a∨猫咪| 男男受被啪到高潮自述| 成人晚上爱看视频| 朝桐光av一区二区三区| 久久精品亚洲精品国产欧美kt∨| 中文字幕在线观看免费高清| 久久精品视频一区二区| 国产破处视频在线观看| 亚洲日本在线看| 久久精品国产亚洲av高清色欲| 午夜日韩在线电影| 成人黄色片在线观看| 91精品国产黑色紧身裤美女| 懂色av成人一区二区三区| 日韩精品一二三四区| www日韩tube| 久精品免费视频| 青青青免费在线视频| 国产精品白丝jk喷水视频一区| 日韩有码欧美| 国产欧美日韩一区二区三区| 欧美男gay| 九九久久九九久久| 欧美综合二区| 国产精品嫩草影视| caoporm超碰国产精品| www.99热| 亚洲国产va精品久久久不卡综合| 国产污视频网站| 欧美tickling网站挠脚心| 日韩精品系列| 久久九九热免费视频| 久草在线资源站手机版| 国产精品专区h在线观看| 亚洲一二三区视频| 深夜福利成人| 在线观看视频免费一区二区三区| 国产精彩免费视频| 国产成人精品网址| wwwww黄色| 亚欧色一区w666天堂| 伊人免费在线观看| 亚洲精品美女久久久久| 麻豆最新免费在线视频| 热99精品只有里视频精品| 激情视频亚洲| 日韩精品一区二区三区四区五区 | 成年人视频观看| 久久福利视频一区二区| 国产黄色网址在线观看| 亚洲精品高清在线| 亚洲熟女乱色一区二区三区久久久| 日韩高清有码在线| 欧洲性视频在线播放| 国产有码一区二区| 国产欧美日韩在线观看视频| 人人妻人人澡人人爽欧美一区双 | 在线中文字幕一区二区| 成人午夜福利视频| 操日韩av在线电影| 国产69精品久久| 欧美亚洲国产免费| 99国产一区| 四虎永久免费观看| 亚洲精品亚洲人成人网在线播放| 成人免费一级片| 亚洲欧美日韩国产中文专区| 麻豆蜜桃在线| 成人看片在线| 国产精品观看| 欧美xxxx黑人| 亚洲欧美乱综合| 国产精品久久久久久无人区| 国产亚洲欧美另类中文| 吉吉日韩欧美| 茄子视频成人在线观看| 国产亚洲一级| 短视频在线观看| 欧美性xxxxxxx| 青草久久伊人| 欧美做受高潮电影o| 午夜先锋成人动漫在线| 激情深爱综合网| 91小视频免费看| 欧美日韩乱国产| 亚洲男人的天堂网站| 韩国主播福利视频一区二区三区| 狠狠色伊人亚洲综合网站色| 亚洲国内欧美| jizz日本免费| 欧美性猛交视频| 国产黄在线观看| 国产精品香蕉国产| 色呦哟—国产精品| 婷婷中文字幕在线观看| 亚洲激情综合网| 免费国产黄色片| …久久精品99久久香蕉国产| 亚洲精品一级二级三级| 国产视频一区二区三区在线播放 | 亚洲精品国产精品国自产观看| 麻豆一区二区三| 国产精品白嫩白嫩大学美女| 精品国产免费一区二区三区四区| h片在线观看视频免费免费| 久久久久久久免费| 奇米精品一区二区三区在线观看一| 91视频免费看片| 欧美一区二区女人| bl视频在线免费观看| 蜜桃传媒视频麻豆一区| 青青草成人在线观看| 黄色a级片在线观看| 亚洲精品在线一区二区| 日韩福利一区| 国产对白在线播放| 99久久精品国产精品久久| 天天干,天天干| 久久久精品国产| 国产一区二区在线视频你懂的| 国产三区在线视频| 国产精品成人免费在线| 三级视频在线看| 国产欧美最新羞羞视频在线观看| 欧美日韩国产一区精品一区| 巨胸大乳www视频免费观看| 欧美猛男gaygay网站| sm在线播放| 一级一片免费播放| 91在线国产观看| 国产精品热久久| 欧美亚洲视频一区二区| 亚洲不卡av不卡一区二区| 成人免费毛片日本片视频| 在线不卡一区二区| 筱崎爱全乳无删减在线观看 | 激情伦成人综合小说| 青青草国产成人99久久| 日韩精品视频免费看| 在线中文字幕日韩| 女人抽搐喷水高潮国产精品| 欧美女同在线观看|