精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

一篇文章帶你深入理解FlinkSQL中的窗口

大數據
時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。

[[360693]]

前言

時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over Windows

 

一、分組窗口(Group Windows) 分組窗口(Group Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group)中,并對每個組的數據執行一次聚合函數。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在 group by 子句中,像常規的分組字段一樣引用。例子:

  1. val table = input 
  2. .window([w: GroupWindow] as 'w) 
  3. .groupBy('w, 'a) 
  4. .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count

Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層DataStream 或 DataSet 的窗口操作。

Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding和 會話(Session)。

1.1 滾動窗口

滾動窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:

  • over:定義窗口長度
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求

設置滾動窗口為10秒鐘統計id出現的次數。

2.數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.代碼實現

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.scala._ 
  8. import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} 
  9. import org.apache.flink.types.Row 
  10.  
  11. /** 
  12.  * @Package Windows 
  13.  * @File :FlinkSQLTumBlingTie.java 
  14.  * @author 大數據老哥 
  15.  * @date 2020/12/25 21:58 
  16.  * @version V1.0 
  17.  *          設置滾動窗口 
  18.  */ 
  19. object FlinkSQLTumBlingTie { 
  20.   def main(args: Array[String]): Unit = { 
  21.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  22.     env.setParallelism(1) 
  23.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  24.  
  25.     val settings = EnvironmentSettings.newInstance() 
  26.       .useBlinkPlanner() 
  27.       .inStreamingMode() 
  28.       .build() 
  29.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  30.  
  31.     // 讀取數據 
  32.     val inputPath = "./data/sensor.txt" 
  33.     val inputStream = env.readTextFile(inputPath) 
  34.     
  35.  
  36.     // 先轉換成樣例類類型(簡單轉換操作) 
  37.     val dataStream = inputStream 
  38.       .map(data => { 
  39.         val arr = data.split(","
  40.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  41.       }) 
  42.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  43.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  44.       }) 
  45.  
  46.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  47.     // 注冊表 
  48.     tableEnv.createTemporaryView("sensor", sensorTable) 
  49.     // table 實現 
  50.     val resultTable = sensorTable 
  51.       .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統計一次,滾動時間窗口 
  52.       .groupBy('id, 'tw) 
  53.       .select('id, 'id.count, 'tw.end
  54.     //sql 實現 
  55.     val sqlTable = tableEnv.sqlQuery( 
  56.       ""
  57.         |select 
  58.         |id, 
  59.         |count(id) , 
  60.         |tumble_end(ts,interval '10' second
  61.         |from sensor 
  62.         |group by 
  63.         |id, 
  64.         |tumble(ts,interval '10' second
  65.         |""".stripMargin) 
  66.  
  67.     /*** 
  68.      * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時間字段 rowtime) 
  69.      * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時間字段 proctime) 
  70.      * .window(Tumble over 10.minutes on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組) 
  71.      */ 
  72.     resultTable.toAppendStream[Row].print("talbe"
  73.     sqlTable.toRetractStream[Row].print("sqlTable"
  74.      
  75.     env.execute("FlinkSQLTumBlingTie"
  76.   } 
  77.  
  78.   case class SensorReading(id: String, timestamp: Long, temperature: Double
  79.  

運行結果

 

1.2 滑動窗口

滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:

  • over:定義窗口長度
  • every:定義滑動步長
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求描述

設置窗口大小為10秒鐘設置滑動距離為5秒鐘,統計id的出現的次數。

2.數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.實現代碼

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10. import windows.FlinkSQLTumBlingTie.SensorReading 
  11.  
  12. /** 
  13.  * @Package windows 
  14.  * @File :FlinkSQLSlideTime.java 
  15.  * @author 大數據老哥 
  16.  * @date 2020/12/27 22:19 
  17.  * @version V1.0 
  18.  *          滑動窗口 
  19.  */ 
  20. object FlinkSQLSlideTime { 
  21.   def main(args: Array[String]): Unit = { 
  22.     //構建運行環境 
  23.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  24.     env.setParallelism(1) // 設置分區為1 方便后面測試 
  25.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間 
  26.  
  27.     val settings = EnvironmentSettings.newInstance() 
  28.       .useBlinkPlanner() 
  29.       .inStreamingMode() 
  30.       .build() 
  31.     // 創建表env 
  32.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  33.  
  34.     // 讀取數據 
  35.     val inputPath = "./data/sensor.txt" 
  36.     val inputStream = env.readTextFile(inputPath) 
  37.  
  38.     // 先轉換成樣例類類型(簡單轉換操作) 
  39.     val dataStream = inputStream 
  40.       .map(data => { 
  41.         val arr = data.split(","
  42.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  43.       }) 
  44.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  45.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  46.       }) 
  47.  
  48.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  49.     // 注冊表 
  50.     tableEnv.createTemporaryView("sensor", sensorTable) 
  51.     // table API 實現 
  52.     val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w) 
  53.       .groupBy('w, 'id) 
  54.       .select('id, 'id.count, 'w.end
  55.     val tableSql = tableEnv.sqlQuery( 
  56.       ""
  57.         |select 
  58.         |id, 
  59.         |count(id), 
  60.         |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w 
  61.         |from sensor 
  62.         |group by 
  63.         |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id 
  64.         |""".stripMargin) 
  65.  
  66.     tableApi.toAppendStream[Row].print("tableApi"
  67.     tableSql.toAppendStream[Row].print("tableSql"
  68.     /** 
  69. .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時間字段 rowtime) 
  70. .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時間字段 proctime)  
  71. .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組) 
  72.    **/ 
  73.     env.execute("FlinkSQLSlideTime"
  74.   } 

4.運行結果

 

1.3 會話窗口

會話窗口(Session windows)要用 Session 類來定義,另外還有三個方法:

  • withGap:會話時間間隔
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的 groupBy 中實現案例

1.需求描述

設置一個session 為10秒鐘 統計id的個數

2.準備數據

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

3.編寫代碼

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Session, Table
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10. import windows.FlinkSQLTumBlingTie.SensorReading 
  11.  
  12. /** 
  13.  * @Package windows 
  14.  * @File :FlinkSqlSessionTime.java 
  15.  * @author 大數據老哥 
  16.  * @date 2020/12/27 22:52 
  17.  * @version V1.0 
  18.  */ 
  19. object FlinkSqlSessionTime { 
  20.   def main(args: Array[String]): Unit = { 
  21.     //構建運行環境 
  22.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  23.     env.setParallelism(1) // 設置分區為1 方便后面測試 
  24.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間 
  25.  
  26.     val settings = EnvironmentSettings.newInstance() 
  27.       .useBlinkPlanner() 
  28.       .inStreamingMode() 
  29.       .build() 
  30.     // 創建表env 
  31.     val tableEnv = StreamTableEnvironment.create(env, settings) 
  32.  
  33.     // 讀取數據 
  34.     val inputPath = "./data/sensor.txt" 
  35.     val inputStream = env.readTextFile(inputPath) 
  36.  
  37.     // 先轉換成樣例類類型(簡單轉換操作) 
  38.     val dataStream = inputStream 
  39.       .map(data => { 
  40.         val arr = data.split(","
  41.         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  42.       }) 
  43.       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  44.         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  45.       }) 
  46.  
  47.     val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
  48.     // 注冊表 
  49.     tableEnv.createTemporaryView("sensor", sensorTable) 
  50.  
  51.     val tableApi = sensorTable. 
  52.       window(Session withGap 10.seconds on 'ts as 'w) 
  53.       .groupBy('id, 'w) 
  54.       .select('id, 'id.count, 'w.end
  55.     val tableSQL = tableEnv.sqlQuery( 
  56.       ""
  57.         |SELECT 
  58.         |id, 
  59.         |COUNT(id), 
  60.         |SESSION_END(ts, INTERVAL '10' SECONDAS w 
  61.         |FROM sensor 
  62.         |GROUP BY 
  63.         |id, 
  64.         |SESSION(ts, INTERVAL '10' SECOND
  65.         |""".stripMargin) 
  66.     tableApi.toAppendStream[Row].print("tableApi"
  67.     tableSQL.toAppendStream[Row].print("tableSQL"
  68.  
  69.     /** 
  70.      * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時間字段 rowtime) 
  71.      * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時間字段 proctime) 
  72.      */ 
  73.     env.execute("FlinkSqlSessionTime"
  74.   } 

4.運行結果

 

二、 Over Windows

Over window 聚合是標準 SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows使用.window(w:overwindows*)子句定義,并在 select()方法中通過別名來引用。例子:

  1. val table = input 
  2. .window([w: OverWindow] as 'w) 
  3. .select('a, 'b.sum over 'w, 'c.min over 'w) 

Table API 提供了 Over 類,來配置 Over 窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義 Over windows。

無界的 over window 是使用常量指定的。也就是說,時間間隔要指定 UNBOUNDED_RANGE,或者行計數間隔要指定 UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。

2.1 無界的 over window

  1. // 無界的事件時間 over window (時間字段 "rowtime"
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) 
  3. //無界的處理時間 over window (時間字段"proctime"
  4. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) 
  5. // 無界的事件時間 Row-count over window (時間字段 "rowtime"
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) 
  7. //無界的處理時間 Row-count over window (時間字段 "rowtime"
  8. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w) 

2.2 有界的 over window

  1. // 有界的事件時間 over window (時間字段 "rowtime",之前 1 分鐘) 
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) 
  3. // 有界的處理時間 over window (時間字段 "rowtime",之前 1 分鐘) 
  4. .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) 
  5. // 有界的事件時間 Row-count over window (時間字段 "rowtime",之前 10 行) 
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) 
  7. // 有界的處理時間 Row-count over window (時間字段 "rowtime",之前 10 行) 
  8. .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w) 

2.3 代碼練習

我們可以綜合學習過的內容,用一段完整的代碼實現一個具體的需求。例如,統計每個sensor每條數據,與之前兩行數據的平均溫度。

數據準備

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼分析:

  1. package windows 
  2.  
  3. import org.apache.flink.streaming.api.TimeCharacteristic 
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.api.windowing.time.Time 
  7. import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble} 
  8. import org.apache.flink.table.api.scala._ 
  9. import org.apache.flink.types.Row 
  10.  
  11. /** 
  12. * @Package windows 
  13. * @File :FlinkSqlTumBlingOverTime.java 
  14. * @author 大數據老哥 
  15. * @date 2020/12/28 21:45 
  16. * @version V1.0 
  17. */ 
  18. object FlinkSqlTumBlingOverTime { 
  19.  def main(args: Array[String]): Unit = { 
  20.    // 構建運行環境 
  21.    val env = StreamExecutionEnvironment.getExecutionEnvironment 
  22.    env.setParallelism(1) // 設置并行度為1方便后面進行測試 
  23.    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置事件時間 
  24.  
  25.    val settings = EnvironmentSettings.newInstance() 
  26.      .useBlinkPlanner() 
  27.      .inStreamingMode() 
  28.      .build() 
  29.    //構建table Env 
  30.    val tableEnv = StreamTableEnvironment.create(env, settings) 
  31.  
  32.    // 讀取數據 
  33.    val inputPath = "./data/sensor.txt" 
  34.    val inputStream = env.readTextFile(inputPath) 
  35.    // 先轉換成樣例類類型(簡單轉換操作) 
  36.    // 解析數據 封裝成樣例類 
  37.    val dataStream = inputStream 
  38.      .map(data => { 
  39.        val arr = data.split(","
  40.        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
  41.      }) 
  42.      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
  43.        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
  44.      }) 
  45.    // 將數據注冊成一張臨時表 
  46.    val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts) 
  47.    tableEnv.createTemporaryView("sensor",dataTable) 
  48.    var tableRes= dataTable.window( Over partitionBy 'id orderBy  'ts preceding 2.rows as 'ow) 
  49.     .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow) 
  50.  
  51.   var tableSql= tableEnv.sqlQuery( 
  52.      ""
  53.        |select 
  54.        |id, 
  55.        |ts, 
  56.        |count(id) over ow, 
  57.        |avg(temperature) over ow 
  58.        |from sensor 
  59.        |window ow as
  60.        | partition by id 
  61.        | order by ts 
  62.        | rows between 2 preceding and current row 
  63.        |) 
  64.        |""".stripMargin) 
  65.  
  66.    tableRes.toAppendStream[Row].print("tableRes"
  67.    tableSql.toAppendStream[Row].print("tableSql"
  68.    env.execute("FlinkSqlTumBlingOverTime"
  69.  } 
  70.  case class SensorReading(id: String, timestamp: Long, temperature: Double
  71.  

運行結果

 

 

總結

好了到這里FlinkSql中窗口使用到這里就結束啦,喜歡的可以給了三連。其中FlinkSql中的窗口的用法還是比較多得,所有還是要多加練習。老話說的好,師傅領進門,修行在個人。

本文轉載自微信公眾號「 大數據老哥」,可以通過以下二維碼關注。轉載本文請聯系 大數據老哥公眾號。

 

責任編輯:武曉燕 來源: 大數據老哥
相關推薦

2020-11-27 08:02:41

Promise

2025-01-09 11:26:47

2021-10-15 07:57:04

Docker 日志容器

2018-11-21 08:00:05

Dubbo分布式系統

2022-02-21 09:44:45

Git開源分布式

2023-05-12 08:19:12

Netty程序框架

2021-06-30 00:20:12

Hangfire.NET平臺

2021-08-12 14:19:14

Slice數組類型內存

2021-05-18 09:00:28

Pythonclass

2020-09-29 15:13:14

C++語言開發

2021-05-15 09:18:04

Python進程

2021-07-01 10:01:16

JavaLinkedList集合

2021-02-02 18:39:05

JavaScript

2021-01-29 18:41:16

JavaScript函數語法

2022-12-14 08:03:27

CSS變量前端

2021-06-04 09:56:01

JavaScript 前端switch

2020-02-28 11:29:00

ElasticSear概念類比

2020-11-10 10:48:10

JavaScript屬性對象

2023-05-08 08:21:15

JavaNIO編程

2020-12-08 08:09:49

SVG圖標Web
點贊
收藏

51CTO技術棧公眾號

欧美精品做受xxx性少妇| 欧美日韩国产另类不卡| 美女三级99| 中文在线资源天堂| 国产精品s色| 亚洲精品自拍偷拍| 亚洲欧美日本一区二区三区| 日韩另类在线| 久久精品在线免费观看| 91精品视频一区| 国产一级做a爱片久久毛片a| 色综合天天爱| 亚洲第一视频网| 日本黄色的视频| 中文字幕乱码在线播放| 亚洲美女偷拍久久| 欧美一区亚洲二区| 亚洲精品一区二区口爆| 蜜臀久久99精品久久久久久9| 久久久久久久久久久亚洲| 91动漫免费网站| 日韩三级毛片| 欧美大片在线观看| 日韩欧美理论片| 成人av集中营| 色综合久久中文综合久久97| 国产av熟女一区二区三区 | aaa欧美日韩| 成人福利网站在线观看11| 一区二区三区在线观看av| 韩国精品一区二区三区| 久久精品中文字幕电影| 69精品无码成人久久久久久| 日韩精品a在线观看91| 日韩一二三区视频| 最新免费av网址| 成人精品国产亚洲| 色久优优欧美色久优优| av免费观看大全| sis001亚洲原创区| 亚洲精品成人悠悠色影视| 在线国产99| 在线观看精品一区二区三区| 国产肉丝袜一区二区| 久久精品日韩精品| 性xxxxbbbb| 国产麻豆91精品| 91原创国产| 国产高中女学生第一次| 国产精品综合av一区二区国产馆| 成人精品在线视频| 国产精品无码专区av免费播放| 久久99精品久久久久| 成人激情视频在线播放| 亚洲影院一区二区三区| 极品美女销魂一区二区三区| 成人福利视频在线观看| www.蜜桃av.com| 国产精品亚洲一区二区三区在线| 亚洲影视中文字幕| 精品国自产拍在线观看| 从欧美一区二区三区| 国产精品久久久久久久久久久久午夜片| 国产情侣一区二区| 国产精品66部| 国产高清一区视频| 亚洲色欧美另类| 久久久久国产成人精品亚洲午夜| 日韩av电影免费在线| 高清日韩av电影| **网站欧美大片在线观看| 永久免费看av| 国产天堂在线播放视频| 岛国av一区二区| 国内自拍视频一区| 亚洲天堂网站| 精品欧美一区二区三区精品久久| 日韩aaaaa| 欧美色婷婷久久99精品红桃| 精品国内产的精品视频在线观看| avove在线播放| 99精品国产福利在线观看免费 | 综合久久成人| 亚洲国产毛片完整版| 男人操女人动态图| 天天精品视频| 午夜精品福利在线观看| 久久久国产免费| 国模无码大尺度一区二区三区| 99中文字幕| 国产高清在线| 亚洲一区二区不卡免费| mm1313亚洲国产精品无码试看| 国产精品1区| 亚洲欧美精品suv| 久久免费看少妇高潮v片特黄 | 亚洲国产高清av| 秋霞一区二区三区| 亚洲欧洲一区二区三区久久| 国产黄色的视频| 老司机精品导航| 91九色蝌蚪嫩草| 免费在线超碰| 亚洲综合视频在线观看| 一区二区三区国产免费| japanese色系久久精品| 日韩中文在线中文网三级| 97免费在线观看视频| 久久国产麻豆精品| 久久久久久九九| 视频在线这里都是精品| 欧美影院一区二区| 熟女人妻在线视频| 午夜视频精品| 国产日韩欧美在线| 嫩草在线播放| 黄色成人av网| 人妻巨大乳一二三区| 欧美大人香蕉在线| 国产91色在线|免| 无码精品人妻一区二区三区影院| 亚洲欧美电影院| 三级a三级三级三级a十八发禁止| 色婷婷精品视频| 欧美精品videosex性欧美| 国产精品视频无码| 中文一区二区在线观看| 日日碰狠狠添天天爽超碰97| 亚洲日本va中文字幕| 久久夜色精品国产欧美乱| 中日精品一色哟哟| 精品美女视频| 国产一区二区黑人欧美xxxx| 久久久久久天堂| 免费一级欧美片在线观看| 麻豆一区区三区四区产品精品蜜桃| 又污又黄的网站| 激情图区综合网| 亚洲国产一区在线| av漫画网站在线观看| 日韩欧美精品在线视频| 国产日韩欧美在线观看视频| 大伊香蕉精品在线品播放| 国产一区二区成人| 免费无码国产精品| 久久高清免费观看| 欧美精品久久久久| 亚洲精品无人区| 日本成人在线免费观看| 日本欧美韩国国产| 欧美日韩一区二区三区四区五区| 黄色国产精品视频| 成人做爰免费视频免费看| 午夜视频久久久久久| 欧美日韩高清在线一区| 亚洲性猛交富婆| 欧美韩国日本不卡| 中文字幕视频三区| 小小影院久久| 99精品国产高清一区二区| 草草影院在线| 日韩电影免费在线观看中文字幕| 五月天激情国产综合婷婷婷| 久久蜜桃av一区精品变态类天堂| 午夜欧美福利视频| 亚洲国产老妈| 激情欧美一区二区三区中文字幕| 久草在线中文最新视频| 尤物精品国产第一福利三区| 一本久道久久综合无码中文| 一级女性全黄久久生活片免费| 中文字幕乱码一区| 日韩精品电影在线| 亚洲五码在线观看视频| 欧美激情极品| 91精品免费看| 91制片在线观看| 国产亚洲精品美女久久久久 | 日韩欧美中文字幕一区| 一级aaa毛片| 国产欧美综合在线观看第十页| 亚洲三级在线观看视频| 亚洲毛片播放| 亚洲欧美日本国产有色| 91精品尤物| 国产精品久久久久久超碰| a级在线观看| 精品中文字幕久久久久久| 一区二区三区免费观看视频| 亚洲国产综合91精品麻豆| 91精品国自产在线| 国产a久久麻豆| 久久综合伊人77777麻豆最新章节| 91精品蜜臀一区二区三区在线| 精品视频一区二区| 成人97精品毛片免费看| 日本一区二区在线播放| 菠萝蜜视频国产在线播放| 亚洲久久久久久久久久| 国产视频一二三四区| 一本大道久久精品懂色aⅴ| 四虎影院中文字幕| 久久久久成人黄色影片| 特种兵之深入敌后| 日韩电影在线免费观看| 美女扒开大腿让男人桶| 91一区二区| 欧美国产二区| 大陆精大陆国产国语精品| 国产一区欧美二区三区| 欧美成人精品一区二区男人小说| 久久综合色影院| 波多野结衣在线影院| 亚洲精品乱码久久久久久按摩观| 国内精品久久久久久久久久| 欧美色大人视频| 无码人妻精品一区二区蜜桃色欲| 亚洲国产精品人人做人人爽| 欧美特级一级片| 国产精品久久久久久久裸模| 日韩在线免费观看av| 成人黄色在线看| japan高清日本乱xxxxx| 紧缚捆绑精品一区二区| 色播五月综合网| 日本在线不卡视频| 日韩中文字幕免费在线| 日韩午夜av在线| 精品久久久久久无码中文野结衣| 你懂的国产精品| 超碰成人在线免费观看| 第九色区aⅴ天堂久久香| 欧洲精品码一区二区三区免费看| 日韩美女精品| 精品久久久久久亚洲| av日韩精品| 99re视频在线播放| av不卡一区| 国产精品久久国产三级国电话系列| 精品久久国产一区| 亚洲自拍偷拍第一页| 国产一区2区在线观看| 国产在线精品成人一区二区三区| 国产成+人+综合+亚洲欧美| 国产精品久久久久久亚洲调教 | 日本成人网址| 中文字幕欧美专区| 18视频免费网址在线观看| 在线观看欧美日韩| 免费在线看黄色| 久久精品夜夜夜夜夜久久| 黄色网在线看| 欧美激情精品久久久久久蜜臀 | 亚洲激情久久| 永久免费网站视频在线观看| 欧美日韩亚洲一区| 国产真人做爰毛片视频直播| 亚洲人成人一区二区三区| 波多野结衣乳巨码无在线| 先锋影音久久久| 我看黄色一级片| 精品一区二区三区免费| wwwww在线观看| 99精品一区二区三区| 亚欧洲乱码视频| 国产精品初高中害羞小美女文| 免费在线观看h片| 亚洲国产一区在线观看| 久久夜色精品国产噜噜亚洲av| 欧美在线短视频| 国产精品无码在线播放| 亚洲成成品网站| 欧美偷拍视频| 国产一区二区动漫| a免费在线观看| 欧洲精品在线视频| 色8久久久久| 国产精品夜夜夜一区二区三区尤| 亚洲日本三级| 中文字幕中文字幕一区三区| 欧美国产精品| 十八禁视频网站在线观看| 久久99国产精品尤物| 中文字幕一区二区三区乱码不卡| 久久网站最新地址| 亚洲第一视频区| 亚洲黄色录像片| 亚洲 日本 欧美 中文幕| 欧美一区二区视频观看视频| 三级视频网站在线| 久久人人爽人人爽爽久久| 国产福利电影在线播放| 国产欧美日韩高清| 鲁大师精品99久久久| 中文字幕色一区二区| 国产欧美一级| 午夜影院免费版| 久久精品在这里| 日本熟妇乱子伦xxxx| 欧美天堂亚洲电影院在线播放| 欧美一级免费片| 久久久国产91| 先锋欧美三级| 国产伦精品一区二区三区照片91| 日韩精品第一区| 37pao成人国产永久免费视频| 国产精品一二三区在线| 亚洲毛片亚洲毛片亚洲毛片| 午夜精品久久久久久久| 国产精品欧美亚洲| 这里只有精品视频| 在线观看v片| 国产精品自拍首页| 午夜日韩电影| 精品国产乱码久久久久久1区二区| 26uuuu精品一区二区| 久久中文字幕无码| 91精品欧美一区二区三区综合在 | 久久综合九色欧美综合狠狠| 欧美成人免费看| 欧美军同video69gay| 激情在线视频| 欧美在线性视频| 日韩成人av在线资源| www.av毛片| 成人精品免费网站| 免费在线视频观看| 欧美成人性战久久| 50度灰在线| 亚洲一区二区三区久久| 国产精品久久久久无码av| 浓精h攵女乱爱av| 国产日产欧美一区| 成人免费视频国产免费| 亚洲理论在线a中文字幕| 亚洲国产欧美日本视频| 久久久精品国产一区二区三区| 亚洲精品免费观看| 午夜男人的天堂| 性做久久久久久久久| 神马午夜精品95| 亚洲**2019国产| 日韩伦理一区二区三区| 国产精品无码专区av在线播放| www国产亚洲精品久久麻豆| 亚洲黄色三级视频| 日韩久久精品成人| 偷拍视频一区二区三区| 色噜噜一区二区| 另类中文字幕网| 国产老头老太做爰视频| 日韩一区二区在线观看视频播放| av免费在线免费观看| 成人午夜电影在线播放| 亚洲高清二区| 中文字幕国产专区| 欧美午夜影院一区| 久久综合之合合综合久久| 91精品天堂| 国产精品日韩久久久| a毛片毛片av永久免费| 欧美性生活久久| gogo在线高清视频| 国产日本一区二区三区| 久久精品网址| 日本少妇aaa| 精品免费一区二区三区| 欧美aa一级| 亚洲国产激情一区二区三区| 国产一区二区成人久久免费影院| 免费中文字幕在线观看| 日韩精品中文字幕久久臀| 伦一区二区三区中文字幕v亚洲| 超薄肉色丝袜足j调教99| 91亚洲精品久久久蜜桃网站| 午夜视频网站在线观看| 精品激情国产视频| 国产精品对白久久久久粗| 美女一区二区三区视频| 一区二区三区日韩欧美| 九色国产在线观看| 国产在线视频欧美| 亚洲黄页一区| 日韩一卡二卡在线观看| 亚洲福利在线视频| 久久久久毛片| 国产女大学生av| 国产精品久久久久久福利一牛影视| 亚洲精品人妻无码| 国产精品视频永久免费播放| 国产一区亚洲| 99久久99久久精品免费看小说.| 亚洲第一精品久久忘忧草社区| 国产一区二区主播在线| 日本黄色片一级片| 国产精品久久影院| 五月天婷婷社区| 96久久精品| 久久精品国产99国产精品| 一级片中文字幕|