精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

如何做到“恰好一次”地傳遞數十億條消息

大數據
在分布式領域中存在著三種類型的消息投遞語義,分別是:最多一次(at-most-once)、至少一次(at-least-once)和恰好一次(exactly-once)。本文作者介紹了一個利用Kafka和RocksDB來構建的“恰好一次”消息去重系統的實現原理。

在分布式領域中存在著三種類型的消息投遞語義,分別是:最多一次(at-most-once)、至少一次(at-least-once)和恰好一次(exactly-once)。本文作者介紹了一個利用Kafka和RocksDB來構建的“恰好一次”消息去重系統的實現原理。

對任何一個數據流水線的唯一要求就是不能丟失數據。數據通常可以被延遲或重新排序,但不能丟失。

為了滿足這一要求,大多數的分布式系統都能夠保證“至少一次”的投遞消息技術。實現“至少一次”的投遞技術通常就是:“重試、重試、再重試”。在你收到消費者的確認消息之前,你永遠不要認為消息已經投遞過去。

但“至少一次”的投遞并不是用戶想要的。用戶希望消息被投遞一次,并且僅有一次。

然而,實現“恰好一次”的投遞需要***的設計。每種投遞失敗的情況都必須認真考慮,并設計到架構中去,因此它不能在事后“掛到”現有的實現上去。即使這樣,“只有一次”的投遞消息幾乎是不可能的。

在過去的三個月里,我們構建了一個全新的去重系統,以便在面對各種故障時能讓系統盡可能實現“恰好一次”的投遞。

新系統能夠跟蹤舊系統100倍的消息數量,并且可靠性也得到了提高,而付出的代價卻只有一點點。下面我們就開始介紹這個新系統。

問題所在

Segment內部的大部分系統都是通過重試、消息重新投遞、鎖定和兩階段提交來優雅地處理故障。但是,有一個特例,那就是將數據直接發送到公共API的客戶端程序。

客戶端(特別是移動客戶端)經常會發生網絡問題,有時候發送了數據,卻沒有收到API的響應。

想象一下,某天你乘坐公共汽車,在iPhone上使用HotelTonight軟件預訂房間。該應用程序將數據上傳到了Segment的服務器上,但汽車突然進入了隧道并失去了網絡連接。你發送的某些數據在服務器上已經被處理,但客戶端卻無法收到服務器的響應消息。

在這種情況下,即使服務器在技術上已經收到了這些確切的消息,但客戶端也會進行重試并將相同的消息重新發送給Segment的API。

從我們服務器的統計數據來看,在四個星期的窗口時間內,大約有0.6%的消息似乎是我們已經收到過的重復消息。

這個錯誤率聽起來可能并不是很高。但是,對于一個能創造數十億美元效益的電子商務應用程序來說,0.6%的出入可能意味著盈利和數百萬美元損失之間的差別。

對消息進行去重

現在,我們認識到問題的癥結了,我們必須刪除發送到API的重復消息。但是,該怎么做呢?

最簡單的思路就是使用針對任何類型的去重系統的高級API。在Python中,我們可以將其表示為:

  1. def dedupe(stream): 
  2.  
  3.   for message in stream: 
  4.  
  5.     if has_seen(message.id):  
  6.  
  7.       discard(message) 
  8.  
  9.     else
  10.  
  11.       publish_and_commit(message)  

對于數據流中的每個消息,首先要把他的id(假設是唯一的)作為主鍵,檢查是否曾經見過這個特定的消息。如果以前見過這個消息,則丟棄它。如果沒有,則是新的,我們應重新發布這個消息并以原子的方式提交消息。

為了避免存儲所有的消息,我們會設置“去重窗口”這個參數,這個參數定義了在消息過期之前key存儲的時長。只要消息落在窗口時間之外,我們就認為它已過期失效。我們要保證在窗口時間內某個給定ID的消息只發送一次。

這個行為很容易描述,但有兩個方面需要特別注意:讀/寫性能和正確性。

我們希望系統能夠低延遲和低成本的對通過流水線的數十億個事件進行去重。更重要的是,我們要確保所有的事件都能夠被持久化,以便可以從崩潰中恢復出來,并且不會輸出重復的消息。

架構

為了實現這一點,我們創建了一個“兩階段”架構,它讀入Kafka的數據,并且在四個星期的時間窗口內對接收到的所有事件進行去重。

 

去重系統的高級架構圖

Kafka的拓撲結構

要了解其工作原理,首先看一下Kafka的流拓撲結構。所有傳入消息的API調用都將作為單獨的消息進行分離,并讀入到Kafka輸入主題(input topic)中。

首先,每個傳入的消息都有一個由客戶端生成的具有唯一性的messageId標記。在大多數情況下,這是一個UUIDv4(我們考慮切換到ksuids)。 如果客戶端不提供messageId,我們會在API層自動分配一個。

我們不使用矢量時鐘或序列號,因為我們希望能降低客戶端的復雜性。使用UUID可以讓任何人輕松地將數據發送到我們的API上來,因為幾乎所有的主要語言都支持它。

  1.  
  2.   "messageId""ajs-65707fcf61352427e8f1666f0e7f6090"
  3.  
  4.   "anonymousId""e7bd0e18-57e9-4ef4-928a-4ccc0b189d18"
  5.  
  6.   "timestamp""2017-06-26T14:38:23.264Z"
  7.  
  8.   "type""page" 
  9.  
  10. }  

為了能夠將消息持久化,并能夠重新發送,一個個的消息被保存到Kafka中。消息以messageId進行分區,這樣就可以保證具有相同messageId的消息能夠始終由同一個消費者處理。

這對于數據處理來說是一件很重要的事情。我們可以通過路由到正確的分區來查找鍵值,而不是在整個中央數據庫的數百億條消息中查找,這種方法極大地縮小了查找范圍。

去重“worker”(worker:工人。譯者注,這里表示的是某個進程。為防止引起歧義,下文將直接使用worker)是一個Go程序,它的功能是從Kafka輸入分區中讀入數據,檢查消息是否有重復,如果是新的消息,則發送到Kafka輸出主題中。

根據我們的經驗,worker和Kafka拓撲結構都非常容易掌握。我們無需使用一組遇到故障時需要切換到副本的龐大的Memcached實例。相反,我們只需使用零協同的嵌入式RocksDB數據庫,并以非常低的成本來獲得持久化存儲。

RocksDB的worker進程

每一個worker都會在本地EBS硬盤上存放了一個RocksDB數據庫。RocksDB是由Facebook開發的嵌入式鍵值存儲系統,它的性能非常高。

每當從輸入主題中過來的消息被消費時,消費者通過查詢RocksDB來確定我們之前是否見過該事件的messageId。

如果RocksDB中不存在該消息,我們就將其添加到RocksDB中,然后將消息發布到Kafka輸出主題。

如果消息已存在于RocksDB,則worker不會將其發布到輸出主題,而是更新輸入分區的偏移,確認已處理過該消息。

性能

為了讓我們的數據庫獲得高性能,我們必須對過來的每個事件滿足三種查詢模式:

  1. 檢測隨機key的存在性,這可能不存在于我們的數據庫中,但會在key空間中的任何地方找到。
  2. 高速寫入新的key
  3. 老化那些超出了“去重窗口”的舊的key

實際上,我們必須不斷地檢索整個數據庫,追加新的key,老化舊的key。在理想情況下,這些發生在同一數據模型中。 

 [[196104]] 

 

我們的數據庫必須滿足三種獨立的查詢模式

一般來說,這些性能大部分取決于我們數據庫的性能,所以應該了解一下RocksDB的內部機制來提高它的性能。

RocksDB是一個日志結構合并樹(log-structured-merge-tree, 簡稱LSM)數據庫,這意味著它會不斷地將新的key附加到磁盤上的預寫日志(write-ahead-log)中,并把排序過的key存放在內存中作為memtable的一部分。

  

key存放在內存中作為memtable的一部分

寫入key是一個非常快速的過程。新的消息以追加的方式直接保存到磁盤上,并且數據條目在內存中進行排序,以提供快速的搜索和批量寫入。

每當寫入到memtable的條目達到一定數量時,這些條目就會被作為SSTable(排序的字符串表)持久化到磁盤上。由于字符串已經在內存中排過序了,所以可以將它們直接寫入磁盤。

 

當前的memtable零級寫入磁盤

以下是在我們的生產日志中寫入的示例:

  1. [JOB 40] Syncing log #655020 
  2.  
  3. [default] [JOB 40] Flushing memtable with next log file: 655022 
  4.  
  5. [default] [JOB 40] Level-0 flush table #655023: started 
  6.  
  7. [default] [JOB 40] Level-0 flush table #655023: 15153564 bytes OK 
  8.  
  9. [JOB 40] Try to delete WAL files size 12238598, prev total WAL file size 24346413, number of live WAL files 3.  

每個SSTable是不可變的,一旦創建,永遠不會改變。這是什么寫入新的鍵這么快的原因。無需更新文件,無需寫入擴展。相反,在帶外壓縮階段,同一級別的多個SSTable可以合并成一個新的文件。

 

 

當在同一級別的SSTables壓縮時,它們的key會合并在一起,然后將新的文件升級到下一個更高的級別。

看一下我們生產的日志,可以看到這些壓縮作業的示例。在這種情況下,作業41正在壓縮4個0級文件,并將它們合并為單個較大的1級文件。

  1. /data/dedupe.db$ head -1000 LOG | grep "JOB 41" 
  2.  
  3. [JOB 41] Compacting 4@0 + 4@1 files to L1, score 1.00 
  4.  
  5. [default] [JOB 41] Generated table #655024: 1550991 keys, 69310820 bytes 
  6.  
  7. [default] [JOB 41] Generated table #655025: 1556181 keys, 69315779 bytes 
  8.  
  9. [default] [JOB 41] Generated table #655026: 797409 keys, 35651472 bytes 
  10.  
  11. [default] [JOB 41] Generated table #655027: 1612608 keys, 69391908 bytes 
  12.  
  13. [default] [JOB 41] Generated table #655028: 462217 keys, 19957191 bytes 
  14.  
  15. [default] [JOB 41] Compacted 4@0 + 4@1 files to L1 => 263627170 bytes  

壓縮完成后,新合并的SSTables將成為最終的數據庫記錄集,舊的SSTables將被取消鏈接。

如果我們登錄到生產實例,我們可以看到正在更新的預寫日志以及正在寫入、讀取和合并的單個SSTable。

 

日志和最近占用I/O的SSTable

下面生產的SSTable統計數據中,可以看到一共有四個“級別”的文件,并且一個級別比一個級別的文件大。

  1. ** Compaction Stats [default] ** 
  2.  
  3. Level    Files   Size(MB} Score Read(GB}  Rn(GB} Rnp1(GB} Write(GB} Wnew(GB} Moved(GB} W-Amp Rd(MB/s} Wr(MB/s} Comp(sec} Comp(cnt} Avg(sec} KeyIn KeyDrop 
  4.  
  5. ---------------------------------------------------------------------------------------------------------------------------------------------------------- 
  6.  
  7.   L0      1/0      14.46   0.2      0.0     0.0      0.0       0.1      0.1       0.0   0.0      0.0     15.6         7         8    0.925       0      0 
  8.  
  9.   L1      4/0     194.95   0.8      0.5     0.1      0.4       0.5      0.1       0.0   4.7     20.9     20.8        26         2   12.764     12M     40 
  10.  
  11.   L2     48/0    2551.71   1.0      1.4     0.1      1.3       1.4      0.1       0.0  10.7     19.4     19.4        73         2   36.524     34M     14 
  12.  
  13.   L3    351/0   21735.77   0.8      2.0     0.1      1.9       1.9     -0.0       0.0  14.3     18.1     16.9       112         2   56.138     52M  3378K 
  14.  
  15.  Sum    404/0   24496.89   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K 
  16.  
  17.  Int      0/0       0.00   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K  

RocksDB保存了索引和存儲在SSTable的特定SSTables的布隆過濾器,并將這些加載到內存中。通過查詢這些過濾器和索引可以找到特定的key,然后將完整的SSTable作為LRU基礎的一部分加載到內存中。

在絕大多數情況下,我們就可以看到新的消息了,這使得我們的去重系統成為教科書中的布隆過濾器案例。

布隆過濾器會告訴我們某個鍵“可能在集合中”,或者“絕對在集合中”。要做到這一點,布隆過濾器保存了已經見過的任何元素的多種哈希函數的設置位。如果設置了散列函數的所有位,則過濾器將返回消息“可能在集合中”。

 

我們的集合包含{x,y,z},在布隆過濾器中查詢w,則布隆過濾器會返回“不在集合中”,因為其中有一位沒有設置。

如果返回“可能在集合中”,則RocksDB可以從SSTables中查詢到原始數據,以確定該項是否在該集合中實際存在。但在大多數情況下,我們不需查詢任何SSTables,因為過濾器將返回“絕對不在集合”的響應。

在我們查詢RocksDB時,我們會為所有要查詢的相關的messageId發出一個MultiGet。基于性能考慮,我們會批量地發布出去,以避免太多的并發鎖定操作。它還允許我們批量處理來自Kafka的數據,這是為了實現順序寫入,而不是隨機寫入。

以上回答了為什么讀/寫工作負載性能這么好的問題,但仍然存在如何老化數據這個問題。

刪除:按大小來限制,而不是按時間來限制

在我們的去重過程中,我們必須要確定是否要將我們的系統限制在嚴格的“去重窗口”內,或者是通過磁盤上的總數據庫大小來限制。

為了避免系統突然崩潰導致去重系統接收到所有客戶端的消息,我們決定按照大小來限制接收到消息數量,而不是按照設定的時間窗口來限制。這允許我們為每個RocksDB實例設置***的大小,以能夠處理突然的負載增加。但是其副作用是可能會將去重窗口降低到24小時以下。

我們會定期在RocksDB中老化舊的key,使其不會增長到***大小。為此,我們根據序列號保留key的第二個索引,以便我們可以先刪除最早接收到的key。

我們使用每個插入的key的序列號來刪除對象,而不是使用RocksDB TTL(這需要在打開數據庫的時候設置一個固定的TTL值)來刪除。

因為序列號是第二索引,所以我們可以快速地查詢,并將其標記為已刪除。下面是根據序列號進行刪除的示例代碼:

  1. func (d *DB) delete(n int) error { 
  2.  
  3.         // open a connection to RocksDB 
  4.  
  5.         ro := rocksdb.NewDefaultReadOptions() 
  6.  
  7.         defer ro.Destroy() 
  8.  
  9.  
  10.         // find our offset to seek through for writing deletes 
  11.  
  12.         hint, err := d.GetBytes(ro, []byte("seek_hint")) 
  13.  
  14.         if err != nil { 
  15.  
  16.                 return err 
  17.  
  18.         } 
  19.  
  20.  
  21.         it := d.NewIteratorCF(ro, d.seq) 
  22.  
  23.         defer it.Close() 
  24.  
  25.  
  26.         // seek to the first key, this is a small 
  27.  
  28.         // optimization to ensure we don't use `.SeekToFirst()` 
  29.  
  30.         // since it has to skip through a lot of tombstones. 
  31.  
  32.         if len(hint) > 0 { 
  33.  
  34.                 it.Seek(hint) 
  35.  
  36.         } else { 
  37.  
  38.                 it.SeekToFirst() 
  39.  
  40.         } 
  41.  
  42.  
  43.         seqs := make([][]byte, 0, n) 
  44.  
  45.         keys := make([][]byte, 0, n) 
  46.  
  47.  
  48.         // look through our sequence numbers, counting up 
  49.  
  50.         // append any data keys that we find to our set to be 
  51.  
  52.         // deleted 
  53.  
  54.         for it.Valid() && len(seqs) < n { 
  55.  
  56.                 k, v := it.Key(), it.Value() 
  57.  
  58.                 key := make([]byte, len(k.Data())) 
  59.  
  60.                 val := make([]byte, len(v.Data())) 
  61.  
  62.  
  63.                 copy(key, k.Data()) 
  64.  
  65.                 copy(val, v.Data()) 
  66.  
  67.                 seqs = append(seqs, key
  68.  
  69.                 keys = append(keys, val) 
  70.  
  71.  
  72.                 it.Next() 
  73.  
  74.                 k.Free() 
  75.  
  76.                 v.Free() 
  77.  
  78.         } 
  79.  
  80.  
  81.         wb := rocksdb.NewWriteBatch() 
  82.  
  83.         wo := rocksdb.NewDefaultWriteOptions() 
  84.  
  85.         defer wb.Destroy() 
  86.  
  87.         defer wo.Destroy() 
  88.  
  89.  
  90.         // preserve next sequence to be deleted. 
  91.  
  92.         // this is an optimization so we can use `.Seek()` 
  93.  
  94.         // instead of letting `.SeekToFirst()` skip through lots of tombstones. 
  95.  
  96.         if len(seqs) > 0 { 
  97.  
  98.                 hint, err := strconv.ParseUint(string(seqs[len(seqs)-1]), 10, 64) 
  99.  
  100.                 if err != nil { 
  101.  
  102.                         return err 
  103.  
  104.                 } 
  105.  
  106.  
  107.                 buf := []byte(strconv.FormatUint(hint+1, 10)) 
  108.  
  109.                 wb.Put([]byte("seek_hint"), buf) 
  110.  
  111.         } 
  112.  
  113.  
  114.         // we not only purge the keys, but the sequence numbers as well 
  115.  
  116.         for i := range seqs { 
  117.  
  118.                 wb.DeleteCF(d.seq, seqs[i]) 
  119.  
  120.                 wb.Delete(keys[i]) 
  121.  
  122.         } 
  123.  
  124.  
  125.         // finally, we persist the deletions to our database 
  126.  
  127.         err = d.Write(wo, wb) 
  128.  
  129.         if err != nil { 
  130.  
  131.                 return err 
  132.  
  133.         } 
  134.  
  135.  
  136.         return it.Err() 
  137.  
  138. }  

為了保證寫入速度,RocksDB不會立即返回并刪除一個鍵(記住,這些SSTable是不可變的!)。相反,RocksDB將添加一個“墓碑”,等到壓縮時再進行刪除。因此,我們可以通過順序寫入來快速地老化,避免因為刪除舊項而破壞內存數據。

確保正確性

我們已經討論了如何確保數十億條消息投遞的速度、規模和低成本的搜索。***一個部分將講述各種故障情況下我們如何確保數據的正確性。

EBS快照和附件

為了確保RocksDB實例不會因為錯誤的代碼推送或潛在的EBS停機而損壞,我們會定期保存每個硬盤驅動器的快照。雖然EBS已經在底層進行了復制,但是這一步可以防止數據庫受到某些底層機制的破壞。

如果我們想要啟用一個新實例,則可以先暫停消費者,將相關聯的EBS驅動器分開,然后重新附加到新的實例上去。只要我們保證分區ID相同,重新分配磁盤是一個輕松的過程,而且也能保證數據的正確性。

如果worker發生崩潰,我們依靠RocksDB內置的預寫日志來確保不會丟失消息。消息不會從輸入主題提交,除非RocksDB已經將消息持久化在日志中。

讀取輸出主題

你可能會注意到,本文直到這里都沒有提到“原子”步驟,以使我們能夠確保只投遞一次消息。我們的worker有可能在任何時候崩潰,不如:寫入RocksDB時、發布到輸出主題時,或確認輸入消息時。

我們需要一個原子的“提交”點,并覆蓋所有這些獨立系統的事務。對于輸入的數據,需要某個“事實來源”:輸出主題。

如果去重worker因為某些原因發生崩潰,或者遇到Kafka的某個錯誤,則系統在重新啟動時,會首先查閱這個“事實來源”,輸出主題,來判斷事件是否已經發布出去。

如果在輸出主題中找到消息,而不是RocksDB(反之亦然),則去重worker將進行必要的修復工作以保持數據庫和RocksDB之間的同步。實際上,我們使用輸出主題作為我們的預寫入日志和最終的事實來源,讓RocksDB進行檢查和校驗。

在生產環境中

我們的去重系統已經在生產運行了3個月,對其運行的結果我們感到非常滿意。我們有以下這些數據:

  • 在RocksDB中,有1.5TB的key存儲在磁盤上
  • 在老化舊的key之前,有一個四個星期的去重窗口
  • RocksDB實例中存儲了大約600億個key
  • 通過去重系統的消息達到2000億條

該系統快速、高效、容錯性強,也非常容易理解。

特別是我們的v2版本系統相比舊的去重系統有很多優點。

以前我們將所有的key存儲在Memcached中,并使用Memcached的原子CAS(check-and-set)操作來設置key。 Memcached起到了提交點和“原子”地發布key的作用。

雖然這個功能很好,但它需要有大量的內存來支撐所有的key。此外,我們必須能夠接受偶爾的Memcached故障,或者將用于高速內存故障切換的支出加倍。

Kafka/RocksDB的組合相比舊系統有如下幾個優勢:

  • 數據存儲在磁盤上:在內存中保存所有的key或完整的索引,其代價是非常昂貴的。通過將更多的數據轉移到磁盤,并利用多種不同級別的文件和索引,能夠大幅削減成本。對于故障切換,我們能夠使用冷備(EBS),而不用運行其他的熱備實例。
  • 分區:為了縮小key的搜索范圍,避免在內存中加載太多的索引,我們需要保證某個消息能夠路由到正確的worker。在Kafka中對上游進行分區可以對這些消息進行路由,從而更有效地緩存和查詢。
  • 顯式地進行老化處理:在使用Memcached的時候,我們在每個key上設置一個TTL來標記是否超時,然后依靠Memcached進程來對超時的key進行處理。這使得我們在面對大量數據時,可能會耗盡內存,并且在丟棄大量超時消息時,Memcached的CPU使用率會飆升。而通過讓客戶端來處理key的刪除,使得我們可以通過縮短去重窗口來優雅地處理。
  • 將Kafka作為事實來源:為了真正地避免對多個提交點進行消息去重,我們必須使用所有下游消費者都常見的事實來源。使用Kafka作為“事實來源”是最合適的。在大多數失敗的情況下(除了Kafka失敗之外),消息要么會被寫入Kafka,要么不會。使用Kafka可以確保按順序投遞消息,并在多臺計算機之間進行磁盤復制,而不需要在內存中保留大量的數據。
  • 批量讀寫:通過Kafka和RocksDB的批量I/O調用,我們可以通過利用順序讀寫來獲得更好的性能。與之前在Memcached中使用的隨機訪問不同,我們能夠依靠磁盤的性能來達到更高的吞吐量,并只在內存中保留索引。

總的來說,我們對自己構建的去重系統非常滿意。使用Kafka和RocksDB作為流媒體應用的原語開始變得越來越普遍。我們很高興能繼續在這些原語之上構建新的分布式應用程序。 

責任編輯:龐桂玉 來源: CSDN大數據
相關推薦

2019-11-27 18:33:32

Docker架構數據

2025-03-10 00:35:00

AndroidIPC管道

2019-04-18 10:55:00

故障演練流量

2018-10-11 09:33:51

Kafka消息處理

2013-01-22 17:33:30

2020-06-22 10:06:15

數據網絡泄露

2011-11-09 15:49:52

API

2017-12-12 16:17:55

微服務系統運維

2020-09-17 11:02:40

BLESA藍牙攻擊漏洞

2017-03-29 14:38:05

高可用視頻調度秒拍

2009-11-20 11:37:11

Oracle完全卸載

2022-11-23 14:08:49

2018-09-13 09:39:03

騰訊運維IT

2019-08-08 10:18:15

運維架構技術

2016-01-08 10:03:07

硅谷通吃互聯網

2020-05-20 12:52:03

漏洞攻擊藍牙

2021-12-17 11:29:03

WiFi漏洞芯片

2022-09-09 08:41:43

Netty服務端驅動

2010-03-30 10:44:05

Nginx啟動

2021-05-24 10:55:05

Netty單機并發
點贊
收藏

51CTO技術棧公眾號

日韩美女视频在线| 亚洲美女偷拍久久| 国产精品久久久久av免费| 国产精品69久久久久孕妇欧美| 日韩一区二区三免费高清在线观看| 成人免费一区二区三区视频 | 99视频这里有精品| 亚洲第一狼人社区| 亚洲欧美丝袜| 色wwwwww| 久久爱另类一区二区小说| 欧美激情xxxxx| 国产精品毛片一区二区| 欧美第一在线视频| 色综合久久综合网欧美综合网 | 欧美日韩一区二区电影| 2022中文字幕| avtt在线播放| 成人永久免费视频| 国产美女91呻吟求| 国产三级av片| 午夜日韩视频| 色阁综合伊人av| 蜜臀av粉嫩av懂色av| 久久国产三级| 91久久国产最好的精华液| 日韩精品免费一区| av中文字幕在线| 95精品视频在线| 亚洲jizzjizz日本少妇| 日本不卡高字幕在线2019| 人体私拍套图hdxxxx| 欧美日韩免费电影| 色婷婷久久一区二区三区麻豆| 免费的av在线| 男人的天堂在线视频免费观看| 成人网在线免费视频| 91精品在线观| 亚洲熟妇av乱码在线观看| 性色av一区二区怡红| 欧美二区在线播放| 在线免费观看亚洲视频| 成人免费a**址| 亚洲精品在线观看www| www.555国产精品免费| 国产精品毛片aⅴ一区二区三区| 欧美性猛片aaaaaaa做受| 欧美 日韩 亚洲 一区| 肉肉视频在线观看| 亚洲午夜日本在线观看| 久久国产精品免费观看| 自由的xxxx在线视频| 亚洲欧美日韩中文播放| 中文字幕av日韩精品| 97视频在线观看网站| 亚洲国产精品传媒在线观看| 日韩欧美视频第二区| 免费一级毛片在线观看| 久久午夜羞羞影院免费观看| 久久综合九色欧美狠狠| 手机看片福利在线观看| 久久尤物电影视频在线观看| 久久波多野结衣| 日本韩国一区| 国产三区在线成人av| 日韩在线电影一区| eeuss影院在线观看| 国产精品天干天干在观线| 亚洲一区二区三区乱码| 日本免费在线观看| 亚洲精品成人精品456| 国产激情片在线观看| 毛片网站在线看| 婷婷开心激情综合| 亚洲成人你懂的| 污视频在线免费观看一区二区三区 | 日韩不卡av在线| 99热国内精品永久免费观看| 久久亚洲精品成人| av资源吧首页| 日日夜夜精品免费视频| 国产美女精品免费电影| www黄色网址| 99精品国产视频| 日韩av在线一区二区三区| 亚洲s色大片| 亚洲一区自拍偷拍| 免费观看精品视频| 亚洲资源在线| 日韩av一卡二卡| 阿v天堂2014| 欧美日本一区| 日本老师69xxx| 一级成人免费视频| 成人动漫在线一区| 亚洲视频在线二区| 99热99re6国产在线播放| 色先锋aa成人| 四虎国产精品免费| 国产videos久久| 久久久国产一区| 日韩一区二区视频在线| 久久99精品国产麻豆婷婷洗澡| 成人av片网址| 成年人在线观看网站| 亚洲图片欧美一区| 亚洲欧洲日本精品| 免费萌白酱国产一区二区三区| 原创国产精品91| 欧美黄色免费观看| 青青草97国产精品免费观看无弹窗版 | 99re热视频精品| 男人的天堂成人| 成人性生交大片免费观看网站| 91精品国产麻豆| 丰满少妇高潮一区二区| 亚洲国产电影| 成人午夜在线视频一区| 四虎国产精品永远| 亚洲一区在线播放| 亚洲欧美日本一区二区| 蜜臀91精品国产高清在线观看| 欧美成人午夜免费视在线看片| 波多野结衣人妻| 91啪九色porn原创视频在线观看| 美女av免费观看| 97色婷婷成人综合在线观看| 亚洲人成亚洲人成在线观看| 日韩欧美亚洲国产| 国产精品乡下勾搭老头1| 五码日韩精品一区二区三区视频| 国产色播av在线| 日韩欧美黄色影院| 午夜激情福利网| 久久精品国产99国产精品| 欧美日韩一区综合| 亚洲欧美一区二区三区| 亚洲国产成人在线播放| 九九热精品免费视频| 激情成人综合网| 亚洲欧洲精品一区| 国产超碰精品| 亚洲美女福利视频网站| 91在线看视频| 99久久精品国产观看| 日本熟妇人妻xxxx| 最新国产精品精品视频| 欧美成年人视频网站欧美| 国产精品无码在线播放| 亚洲欧洲成人自拍| 四季av一区二区三区| 三上亚洲一区二区| 成人福利免费观看| www.欧美日本韩国| 日韩精品影音先锋| 国产精品变态另类虐交| 不卡的av电影在线观看| 你真棒插曲来救救我在线观看| 久久99偷拍| 91高清免费在线观看| 黄色在线播放| 欧美日韩中文字幕一区二区| 欧美一级特黄高清视频| 国产一区二区网址| 国产精品久久久久久久久电影网| 国产精品乱战久久久| 2024亚洲男人天堂| av在线电影免费观看| 欧美高清视频不卡网| 精品处破女学生| 91美女在线观看| 免费激情视频在线观看| 99久久亚洲精品| 成人h视频在线观看| 蜜臀久久精品| 综合久久五月天| h片在线免费看| 天天影视色香欲综合网老头| 无码 人妻 在线 视频| 久久99久久99| 免费成人午夜视频| 日韩久久电影| 成人午夜电影免费在线观看| 91探花在线观看| 亚洲人午夜色婷婷| 在线播放国产一区| 中文字幕日韩一区| 最新版天堂资源在线| 一本一道久久综合狠狠老精东影业| 久久综合色一本| 四虎国产精品免费久久5151| 欧美成人精品xxx| 人妻一区二区三区| 欧美色网站导航| 久久久久久蜜桃| 久久亚洲私人国产精品va媚药| 亚洲性生活网站| 亚洲国产精品日韩专区av有中文| yellow视频在线观看一区二区| 黄色软件视频在线观看| 最近中文字幕日韩精品| 国产又粗又猛又爽又黄视频 | 中文字幕日韩高清| 超碰在线观看av| 色综合久久九月婷婷色综合| 国产精品夜夜夜爽阿娇| 99久久精品国产一区二区三区 | 日本不卡在线视频| 日韩视频一二三| 色97色成人| 国产一区免费| 亚洲欧美久久精品| 91tv亚洲精品香蕉国产一区7ujn| 免费在线午夜视频| 亚洲国产成人精品一区二区| 国产视频手机在线观看| 欧美午夜精品久久久久久浪潮| 99热这里只有精品4| 久久亚洲精品小早川怜子| 日本一区二区三区在线免费观看| 亚洲专区一区| 99re6这里有精品热视频| 成人免费av| 久久久精彩视频| 久久精品免视看国产成人| 日本精品性网站在线观看| 日本在线视频中文有码| 在线亚洲欧美视频| 粉嫩绯色av一区二区在线观看| 久久91亚洲人成电影网站| 少妇一区二区三区四区| 欧美性感一类影片在线播放| 国产在线视频你懂的| 国产精品久久久久影院| 我和岳m愉情xxxⅹ视频| 国内不卡的二区三区中文字幕 | 色婷婷精品久久二区二区蜜臂av | 国产精品久久久久久久99| 国产精品黄色片| 亚洲女人的天堂| 日本精品一二三| 久久电影网电视剧免费观看| 亚洲天堂网一区| 老妇喷水一区二区三区| 国产综合av在线| 一区二区亚洲| 成人在线播放网址| 欧美在线三级| 日韩人妻精品一区二区三区| 希岛爱理一区二区三区| 一本久久a久久精品vr综合| 亚洲欧美成人vr| 国产经典一区二区三区| 99久久香蕉| 成人久久18免费网站漫画| 99久久99九九99九九九| 国产精品久久不能| 全球中文成人在线| 国产欧美精品在线播放| 深夜福利亚洲| 亚洲综合第一页| 视频成人永久免费视频| 97se亚洲综合在线| 日本在线成人| 久久久亚洲综合网站| 日韩有码一区| 免费在线成人av电影| 国产精品自在| 欧美一级二级三级| 激情综合网站| 亚洲国产精品一区二区第一页| 99久久精品网站| 今天免费高清在线观看国语| 欧美日韩国产欧| 男人的天堂狠狠干| 日本欧美一区二区| 午夜一区二区视频| 国产一区二区三区黄视频| 日本va欧美va瓶| 日韩精品免费播放| 久久久水蜜桃av免费网站| 国产福利视频在线播放| 蜜桃91丨九色丨蝌蚪91桃色| 日本中文字幕影院| 国产精品一区二区x88av| 性一交一黄一片| 久久综合av免费| 娇小11一12╳yⅹ╳毛片| 亚洲女同女同女同女同女同69| 欧美丰满艳妇bbwbbw| 性感美女极品91精品| 久久久久久在线观看| 欧美性大战久久久久久久| 亚洲国产精品18久久久久久| 在线一级成人| 日本一区免费看| 亚洲精品成人影院| 久草青青在线观看| 久久99国产精品久久99果冻传媒| 中国特级黄色片| 国产精品精品国产色婷婷| 极品颜值美女露脸啪啪| 午夜精品在线视频一区| 国产情侣小视频| 亚洲国产又黄又爽女人高潮的| 春暖花开成人亚洲区| 欧美成年人视频网站| 日韩国产网站| 99在线看视频| 不卡av一区二区| av一区二区三区免费观看| 丝袜美腿高跟呻吟高潮一区| 免费观看黄网站| 91免费版在线| 日韩欧美亚洲国产| 欧美日本免费一区二区三区| 五月婷婷六月丁香| 欧美精品videos| 亚洲欧洲日韩精品在线| 久久综合一区| 亚洲三级电影在线观看| 中文字幕 欧美日韩| 欧美zozo| 亚洲天堂成人在线视频| 99自拍视频在线观看| 国产精品欧美日韩久久| 一二三级黄色片| 狠狠色综合播放一区二区| 大黑人交xxx极品hd| 亚洲高清视频中文字幕| 一区二区三区播放| 亚洲片在线观看| 高清不卡av| 精品一区久久久| 国产精品v日韩精品v欧美精品网站| 欧美精品性生活| 久久久久国产精品麻豆| 久久久久亚洲av片无码下载蜜桃| 欧美日韩成人综合天天影院| 黄色毛片在线观看| 日本欧美中文字幕| 一区二区三区韩国免费中文网站| 又大又硬又爽免费视频| 国产一区二区在线免费观看| jizz18女人高潮| 色综合天天综合色综合av| 亚洲免费一级片| 久久久久久久久久国产| 日韩成人在线看| 超碰10000| 国产成人在线免费观看| 极品蜜桃臀肥臀-x88av| 欧美三级日韩三级国产三级| 欧美香蕉爽爽人人爽| 51视频国产精品一区二区| 亚洲第一福利社区| 精品中文字幕av| 91丨porny丨蝌蚪视频| 天干夜夜爽爽日日日日| 亚洲色图第三页| free欧美| 老汉色影院首页| 国产一区二区调教| 国产乱国产乱老熟300| 精品国产免费视频| 欧美男男激情videos| 国精产品一区二区| 日韩精品色哟哟| 日本视频在线免费| 偷拍亚洲色图| 国产 日韩 欧美在线| 99国产精品久久久久久久久久 | 国产白丝精品91爽爽久久| 欧美精品一区二区蜜桃| 精品日韩欧美在线| 国内小视频在线看| 国产精品 日韩| 亚洲欧美日韩国产一区二区| 亚洲天堂视频一区| 欧美一区二区在线看| 亚洲妇熟xxxx妇色黄| 国产区一区二区三区| 日韩电影在线免费| 国产美女久久久久久| 精品国产一区二区三区不卡 | 日韩极品在线观看| 久久无码av三级| 日韩中文字幕在线观看视频| 一夜七次郎国产精品亚洲| 久久91视频| 国产一线二线三线女| 国产三级精品视频| 一区二区的视频| 欧美xxxx14xxxxx性爽| 亚洲天堂最新地址| 日韩视频免费观看高清完整版| av中文字幕在线看| 日本精品免费| 成人网页在线观看| 久草视频在线免费|