精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flume架構與源碼分析-核心組件分析-1

開發 開發工具
start方法在整個Flume啟動時或者初始化組件時都會調用start方法進行組件初始化,Flume組件出現異常停止時會調用stop,getLifecycleState返回組件的生命周期狀態,有IDLE, START, STOP, ERROR四個狀態。

[[177130]]

首先所有核心組件都會實現org.apache.flume.lifecycle.LifecycleAware接口:

Java代碼

  1. public interface LifecycleAware {   
  2.   public void start();   
  3.   public void stop();   
  4.   public LifecycleState getLifecycleState();   
  5. }   

start方法在整個Flume啟動時或者初始化組件時都會調用start方法進行組件初始化,Flume組件出現異常停止時會調用stop,getLifecycleState返回組件的生命周期狀態,有IDLE, START, STOP, ERROR四個狀態。

如果開發的組件需要配置,如設置一些屬性;可以實現org.apache.flume.conf.Configurable接口:

Java代碼

  1. public interface Configurable {   
  2.    public void configure(Context context);   
  3. }   

Flume在啟動組件之前會調用configure來初始化組件一些配置。

1、Source

Source用于采集日志數據,有兩種實現方式:輪訓拉取和事件驅動機制;Source接口如下:

Java代碼

  1. public interface Source extends LifecycleAware, NamedComponent {   
  2.   public void setChannelProcessor(ChannelProcessor channelProcessor);   
  3.   public ChannelProcessor getChannelProcessor();   
  4. }    

Source接口首先繼承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是說它的的所有邏輯的實現應該在LifecycleAware接口的start和stop中實現;ChannelProcessor之前介紹過用來進行日志流的過濾和Channel的選擇及調度。

而Source是通過SourceFactory工廠創建,默認提供了DefaultSourceFactory,其首先通過Enum類型org.apache.flume.conf.source.SourceType查找默認實現,如exec,則找到org.apache.flume.source.ExecSource實現,如果找不到直接Class.forName(className)創建。

Source提供了兩種機制: PollableSource(輪訓拉取)和EventDrivenSource(事件驅動):

PollableSource默認提供了如下實現:

比如JMSSource實現使用javax.jms.MessageConsumer.receive(pollTimeout)主動去拉取消息。

EventDrivenSource默認提供了如下實現:

比如NetcatSource、HttpSource就是事件驅動,即被動等待;比如HttpSource就是內部啟動了一個內嵌的Jetty啟動了一個Servlet容器,通過FlumeHTTPServlet去接收消息。

Flume提供了SourceRunner用來啟動Source的流轉:

Java代碼

  1. public class EventDrivenSourceRunner extends SourceRunner {   
  2.   private LifecycleState lifecycleState;   
  3.   public EventDrivenSourceRunner() {   
  4.       lifecycleState = LifecycleState.IDLE; //啟動之前是空閑狀態   
  5.   }   
  6.    
  7.   @Override   
  8.   public void start() {   
  9.     Source source = getSource(); //獲取Source   
  10.     ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器   
  11.     cp.initialize(); //初始化Channel處理器   
  12.     source.start();  //啟動Source   
  13.     lifecycleState = LifecycleState.START; //本組件狀態改成啟動狀態   
  14.   }   
  15.   @Override   
  16.   public void stop() {   
  17.     Source source = getSource(); //先停Source   
  18.     source.stop();   
  19.     ChannelProcessor cp = source.getChannelProcessor();   
  20.     cp.close();//再停Channel處理器   
  21.     lifecycleState = LifecycleState.STOP; //本組件狀態改成停止狀態   
  22.   }   
  23. }    

從本組件也可以看出:1、首先要初始化ChannelProcessor,其實現時初始化過濾器鏈;2、接著啟動Source并更改本組件的狀態。

Java代碼

  1. public class PollableSourceRunner extends SourceRunner {   
  2.  @Override   
  3.  public void start() {   
  4.   PollableSource source = (PollableSource) getSource();   
  5.   ChannelProcessor cp = source.getChannelProcessor();   
  6.   cp.initialize();   
  7.   source.start();   
  8.    
  9.   runner = new PollingRunner();   
  10.   runner.source = source;   
  11.   runner.counterGroup = counterGroup;   
  12.   runner.shouldStop = shouldStop;   
  13.    
  14.   runnerThread = new Thread(runner);   
  15.   runnerThread.setName(getClass().getSimpleName() + "-" +    
  16.       source.getClass().getSimpleName() + "-" + source.getName());   
  17.   runnerThread.start();    
  18.    
  19.   lifecycleState = LifecycleState.START;   
  20.  }   
  21. }    

而PollingRunner首先初始化組件,但是又啟動了一個線程PollingRunner,其作用就是輪訓拉取數據:

Java代碼

  1. @Override   
  2.   public void run() {   
  3.     while (!shouldStop.get()) { //如果沒有停止,則一直在死循環運行   
  4.       counterGroup.incrementAndGet("runner.polls");   
  5.    
  6.       try {   
  7.         //調用PollableSource的process方法進行輪訓拉取,然后判斷是否遇到了失敗補償   
  8.         if (source.process().equals(PollableSource.Status.BACKOFF)) {/   
  9.           counterGroup.incrementAndGet("runner.backoffs");   
  10.    
  11.           //失敗補償時暫停線程處理,等待超時時間之后重試   
  12.           Thread.sleep(Math.min(   
  13.               counterGroup.incrementAndGet("runner.backoffs.consecutive")   
  14.               * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));   
  15.         } else {   
  16.           counterGroup.set("runner.backoffs.consecutive", 0L);   
  17.         }   
  18.       } catch (InterruptedException e) {   
  19.                 }   
  20.       }   
  21.     }   
  22.   }   
  23. }    

Flume在啟動時會判斷Source是PollableSource還是EventDrivenSource來選擇使用PollableSourceRunner還是EventDrivenSourceRunner。

比如HttpSource實現,其通過FlumeHTTPServlet接收消息然后:

Java代碼

  1. List<Event> events = Collections.emptyList(); //create empty list   
  2. //首先從請求中獲取Event   
  3. events = handler.getEvents(request);   
  4. //然后交給ChannelProcessor進行處理   
  5. getChannelProcessor().processEventBatch(events);    

到此基本的Source流程就介紹完了,其作用就是監聽日志,采集,然后交給ChannelProcessor進行處理。

2、Channel

Channel用于連接Source和Sink,Source生產日志發送到Channel,Sink從Channel消費日志;也就是說通過Channel實現了Source和Sink的解耦,可以實現多對多的關聯,和Source、Sink的異步化。

之前Source采集到日志后會交給ChannelProcessor處理,那么接下來我們先從ChannelProcessor入手,其依賴三個組件:

Java代碼

  1. private final ChannelSelector selector;  //Channel選擇器   
  2. private final InterceptorChain interceptorChain; //攔截器鏈   
  3. private ExecutorService execService; //用于實現可選Channel的ExecutorService,默認是單線程實現    

接下來看下其是如何處理Event的:

Java代碼

  1. public void processEvent(Event event) {   
  2.   event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾   
  3.   if (event == null) {   
  4.     return;   
  5.   }   
  6.   List<Event> events = new ArrayList<Event>(1);   
  7.   events.add(event);   
  8.    
  9.   //通過Channel選擇器獲取必須成功處理的Channel,然后事務中執行   
  10.   List<Channel> requiredChannels = selector.getRequiredChannels(event);   
  11.   for (Channel reqChannel : requiredChannels) {    
  12.     executeChannelTransaction(reqChannel, events, false);   
  13.   }   
  14.    
  15.   //通過Channel選擇器獲取可選的Channel,這些Channel失敗是可以忽略,不影響其他Channel的處理   
  16.   List<Channel> optionalChannels = selector.getOptionalChannels(event);   
  17.   for (Channel optChannel : optionalChannels) {   
  18.     execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));   
  19.   }   
  20. }    

另外內部還提供了批處理實現方法processEventBatch;對于內部事務實現的話可以參考executeChannelTransaction方法,整體事務機制類似于JDBC:

Java代碼

  1. private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {   
  2.   //1、獲取Channel上的事務   
  3.   Transaction tx = channel.getTransaction();   
  4.   Preconditions.checkNotNull(tx, "Transaction object must not be null");   
  5.   try {   
  6.     //2、開啟事務   
  7.     tx.begin();   
  8.     //3、在Channel上執行批量put操作   
  9.     for (Event event : batch) {   
  10.       channel.put(event);   
  11.     }   
  12.     //4、成功后提交事務   
  13.     tx.commit();   
  14.   } catch (Throwable t) {   
  15.     //5、異常后回滾事務   
  16.     tx.rollback();   
  17.     if (t instanceof Error) {   
  18.        LOG.error("Error while writing to channel: " +   
  19.            channel, t);   
  20.        throw (Error) t;   
  21.     } else if(!isOptional) {//如果是可選的Channel,異常忽略   
  22.        throw new ChannelException("Unable to put batch on required " +   
  23.              "channel: " + channel, t);   
  24.     }   
  25.   } finally {   
  26.     //***關閉事務   
  27.     tx.close();   
  28.   }   
  29. }   

Interceptor用于過濾Event,即傳入一個Event然后進行過濾加工,然后返回一個新的Event,接口如下:

Java代碼

  1. public interface Interceptor {   
  2.     public void initialize();   
  3.     public Event intercept(Event event);   
  4.     public List<Event> intercept(List<Event> events);   
  5.     public void close();   
  6. }    

可以看到其提供了initialize和close方法用于啟動和關閉;intercept方法用于過濾或加工Event。比如HostInterceptor攔截器用于獲取本機IP然后默認添加到Event的字段為host的Header中。

接下來就是ChannelSelector選擇器了,其通過如下方式創建:

Java代碼

  1. //獲取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating   
  2. ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();   
  3. //使用Source關聯的Channel創建,比如agent.sources.s1.channels = c1 c2   
  4. ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);    

ChannelSelector默認提供了兩種實現:復制和多路復用:

默認實現是復制選擇器ReplicatingChannelSelector,即把接收到的消息復制到每一個Channel;多路復用選擇器MultiplexingChannelSelector會根據Event Header中的參數進行選擇,以此來選擇使用哪個Channel。

而Channel是Event中轉的地方,Source發布Event到Channel,Sink消費Channel的Event;Channel接口提供了如下接口用來實現Event流轉:

Java代碼

  1. public interface Channel extends LifecycleAware, NamedComponent {   
  2.   public void put(Event event) throws ChannelException;   
  3.   public Event take() throws ChannelException;   
  4.   public Transaction getTransaction();   
  5. }    

put用于發布Event,take用于消費Event,getTransaction用于事務支持。默認提供了如下Channel的實現:

對于Channel的實現我們后續單獨章節介紹。

3、Sink

Sink從Channel消費Event,然后進行轉移到收集/聚合層或存儲層。Sink接口如下所示:

Java代碼

  1. public interface Sink extends LifecycleAware, NamedComponent {   
  2.   public void setChannel(Channel channel);   
  3.   public Channel getChannel();   
  4.   public Status process() throws EventDeliveryException;   
  5.   public static enum Status {   
  6.     READY, BACKOFF   
  7.   }   
  8. }    

類似于Source,其首先繼承了LifecycleAware,然后提供了Channel的getter/setter方法,并提供了process方法進行消費,此方法會返回消費的狀態,READY或BACKOFF。

Sink也是通過SinkFactory工廠來創建,其也提供了DefaultSinkFactory默認工廠,比如傳入hdfs,會先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相應的默認處理類org.apache.flume.sink.hdfs.HDFSEventSink,如果沒找到默認處理類,直接通過Class.forName(className)進行反射創建。

我們知道Sink還提供了分組功能,用于把多個Sink聚合為一組進行使用,內部提供了SinkGroup用來完成這個事情。此時問題來了,如何去調度多個Sink,其內部使用了SinkProcessor來完成這個事情,默認提供了故障轉移和負載均衡兩個策略。

首先SinkGroup就是聚合多個Sink為一組,然后將多個Sink傳給SinkProcessorFactory進行創建SinkProcessor,而策略是根據配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance來選擇的。

SinkProcessor提供了如下實現:

DefaultSinkProcessor:默認實現,用于單個Sink的場景使用。

FailoverSinkProcessor:故障轉移實現:

Java代碼

  1. public Status process() throws EventDeliveryException {   
  2.   Long now = System.currentTimeMillis();   
  3.     //1、首先檢查失敗隊列的頭部的Sink是否已經過了失敗補償等待時間了   
  4.   while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {   
  5.     //2、如果可以使用了,則從失敗Sink隊列獲取隊列***個Sink   
  6.     FailedSink cur = failedSinks.poll();   
  7.     Status s;   
  8.     try {   
  9.       s = cur.getSink().process(); //3、使用此Sink進行處理   
  10.       if (s  == Status.READY) { //4、如果處理成功   
  11.         liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink隊列   
  12.         activeSink = liveSinks.get(liveSinks.lastKey());   
  13.       } else {   
  14.         failedSinks.add(cur); //4.2、如果此時不是READY,即BACKOFF期間,再次放回失敗隊列   
  15.       }   
  16.       return s;   
  17.     } catch (Exception e) {   
  18.       cur.incFails(); //5、如果遇到異常了,則增加失敗次數,并放回失敗隊列   
  19.       failedSinks.add(cur);   
  20.     }   
  21.   }   
  22.    
  23.   Status ret = null;   
  24.   while(activeSink != null) { //6、此時失敗隊列中沒有Sink能處理了,那么需要使用存活Sink隊列進行處理   
  25.     try {   
  26.       ret = activeSink.process();   
  27.       return ret;   
  28.     } catch (Exception e) { //7、處理失敗進行轉移到失敗隊列   
  29.       activeSink = moveActiveToDeadAndGetNext();   
  30.     }   
  31.   }   
  32.    
  33.   throw new EventDeliveryException("All sinks failed to process, " +   
  34.       "nothing left to failover to");   
  35. }   

失敗隊列是一個優先級隊列,使用refresh屬性排序,而refresh是通過如下機制計算的:

Java代碼

  1. refresh = System.currentTimeMillis() 
  2. + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); 

其中maxPenalty是***等待時間,默認30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于實現指數級等待時間遞增, FAILURE_PENALTY是1s。

LoadBalanceSinkProcessor:用于實現Sink的負載均衡,其通過SinkSelector進行實現,類似于ChannelSelector。LoadBalanceSinkProcessor在啟動時會根據配置,如agent.sinkgroups.g1.processor.selector = random進行選擇,默認提供了兩種選擇器:

LoadBalanceSinkProcessor使用如下機制進行負載均衡:

Java代碼

  1. public Status process() throws EventDeliveryException {   
  2.   Status status = null;   
  3.   //1、使用選擇器創建相應的迭代器,也就是用來選擇Sink的迭代器   
  4.   Iterator<Sink> sinkIterator = selector.createSinkIterator();   
  5.   while (sinkIterator.hasNext()) {   
  6.     Sink sink = sinkIterator.next();   
  7.     try {   
  8.       //2、選擇器迭代Sink進行處理,如果成功直接break掉這次處理,此次負載均衡就算完成了   
  9.       status = sink.process();   
  10.       break;   
  11.     } catch (Exception ex) {   
  12.       //3、失敗后會通知選擇器,采取相應的失敗退避補償算法進行處理   
  13.       selector.informSinkFailed(sink);   
  14.       LOGGER.warn("Sink failed to consume event. "   
  15.           + "Attempting next sink if available.", ex);   
  16.     }   
  17.   }   
  18.   if (status == null) {   
  19.     throw new EventDeliveryException("All configured sinks have failed");   
  20.   }   
  21.   return status;   
  22. }    

如上的核心就是怎么創建迭代器,如何進行失敗退避補償處理,首先我們看下RoundRobinSinkSelector實現,其內部是通過通用的RoundRobinOrderSelector選擇器實現:

Java代碼

  1. public Iterator<T> createIterator() {   
  2.   //1、獲取存活的Sink索引,   
  3.   List<Integer> activeIndices = getIndexList();   
  4.   int size = activeIndices.size();   
  5.   //2、如果上次記錄的下一個存活Sink的位置超過了size,那么從隊列頭重新開始計數   
  6.   if (nextHead >= size) {   
  7.     nextHead = 0;   
  8.   }   
  9.   //3、獲取本次使用的起始位置   
  10.   int begin = nextHead++;   
  11.   if (nextHead == activeIndices.size()) {   
  12.     nextHead = 0;   
  13.   }   
  14.   //4、從該位置開始迭代,其實現類似于環形隊列,比如整個隊列是5,起始位置是3,則按照 3、4、0、1、2的順序進行輪訓,實現了輪訓算法    
  15.   int[] indexOrder = new int[size];   
  16.   for (int i = 0; i < size; i++) {   
  17.     indexOrder[i] = activeIndices.get((begin + i) % size);   
  18.   }   
  19.   //indexOrder是迭代順序,getObjects返回相關的Sinks;   
  20.   return new SpecificOrderIterator<T>(indexOrder, getObjects());   
  21. }    

getIndexList實現如下:

Java代碼

  1. protected List<Integer> getIndexList() {   
  2.   long now = System.currentTimeMillis();   
  3.   List<Integer> indexList = new ArrayList<Integer>();   
  4.   int i = 0;   
  5.   for (T obj : stateMap.keySet()) {   
  6.     if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {   
  7.       indexList.add(i);   
  8.     }   
  9.     i++;   
  10.   }   
  11.   return indexList;   
  12. }   

isShouldBackOff()表示是否開啟退避算法支持,如果不開啟,則認為每個Sink都是存活的,每次都會重試,通過agent.sinkgroups.g1.processor.backoff = true配置開啟,默認false;restoreTime和之前介紹的refresh一樣,是退避補償等待時間,算法類似,就不多介紹了。

那么什么時候調用Sink進行消費呢?其類似于SourceRunner,Sink提供了SinkRunner進行輪訓拉取處理,SinkRunner會輪訓調度SinkProcessor消費Channel的消息,然后調用Sink進行轉移。SinkProcessor之前介紹過,其負責消息復制/路由。

SinkRunner實現如下:

Java代碼

  1. public void start() {   
  2.   SinkProcessor policy = getPolicy();   
  3.   policy.start();   
  4.   runner = new PollingRunner();   
  5.   runner.policy = policy;   
  6.   runner.counterGroup = counterGroup;   
  7.   runner.shouldStop = new AtomicBoolean();   
  8.   runnerThread = new Thread(runner);   
  9.   runnerThread.setName("SinkRunner-PollingRunner-" +   
  10.       policy.getClass().getSimpleName());   
  11.   runnerThread.start();   
  12.   lifecycleState = LifecycleState.START;   
  13. }    

即獲取SinkProcessor然后啟動它,接著啟動輪訓線程去處理。PollingRunner線程負責輪訓消息,核心實現如下:

Java代碼

  1. public void run() {   
  2.   while (!shouldStop.get()) { //如果沒有停止   
  3.     try {   
  4.       if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了,進行退避補償處理   
  5.         counterGroup.incrementAndGet("runner.backoffs");   
  6.         Thread.sleep(Math.min(   
  7.             counterGroup.incrementAndGet("runner.backoffs.consecutive")   
  8.             * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設定的超時時間   
  9.       } else {   
  10.         counterGroup.set("runner.backoffs.consecutive", 0L);   
  11.       }   
  12.     } catch (Exception e) {   
  13.       try {   
  14.         Thread.sleep(maxBackoffSleep); //如果遇到異常則等待***退避時間   
  15.       } catch (InterruptedException ex) {   
  16.         Thread.currentThread().interrupt();   
  17.       }   
  18.     }   
  19.   }   
  20. }    

整體實現類似于PollableSourceRunner實現,整體處理都是交給SinkProcessor完成的。SinkProcessor會輪訓Sink的process方法進行處理;此處以LoggerSink為例:

Java代碼

  1. @Override   
  2. public Status process() throws EventDeliveryException {   
  3.   Status result = Status.READY;   
  4.   Channel channel = getChannel();   
  5.   //1、獲取事務   
  6.   Transaction transaction = channel.getTransaction();   
  7.   Event event = null;   
  8.    
  9.   try {   
  10.     //2、開啟事務   
  11.     transaction.begin();   
  12.     //3、從Channel獲取Event   
  13.     event = channel.take();   
  14.     if (event != null) {   
  15.       if (logger.isInfoEnabled()) {   
  16.         logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));   
  17.       }   
  18.     } else {//4、如果Channel中沒有Event,則默認進入故障補償機制,即防止死循環造成CPU負載高   
  19.       result = Status.BACKOFF;   
  20.     }   
  21.     //5、成功后提交事務   
  22.     transaction.commit();   
  23.   } catch (Exception ex) {   
  24.     //6、失敗后回滾事務   
  25.     transaction.rollback();   
  26.     throw new EventDeliveryException("Failed to log event: " + event, ex);   
  27.   } finally {   
  28.     //7、關閉事務   
  29.     transaction.close();   
  30.   }   
  31.   return result;   
  32. }    

Sink中一些實現是支持批處理的,比如RollingFileSink:

Java代碼

  1. //1、開啟事務   
  2. //2、批處理   
  3. for (int i = 0; i < batchSize; i++) {   
  4.   event = channel.take();   
  5.   if (event != null) {   
  6.     sinkCounter.incrementEventDrainAttemptCount();   
  7.     eventAttemptCounter++;   
  8.     serializer.write(event);   
  9.   }   
  10. }   
  11. //3、提交/回滾事務、關閉事務   

定義一個批處理大小然后在事務中執行批處理。

【本文是51CTO專欄作者張開濤的原創文章,作者微信公眾號:開濤的博客,id:kaitao-1234567】

責任編輯:武曉燕 來源: 開濤的博客
相關推薦

2016-11-29 09:38:06

Flume架構核心組件

2016-11-25 13:14:50

Flume架構源碼

2016-11-29 16:59:46

Flume架構源碼

2022-06-07 10:33:29

Camera組件鴻蒙

2015-04-24 09:33:11

Cloud Found組件分析PaaS

2021-09-05 07:35:58

lifecycleAndroid組件原理

2011-04-29 13:40:37

MongoDBCommand

2022-01-05 08:53:13

Spring原理分析MVC

2009-12-31 15:55:06

ADO.NET結構

2016-10-21 13:03:18

androidhandlerlooper

2021-09-08 10:47:33

Flink執行流程

2025-01-13 00:13:59

VSCode架構依賴注入

2015-08-11 15:52:52

大數據數據分析

2022-07-17 06:51:22

Vite 3.0前端

2019-10-08 10:01:22

Kafka應用場景架構

2017-05-04 22:30:17

Zuul過濾器微服務

2011-05-26 10:05:48

MongoDB

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2011-03-15 11:33:18

iptables

2019-10-16 16:33:41

Docker架構語言
點贊
收藏

51CTO技術棧公眾號

久久久久久久久久电影| 变态另类ts人妖一区二区| 国产精品第九页| 日本h片久久| 91在线视频播放地址| 俺去亚洲欧洲欧美日韩| 国产无套精品一区二区| 一本一本久久a久久| 欧美亚洲韩国| 91视频一区二区三区| 国内精品久久久| 日本成人在线免费| 激情视频在线观看| 99久久99视频只有精品| 一本色道**综合亚洲精品蜜桃冫| 国产精品v欧美精品v日韩精品| 强制高潮抽搐sm调教高h| 3d性欧美动漫精品xxxx软件| 久久一区二区三区国产精品| 孩xxxx性bbbb欧美| www亚洲色图| 欧美精品中文字幕亚洲专区| 亚洲v中文字幕| 精品日韩美女| 国产一区二区视频网站| 久久99高清| 欧美日韩亚洲丝袜制服| 日韩国产一区久久| 亚洲午夜无码久久久久| 日本a口亚洲| 欧美福利电影网| 日本老太婆做爰视频| 精品女同一区二区三区| 好吊一区二区三区| 日韩av在线免播放器| 亚洲熟妇av一区二区三区漫画| 十九岁完整版在线观看好看云免费| aa亚洲婷婷| 亚洲视频一区二区三区| 亚洲无吗一区二区三区| 黄色网址视频在线观看| 久久精品夜色噜噜亚洲a∨| 春色成人在线视频| 国产女人爽到高潮a毛片| 欧美国产另类| 日韩精品中文字| av视屏在线播放| 欧美激情午夜| 成+人+亚洲+综合天堂| 欧洲亚洲免费视频| 中文字幕无码日韩专区免费| 成人免费在线观看av| 亚洲欧美日韩在线高清直播| 九九九九九伊人| 成入视频在线观看| 国产欧美日韩另类一区| 91成人免费视频| 日本特级黄色片| 天天射成人网| 久久网福利资源网站| 精品无码国产一区二区三区51安| 日本综合久久| 欧日韩精品视频| 欧美这里只有精品| 浮生影视网在线观看免费| 国产精品18久久久久久久网站| 2020国产精品视频| 四虎精品免费视频| 亚洲小说图片| 日韩欧美www| 男女男精品视频站| zzzwww在线看片免费| 亚洲成人av中文| 中文字幕精品—区二区日日骚| 人成网站在线观看| 久久99精品久久久久久国产越南| 5278欧美一区二区三区| 五月天婷婷导航| 黄色成人av网站| 亚洲18私人小影院| 日本午夜在线观看| 国产综合欧美| 青青a在线精品免费观看| 久久影视中文字幕| 一本久道综合久久精品| 日本国产欧美一区二区三区| 最新中文字幕在线观看视频| 狠狠色丁香久久婷婷综合丁香| 日本精品性网站在线观看| 欧美丰满艳妇bbwbbw| 日韩高清欧美| 亚洲欧美日韩天堂一区二区| 国产在线综合视频| 欧美在线精品一区| 欧美中文在线观看| 国产精品久久久久久69| 日韩av中文字幕一区二区 | 在线免费看a| 成人福利视频在线| 三级三级久久三级久久18| 青青视频在线观| aa级大片欧美| 国产乱码精品一区二区三区卡| 国产免费黄色大片| 91一区一区三区| 超碰在线免费观看97| 成人在线免费看| 久久精品这里都是精品| 99久re热视频精品98| 黄色一级大片在线免费看产| 亚洲成a人片在线观看中文| 青青在线视频免费| 9l亚洲国产成人精品一区二三 | 亚洲欧洲日韩在线| 欧美视频小说| 精品美女视频在线观看免费软件 | gv天堂gv无码男同在线观看| 美女久久99| 欧美巨猛xxxx猛交黑人97人| 国产精品久久久久久久精| 国产一区二区三区久久久久久久久| 97人人爽人人喊人人模波多| 97人妻精品一区二区三区动漫| 91色porny在线视频| 国产制服91一区二区三区制服| 操你啦视频在线| 亚洲欧美电影院| 成人在线视频一区二区三区| 成人全视频在线观看在线播放高清| 欧美三级中文字幕| 中文字幕xxx| 日本久久黄色| 日本成人精品在线| 深爱五月激情五月| 久久久精品国产99久久精品芒果| 男人天堂新网址| 99综合99| 亚洲国产黄色片| 亚洲自拍偷拍一区二区| 1024精品一区二区三区| 日本午夜精品理论片a级appf发布| www香蕉视频| 中文字幕在线一区| 999久久欧美人妻一区二区| 99精品视频在线免费播放| 中文字幕久久久av一区| 久草免费在线视频观看| 国产精品日本| 好看的日韩精品| 精精国产xxxx视频在线播放| 亚洲白虎美女被爆操| 日本高清www| 九九综合九九| 国产成+人+综合+亚洲欧洲| 国产精品欧美亚洲| 亚洲人精品午夜| 美女av免费在线观看| 青娱乐极品盛宴一区二区| 精品福利一二区| 日本熟妇成熟毛茸茸| 日韩中文字幕亚洲一区二区va在线| 久久精品日产第一区二区三区乱码 | 亚洲电影在线观看| 日本午夜小视频| 久久亚洲一区二区三区四区| av片中文字幕| 麻豆国产精品| 亚洲欧美日韩中文在线制服| 成人免费毛片男人用品| jizz在线观看| 中文字幕免费在线观看视频一区| 亚洲欧洲精品在线| av日韩国产| 欧美日本一道本| aa片在线观看视频在线播放| 99精品美女| 亚洲综合中文字幕68页| 北岛玲日韩精品一区二区三区| 日本久久精品电影| 亚洲成人精品在线播放| 精品视频黄色| 96精品视频在线| 精品国自产在线观看| 午夜精品一区二区三区电影天堂| 国产精品无码午夜福利| 理论片日本一区| 欧美日韩一区二区三区免费| 欧美free嫩15| 欧美大肥婆大肥bbbbb| 一级特黄aa大片| 久久久久国产精品麻豆ai换脸 | 亚洲级视频在线观看免费1级| 91麻豆精品国产91久久综合| 精品一区二区国语对白| 天堂…中文在线最新版在线| 日本精品视频| 欧美大成色www永久网站婷| 欧美自拍第一页| 欧美性大战久久久久久久蜜臀| 右手影院亚洲欧美| 老色鬼精品视频在线观看播放| 欧美a级免费视频| 亚欧日韩另类中文欧美| 91国内精品久久| 69久久久久| 亚洲黄色片网站| 国产伦精品一区二区三区视频痴汉| 性做久久久久久久免费看| 中文天堂资源在线| 久久综合色播五月| 中文字幕一区二区三区人妻在线视频 | 动漫3d精品一区二区三区乱码| 三级精品视频久久久久| 亚洲高清在线看| 亚洲午夜电影在线| bl动漫在线观看| 99人久久精品视频最新地址| 神马欧美一区二区| 日韩三级av| 成人一区二区三区四区| 亚洲精品成a人ⅴ香蕉片| 日本精品在线视频 | 国产精品伦子伦免费视频| 欧美日韩在线精品一区二区三区激情综 | 日韩欧美一区二区免费| 波多野结衣网站| 精品久久久免费| 精品无码人妻一区| 成人一区在线看| 国产亚洲天堂网| 最新国产乱人伦偷精品免费网站| 影音先锋欧美资源| 成人在线亚洲| 色播亚洲视频在线观看| 国产精品最新| 91视频国产一区| h片在线观看| 欧美第一淫aaasss性| 少妇高潮一区二区三区69| 日韩美女在线视频| 99热这里只有精品99| 亚洲成人一二三| 青青草成人免费| 一区二区三区四区蜜桃| 黄色在线观看av| 99久久精品国产观看| wwwxx日本| 不卡视频一二三四| 亚洲av成人精品一区二区三区| 成人免费毛片aaaaa**| 佐佐木明希电影| 成人激情免费电影网址| 国产精品久久久久久久无码| 91在线播放网址| 在哪里可以看毛片| 国产女同互慰高潮91漫画| 久久视频精品在线观看| 国产精品激情偷乱一区二区∴| 女人裸体性做爰全过| 懂色av一区二区夜夜嗨| 国产原创popny丨九色| 亚洲欧洲一区二区天堂久久| 国产高清av在线播放| 日韩激情在线| 在线视频福利一区| 欧美成人日本| 国产亚洲精品网站| 男男成人高潮片免费网站| 分分操这里只有精品| 国产视频一区在线观看一区免费| 人妻精品无码一区二区三区 | 中文字幕无人区二| 国产精品一区二区果冻传媒| 精品免费国产一区二区| 欧美国产先锋| 超碰成人免费在线| 久久青草久久| 九一国产精品视频| 久久亚洲二区| 色18美女社区| 白白色亚洲国产精品| 国产精品扒开腿做爽爽| 中文字幕永久在线不卡| 五月天婷婷丁香| 在线观看91精品国产入口| 91精品国产乱码久久久久| 日韩精品在线网站| 欧美一区二区三区激情| 亚洲午夜激情免费视频| 亚洲人成色777777老人头| 欧美一区二区久久| 在线播放精品视频| 色婷婷久久久亚洲一区二区三区| 又骚又黄的视频| 亚洲黄色av女优在线观看 | 日韩精品免费一区二区三区竹菊| 欧美一区免费视频| 老司机在线精品视频| 99国产在线| 亚洲一二三区视频| eeuss一区二区三区| 亚洲第一论坛sis| 激情视频小说图片| 日韩va欧美va亚洲va久久| 性色av蜜臀av浪潮av老女人| 国产精品美女一区二区在线观看| 非洲一级黄色片| 久久久国产精华| 久久综合成人网| 亚洲va欧美va人人爽午夜| 国产99久久久久久免费看| 精品处破学生在线二十三| 在线观看免费网站黄| 欧美有码在线观看| 成人资源在线| 今天免费高清在线观看国语| 日韩黄色免费网站| 强迫凌虐淫辱の牝奴在线观看| 最好看的中文字幕久久| 亚洲 小说区 图片区| 亚洲精品在线不卡| 国产极品人妖在线观看| 国外成人性视频| 欧美视频二区欧美影视| 亚洲va韩国va欧美va精四季| 免费久久99精品国产自在现线| 国产精品免费成人| 成人福利视频在线看| 青青草国产在线观看| 91超碰这里只有精品国产| 国精产品一品二品国精品69xx| 亚洲国产精品久久精品怡红院| 在线a免费看| 精品中文字幕视频| 福利一区和二区| 水蜜桃亚洲一二三四在线| 久久久久网站| 日韩高清在线一区二区| 国产91精品一区二区麻豆网站| 任你操精品视频| 欧美三区在线观看| 成人免费在线电影| 国产精品久久不能| av日韩精品| 免费看欧美黑人毛片| 日韩精品视频网站| 中文字幕一区二区人妻在线不卡| 图片区日韩欧美亚洲| 午夜性色福利视频| 欧美综合在线第二页| 亚洲精品国产精品粉嫩| 亚洲熟妇av一区二区三区| 久久婷婷国产综合精品青草| 69xxxx国产| 中文字幕精品一区久久久久 | 国产精品日本欧美一区二区三区| 国产精品久久久久久久无码| 欧美性高潮床叫视频| 91在线精品入口| 不卡中文字幕av| 综合视频一区| 看av免费毛片手机播放| 久久久久国产精品麻豆| 亚洲综合网av| 欧美日韩高清区| 日韩欧美ww| 日韩高清第一页| 久久蜜桃av一区二区天堂| 久久久久久无码午夜精品直播| 中文字幕久热精品在线视频| 精品国产麻豆| 免费在线观看亚洲视频| 国产日韩av一区| www成人在线| 日韩免费在线观看| 8x8ⅹ拨牐拨牐拨牐在线观看| 久久久精品动漫| 另类中文字幕网| 精品视频久久久久| 一区二区在线免费视频| caoporn视频在线观看| 欧美日韩系列| 国产一区二区三区观看| 美国精品一区二区| 日韩欧美国产综合| 韩国主播福利视频一区二区三区| 国产成人亚洲欧美| 欧美亚洲一区二区三区| 国产在线一卡二卡| 制服丝袜日韩国产| www在线看| 亚洲国产精品毛片| 成人深夜福利app| 中文字幕乱码人妻无码久久| 亚洲天堂免费在线| 欧美日本三级| 日韩一级片播放| 五月婷婷综合在线| 精品视频在线一区二区| 欧美日韩国产一二|