精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flume架構與源碼分析-MemoryChannel事務實現

開發 開發工具
Flume提供了可靠地日志采集功能,其高可靠是通過事務機制實現的。而對于Channel的事務我們本部分會介紹MemoryChannel和FileChannel的實現。

[[177429]]

Flume提供了可靠地日志采集功能,其高可靠是通過事務機制實現的。而對于Channel的事務我們本部分會介紹MemoryChannel和FileChannel的實現。

首先我們看下BasicChannelSemantics實現:

Java代碼

  1. public abstract class BasicChannelSemantics extends AbstractChannel {   
  2.   //1、事務使用ThreadLocal存儲,保證事務線程安全   
  3.   private ThreadLocal<BasicTransactionSemantics> currentTransaction   
  4.       = new ThreadLocal<BasicTransactionSemantics>();   
  5.    
  6.   private boolean initialized = false;   
  7.   //2、進行一些初始化工作   
  8.   protected void initialize() {}   
  9.   //3、提供給實現類的創建事務的回調   
  10.   protected abstract BasicTransactionSemantics createTransaction();   
  11.   //4、往Channel放Event,其直接委托給事務的put方法實現   
  12.   @Override   
  13.   public void put(Event event) throws ChannelException {   
  14.     BasicTransactionSemantics transaction = currentTransaction.get();   
  15.     Preconditions.checkState(transaction != null,   
  16.         "No transaction exists for this thread");   
  17.     transaction.put(event);   
  18.   }   
  19.   //5、從Channel獲取Event,也是直接委托給事務的take方法實現   
  20.   @Override   
  21.   public Event take() throws ChannelException {   
  22.     BasicTransactionSemantics transaction = currentTransaction.get();   
  23.     Preconditions.checkState(transaction != null,   
  24.         "No transaction exists for this thread");   
  25.     return transaction.take();   
  26.   }   
  27.    
  28.   //6、獲取事務,如果本實例沒有初始化則先初始化;否則先從ThreadLocal獲取事務,如果沒有或者關閉了則創建一個并綁定到ThreadLocal。   
  29.   @Override   
  30.   public Transaction getTransaction() {   
  31.    
  32.     if (!initialized) {   
  33.       synchronized (this) {   
  34.         if (!initialized) {   
  35.           initialize();   
  36.           initialized = true;   
  37.         }   
  38.       }   
  39.     }   
  40.    
  41.     BasicTransactionSemantics transaction = currentTransaction.get();   
  42.     if (transaction == null || transaction.getState().equals(   
  43.             BasicTransactionSemantics.State.CLOSED)) {   
  44.       transaction = createTransaction();   
  45.       currentTransaction.set(transaction);   
  46.     }   
  47.     return transaction;   
  48.   }   
  49. }   

MemoryChannel事務實現

首先我們來看下MemoryChannel的實現,其是一個純內存的Channel實現,整個事務操作都是在內存中完成。首先看下其內存結構:

1、首先由一個Channel Queue用于存儲整個Channel的Event數據;

2、每個事務都有一個Take Queue和Put Queue分別用于存儲事務相關的取數據和放數據,等事務提交時才完全同步到Channel Queue,或者失敗把取數據回滾到Channel Queue。

MemoryChannel時設計時考慮了兩個容量:Channel Queue容量和事務容量,而這兩個容量涉及到了數量容量和字節數容量。

另外因為多個事務要操作Channel Queue,還要考慮Channel Queue的動態擴容問題,因此MemoryChannel使用了鎖來實現;而容量問題則使用了信號量來實現。

在configure方法中進行了一些參數的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何計算的:

Java代碼

  1. try {   
  2.   capacity = context.getInteger("capacity", defaultCapacity);   
  3. } catch(NumberFormatException e) {   
  4.   capacity = defaultCapacity;   
  5. }   
  6.    
  7. if (capacity <= 0) {   
  8.   capacity = defaultCapacity;   
  9. }    

即首先從配置文件讀取數量容量,如果沒有配置則是默認容量(默認100),而配置的容量小于等于0,則也是默認容量。

接下來是初始化事務數量容量:

Java代碼

  1. try {   
  2.   transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);   
  3. } catch(NumberFormatException e) {   
  4.   transCapacity = defaultTransCapacity;   
  5. }   
  6. if (transCapacity <= 0) {   
  7.   transCapacity = defaultTransCapacity;   
  8. }   
  9. Preconditions.checkState(transCapacity <= capacity,   
  10. "Transaction Capacity of Memory Channel cannot be higher than " +   
  11.         "the capacity.");   

整個過程和Channel Queue數量容量初始化類似,但是***做了前置條件判斷,事務容量必須小于等于Channel Queue容量。

接下來是字節容量限制:

Java代碼

  1. try {   
  2.   byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);   
  3. } catch(NumberFormatException e) {   
  4.   byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;   
  5. }   
  6. try {   
  7.   byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);   
  8.   if (byteCapacity < 1) {   
  9.     byteCapacity = Integer.MAX_VALUE;   
  10.   }   
  11. } catch(NumberFormatException e) {   
  12.   byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);   
  13. }    

byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比參數,即我們定義的字節容量和實際事件容量的百分比,因為我們定義的字節容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的內存占用,可以認為該參數定義了Event header占了實際字節容量的百分比,默認20%;

byteCapacity:首先讀取配置文件定義的byteCapacity,如果沒有定義,則使用默認defaultByteCapacity,而defaultByteCapacity默認是JVM物理內存的80%(Runtime.getRuntime().maxMemory() * .80);那么實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認100,即計算百分比的一個系數。

接下來定義keepAlive參數:

Java代碼

  1. try {   
  2.   keepAlive = context.getInteger("keep-alive", defaultKeepAlive);   
  3. } catch(NumberFormatException e) {   
  4.   keepAlive = defaultKeepAlive;   
  5. }    

keepAlive定義了操作Channel Queue的等待超時事件,默認3s。

接著初始化Channel Queue:

Java代碼

  1. if(queue != null) {   
  2.   try {   
  3.     resizeQueue(capacity);   
  4.   } catch (InterruptedException e) {   
  5.     Thread.currentThread().interrupt();   
  6.   }   
  7. else {   
  8.   synchronized(queueLock) {   
  9.     queue = new LinkedBlockingDeque<Event>(capacity);   
  10.     queueRemaining = new Semaphore(capacity);   
  11.     queueStored = new Semaphore(0);   
  12.   }   
  13. }    

首先如果Channel Queue不為null,表示動態擴容;否則進行Channel Queue的創建。

首先看下***創建Channel Queue,首先使用queueLock鎖定,即在操作Channel Queue時都需要鎖定,因為之前說過Channel Queue可能動態擴容,然后初始化信號量:Channel Queue剩余容量和向Channel Queue申請存儲的容量,用于事務操作中預占Channel Queue容量。

接著是調用resizeQueue動態擴容:

Java代碼

  1. private void resizeQueue(int capacity) throws InterruptedException {   
  2.   int oldCapacity;   
  3.   synchronized(queueLock) { //首先計算擴容前的Channel Queue的容量   
  4.     oldCapacity = queue.size() + queue.remainingCapacity();   
  5.   }   
  6.    
  7.   if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要擴容   
  8.     return;   
  9.   } else if (oldCapacity > capacity) {//如果老容量大于新容量,縮容   
  10.     //首先要預占老容量-新容量的大小,以便縮容容量   
  11. if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {   
  12.    //如果獲取失敗,默認是記錄日志然后忽略   
  13. else {   
  14.   //否則,直接縮容,然后復制老Queue的數據,縮容時需要鎖定queueLock,因為這一系列操作要線程安全   
  15.       synchronized(queueLock) {   
  16.         LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);   
  17.         newQueue.addAll(queue);   
  18.         queue = newQueue;   
  19.       }   
  20.     }   
  21.   } else {   
  22.     //如果不是縮容,則直接擴容即可   
  23.     synchronized(queueLock) {   
  24.       LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);   
  25.       newQueue.addAll(queue);   
  26.       queue = newQueue;   
  27. }   
  28. //增加/減少Channel Queue的新的容量   
  29.     queueRemaining.release(capacity - oldCapacity);   
  30.   }   
  31. }   
  32.    
  33. 到此,整個Channel Queue相關的數據初始化完畢,接著會調用start方法進行初始化:   
  34. public synchronized void start() {   
  35.   channelCounter.start();   
  36.   channelCounter.setChannelSize(queue.size());   
  37.   channelCounter.setChannelCapacity(Long.valueOf(   
  38.           queue.size() + queue.remainingCapacity()));   
  39.   super.start();   
  40. }    

此處初始化了一個ChannelCounter,是一個計數器,記錄如當前隊列放入Event數、取出Event數、成功數等。

之前已經分析了大部分Channel會把put和take直接委托給事務去完成,因此接下來看下MemoryTransaction的實現。

首先看下MemoryTransaction的初始化:

Java代碼

  1. private class MemoryTransaction extends BasicTransactionSemantics {   
  2.   private LinkedBlockingDeque<Event> takeList;   
  3.   private LinkedBlockingDeque<Event> putList;   
  4.   private final ChannelCounter channelCounter;   
  5.   private int putByteCounter = 0;   
  6.   private int takeByteCounter = 0;   
  7.   public MemoryTransaction(int transCapacity, ChannelCounter counter) {   
  8.     putList = new LinkedBlockingDeque<Event>(transCapacity);   
  9.     takeList = new LinkedBlockingDeque<Event>(transCapacity);   
  10.     channelCounter = counter;   
  11.   }    

可以看出MemoryTransaction涉及到兩個事務容量大小定義的隊列(鏈表阻塞隊列)、隊列字節計數器、另外一個是Channel操作的計數器。

事務中的放入操作如下:

Java代碼

  1. protected void doPut(Event event) throws InterruptedException {   
  2.   //1、增加放入事件計數器   
  3.   channelCounter.incrementEventPutAttemptCount();   
  4.   //2、estimateEventSize計算當前Event body大小   
  5.   int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);   
  6.   //3、往事務隊列的putList中放入Event,如果滿了,則拋異常回滾事務   
  7.   if (!putList.offer(event)) {   
  8.       throw new ChannelException(   
  9.       "Put queue for MemoryTransaction of capacity " +   
  10.         putList.size() + " full, consider committing more frequently, " +   
  11.         "increasing capacity or increasing thread count");   
  12.   }   
  13.   //4、增加放入隊列字節數計數器   
  14.   putByteCounter += eventByteSize;   
  15. }    

整個doPut操作相對來說比較簡單,就是往事務putList隊列放入Event,如果滿了則直接拋異常回滾事務;否則放入putList暫存,等事務提交時轉移到Channel Queue。另外需要增加放入隊列的字節數計數器,以便之后做字節容量限制。

接下來是事務中的取出操作:

Java代碼

  1. protected Event doTake() throws InterruptedException {   
  2.   //1、增加取出事件計數器   
  3.   channelCounter.incrementEventTakeAttemptCount();   
  4.   //2、如果takeList隊列沒有剩余容量,即當前事務已經消費了***容量的Event   
  5.   if(takeList.remainingCapacity() == 0) {   
  6.     throw new ChannelException("Take list for MemoryTransaction, capacity " +   
  7.         takeList.size() + " full, consider committing more frequently, " +   
  8.         "increasing capacity, or increasing thread count");   
  9.   }   
  10.   //3、queueStored試圖獲取一個信號量,超時直接返回null   
  11.   if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {   
  12.     return null;   
  13.   }   
  14.   //4、從Channel Queue獲取一個Event   
  15.   Event event;   
  16.   synchronized(queueLock) {//對Channel Queue的操作必須加queueLock,因為之前說的動態擴容問題   
  17.     event = queue.poll();   
  18.   }   
  19.   //5、因為信號量的保證,Channel Queue不應該返回null,出現了就不正常了   
  20.   Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +   
  21.       "signalling existence of entry");   
  22.   //6、暫存到事務的takeList隊列   
  23.   takeList.put(event);   
  24.   //7、計算當前Event body大小并增加取出隊列字節數計數器   
  25.   int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);   
  26.   takeByteCounter += eventByteSize;   
  27.   return event;   
  28. }   

接下來是提交事務:

Java代碼

  1. protected void doCommit() throws InterruptedException {   
  2.   //1、計算改變的Event數量,即取出數量-放入數量;如果放入的多,那么改變的Event數量將是負數   
  3.   int remainingChange = takeList.size() - putList.size();   
  4.   //2、  如果remainingChange小于0,則需要獲取Channel Queue剩余容量的信號量   
  5.   if(remainingChange < 0) {   
  6.     //2.1、首先獲取putByteCounter個字節容量信號量,如果失敗說明超過字節容量限制了,回滾事務   
  7.     if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {   
  8.       throw new ChannelException("Cannot commit transaction. Byte capacity " +   
  9.         "allocated to store event body " + byteCapacity * byteCapacitySlotSize +   
  10.         "reached. Please increase heap space/byte capacity allocated to " +   
  11.         "the channel as the sinks may not be keeping up with the sources");   
  12.     }   
  13.     //2.2、獲取Channel Queue的-remainingChange個信號量用于放入-remainingChange個Event,如果獲取不到,則釋放putByteCounter個字節容量信號量,并拋出異常回滾事務   
  14.     if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {   
  15.       bytesRemaining.release(putByteCounter);   
  16.       throw new ChannelFullException("Space for commit to queue couldn't be acquired." +   
  17.           " Sinks are likely not keeping up with sources, or the buffer size is too tight");   
  18.     }   
  19.   }   
  20.   int puts = putList.size();   
  21.   int takes = takeList.size();   
  22.   synchronized(queueLock) {//操作Channel Queue時一定要鎖定queueLock   
  23.     if(puts > 0 ) {   
  24.       while(!putList.isEmpty()) { //3.1、如果有Event,則循環放入Channel Queue   
  25.         if(!queue.offer(putList.removeFirst())) {    
  26.           //3.2、如果放入Channel Queue失敗了,說明信號量控制出問題了,這種情況不應該發生   
  27.           throw new RuntimeException("Queue add failed, this shouldn't be able to happen");   
  28.         }   
  29.       }   
  30.     }   
  31.     //4、操作成功后,清空putList和takeList隊列   
  32.     putList.clear();   
  33.     takeList.clear();   
  34.   }   
  35.   //5.1、釋放takeByteCounter個字節容量信號量   
  36.   bytesRemaining.release(takeByteCounter);   
  37.   //5.2、重置字節計數器   
  38.   takeByteCounter = 0;   
  39.   putByteCounter = 0;   
  40.   //5.3、釋放puts個queueStored信號量,這樣doTake方法就可以獲取數據了   
  41.   queueStored.release(puts);   
  42.   //5.4、釋放remainingChange個queueRemaining信號量   
  43.   if(remainingChange > 0) {   
  44.     queueRemaining.release(remainingChange);   
  45.   }   
  46.   //6、ChannelCounter一些數據計數   
  47.   if (puts > 0) {   
  48.     channelCounter.addToEventPutSuccessCount(puts);   
  49.   }   
  50.   if (takes > 0) {   
  51.     channelCounter.addToEventTakeSuccessCount(takes);   
  52.   }   
  53.    
  54.   channelCounter.setChannelSize(queue.size());   
  55. }    

此處涉及到兩個信號量:

queueStored表示Channel Queue已存儲事件容量(已存儲的事件數量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。queueStored信號量默認為0。當doTake取出Event時減少一個queueStored信號量,當doCommit提交事務時需要增加putList 隊列大小的queueStored信號量,當doRollback回滾事務時需要減少takeList隊列大小的queueStored信號量。

queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數量),取出事件成功時+N,放入事件成功時-N。queueRemaining信號量默認為Channel Queue容量。其在提交事務時首先通過remainingChange = takeList.size() - putList.size()計算獲得需要增加多少變更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange個事件放入,此時應該減少-queueRemaining信號量;而如果大于0,則表示取出的事件比放入的多,表示有queueRemaining個事件取出,此時應該增加queueRemaining信號量;即消費事件時減少信號量,生產事件時增加信號量。

而bytesRemaining是字節容量信號量,超出容量則回滾事務。

***看下回滾事務:

Java代碼

  1. protected void doRollback() {   
  2.     int takes = takeList.size();   
  3.     synchronized(queueLock) { //操作Channel Queue時一定鎖住queueLock   
  4.       //1、前置條件判斷,檢查是否有足夠容量回滾事務   
  5.       Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +   
  6.           "queue to rollback takes. This should never happen, please report");   
  7.       //2、回滾事務的takeList隊列到Channel Queue   
  8.       while(!takeList.isEmpty()) {   
  9.         queue.addFirst(takeList.removeLast());   
  10.       }   
  11.       putList.clear();   
  12.     }   
  13.     //3、釋放putByteCounter個bytesRemaining信號量   
  14.     bytesRemaining.release(putByteCounter);   
  15.    
  16.     //4、計數器重置   
  17.     putByteCounter = 0;   
  18.     takeByteCounter = 0;   
  19.     //5、釋放takeList隊列大小個已存儲事件容量   
  20.     queueStored.release(takes);   
  21.     channelCounter.setChannelSize(queue.size());   
  22.   }   
  23. }    

也就是說在回滾時,需要把takeList中暫存的事件回滾到Channel Queue,并回滾queueStored信號量。

【本文是51CTO專欄作者張開濤的原創文章,作者微信公眾號:開濤的博客,id:kaitao-1234567】

責任編輯:武曉燕 來源: 開濤的博客
相關推薦

2016-11-25 13:14:50

Flume架構源碼

2016-11-29 09:38:06

Flume架構核心組件

2016-11-25 13:26:50

Flume架構源碼

2020-08-19 09:45:29

Spring數據庫代碼

2023-12-29 18:53:58

微服務Saga模式

2024-06-13 09:25:14

2010-09-24 19:12:11

SQL隱性事務

2024-04-17 08:11:01

數據庫事務流程

2021-08-06 08:33:27

Springboot分布式Seata

2024-05-28 00:00:30

Golang數據庫

2013-03-19 10:35:24

Oracle

2025-01-26 15:38:11

Spring事務編程式

2010-01-22 18:01:55

2016-11-15 14:18:09

神策分析大數據數據分析

2011-04-29 13:40:37

MongoDBCommand

2025-06-26 08:28:18

2020-10-09 14:13:04

Zookeeper Z

2016-10-21 13:03:18

androidhandlerlooper

2021-09-08 10:47:33

Flink執行流程

2024-06-11 13:50:43

點贊
收藏

51CTO技術棧公眾號

国产精品久久久久久久久粉嫩av| 亚洲色图五月天| 国产一区二区片| 视频污在线观看| 日韩专区一卡二卡| 深夜福利亚洲导航| 国产女主播在线播放| 成人美女黄网站| 综合婷婷亚洲小说| 国产欧美日韩一区二区三区| 中文字幕精品无| 自拍偷拍欧美专区| 日韩精品一区二区视频| 国产精品成人一区二区三区吃奶| 人妻互换一区二区激情偷拍| 一区二区三区在线资源| 一本色道久久综合亚洲91| 亚洲最大的网站| 91porny在线| 911亚洲精品| 亚洲国产一区视频| 91国产丝袜在线放| 在线观看亚洲黄色| 在线精品一区二区| 久久久国产精品一区| 日韩av在线看免费观看| 51精品国产| 91精品国产综合久久久久久久久久 | 国产精品乱码妇女bbbb| 激情视频在线观看一区二区三区| 欧美偷拍第一页| 国产成人精品999在线观看| 欧美一区二区成人| xxxx一级片| 国产黄色片在线观看| 国产激情一区二区三区四区 | 99国产精品国产精品毛片| 91精品国产综合久久男男| 中文字幕视频网| 激情久久一区| 欧美激情视频在线免费观看 欧美视频免费一| 日本二区在线观看| 欧美顶级毛片在线播放| 精品日韩一区二区三区| www.久久com| 四虎影视国产精品| 国产精品国产馆在线真实露脸 | 国产精品igao网网址不卡| 一二区成人影院电影网| 日韩欧美国产网站| 草草久久久无码国产专区| h片精品在线观看| 91在线视频在线| 国产日韩精品一区观看| 亚洲欧美激情在线观看| 国产成人在线看| 97欧洲一区二区精品免费| 国产乱叫456在线| 久久99国产精品免费| 国产欧美精品一区二区三区-老狼 国产欧美精品一区二区三区介绍 国产欧美精品一区二区 | 国产精品高清一区二区| 欧美日韩精品欧美日韩精品| 久热精品在线播放| 五月天色综合| 91精品欧美久久久久久动漫| 少妇丰满尤物大尺度写真| 午夜久久av| 精品播放一区二区| 一区二区三区免费在线观看视频 | 色噜噜一区二区| 国产区高清在线| 国产精品色眯眯| 手机在线视频你懂的| 中中文字幕av在线| av色综合久久天堂av综合| 国产精成人品localhost| 日本激情视频网站| 久久久综合视频| 亚洲国产精品一区二区第一页| 视频一区二区三区不卡| 一区二区三区中文字幕| 一卡二卡三卡视频| xx欧美视频| 欧美日韩视频在线观看一区二区三区 | 久久国产免费看| 999热视频| 亚洲av片在线观看| 国产精品女主播av| 日韩精品久久一区二区| 亚洲精品一区| 欧美日韩dvd在线观看| 熟妇无码乱子成人精品| 露出调教综合另类| 日韩在线视频网站| 久久久一二三区| 爽好多水快深点欧美视频| 欧美大尺度在线观看| 免费毛片一区二区三区| 久久综合网络一区二区| 91在线|亚洲| 手机看片福利在线观看| 国产精品1区二区.| 久久综合久久久| 麻豆传媒在线免费看| 五月综合激情婷婷六月色窝| 天天干天天玩天天操| 国产精品自在线拍| www国产精品视频| 国产精品黄色大片| 久久精品久久99精品久久| 国产精品视频免费一区二区三区| sese一区| 精品美女久久久久久免费| 婷婷久久五月天| 欧美14一18处毛片| 欧美日韩一二三| 一本加勒比波多野结衣| 亚洲无中文字幕| 国产精品av在线| 亚洲精品久久久久久久久久 | 国产精品色午夜在线观看| 国产肥老妇视频| 国产精品国模大尺度视频| 久久久999视频| 99精品国产高清一区二区麻豆| 中文字幕在线看视频国产欧美在线看完整 | 性囗交免费视频观看| 91嫩草亚洲精品| 国产成人精品电影| 无码精品在线观看| 亚洲制服欧美中文字幕中文字幕| 亚洲精品www.| 精品福利久久久| 欧美一级片久久久久久久| 午夜久久久久久久久久| 中文字幕中文字幕在线一区| 毛片一区二区三区四区| 国产精品主播在线观看| 久久理论片午夜琪琪电影网| 国产视频一区二区三| 国产精品久久久久婷婷| 中文字幕第21页| 国产欧美一区二区三区精品观看| 91精品国产91久久久久久久久| 精品人妻少妇AV无码专区| 亚洲欧美综合色| 亚洲午夜激情影院| 最新国产精品久久久| 成人精品久久av网站| 色欧美激情视频在线| 欧美亚洲国产一区二区三区| av电影网站在线观看| 日日摸夜夜添夜夜添精品视频| 欧美日本亚洲| 综合在线影院| 一区国产精品视频| 一道本在线视频| 亚洲欧洲在线观看av| 福利片一区二区三区| 久久久久久高清| 成人全视频高清免费观看| 色综合婷婷久久| 国产传媒国产传媒| 久久精品国产精品亚洲精品| 99re99热| 999国产精品一区| 91精品国产91久久久久久最新 | 欧美理论电影在线播放| 日韩精品一区二区在线播放| 丁香啪啪综合成人亚洲小说 | 精品视频久久久久久久| 久久精品国产成人av| 国产性做久久久久久| 日本一区二区免费高清视频| 久久wwww| 在线激情影院一区| 国产又黄又粗又长| 一区二区三区四区不卡在线 | 国产又粗又猛又爽又黄91| 国产精品久久久一本精品| 亚洲国产日韩在线一区| 亚洲精选国产| 日韩欧美一区二区三区四区五区| 欧洲美女精品免费观看视频| 久久久久国产精品www| 男人天堂网在线| 欧美精品三级日韩久久| 动漫精品一区一码二码三码四码| 91麻豆精品在线观看| 日韩av片专区| 国产精品一页| 在线视频一区观看| 精品视频在线你懂得| 国产精品国产三级国产专播精品人| 日本不卡视频| 日韩成人小视频| 国产一区二区三区视频免费观看 | 久久精品亚洲精品| 日本美女一级片| 欧美日韩国产大片| 日本一本高清视频| 国产精品第13页| 国产精品伦子伦| 精品一区二区影视| www国产黄色| 欧美jizzhd精品欧美巨大免费| 国偷自产av一区二区三区小尤奈| 久久久免费人体| 2019精品视频| 呦呦在线视频| 中文字幕在线观看日韩| 人成免费电影一二三区在线观看| 7777精品伊人久久久大香线蕉| 午夜影院免费在线观看| 一二三区精品福利视频| 欧美aaa级片| 久久亚洲一区二区三区四区| 欧美日韩一区二区区| 美女一区二区视频| 免费高清在线观看免费| 欧美一区高清| 精品91一区二区三区| 精品黄色一级片| 久久亚洲国产精品日日av夜夜| 日韩一区二区三区色| 91精品久久久久久久久久久久久久| 原纱央莉成人av片| 午夜精品久久久99热福利| 中文在线手机av| 久久这里只有精品视频首页| freemovies性欧美| 亚洲性视频网址| 免费在线视频一级不卡| 亚洲精品黄网在线观看| 亚洲欧美另类综合| 精品国产sm最大网站免费看| 99热这里只有精品66| 在线成人免费观看| 中文字幕人妻一区二区在线视频 | 国产精品主播一区二区| 91成人在线观看喷潮| 日本一级免费视频| 2014亚洲片线观看视频免费| 国产高潮视频在线观看| 国产不卡视频在线观看| 亚洲成人激情小说| 国产福利精品一区二区| 手机在线观看日韩av| 国产一区二区日韩精品| 97超碰人人看| 国产福利一区二区| 国产成人精品一区二区三区在线观看 | 一区二区免费不卡在线| 在线视频不卡一区二区| 亚洲精品电影| 欧美黑人在线观看| 好看不卡的中文字幕| 免费超爽大片黄| 国产乱码精品| 青青草精品视频在线观看| 日本va欧美va欧美va精品| 欧美精品久久久久久久久25p| 老司机一区二区| 色黄视频免费看| 波多野结衣在线aⅴ中文字幕不卡| 漂亮人妻被黑人久久精品| 久久综合九色综合97婷婷女人| 免费观看av网站| 国产精品天天看| 精品99在线观看| 欧美日韩国产页| 丰满熟女人妻一区二区三| 在线电影欧美成精品| 亚洲av无码一区二区三区dv| 亚洲激情在线观看| 高h视频在线| 久热精品视频在线| 阿v视频在线| 国产精品成人av在线| 国产午夜久久av| 久久精品ww人人做人人爽| 欧美日韩有码| 菠萝蜜视频在线观看入口| 先锋影音国产一区| 国产精品自在自线| 成人av在线资源网| 国产精品久久久久久久av| 亚洲精品亚洲人成人网| 色婷婷在线观看视频| 欧美日韩综合不卡| 亚洲精品一区二区三区四区| 亚洲色图35p| 久久亚洲资源| 国产精品香蕉国产| 第一区第二区在线| 亚洲一区二区在线免费观看| 黄色成人91| 亚洲 激情 在线| 99这里只有久久精品视频| 国产无遮挡在线观看| 亚洲成人免费观看| 亚洲一区二区视频在线播放| 亚洲国产毛片完整版| 日韩在线观看www| 7m精品福利视频导航| 秋霞影院一区| 欧洲久久久久久| 伊人久久大香线蕉综合网站 | 欧美日韩国产欧美日美国产精品| 亚洲精品成人区在线观看| 中文字幕日韩av综合精品| 91九色在线看| 亚洲综合在线做性| 日本一二区不卡| 亚洲熟妇av日韩熟妇在线| 国产在线不卡一区| 性欧美一区二区| 精品久久久久久久中文字幕| www.黄色片| 中文字幕久久精品| 女生影院久久| 久久波多野结衣| 亚洲高清av| 欧美日韩一区二区区| 亚洲人亚洲人成电影网站色| 亚洲av无码乱码国产精品fc2| 亚洲激情中文字幕| 黄页网站大全在线免费观看| 亚洲精品日韩av| 国产精品久久久久无码av| 亚洲精品视频导航| 国产亚洲精品aa| 国产精品第5页| 日韩精品在线观看网站| 成年人国产在线观看| 97人摸人人澡人人人超一碰| 亚洲人metart人体| 中文字幕免费高清在线| 国产精品女主播av| 中文字幕在线观看精品| 一区二区成人精品| 日韩成人亚洲| 手机在线观看国产精品| 日韩电影一二三区| 51妺嘿嘿午夜福利| 91久久精品日日躁夜夜躁欧美| 日本午夜在线| 国产成人福利网站| 欧美久久精品一级c片| 丰满少妇在线观看| 国产精品国产三级国产| 国产精品久久免费| 欧美成人高清视频| 凹凸av导航大全精品| 18禁裸男晨勃露j毛免费观看| 成人高清视频在线观看| 在线能看的av| 亚洲日本成人网| 黄色欧美视频| 亚洲成人动漫在线| 岛国一区二区在线观看| 日韩经典在线观看| 亚洲精品一区中文| 欧美xxxx做受欧美护士| 在线不卡视频一区二区| 国产精品一区二区久激情瑜伽| 久久久久97国产| 日韩精品中文在线观看| 色尼玛亚洲综合影院| 亚洲欧洲一区二区| 国产一区二区在线电影| 精品亚洲永久免费| 亚洲欧美另类国产| 在线看女人毛片| 国产 高清 精品 在线 a | 潘金莲激情呻吟欲求不满视频| 日韩理论片网站| 神马久久久久久久久久| 国产精品6699| 欧美一区二区三区另类| 亚洲观看黄色网| 欧美伊人久久久久久久久影院| dj大片免费在线观看| 久久99九九| 麻豆视频一区二区| 免看一级a毛片一片成人不卡| 日韩电影中文字幕| 日韩毛片免费看| 老太脱裤让老头玩ⅹxxxx| 久久精品亚洲精品国产欧美kt∨| 亚洲天堂狠狠干| 91精品国产高清| 自由日本语亚洲人高潮| 国产成人av一区二区三区不卡| 在线不卡中文字幕播放| 男人的天堂免费在线视频| 在线成人性视频| 91免费版在线看| 99视频在线观看免费| 青青久久av北条麻妃黑人|