精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Flink 漫談系列(04) - State

開發 開發工具
本篇簡單介紹了Apache Flink中State的概念,并重點介紹了OperatorState和KeyedState在擴容時候的處理方式。

實際問題

在流計算場景中,數據會源源不斷的流入Apache Flink系統,每條數據進入Apache Flink系統都會觸發計算。如果我們想進行一個Count聚合計算,那么每次觸發計算是將歷史上所有流入的數據重新新計算一次,還是每次計算都是在上一次計算結果之上進行增量計算呢?答案是肯定的,Apache Flink是基于上一次的計算結果進行增量計算的。那么問題來了: "上一次的計算結果保存在哪里,保存在內存可以嗎?",答案是否定的,如果保存在內存,在由于網絡,硬件等原因造成某個計算節點失敗的情況下,上一次計算結果會丟失,在節點恢復的時候,就需要將歷史上所有數據(可能十幾天,上百天的數據)重新計算一次,所以為了避免這種災難性的問題發生,Apache Flink 會利用State存儲計算結果。本篇將會為大家介紹Apache Flink State的相關內容。

什么是State

這個問題似乎有些"***"?不管問題的答案是否顯而易見,但我還是想簡單說一下在Apache Flink里面什么是State?State是指流計算過程中計算節點的中間計算結果或元數據屬性,比如 在aggregation過程中要在state中記錄中間聚合結果,比如 Apache Kafka 作為數據源時候,我們也要記錄已經讀取記錄的offset,這些State數據在計算過程中會進行持久化(插入或更新)。所以Apache Flink中的State就是與時間相關的,Apache Flink任務的內部數據(計算數據和元數據屬性)的快照。

為什么需要State

與批計算相比,State是流計算特有的,批計算沒有failover機制,要么成功,要么重新計算。流計算在 大多數場景 下是增量計算,數據逐條處理(大多數場景),每次計算是在上一次計算結果之上進行處理的,這樣的機制勢必要將上一次的計算結果進行存儲(生產模式要持久化),另外由于 機器,網絡,臟數據等原因導致的程序錯誤,在重啟job時候需要從成功的檢查點(checkpoint,后面篇章會專門介紹)進行state的恢復。增量計算,Failover這些機制都需要state的支撐。

State 實現

Apache Flink內部有四種state的存儲實現,具體如下:

  • 基于內存的HeapStateBackend - 在debug模式使用,不 建議在生產模式下應用;
  • 基于HDFS的FsStateBackend - 分布式文件持久化,每次讀寫都產生網絡IO,整體性能不佳;
  • 基于RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化;
  • 還有一個是基于Niagara(Alibaba對 Apache Flink的增強)NiagaraStateBackend - 分布式持久化- 在Alibaba生產環境應用;

State 持久化邏輯

Apache Flink版本選擇用RocksDB+HDFS的方式進行State的存儲,State存儲分兩個階段,首先本地存儲到RocksDB,然后異步的同步到遠程的HDFS。 這樣而設計既消除了HeapStateBackend的局限(內存大小,機器壞掉丟失等),也減少了純分布式存儲的網絡IO開銷。

State 持久化邏輯

State 分類

Apache Flink 內部按照算子和數據分組角度將State劃分為如下兩類:

  • KeyedState - 這里面的key是我們在SQL語句中對應的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段組成的Row的字節數組,每一個key都有一個屬于自己的State,key與key之間的State是不可見的;
  • OperatorState - Apache Flink內部的Source Connector的實現中就會用OperatorState來記錄source數據讀取的offset。

State 擴容重新分配

Apache Flink是一個大規模并行分布式系統,允許大規模的有狀態流處理。 為了可伸縮性,Apache Flink作業在邏輯上被分解成operator graph,并且每個operator的執行被物理地分解成多個并行運算符實例。 從概念上講,Apache Flink中的每個并行運算符實例都是一個獨立的任務,可以在自己的機器上調度到網絡連接的其他機器運行。

Apache Flink的DAG圖中只有邊相連的節點🈶網絡通信,也就是整個DAG在垂直方向有網絡IO,在水平方向如下圖的stateful節點之間沒有網絡通信,這種模型也保證了每個operator實例維護一份自己的state,并且保存在本地磁盤(遠程異步同步)。通過這種設計,任務的所有狀態數據都是本地的,并且狀態訪問不需要任務之間的網絡通信。 避免這種流量對于像Apache Flink這樣的大規模并行分布式系統的可擴展性至關重要。

如上我們知道Apache Flink中State有OperatorState和KeyedState,那么在進行擴容時候(增加并發)State如何分配呢?比如:外部Source有5個partition,在Apache Flink上面由Srouce的1個并發擴容到2個并發,中間Stateful Operation 節點由2個并發并擴容的3個并發,如下圖所示:

State 擴容重新分配

在Apache Flink中對不同類型的State有不同的擴容方法,接下來我們分別介紹。

OperatorState對擴容的處理

我們選取Apache Flink中某個具體Connector實現實例進行介紹,以MetaQ為例,MetaQ以topic方式訂閱數據,每個topic會有N>0個分區,以上圖為例,加上我們訂閱的MetaQ的topic有5個分區,那么當我們source由1個并發調整為2個并發時候,State是怎么恢復的呢?

state 恢復的方式與Source中OperatorState的存儲結構有必然關系,我們先看MetaQSource的實現是如何存儲State的。首先MetaQSource 實現了ListCheckpointed,其中的T是Tuple2

  1. public interface ListCheckpointed<T extends Serializable> { 
  2.     List<T> snapshotState(long var1, long var3) throws Exception; 
  3.  
  4.     void restoreState(List<T> var1) throws Exception;} 

我們發現 snapshotState方法的返回值是一個List,T是Tuple2

  1. public interface InputSplit extends Serializable { 
  2.     int getSplitNumber(); 

也就是說,InputSplit我們可以理解為是一個Partition索引,有了這個數據結構我們在看看上面圖所示的case是如何工作的?當Source的并行度是1的時候,所有打partition數據都在同一個線程中讀取,所有partition的state也在同一個state中維護,State存儲信息格式如下:

如果我們現在將并發調整為2,那么我們5個分區的State將會在2個獨立的任務(線程)中進行維護,在內部實現中我們有如下算法進行分配每個Task所處理和維護partition的State信息,如下:

  1. List<Integer> assignedPartitions = new LinkedList<>(); 
  2. for (int i = 0; i < partitions; i++) { 
  3.         if (i % consumerCount == consumerIndex) { 
  4.                 assignedPartitions.add(i); 
  5.         } 

這個求mod的算法,決定了每個并發所處理和維護partition的State信息,針對我們當前的case具體的存儲情況如下:

那么到現在我們發現上面擴容后State得以很好的分配得益于OperatorState采用了List的數據結構的設計。另外大家注意一個問題,相信大家已經發現上面分配partition的算法有一個限制,那就是Source的擴容(并發數)是否可以超過Source物理存儲的partition數量呢?答案是否定的,不能。目前Apache Flink的做法是提前報錯,即使不報錯也是資源的浪費,因為超過partition數量的并發永遠分配不到待管理的partition。

KeyedState對擴容的處理

對于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一樣,這種分配方式大多數情況是恢復的state不是本地已有的state,需要一次網絡拷貝,這種效率比較低,OperatorState采用這種簡單的方式進行處理是因為OperatorState的state一般都比較小,網絡拉取的成本很小,對于KeyedState往往很大,我們會有更好的選擇,在Apache Flink中采用的是Key-Groups方式進行分配。

什么是Key-Groups

Key-Groups 是Apache Flink中對keyed state按照key進行分組的方式,每個key-group中會包含N>0個key,一個key-group是State分配的原子單位。在Apache Flink中關于Key-Group的對象是 KeyGroupRange, 如下:

  1. public class KeyGroupRange implements KeyGroupsList, Serializable { 
  2.         ... 
  3.         ... 
  4.         private final int startKeyGroup; 
  5.         private final int endKeyGroup; 
  6.         ... 
  7.         ...} 

KeyGroupRange兩個重要的屬性就是 startKeyGroup和endKeyGroup,定義了startKeyGroup和endKeyGroup屬性后Operator上面的Key-Group的個數也就確定了。

什么決定Key-Groups的個數

key-group的數量在job啟動前必須是確定的且運行中不能改變。由于key-group是state分配的原子單位,而每個operator并行實例至少包含一個key-group,因此operator的***并行度不能超過設定的key-group的個數,那么在Apache Flink的內部實現上key-group的數量就是***并行度的值。

GroupRange.of(0, maxParallelism)如何決定key屬于哪個Key-Group

確定好GroupRange之后,如何決定每個Key屬于哪個Key-Group呢?我們采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法會將key劃分到指定的key-group中,如下:

  1. public static int assignToKeyGroup(Object key, int maxParallelism) { 
  2.       return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  3.  
  4. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  5.       return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism); 
  6.  
  7. @Override 
  8. public int partition(T key, int numPartitions) { 
  9.       return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions; 

如上實現我們了解到分配Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism進行取余操作來分配的。如下圖當parallelism=2,maxParallelism=10的情況下流上key與key-group的對應關系如下圖所示:

如上圖key(a)的hashCode是97,與***并發10取余后是7,被分配到了KG-7中,流上每個event都會分配到KG-0至KG-9其中一個Key-Group中。

每個Operator實例如何獲取Key-Groups

了解了Key-Groups概念和如何分配每個Key到指定的Key-Groups之后,我們看看如何計算每個Operator實例所處理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:

  1. public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( 
  2.       int maxParallelism, 
  3.       int parallelism, 
  4.       int operatorIndex) { 
  5.         GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex); 
  6.         int startGroup = splitRange.getStartGroup(); 
  7.         int endGroup = splitRange.getEndGroup(); 
  8.    return new KeyGroupRange(startGroup, endGroup - 1); 
  9.  
  10. public GroupRange getSplitRange(int numSplits, int splitIndex) { 
  11.         ... 
  12.         final int numGroupsPerSplit = getNumGroups() / numSplits; 
  13.         final int numFatSplits = getNumGroups() % numSplits; 
  14.  
  15.         int startGroupForThisSplit; 
  16.         int endGroupForThisSplit; 
  17.         if (splitIndex < numFatSplits) { 
  18.             startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1); 
  19.             endGroupForThisSplit =   startGroupForThisSplit + numGroupsPerSplit + 1; 
  20.         } else { 
  21.             startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits; 
  22.             endGroupForThisSplit =  startGroupForThisSplit + numGroupsPerSplit; 
  23.         } 
  24.         if (startGroupForThisSplit >= endGroupForThisSplit) { 
  25.                 return GroupRange.emptyGroupRange(); 
  26.         } else { 
  27.                 return new GroupRange(startGroupForThisSplit, endGroupForThisSplit); 
  28.         }} 

上面代碼的核心邏輯是先計算每個Operator實例至少分配的Key-Group個數,將不能整除的部分N個,平均分給前N個實例。最終每個Operator實例管理的Key-Groups會在GroupRange中表示,本質是一個區間值;下面我們就上圖的case,說明一下如何進行分配以及擴容后如何重新分配。

假設上面的Stateful Operation節點的***并行度maxParallelism的值是10,也就是我們一共有10個Key-Group,當我們并發是2的時候和并發是3的時候分配的情況如下圖:

如上算法我們發現在進行擴容時候,大部分state還是落到本地的,如Task0只有KG-4被分出去,其他的還是保持在本地。同時我們也發現,一個job如果修改了maxParallelism的值那么會直接影響到Key-Groups的數量和key的分配,也會打亂所有的Key-Group的分配,目前在Apache Flink系統中統一將maxParallelism的默認值調整到4096,***程度的避免無法擴容的情況發生。

小結

本篇簡單介紹了Apache Flink中State的概念,并重點介紹了OperatorState和KeyedState在擴容時候的處理方式。Apache Flink State是支撐Apache Flink中failover,增量計算,Window等重要機制和功能的核心設施。后續介紹failover,增量計算,Window等相關篇章中也會涉及State的利用,當涉及到本篇沒有覆蓋的內容時候再補充介紹。同時本篇沒有介紹Alibaba對Apache Flink的增強的Niagara版本的State。Niagara是Alibaba精心打造的新一代適用于流計算場景的StateBackend存儲實現,相關內容后續在合適時間再向大家介紹。

關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-06-10 17:26:07

數據集計算

2018-10-09 10:55:52

Apache FlinWatermark流計算

2022-07-13 12:53:59

數據存儲

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-07-13 13:03:29

流計算亂序

2022-07-12 10:38:25

分布式框架

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2022-06-06 11:55:12

Flink字節跳動State

2020-04-09 11:08:30

PyFlinkJAR依賴

2024-03-28 08:50:58

Flink分配方式后端
點贊
收藏

51CTO技術棧公眾號

免费毛片一区二区三区久久久| 一区二区成人av| 国产av人人夜夜澡人人爽麻豆| 后入内射欧美99二区视频| 欧美一级视频| 久久久精品免费视频| 亚洲av无码一区二区三区观看| 91亚洲精品| 亚洲va中文字幕| 色综合久久av| 手机看片一区二区| 精品一区二区三区影院在线午夜| 欧美—级高清免费播放| 免费一级黄色录像| 日韩伦理一区二区三区| 欧美一区二区三区免费| 国产熟人av一二三区| 91黄页在线观看| 国产精品久久二区二区| 久久亚洲高清| 免费a视频在线观看| 精品一二三四区| 日韩美女写真福利在线观看| 久久久精品视频在线| 成人综合专区| 亚洲日本欧美中文幕| 无码国产精品一区二区免费式直播| 韩国精品主播一区二区在线观看| 亚洲超碰97人人做人人爱| 国产大尺度在线观看| av在线之家电影网站| 99精品视频一区| http;//www.99re视频| 97人妻精品一区二区三区视频| 性色一区二区三区| 性色av一区二区三区| 黄色一级片在线免费观看| 欧美疯狂party性派对| 在线日韩中文字幕| 国产精品高清无码在线观看| 日韩福利视频一区| 日韩精品欧美激情| 少妇户外露出[11p]| 久久99国产精品久久99大师| 精品国产乱码久久久久久免费| aaaaaaaa毛片| 玖玖精品一区| 日韩一区二区在线看片| 中文字幕第三区| 成人豆花视频| 欧美一区二区二区| 美女被艹视频网站| 日韩中文一区二区| 精品免费日韩av| 日本性生活一级片| 日本天堂一区| 国产婷婷97碰碰久久人人蜜臀| 国产一级二级在线观看| 青青视频一区二区| 亚洲欧美国产另类| 在线国产视频一区| 久久国产电影| 超碰91人人草人人干| 国产盗摄一区二区三区在线| 中文字幕日韩一区二区不卡| 欧美老少配视频| 久久高清免费视频| 亚洲精品三级| 国产99在线|中文| 久久久久精彩视频| 韩国成人精品a∨在线观看| 999在线免费观看视频| 人妻视频一区二区三区| 国产亚洲一区二区在线观看| 色就是色欧美| 成人在线免费看片| 午夜精品在线视频一区| 成年人免费在线播放| 成人四虎影院| 欧美日本高清视频在线观看| 三级网站免费看| 国产香蕉精品| 永久免费看mv网站入口亚洲| 国产免费久久久久| 一本一本久久| 国产精品欧美亚洲777777| 国产精品久久久久毛片| 丰满白嫩尤物一区二区| 日本精品视频一区| 九义人在线观看完整免费版电视剧| 一区二区三区四区高清精品免费观看 | 精品中文在线| 日韩高清人体午夜| 欧美人与禽zoz0善交| 国产精品黄色| 国产精品久久在线观看| 国产女人高潮的av毛片| 91女厕偷拍女厕偷拍高清| 中国成人亚色综合网站| av午夜在线观看| 欧美日韩成人综合天天影院| 午夜不卡久久精品无码免费| 日韩欧美高清在线播放| 91精品成人久久| 亚洲熟妇无码久久精品| 99久久er热在这里只有精品15| 午夜视频久久久| 黄色aa久久| 欧美一区二区视频在线观看2022| 亚洲a v网站| 影音先锋在线一区| 国产中文字幕日韩| 国产黄在线观看| 亚洲成人资源在线| 波多野结衣在线免费观看| 成人亚洲一区| 国产成人精品视频在线观看| 亚洲欧美激情另类| 亚洲欧洲av另类| av免费网站观看| 国内精品久久久久久久果冻传媒| 蜜臀久久99精品久久久| 国产精品传媒视频| 狠狠操精品视频| 婷婷综合福利| 国模私拍一区二区三区| av免费观看在线| 国产精品护士白丝一区av| 女人另类性混交zo| 清纯唯美亚洲经典中文字幕| 国产69精品99久久久久久宅男| 国产乱色精品成人免费视频| 欧美国产日韩a欧美在线观看 | 亚洲黄色网址大全| 亚洲一区二区三区高清不卡| 国产v亚洲v天堂无码| 中文字幕中文字幕在线十八区 | 欧美精品二区| 亚洲精品欧美一区二区三区| 欧美成人视屏| 欧美日韩夫妻久久| 99热6这里只有精品| 日韩精品久久理论片| 欧美日韩一区在线视频| 亚洲女色av| 亚洲欧美日韩精品久久亚洲区 | 欧美视频小说| 婷婷激情一区| 亚洲一二三在线| 国产免费www| 欧美激情一二三区| 国产又黄又猛又粗| 久久国产亚洲精品| 国产精品人成电影在线观看| se在线电影| 欧美群妇大交群的观看方式| 顶级黑人搡bbw搡bbbb搡| 蜜桃精品视频在线| 中文字幕中文字幕一区三区| 国产精品视频一区视频二区 | 日韩亚洲国产免费| 久久天堂av综合合色| 国产chinasex对白videos麻豆| 亚洲免费观看高清完整版在线观看| 樱花草www在线| 激情丁香综合| 久久亚洲国产精品日日av夜夜| 香蕉视频亚洲一级| www.久久色.com| 亚洲精品911| 欧美日韩一区二区三区在线免费观看| 麻豆精品免费视频| 激情五月激情综合网| 国产尤物av一区二区三区| 国产人妖ts一区二区| 国产成人精品一区二区| 免费高清在线观看| 亚洲精品一区二区三区影院 | 国产自产v一区二区三区c| 免费看黄色a级片| 欧美一区自拍| 国产精品免费网站| 任你弄在线视频免费观看| 亚洲国产精品中文| 97超视频在线观看| 亚洲高清三级视频| 久久日免费视频| 粉嫩一区二区三区性色av| 欧美牲交a欧美牲交aⅴ免费真| 久久视频国产| 精品亚洲第一| 国产精品久久久久久久久久久久久久久| 久久99精品久久久久久噜噜| 毛片在线免费| 日韩美女主播在线视频一区二区三区 | 国产精品高潮呻吟久久av野狼| 黄色网在线看| 亚洲精品一二区| 国产成人av免费看| 91黄色免费版| 日本一区二区欧美| 亚洲日本青草视频在线怡红院| 亚洲国产欧美视频| 国产米奇在线777精品观看| 日本免费黄视频| 欧美一区二区三区久久精品| 欧洲亚洲一区二区三区四区五区| 欧美.com| 国产精品一区二区三区免费视频| 3344国产永久在线观看视频| 久久精品亚洲一区| 国产在线超碰| 日韩av在线免费| 亚洲国产av一区二区| 777精品伊人久久久久大香线蕉| 800av免费在线观看| 亚洲自拍另类综合| 91视频青青草| 国产精品―色哟哟| 97人妻精品一区二区免费| 成人av午夜影院| 亚洲成人av免费观看| 美女尤物国产一区| 激情婷婷综合网| 午夜亚洲视频| 欧美不卡在线播放| 亚洲视频精品| 日韩成人手机在线| 在线成人激情| 日本一区二区免费高清视频| 成人羞羞在线观看网站| 日韩精品电影网站| 国产毛片一区二区三区| 久久99欧美| 婷婷五月色综合香五月| 精品伦精品一区二区三区视频| 一区二区视频| 亚洲一区二区在线播放| 国产95亚洲| 91久久大香伊蕉在人线| 国产精品2区| 91香蕉亚洲精品| av在线亚洲一区| 96国产粉嫩美女| 亚洲3区在线| 国产精品日韩一区二区| 精品久久ai电影| 国外成人免费视频| 性欧美xxxx免费岛国不卡电影| 激情五月综合色婷婷一区二区| 动漫3d精品一区二区三区乱码| 97操在线视频| 国产福利资源一区| 精品国产乱码久久久久久郑州公司 | 欧美日韩综合在线| 一级全黄少妇性色生活片| 欧美乱熟臀69xxxxxx| 国产黄色一级大片| 亚洲精品一区二区三区蜜桃下载 | 最近2019免费中文字幕视频三| lutube成人福利在线观看| 国产亚洲精品美女久久久久| 99reav在线| 免费97视频在线精品国自产拍| 亚洲综合影视| 69影院欧美专区视频| 精品成人av| 91色在线视频| 白白在线精品| 欧美在线一区二区三区四区| 欧美wwwww| 日本人妻伦在线中文字幕| 一区二区毛片| 亚洲国产成人va在线观看麻豆| 国产精品88888| 国产亚洲无码精品| 国产精品久久久久久久久免费相片 | 久操视频在线| 97香蕉久久超级碰碰高清版| 素人啪啪色综合| 成人黄色在线免费观看| 亚洲涩涩av| 五月天色婷婷综合| 销魂美女一区二区三区视频在线| 欧美精品性生活| 成人一区二区在线观看| 337人体粉嫩噜噜噜| 一区二区国产盗摄色噜噜| 好吊色在线视频| 欧美一区二区三区影视| 桃花色综合影院| 久久国内精品一国内精品| 依依综合在线| 91精品国产高清久久久久久91裸体| 日韩av系列| 福利网在线观看| 免费在线日韩av| 手机在线播放av| 欧美韩日一区二区三区| 日本天堂在线视频| 欧美乱妇23p| 国内精品一区视频| 欧美激情亚洲另类| 成人精品国产| 欧美第一黄网| 精品999日本| 五月天开心婷婷| 久久久久久久久久美女| 日本a在线观看| 日韩一区二区三区四区五区六区 | 国产日韩欧美电影| 免费一级特黄特色大片| 欧美日韩国产精选| 全部免费毛片在线播放网站| 久久91精品国产91久久跳| 日韩黄色三级在线观看| 欧美高清性xxxxhd| 亚洲黄色av| 波多野结衣三级视频| 国产精品久线在线观看| 无码人妻av一区二区三区波多野| 欧美成人a视频| 国产精品刘玥久久一区| 国产精品美女网站| 国产成人精品三级高清久久91| 日韩黄色片在线| 国产一区不卡精品| 91香蕉视频在线播放| 欧美日韩在线观看一区二区| 黄色av网站在线免费观看| 97久久伊人激情网| 国产精品白丝av嫩草影院| 超碰人人爱人人| 国产麻豆精品一区二区| 手机av在线看| 欧美一级夜夜爽| caopo在线| 亚洲综合第一页| 欧美激情亚洲| 亚洲美女在线播放| 亚洲国产精品一区二区www在线| www.久久久久久| 欧美大片网站在线观看| 中文字幕一区二区三区中文字幕| 国产乱子伦精品视频| 国产成人午夜精品影院观看视频 | 日一区二区三区| 久久美女免费视频| 欧美午夜免费电影| 欧美jizz18性欧美| 91欧美激情另类亚洲| 亚洲午夜精品一区二区国产| 免费网站在线观看黄| 亚洲精品美国一| 日本高清视频在线| 555www成人网| 国产一区网站| 激情在线观看视频| 亚洲最大的成人av| 五月天久久久久久| 日本免费在线精品| 日韩电影二区| 在线成人免费av| 午夜私人影院久久久久| 日韩av成人| 国产精品视频yy9099| 91精品一区二区三区综合| 老司机av网站| 激情亚洲一区二区三区四区 | 色爱av综合网| 欧美性猛交xxx乱久交| 国产精品灌醉下药二区| 亚洲AV无码一区二区三区少妇| 国内偷自视频区视频综合| 久久99国内| aaa一级黄色片| 亚洲成人第一页| 在线免费观看黄色网址| 99re6热在线精品视频播放速度| 99综合在线| 999精品视频在线观看播放| 日韩女优av电影| 欧美韩国亚洲| 日本a级片在线播放| 99国产一区二区三精品乱码| 免费看av在线| 亚州国产精品久久久| 久久国产精品成人免费观看的软件| 国产调教打屁股xxxx网站| 色老头久久综合| 影音先锋在线视频| 欧美日韩中文国产一区发布| 国产精品正在播放| 7799精品视频天天看| 久久97精品久久久久久久不卡| 精品久久电影| 欧美深性狂猛ⅹxxx深喉| 91麻豆精品91久久久久同性| 伊人色综合一区二区三区影院视频| 国产精品一区在线免费观看|