精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Flink 漫談系列(07) - 持續查詢(Continuous Queries)

開發 開發工具
我們知道在流計算場景中,數據是源源不斷的流入的,數據流永遠不會結束,那么計算就永遠不會結束,如果計算永遠不會結束的話,那么計算結果何時輸出呢?

一、實際問題

我們知道在流計算場景中,數據是源源不斷的流入的,數據流永遠不會結束,那么計算就永遠不會結束,如果計算永遠不會結束的話,那么計算結果何時輸出呢?本篇將介紹Apache Flink利用持續查詢來對流計算結果進行持續輸出的實現原理。

二、數據管理

在介紹持續查詢之前,我們先看看Apache Flink對數據的管理和傳統數據庫對數據管理的區別,以MySQL為例,如下圖:

MySQL

如上圖所示傳統數據庫是數據存儲和查詢計算于一體的架構管理方式,這個很明顯,oracle數據庫不可能管理MySQL數據庫數據,反之亦然,每種數據庫廠商都有自己的數據庫管理和存儲的方式,各自有特有的實現。在這點上Apache Flink海納百川(也有corner case),將data store 進行抽象,分為source(讀) 和 sink(寫)兩種類型接口,然后結合不同存儲的特點提供常用數據存儲的內置實現,當然也支持用戶自定義的實現。

那么在宏觀設計上Apache Flink與傳統數據庫一樣都可以對數據表進行SQL查詢,并將產出的結果寫入到數據存儲里面,那么Apache Flink上面的SQL查詢和傳統數據庫查詢的區別是什么呢?Apache Flink又是如何做到求同(語義相同)存異(實現機制不同),***支持ANSI-SQL的呢?

三、靜態查詢

傳統數據庫中對表(比如 flink_tab,有user和clicks兩列,user主鍵)的一個查詢SQL(select * from flink_tab)在數據量允許的情況下,會立刻返回表中的所有數據,在查詢結果顯示之后,對數據庫表flink_tab的DML操作將與執行的SQL無關了。也就是說傳統數據庫下面對表的查詢是靜態查詢,將計算的最終查詢的結果立即輸出,如下:

  1. select * from flink_tab; 
  2. +----+------+--------+ 
  3. | id | user | clicks | 
  4. +----+------+--------+ 
  5. | 1 | Mary | 1 | 
  6. +----+------+--------+ 
  7. 1 row in set (0.00 sec) 

當我執行完上面的查詢,查詢結果立即返回,上面情況告訴我們表 flink_tab里面只有一條記錄,id=1,user=Mary,clicks=1; 這樣傳統數據庫表的一條查詢語句就完全結束了。傳統數據庫表在查詢那一刻我們這里叫Static table,是指在查詢的那一刻數據庫表的內容不再變化了,查詢進行一次計算完成之后表的變化也與本次查詢無關了,我們將在Static Table 上面的查詢叫做靜態查詢。

四、持續查詢

什么是連續查詢呢?連續查詢發生在流計算上面,在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中我們提到過Dynamic Table,連續查詢是作用在Dynamic table上面的,永遠不會結束的,隨著表內容的變化計算在不斷的進行著...

五、靜態/持續查詢特點

靜態查詢和持續查詢的特點就是《Apache Flink 漫談系列 - 流表對偶(duality)性》中所提到的批與流的計算特點,批一次查詢返回一個計算結果就結束查詢,流一次查詢不斷修正計算結果,查詢永遠不結束,表格示意如下:

六、靜態/持續查詢關系

接下來我們以flink_tab表實際操作為例,體驗一下靜態查詢與持續查詢的關系。假如我們對flink_tab表再進行一條增加和一次更新操作,如下:

  1. MySQL> insert into flink_tab(user, clicks) values ('Bob', 1); 
  2. Query OK, 1 row affected (0.08 sec) 
  3.  
  4. MySQL> update flink_tab set clicks=2 where user='Mary'
  5. Query OK, 1 row affected (0.06 sec) 

這時候我們再進行查詢 select * from flink_tab ,結果如下:

  1. MySQL> select * from flink_tab; 
  2. +----+------+--------+ 
  3. | id | user | clicks | 
  4. +----+------+--------+ 
  5. | 1 | Mary | 2 | 
  6. | 2 | Bob | 1 | 
  7. +----+------+--------+ 
  8. 2 rows in set (0.00 sec) 

那么我們看見,相同的查詢SQL(select * from flink_tab),計算結果完全 不 一樣了。這說明相同的sql語句,在不同的時刻執行計算,得到的結果可能不一樣(有點像廢話),就如下圖一樣:

假設不斷的有人在對表flink_tab做操作,同時有一個人間歇性的發起對表數據的查詢,上圖我們只是在三個時間點進行了3次查詢。并且在這段時間內數據表的內容也在變化。引起上面變化的DML如下:

  1. MySQL> insert into flink_tab(user, clicks) values ('Llz', 1); 
  2. Query OK, 1 row affected (0.08 sec) 
  3.  
  4. MySQL> update flink_tab set clicks=2 where user='Bob'
  5. Query OK, 1 row affected (0.01 sec) 
  6. Rows matched: 1 Changed: 1 Warnings: 0 
  7.  
  8. MySQL> update flink_tab set clicks=3 where user='Mary'
  9. Query OK, 1 row affected (0.05 sec) 
  10. Rows matched: 1 Changed: 1 Warnings: 0 

到現在我們不難想象,上面圖內容的核心要點如下:

  • 時間
  • 表數據變化
  • 觸發計算
  • 計算結果更新

接下來我們利用傳統數據庫現有的機制模擬一下持續查詢...

1. 無PK的 Append only 場景

接下來我們把上面隱式存在的時間屬性timestamp作為表flink_tab_ts(timestamp,user,clicks三列,無主鍵)的一列,再寫一個 觸發器(Trigger) 示例觀察一下:

  1. // INSERT 的時候查詢一下數據flink_tab_ts,將結果寫到trigger.sql中 
  2. DELIMITER ;; 
  3. create trigger flink_tab_ts_trigger_insert after insert 
  4. on flink_tab_ts for each row 
  5. begin 
  6. select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql'; 
  7. end ;; 
  8. DELIMITER ; 

上面的trigger要將查詢結果寫入本地文件,默認MySQL是不允許寫入的,我們查看一下:

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | NULL | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

上面secure_file_priv屬性為NULL,說明MySQL不允許寫入file,我需要修改my.cnf在添加secure_file_priv=''打開寫文件限制;

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

下面我們對flink_tab_ts進行INSERT操作:

對flink_tab_ts進行INSERT操作

我們再來看看6次trigger 查詢計算的結果:

大家到這里發現我寫了Trigger的存儲過程之后,每次在數據表flink_tab_ts進行DML操作的時候,Trigger就會觸發一次查詢計算,產出一份新的計算結果,觀察上面的查詢結果發現,結果表不停的增加(Append only)。

2. 有PK的Update場景

我們利用flink_tab_ts的6次DML操作和自定義的觸發器TriggerL來介紹了什么是持續查詢,做處理靜態查詢與持續查詢的關系。那么上面的演示目的是為了說明持續查詢,所有操作都是insert,沒有基于主鍵的更新,也就是說Trigger產生的結果都是append only的,那么大家想一想,如果我們操作flink_tab這張表,按主鍵user進行插入和更新操作,同樣利用Trigger機制來進行持續查詢,結果是怎樣的的呢? 初始化表,trigger:

  1. drop table flink_tab; 
  2. create table flink_tab( 
  3. user VARCHAR(100) NOT NULL, 
  4. clicks INT NOT NULL, 
  5. PRIMARY KEY (user) 
  6. ); 
  7.  
  8. DELIMITER ;; 
  9. create trigger flink_tab_trigger_insert after insert 
  10. on flink_tab for each row 
  11. begin 
  12. select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  13. end ;; 
  14. DELIMITER ; 
  15.  
  16. DELIMITER ;; 
  17. create trigger flink_tab_trigger_ after update 
  18. on flink_tab for each row 
  19. begin 
  20. select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  21. end ;; 
  22. DELIMITER ; 

同樣我做如下6次DML操作,Trigger 6次查詢計算:

在來看看這次的結果與append only 有什么不同?

我想大家早就知道這結果了,數據庫里面定義的PK所有變化會按PK更新,那么觸發的6次計算中也會得到更新后的結果,這應該不難理解,查詢結果也是不斷更新的(Update)!

3. 關系定義

上面Append Only 和 Update兩種場景在MySQL上面都可以利用Trigger機制模擬 持續查詢的概念,也就是說數據表中每次數據變化,我們都觸發一次相同的查詢計算(只是計算時候數據的集合發生了變化),因為數據表不斷的變化,這個表就可以看做是一個動態表Dynamic Table,而查詢SQL(select * from flink_tab_ts) 被觸發器Trigger在滿足某種條件后不停的觸發計算,進而也不斷地產生新的結果。這種作用在Dynamic Table,并且有某種機制(Trigger)不斷的觸發計算的查詢我們就稱之為 持續查詢。

那么到底靜態查詢和動態查詢的關系是什么呢?在語義上 持續查詢 中的每一次查詢計算的觸發都是一次靜態查詢(相對于當時查詢的時間點), 在實現上 Apache Flink會利用上一次查詢結果+當前記錄 以增量的方式完成查詢計算。

特別說明: 上面我們利用 數據變化+Trigger方式描述了持續查詢的概念,這里有必要特別強調一下的是數據庫中trigger機制觸發的查詢,每次都是一個全量查詢,這與Apache Flink上面流計算的持續查詢概念相同,但實現機制完全不同,Apache Flink上面的持續查詢內部實現是增量處理的,隨著時間的推移,每條數據的到來實時處理當前的那一條記錄,不會處理曾經來過的歷史記錄!

七、Apache Flink 如何做到持續查詢

1. 動態表上面持續查詢

在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中我們了解到流和表可以相互轉換,在Apache Flink流計算中攜帶流事件的Schema,經過算子計算之后再產生具有新的Schema的事件,流入下游節點,在產生新的Schema的Event和不斷流轉的過程就是持續查詢作用的結果,如下圖:

2. 增量計算

我們進行查詢大多數場景是進行數據聚合,比如查詢SQL中利用count,sum等aggregate function進行聚合統計,那么流上的數據源源不斷的流入,我們既不能等所有事件流入結束(永遠不會結束)再計算,也不會每次來一條事件就像傳統數據庫一樣將全部事件集合重新整體計算一次,在持續查詢的計算過程中,Apache Flink采用增量計算的方式,也就是每次計算都會將計算結果存儲到state中,下一條事件到來的時候利用上次計算的結果和當前的事件進行聚合計算,比如 有一個訂單表,如下:

一個簡單的計數和求和查詢SQL:

  1. // 求訂單總數和所有訂單的總金額 
  2. select count(id) as cnt,sum(amount)as sumAmount from order_tab; 

這樣一個簡單的持續查詢計算,Apache Flink內部是如何處理的呢?如下圖:

如上圖,Apache Flink中每來一條事件,就進行一次計算,并且每次計算后結果會存儲到state中,供下一條事件到來時候進行計算,即:

  1. result(n) = calculation(result(n-1), n)。 

3. 無PK的Append Only 場景

在實際的業務場景中,我們只需要進行簡單的數據統計,然后就將統計結果寫入到業務的數據存儲系統里面,比如上面統計訂單數量和總金額的場景,訂單表本身是一個append only的數據源(假設沒有更新,截止到2018.5.14日,Apache Flink內部支持的數據源都是append only的),在持續查詢過程中經過count(id),sum(amount)統計計算之后產生的動態表也是append only的,種場景Apache Flink內部只需要進行aggregate function的聚合統計計算就可以,如下:

4. 有PK的Update 場景

現在我們將上面的訂單場景稍微變化一下,在數據表上面我們將金額字段amount,變為地區字段region,數據如下:

查詢統計的變為,在計算具有相同訂單數量的地區數量;查詢SQL如下:

  1. CREATE TABLE order_tab( 
  2. id BIGINT, 
  3. region VARCHAR 
  4.  
  5. CREATE TABLE region_count_sink( 
  6. order_cnt BIGINT, 
  7. region_cnt BIGINT, 
  8. PRIMARY KEY(order_cnt) -- 主鍵 
  9.  
  10. -- 按地區分組計算每個地區的訂單數量 
  11. CREATE VIEW order_count_view AS 
  12. SELECT 
  13. region, count(id) AS order_cnt 
  14. FROM order_tab 
  15. GROUP BY region; 
  16.  
  17. -- 按訂單數量分組統計具有相同訂單數量的地區數量 
  18. INSERT INTO region_count_sink 
  19. SELECT 
  20. order_cnt, 
  21. count(region) as region_cnt 
  22. FROM order_count_view 
  23. GROUP BY order_cnt; 

上面查詢SQL的代碼結構如下(這個圖示在Alibaba 對 Apache Flink 的增強的集成IDE環境生成的,了解更多):

上面SQL中我們發現有兩層查詢計算邏輯,***個查詢計算邏輯是與SOURCE相連的按地區統計訂單數量的分組統計,第二個查詢計算邏輯是在***個查詢產出的動態表上面進行按訂單數量統計地區數量的分組統計,我們一層一層分析。

5. 錯誤處理

  • ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

按照***層分析的結果,再分析第二層產出的結果,我們分析的過程是對的,但是最終寫到sink表的計算結果是錯誤的,那我們錯在哪里了呢?

其實當 (SH,2)這條記錄來的時候,以前來過的(SH, 1)已經是臟數據了,當(BJ, 2)來的時候,已經參與過計算的(BJ, 1)也變成臟數據了,同樣當(BJ, 3)來的時候,(BJ, 2)也是臟數據了,上面的分析,沒有處理臟數據進而導致最終結果的錯誤。那么Apache Flink內部是如何正確處理的呢?

6. 正確處理

  • ***層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

上面我們將有更新的事件進行打標的方式來處理臟數據,這樣在Apache Flink內部計算的時候 算子會根據事件的打標來處理事件,在aggregate function中有兩個對應的方法(retract和accumulate)來處理不同標識的事件,如上面用到的count AGG,內部實現如下:

  1. def accumulate(acc: CountAccumulator): Unit = { 
  2. acc.f0 += 1L // acc.f0 存儲記數 
  3.  
  4. def retract(acc: CountAccumulator, value: Any): Unit = { 
  5. if (value != null) { 
  6. acc.f0 -1L //acc.f0 存儲記數 
  7. }} 

Apache Flink內部這種為事件進行打標的機制叫做 retraction。retraction機制保障了在流上已經流轉到下游的臟數據需要被撤回問題,進而保障了持續查詢的正確語義。

八、Apache Flink Connector 類型

本篇一開始就對比了MySQL的數據存儲和Apache Flink數據存儲的區別,Apache Flink目前是一個計算平臺,將數據的存儲以高度抽象的插件機制與各種已有的數據存儲無縫對接。目前Apache Flink中將數據插件稱之為鏈接器Connector,Connnector又按數據的讀和寫分成Soruce(讀)和Sink(寫)兩種類型。對于傳統數據庫表,PK是一個很重要的屬性,在頻繁的按某些字段(PK)進行更新的場景,在表上定義PK非常重要。那么作為完全支持ANSI-SQL的Apache Flink平臺在Connector上面是否也支持PK的定義呢?

1. Apache Flink Source

現在(2018.11.***pache Flink中用于數據流驅動的Source Connector上面無法定義PK,這樣在某些業務場景下會造成數據量較大,造成計算資源不必要的浪費,甚至有聚合結果不是用戶“期望”的情況。我們以雙流JOIN為例來說明:

  1. SQL: 
  2.  
  3. CREATE TABLE inventory_tab( 
  4. product_id VARCHAR, 
  5. product_count BIGINT 
  6. ); 
  7.  
  8. CREATE TABLE sales_tab( 
  9. product_id VARCHAR, 
  10. sales_count BIGINT 
  11. ) ; 
  12.  
  13. CREATE TABLE join_sink( 
  14. product_id VARCHAR, 
  15. product_count BIGINT, 
  16. sales_count BIGINT, 
  17. PRIMARY KEY(product_id) 
  18. ); 
  19.  
  20. CREATE VIEW join_view AS 
  21. SELECT 
  22. l.product_id, 
  23. l.product_count, 
  24. r.sales_count 
  25. FROM inventory_tab l 
  26. JOIN sales_tab r 
  27. ON l.product_id = r.product_id; 
  28.  
  29. INSERT INTO join_sink 
  30. SELECT 
  31. product_id, 
  32. product_count, 
  33. sales_count 
  34. FROM join_view ; 

代碼結構圖:

實現示意圖:

上圖描述了一個雙流JOIN的場景,雙流JOIN的底層實現會將左(L)右(R)兩面的數據都持久化到Apache Flink的State中,當L流入一條事件,首先會持久化到LState,然后在和RState中存儲的R中所有事件進行條件匹配,這樣的邏輯如果R流product_id為P001的產品銷售記錄已經流入4條,L流的(P001, 48) 流入的時候會匹配4條事件流入下游(join_sink)。

2. 問題

上面雙流JOIN的場景,我們發現其實inventory和sales表是有業務的PK的,也就是兩張表上面的product_id是唯一的,但是由于我們在Sorure上面無法定義PK字段,表上面所有的數據都會以append only的方式從source流入到下游計算節點JOIN,這樣就導致了JOIN內部所有product_id相同的記錄都會被匹配流入下游,上面的例子是 (P001, 48) 來到的時候,就向下游流入了4條記錄,不難想象每個product_id相同的記錄都會與歷史上所有事件進行匹配,進而操作下游數據壓力。

那么這樣的壓力是必要的嗎?從業務的角度看,不是必要的,因為對于product_id相同的記錄,我們只需要對左右兩邊***的記錄進行JOIN匹配就可以了。比如(P001, 48)到來了,業務上面只需要右流的(P001, 22)匹配就好,流入下游一條事件(P001, 48, 22)。 那么目前在Apache Flink上面如何做到這樣的優化呢?

3. 解決方案

上面的問題根本上我們要構建一張有PK的動態表,這樣按照業務PK進行更新處理,我們可以在Source后面添加group by 操作生產一張有PK的動態表。如下:

  1. SQL: 
  2.  
  3. CREATE TABLE inventory_tab( 
  4. product_id VARCHAR, 
  5. product_count BIGINT 
  6.  
  7. CREATE TABLE sales_tab( 
  8. product_id VARCHAR, 
  9. sales_count BIGINT 
  10. CREATE VIEW inventory_view AS 
  11. SELECT 
  12. product_id, 
  13. LAST_VALUE(product_count) AS product_count 
  14. FROM inventory_tab 
  15. GROUP BY product_id; 
  16.  
  17. CREATE VIEW sales_view AS 
  18. SELECT 
  19. product_id, 
  20. LAST_VALUE(sales_count) AS sales_count 
  21. FROM sales_tab 
  22. GROUP BY product_id; 
  23.  
  24. CREATE TABLE join_sink( 
  25. product_id VARCHAR, 
  26. product_count BIGINT, 
  27. sales_count BIGINT, 
  28. PRIMARY KEY(product_id) 
  29. )WITH ( 
  30. type = 'print' 
  31. ) ; 
  32.  
  33. CREATE VIEW join_view AS 
  34. SELECT 
  35. l.product_id, 
  36. l.product_count, 
  37. r.sales_count 
  38. FROM inventory_view l 
  39. JOIN sales_view r 
  40. ON l.product_id = r.product_id; 
  41.  
  42. INSERT INTO join_sink 
  43. SELECT 
  44. product_id, 
  45. product_count, 
  46. sales_count 
  47. FROM join_view ; 

代碼結構:

示意圖:

如上方式可以將無PK的source經過一次節點變成有PK的動態表,以Apache Flink的retract機制和業務要素解決數據瓶頸,減少計算資源的消耗。

說明1: 上面方案LAST_VALUE是Alibaba對 Apache Flink 的增強的功能,社區還沒有支持。

4. Apache Flink Sink

在Apache Flink上面可以根據實際外部存儲的特點(是否支持PK),以及整體job的執行plan來動態推導Sink的執行模式,具體有如下三種類型:

Append 模式 - 該模式用戶在定義Sink的DDL時候不定義PK,在Apache Flink內部生成的所有只有INSERT語句;

Upsert 模式 - 該模式用戶在定義Sink的DDL時候可以定義PK,在Apache Flink內部會根據事件打標(retract機制)生成INSERT/UPDATE和DELETE 語句,其中如果定義了PK, UPDATE語句按PK進行更新,如果沒有定義PK UPDATE會按整行更新;

Retract 模式 - 該模式下會產生INSERT和DELETE兩種信息,Sink Connector 根據這兩種信息構造對應的數據操作指令;

九、小結

本篇以MySQL為例介紹了傳統數據庫的靜態查詢和利用MySQL的Trigger+DML操作來模擬持續查詢,并介紹了Apache Flink上面利用增量模式完成持續查詢,并以雙流JOIN為例說明了持續查詢可能會遇到的問題,并且介紹Apache Flink以為事件打標產生delete事件的方式解決持續查詢的問題,進而保證語義的正確性,***的在流計算上支持續查詢。

# 關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-06-10 17:26:07

數據集計算

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-10-09 10:55:52

Apache FlinWatermark流計算

2022-07-13 12:53:59

數據存儲

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-07-13 13:03:29

流計算亂序

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2020-04-09 11:08:30

PyFlinkJAR依賴

2019-03-29 10:05:44

Apache開源軟件

2018-10-30 11:10:05

Flink數據集計算
點贊
收藏

51CTO技術棧公眾號

国内欧美日韩| 国产精品一区在线看| 欧美精品自拍| 亚洲精品v欧美精品v日韩精品| 欧美男女爱爱视频| 美国成人毛片| 激情五月激情综合网| 久久6免费高清热精品| 国产a级黄色片| 欧美与亚洲与日本直播| 亚洲精品欧美激情| 久久久久成人精品免费播放动漫| 波多野结衣视频免费观看| 国产电影一区二区在线观看| 精品国产乱码久久| 一级黄色香蕉视频| 色婷婷av在线| 国产网站一区二区| 91在线免费网站| 久久午夜免费视频| 日韩在线综合| 亚洲福利在线观看| 182午夜在线观看| 欧美性猛片xxxxx免费中国| 久久综合九色综合97_久久久 | 亚洲欧美韩国| 中文字幕一区二区在线观看| 精品国产一区二区三区久久久久久| 国产一级片免费在线观看| 欧美一区综合| 在线午夜精品自拍| 中文字幕无码人妻少妇免费| 亚洲午夜剧场| 色婷婷激情综合| 青青草综合在线| 992tv免费直播在线观看| 成人av电影在线| 国产欧美一区二区| 黄色在线免费观看| 伊人成人在线| 欧美超级免费视 在线| 亚洲天堂岛国片| 希岛爱理av免费一区二区| 欧美一区二区在线不卡| 五月婷婷六月合| 625成人欧美午夜电影| 亚洲国产一区二区a毛片| 在线无限看免费粉色视频| 国产鲁鲁视频在线观看免费| 97久久超碰精品国产| 99久久精品免费看国产四区| 97人妻精品一区二区三区动漫 | 国产亚洲制服色| 精品免费二区三区三区高中清不卡| 国产成人麻豆精品午夜在线| 精品一二线国产| 国产精品久久婷婷六月丁香| 国产精品久久久久7777婷婷| 成人免费aaa| 激情影院在线| 伊人一区二区三区| 大片在线观看网站免费收看| 超碰在线观看免费版| 亚洲欧美在线aaa| 亚洲在线播放电影| 嫩草在线视频| 中文字幕欧美一区| 男人的天堂成人| 黄色片网站在线| 亚洲人成网站影音先锋播放| 99精品视频网站| 国产黄色小视频在线| 亚洲三级电影全部在线观看高清| gogogo免费高清日本写真| 国产一二区在线| 亚洲欧美另类图片小说| 日本三日本三级少妇三级66| 中文字幕中文字幕在线中高清免费版| 亚洲女爱视频在线| 蜜臀精品一区二区| 理论不卡电影大全神| 色久优优欧美色久优优| 久久99999| 国产精品18| 欧美本精品男人aⅴ天堂| 美女搡bbb又爽又猛又黄www| 人人爽人人爽人人片av| 精品色999| 精品国偷自产在线| 久久国产在线视频| 男女av一区三区二区色多| 国产精品久久激情| av中文字幕第一页| 91一区一区三区| 日韩欧美一区二区三区四区五区| 美女羞羞视频在线观看| 一区二区三区四区在线播放 | 国产精品av一区二区| 久久欧美在线电影| 波多野结衣视频在线观看| 久久精品国产**网站演员| 99国产高清| 美国成人毛片| 一个色在线综合| 久久久久久久久久久视频| 成人全视频免费观看在线看| 日韩视频永久免费| 国产jjizz一区二区三区视频| 日韩精品dvd| 久久久久久久久久久亚洲| 亚洲精品一区二区二区| 国产91露脸合集magnet| 日本亚洲自拍| 免费网站在线观看人| 在线日韩av片| 亚洲乱妇老熟女爽到高潮的片 | 日本亚洲视频在线| 国产精品一区在线观看| a视频网址在线观看| 五月天亚洲婷婷| 亚洲网中文字幕| 久久91精品| 97热精品视频官网| av在线亚洲天堂| 中文字幕免费不卡| 国产97在线 | 亚洲| 蜜桃在线一区| 中文字幕亚洲综合久久| 久久久久久久久久久久久av| 国产**成人网毛片九色| 亚洲最新免费视频| 国产私拍福利精品视频二区| 亚洲第一精品夜夜躁人人爽| 97成人资源站| 久久精品国产久精国产| 日韩高清国产精品| 午夜影视一区二区三区| 亚洲精品在线电影| 国产极品国产极品| 久久国产精品99久久人人澡| 日本不卡免费新一二三区| 亚洲性色av| 亚洲国产一区二区三区在线观看| 日韩一级片av| 国产综合久久久久影院| 无遮挡亚洲一区| 国产高清不卡| 亚洲另类激情图| 日本网站免费观看| 成人深夜在线观看| 欧美国产视频一区| 亚洲国产aⅴ精品一区二区| 久久69精品久久久久久久电影好| 国产手机视频在线| 亚洲免费高清视频在线| 99久久99精品| 重囗味另类老妇506070| 91免费视频国产| 黄色国产一级视频| 国产探花在线观看| 日韩欧美精品在线视频| 中文字幕电影av| 国产在线精品免费| 日韩精品手机在线观看| 奇米一区二区| 欧美片一区二区三区| 国产成人无码www免费视频播放| 亚洲精品菠萝久久久久久久| 国产又粗又猛又爽又黄| 欧美网站在线| 精品国产免费一区二区三区| 忘忧草在线影院两性视频| 亚洲欧洲偷拍精品| 最新黄色网址在线观看| 国产精品理论片在线观看| 在线观看日本www| 欧美三级网页| 精品一卡二卡三卡四卡日本乱码| 综合毛片免费视频| 日韩中文字幕在线视频| 国产婷婷一区二区三区久久| 洋洋av久久久久久久一区| 精品熟女一区二区三区| 久久久精品性| 9999在线观看| 卡通动漫精品一区二区三区| 日本成人激情视频| 麻豆传媒免费在线观看| 精品国精品自拍自在线| 欧美bbbbbbbbbbbb精品| 国产精品欧美综合在线| 亚洲精品成人无码毛片| 久久99伊人| 91嫩草国产丨精品入口麻豆| 伊人久久大香线蕉av不卡| 国产精品一二三在线| 久久久国产精华液999999| 涩涩视频在线播放| 色婷婷av一区二区三区在线观看| 精品久久无码中文字幕| 欧美日韩综合视频网址| 日本 欧美 国产| 91视频一区二区三区| 亚洲国产午夜精品| 爽爽淫人综合网网站| 欧美日韩激情四射| 极品美女一区二区三区| 国产69精品久久久久9999apgf| 欧美电影免费观看高清完整| 美女精品久久久| 免费在线超碰| 日韩一区二区三区视频| 人人妻人人爽人人澡人人精品| 亚洲免费观看在线观看| 夜夜春很很躁夜夜躁| 粉嫩蜜臀av国产精品网站| 超碰在线播放91| 亚洲综合不卡| 草民午夜欧美限制a级福利片| 欧洲女同同性吃奶| 国产精品18久久久| 中文字幕永久视频| 亚洲一区视频| 无码熟妇人妻av在线电影| 欧美疯狂party性派对| 免费观看成人在线| 91在线一区| 成人性生交大片免费看小说 | 4388成人网| 26uuu亚洲电影在线观看| 在线精品视频视频中文字幕| 天堂av资源网| 日韩一区二区在线看| 亚洲视频一区二区三区四区| 欧美日韩中文字幕日韩欧美| 麻豆91精品91久久久| 中文字幕一区二区三区四区 | 黄色三级中文字幕| 99精品网站| 五月天综合网| 欧美偷拍综合| 欧美日韩大片一区二区三区| 人体久久天天| 国内视频一区| 第四色在线一区二区| 91精品综合久久| 国产高清视频一区二区| 91老司机精品视频| 欧洲午夜精品| 国产精品吴梦梦| yy6080久久伦理一区二区| 国产精品吊钟奶在线| 高清电影一区| 国产精品电影网站| 99欧美精品| 国产精品一区二区久久国产| 91欧美精品| 国产精品久久视频| 日韩一级视频| 成人精品一区二区三区| 成人免费91| 91在线观看免费| 亚洲超碰在线观看| 国产精品久久九九| 精品人人人人| 久久久久久九九| 性人久久久久| 日本最新一区二区三区视频观看| 琪琪久久久久日韩精品| 欧美极品视频一区二区三区| 国产91久久精品一区二区| 欧美激情论坛| av亚洲在线观看| 中文字幕在线观看一区二区三区| 97精品视频在线看| 成人毛片100部免费看| 国产精品v一区二区三区| 久久久久免费看黄a片app| 亚洲欧美日韩综合国产aⅴ| 日韩视频第二页| 亚洲产国偷v产偷v自拍涩爱| 一区二区成人在线视频| 久久久久亚洲AV| 欧美日韩激情美女| 中文字幕一区二区在线视频| 欧美丰满一区二区免费视频| 国产99对白在线播放| 亚洲精品国产精品久久清纯直播| 国产午夜精品一区理论片| 不卡av在线播放| 极品视频在线| 国产精品亚洲аv天堂网| 日韩中文在线| 欧美日韩在线一区二区三区| 99久久九九| 免费高清一区二区三区| 久久精品盗摄| 三日本三级少妇三级99| www国产成人| 久久中文免费视频| 福利视频导航一区| 国产又黄又爽视频| 日韩精品中文字| 日本福利专区在线观看| 午夜伦理精品一区| 高清一区二区中文字幕| 久久精品国产美女| 亚洲h色精品| 18岁视频在线观看| 国产成人av一区二区三区在线| 国产人妻大战黑人20p| 一区二区三区在线视频免费| 国产成人无码专区| 日韩视频一区二区| 高清性色生活片在线观看| 久久久久久69| 祥仔av免费一区二区三区四区| 精品欧美一区二区三区久久久| 五月综合激情| 熟女少妇精品一区二区| av成人免费在线| 国产免费无码一区二区视频| 欧美午夜视频网站| 麻豆导航在线观看| 久久久久久尹人网香蕉| 日韩成人免费av| 日本在线高清视频一区| 亚洲国产激情| 性久久久久久久久久久久久久| 日本一区二区免费在线| 日产亚洲一区二区三区| 日韩一区二区不卡| 欧洲不卡av| 国产精品女主播视频| 欧美禁忌电影| 青青草原av在线播放| av在线不卡免费看| 久久久久久av无码免费网站| 欧美丰满少妇xxxxx高潮对白| 大片在线观看网站免费收看| 欧美在线91| 国产精欧美一区二区三区白种人| 中文字幕精品综合| 国产一级片av| 国产一区二区三区日韩欧美| 二区三区不卡| 欧美精品与人动性物交免费看| 亚洲精品美女| 欧产日产国产精品98| 亚洲一区二区三区影院| 99热这里精品| 欧美美最猛性xxxxxx| 国产在线一区不卡| 真人做人试看60分钟免费| 国产精品一区一区| tube国产麻豆| 日韩欧美一级二级三级| 日韩精品分区| 国产精品xxx在线观看www| 亚洲欧美在线专区| 麻豆传媒在线看| 一区二区三区国产| 亚洲欧美激情另类| 久久人91精品久久久久久不卡| 8848成人影院| 一卡二卡三卡视频| 97se亚洲国产综合自在线观| 久久久国产精品成人免费| 亚洲欧美国产精品va在线观看| 一根才成人网| 亚洲高清视频一区二区| 久久99国产精品成人| 国产极品国产极品| 亚洲激情中文字幕| 欧美大电影免费观看| 亚洲精品免费在线看| 国产专区综合网| 久久精品国产亚洲av香蕉| 日韩精品极品在线观看| 日韩av免费| 午夜探花在线观看| 成人黄色一级视频| 天天爽夜夜爽人人爽| 久久久精品国产| xxxx日韩| 久久精品香蕉视频| 亚洲欧美日韩国产成人精品影院| 开心激情综合网| 日本久久久久久久久久久| 婷婷综合亚洲| 国产肉体xxxx裸体784大胆| 欧美日韩视频第一区| 色呦呦在线看| 日本在线成人一区二区| 国产精品99久久久久久有的能看| 欧美一级特黄视频| 日韩在线观看视频免费| 欧美调教网站| 亚洲免费999| 午夜激情久久久|