精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

圖解 Kafka 源碼之 Sender 線程架構設計

開發(fā) 架構
今天主要聊聊「發(fā)送網(wǎng)絡 I/O 的 Sender 線程的架構設計」,深度剖析下消息是如何被發(fā)送出去的。

大家好,我是 華仔, 又跟大家見面了。

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。

上篇主要帶大家深度剖析了「號稱承載 Kafka 客戶端消息快遞倉庫 RecordAccmulator 的架構設計」,消息被暫存到累加器中,今天主要聊聊「發(fā)送網(wǎng)絡 I/O 的 Sender 線程的架構設計」,深度剖析下消息是如何被發(fā)送出去的。

這篇文章干貨很多,希望你可以耐心讀完。

一、總的概述

通過「場景驅動」的方式,來看看消息是如何從客戶端發(fā)送出去的。

在上篇中,我們知道了消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端。

接下來我們就來看看,「Sender 線程的架構實現(xiàn)以及發(fā)送處理流程」,為了方便大家理解,所有的源碼只保留骨干。

二、Sender 線程架構設計

在《圖解Kafka生產(chǎn)者初始化核心流程》這篇中我們知道 KafkaProducer 會啟動一個后臺守護進程,其線程名稱:kafka-producer-network-thread + "|" + clientId。

在 KafkaProducer.java 類有常量定義:NETWORK_THREAD_PREFIX,并啟動 守護線程 KafkaThread 即 ioThread,如果不主動關閉 Sender 線程會一直執(zhí)行下去。

github 源碼地址如下:

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java?

public class KafkaProducer<K, V> implements Producer<K, V> {
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// visible for testing
@SuppressWarnings("unchecked")
KafkaProducer(Map<String, Object> configs,Serializer<K> keySerializer,
Serializer<V> valueSerializer,ProducerMetadata metadata,
KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {
try {
...
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
log.debug("Kafka producer started");
} catch (Throwable t) {
...
}
}
}

從上面得出 Sender 類是一個線程類, 我們來看看 Sender 線程的重要字段和方法,并講解其是如何發(fā)送消息和處理消息響應的。

1、關鍵字段

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {
/* the state of each nodes connection */
private final KafkaClient client; // 為 Sender 線程提供管理網(wǎng)絡連接進行網(wǎng)絡讀寫
/* the record accumulator that batches records */
private final RecordAccumulator accumulator; // 消息倉庫累加器
/* the metadata for the client */
private final ProducerMetadata metadata; // 生產(chǎn)者元數(shù)據(jù)
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
private final boolean guaranteeMessageOrder; // 是否保證消息在 broker 端的順序性
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize; //發(fā)送消息最大字節(jié)數(shù)。
/* the number of acknowledgements to request from the server */
private final short acks; // 生產(chǎn)者的消息發(fā)送確認機制
/* the number of times to retry a failed request before giving up */
private final int retries; // 發(fā)送失敗后的重試次數(shù),默認為0次

/* true while the sender thread is still running */
private volatile boolean running; // Sender 線程是否還在運行中
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose; // 是否強制關閉,此時會忽略正在發(fā)送中的消息。
/* the max time to wait for the server to respond to the request*/
private final int requestTimeoutMs; // 等待服務端響應的最大時間,默認30s
/* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs; // 失敗重試退避時間
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions; // 所有 node 支持的 api 版本
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager; // 事務管理,這里忽略 后續(xù)會有專門一篇講解事務相關的
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches; // 正在執(zhí)行發(fā)送相關的消息批次集合, key為分區(qū),value是 list<ProducerBatch> 。

從該類屬性字段來看比較多,這里說幾個關鍵字段:

  1. client:KafkaClient 類型,是一個接口類,Sender 線程主要用它來實現(xiàn)真正的網(wǎng)絡I/O,即 NetworkClient。該字段主要為 Sender 線程提供了網(wǎng)絡連接管理和網(wǎng)絡讀寫操作能力。
  2. accumulator:RecordAccumulator類型,上一篇的內容  圖解 Kafka 源碼之快遞倉庫 RecordAccumulator 架構設計,Sender 線程用它獲取待發(fā)送的 node 節(jié)點及批次消息等能力。
  3. metadata:ProducerMetadata類型,生產(chǎn)者元數(shù)據(jù)。因為發(fā)送消息時要知道分區(qū) Leader 在哪些節(jié)點,以及節(jié)點的地址、主題分區(qū)的情況等。
  4. guaranteeMessageOrder:是否保證消息在 broker 端的順序性,參數(shù):max.in.flight.requests.per.connection。
  5. maxRequestSize:單個請求發(fā)送消息最大字節(jié)數(shù),默認為1M,它限制了生產(chǎn)者在單個請求發(fā)送的記錄數(shù),以避免發(fā)送大量請求。
  6. acks:生產(chǎn)者的消息發(fā)送確認機制。有3個可選值:0,1,-1/all。
  7. retries:生產(chǎn)者發(fā)送失敗后的重試次數(shù)。默認是0次。
  8. running:Sender線程是否還在運行中。
  9. forceClose:是否強制關閉,此時會忽略正在發(fā)送中的消息。
  10. requestTimeoutMs:生產(chǎn)者發(fā)送請求后等待服務端響應的最大時間。如果超時了且配置了重試次數(shù),會再次發(fā)送請求,待重試次數(shù)用完后在這個時間范圍內返回響應則認為請求最終失敗,默認 30 秒。
  11. retryBackoffMs:生產(chǎn)者在發(fā)送請求失敗后可能會重新發(fā)送失敗的請求,其目的就是防止重發(fā)過快造成服務端壓力過大。默認100 ms。
  12. apiVersions:ApicVersions類對象,保存了每個node所支持的api版本。
  13. inFlightBatches:正在執(zhí)行發(fā)送相關的消息批次集合, key為分區(qū),value是 list<|ProducerBatch>。

2、關鍵方法

Sender 類的方法也不少,這里針對關鍵方法逐一講解下。

(1)run()

Sender 線程實現(xiàn)了 Runnable 接口,會不斷的調用 runOnce(),這是一個典型的循環(huán)事件機制。

/**
* The main run loop for the sender thread
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// 這里使用 volatile boolean 類型的變量 running,判斷 Sender 線程是否在運行狀態(tài)中。
// 1. 如果 Sender 線程在運行狀態(tài)即 running=true,則一直循環(huán)調用 runOnce() 方法。
while (running) {
try {
// 將緩沖區(qū)的消息發(fā)送到 broker。
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 2. 如果(沒有強制關閉 && ((消息累加器中還有剩余消息待發(fā)送 || 還有等待未響應的消息 ) || 還有事務請求未完成)),則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
// 繼續(xù)執(zhí)行將剩余的消息發(fā)送完畢
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 3. 對進行中的事務進行中斷,則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 4. 如果強制關閉,則關閉事務管理器、終止消息的追加并清空未完成的批次
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
// 關閉事務管理器
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
// 終止消息的追加并清空未完成的批次
this.accumulator.abortIncompleteBatches();
}
// 5. 關閉 NetworkClient
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

當 Sender 線程啟動后會直接運行 run() 方法,該方法在 4種情況下會一直循環(huán)調用去發(fā)送消息到 Broker。

(2)runOnce()

我們來看看這個執(zhí)行業(yè)務處理的方法,關于事務的部分后續(xù)專門文章講解。

/**
* Run a single iteration of sending
*/
void runOnce() {
if (transactionManager != null) {
... //事務處理方法 后續(xù)文章專門講解
}
// 1. 獲取當前時間的時間戳。
long currentTimeMs = time.milliseconds();
// 2. 調用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 把消息緩存在 KafkaChannel 的 Send 字段里。下一篇會講解 NetworkClient。
long pollTimeout = sendProducerData(currentTimeMs);
// 3. 讀取消息實現(xiàn)真正的網(wǎng)絡發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

該方法比較簡單,主要做了3件事情:

  1. 獲取當前時間的時間戳。
  2. 調用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 NetworkClient 的 Send 字段里。下一篇會講解 NetworkClient。
  3. 讀取 NetworkClient 的 send 字段消息實現(xiàn)真正的網(wǎng)絡發(fā)送。

(3)sendProducerData()

該方法主要是獲取已經(jīng)準備好的節(jié)點上的批次數(shù)據(jù)并過濾過期的批次集合,最后暫存消息。

private long sendProducerData(long now) {
// 1. 獲取元數(shù)據(jù)
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 2. 獲取已經(jīng)準備好的節(jié)點以及找不到 Leader 分區(qū)對應的節(jié)點的主題
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 3. 如果主題 Leader 分區(qū)對應的節(jié)點不存在,則強制更新元數(shù)據(jù)
if (!result.unknownLeaderTopics.isEmpty()) {
// 添加 topic 到?jīng)]有拉取到元數(shù)據(jù)的 topic 集合中,并標識需要更新元數(shù)據(jù)
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
...
// 針對這個 topic 集合進行元數(shù)據(jù)更新
this.metadata.requestUpdate();
}
// 4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了
if (!this.client.ready(node, now)) {
// 移除對應節(jié)點
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 5. 獲取上面返回的已經(jīng)準備好的節(jié)點上要發(fā)送的 ProducerBatch 集合
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 6. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關的消息批次集合中
addToInflightBatches(batches);
// 7. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
// 對 tp 進行 mute,保證只有一個 batch 正在發(fā)送
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 重置下一批次的過期時間
accumulator.resetNextBatchExpiryTime();
// 8. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過期集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 9. 從 batches 中獲取過期集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 10. 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過 期是根據(jù)系統(tǒng)當前時間與 ProducerBatch 創(chuàng)建時間之差是否超過120s,過期時間可以通過參數(shù) delivery.timeout.ms 設置。
expiredBatches.addAll(expiredInflightBatches);

// 如果過期批次不為空 則輸出對應日志
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
// 11. 處理已過期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端
for (ProducerBatch expiredBatch : expiredBatches) {
// 處理當前過期ProducerBatch的回調結果 ProduceRequestResult,并且設置超時異常 new TimeoutException(errorMessage)
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
// 通知該批消息發(fā)送失敗并返回給客戶端
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
// ... 事務管理器的處理忽略
}
// 收集統(tǒng)計指標,后續(xù)會專門對 Kafka 的 Metrics 進行分析
sensors.updateProduceRequestMetrics(batches);

// 設置下一次的發(fā)送延時
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
// 12. 發(fā)送消息暫存到 NetworkClient send 字段里。
sendProduceRequests(batches, now);
return pollTimeout;
}

該方法主要做了12件事情,逐一說明下:

  1. 首先獲取元數(shù)據(jù),這里主要是根據(jù)元數(shù)據(jù)的更新機制來保證數(shù)據(jù)的準確性。
  2. 獲取已經(jīng)準備好的節(jié)點。accumulator#reay() 方法會通過發(fā)送記錄對應的節(jié)點和元數(shù)據(jù)進行比較,返回結果中包括兩個重要的集合:「準備好發(fā)送的節(jié)點集合 readyNodes」、「找不到 Leader 分區(qū)對應節(jié)點的主題 unKnownLeaderTopic」。
  3. 如果主題 Leader 分區(qū)對應的節(jié)點不存在,則強制更新元數(shù)據(jù)。
  4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點的網(wǎng)絡是否已經(jīng)建立好了。在 NetworkClient 中維護了客戶端與所有節(jié)點的連接,這樣就可以通過連接的狀態(tài)判斷是否連接正常。
  5. 獲取上面返回的已經(jīng)準備好的節(jié)點上要發(fā)送的 ProducerBatch 集合。accumulator#drain() 方法就是將 「TopicPartition」-> 「ProducerBatch 集合」的映射關系轉換成 「Node 節(jié)點」->「ProducerBatch 集合」的映射關系,如下圖所示,這樣的話按照節(jié)點方式只需要2次就完成,大大減少網(wǎng)絡的開銷。

圖片

  1. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關的消息批次集合中。
  2. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1,會添加到 muted 集合,保證只有一個 batch 在發(fā)送。
  3. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過期集合。
  4. 從 accumulator 累加器的 batches 中獲取過期集合。
  5. 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過期是根據(jù)系統(tǒng)當前時間與 ProducerBatch 創(chuàng)建時間之差是否超過120s,過期時間可以通過參數(shù) delivery.timeout.ms 設置。
  6. 處理已過期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端。
  7. 發(fā)送消息暫存到 NetworkClient send 字段里。

從上面源碼可以看出,SendProducerData 方法中調用到了 Sender 線程類中多個方法,這里就不一一講解了。

三、Sender 發(fā)送流程分析

通過前兩部分的源碼解讀和剖析,我們可以得出 Sender 線程的處理流程可以分為兩大部分:「發(fā)送請求」、「接收響應結果」。

1、發(fā)送請求

從 runOnce 方法可以得出發(fā)送請求也分兩步:「消息預發(fā)送」、「真正的網(wǎng)絡發(fā)送」。

void runOnce() {
// 1. 把消息緩存在 KafkaChannel 的 Send 字段里。
long pollTimeout = sendProducerData(currentTimeMs);
// 2. 讀取消息實現(xiàn)真正的網(wǎng)絡發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

2、接收響應結果

等 Sender 線程收到 Broker 端的響應結果后,會根據(jù)響應結果分情況進行處理。

3、時序圖

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入

四、總結

這里,我們一起來總結一下這篇文章的重點。

1、開篇總述消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端,從而引出「Sender 線程」。

2、帶你深度剖析了「Sender 線程」 的發(fā)送消息以及響應處理的相關方法。

3、最后帶你串聯(lián)了整個消息發(fā)送的流程,讓你有個更好的整體認知。

責任編輯:姜華 來源: 華仔聊技術
相關推薦

2023-03-15 08:17:27

Kafka網(wǎng)絡通信組件

2023-12-26 08:16:56

Kafka緩存架構客戶端

2022-03-29 15:10:22

架構設計模型

2022-09-23 08:02:42

Kafka消息緩存

2019-09-20 08:54:38

KafkaBroker消息

2023-08-14 08:17:13

Kafka服務端

2021-11-01 17:17:13

Kafka 高并發(fā)場景

2024-03-14 08:33:13

kafka三高架構Zookeeper

2015-06-02 04:17:44

架構設計審架構設計說明書

2024-08-23 16:04:45

2021-06-10 07:49:27

Kafka 架構設計

2015-06-02 04:34:05

架構設計

2012-05-30 09:43:45

業(yè)務邏輯層

2009-06-22 14:48:21

DRY架構設計

2014-05-19 10:08:36

IM系統(tǒng)架構設計

2023-04-13 08:23:28

軟件架構設計

2024-09-18 09:04:33

架構模式查詢

2024-10-30 10:06:51

2021-12-07 07:32:09

kafka架構原理

2023-08-16 12:34:16

同步備份異步備份
點贊
收藏

51CTO技術棧公眾號

亚洲综合免费观看高清完整版在线| 免费亚洲电影在线| 亚洲国产另类久久精品| 一女被多男玩喷潮视频| yw在线观看| 国产一区二区不卡在线| 午夜精品一区二区三区在线| 欧洲一级黄色片| 青青国产精品| 午夜欧美2019年伦理| 日本亚洲欧洲精品| 超碰在线观看99| 久久午夜av| 欧美人交a欧美精品| 久久精品国产亚洲av久| 久久久久久久久成人| 欧美性xxxx在线播放| 欧美一级免费在线观看| 你懂得在线网址| 国产凹凸在线观看一区二区| 国产精品福利网站| 国产亚洲精品久久久久久打不开| 人人狠狠综合久久亚洲婷婷 | 国产黄色片免费观看| 亚洲影音先锋| 欧美极品少妇xxxxⅹ裸体艺术| 久久美女免费视频| 六月丁香久久丫| 日韩欧美一级在线播放| 亚洲不卡视频在线| 涩涩视频在线播放| 一区二区三区在线视频观看| 日本最新一区二区三区视频观看| 秋霞视频一区二区| 精品一区二区免费视频| 国产成人精品久久二区二区| 久久香蕉精品视频| 国产精品毛片久久| 伊人伊人伊人久久| 草草影院第一页| 麻豆一区二区麻豆免费观看| 欧美高清你懂得| 欧美日韩亚洲自拍| 成人亚洲综合| 欧美日韩一区二区不卡| 国产第一页视频| 小h片在线观看| 五月天久久比比资源色| 日韩伦理在线免费观看| www.色在线| 伊人婷婷欧美激情| 欧美日韩视频免费| 九色91在线| 天天综合天天综合色| 国产精品国产对白熟妇| 成人三级小说| 五月婷婷激情综合| 男人操女人逼免费视频| 看黄在线观看| 色综合 综合色| 国产成人综合一区| 国内精品伊人| 制服视频三区第一页精品| 看看黄色一级片| 国产亚洲高清在线观看| 欧美一三区三区四区免费在线看 | 国产一区二区三区视频免费| 第一次破处视频| 久久福利影院| 欧美成在线观看| 精品久久免费视频| 久久99伊人| 国产精品吴梦梦| 中文字幕一区二区人妻| 国产一区视频网站| 国产精品高清一区二区三区| 天堂在线视频免费观看| 国产丝袜美腿一区二区三区| 手机看片福利永久国产日韩| 日本激情视频在线观看| 亚洲综合色视频| 精品久久一二三| 99re久久| 精品福利在线导航| 51妺嘿嘿午夜福利| 亚洲国产精品久久久久蝴蝶传媒| 欧美国产极速在线| 狠狠人妻久久久久久| 久久99精品久久只有精品| 91观看网站| 久草在现在线| 亚洲精品国产一区二区精华液 | 深夜视频一区二区| 欧美一区二区在线免费播放| 亚洲婷婷在线观看| 欧美日韩国产一区二区三区不卡| 久久天天躁狠狠躁夜夜av| 亚洲国产精一区二区三区性色| 久久亚洲色图| 亚洲xxx视频| 免费在线高清av| 亚洲精品国产高清久久伦理二区| 波多野结衣家庭教师视频| 国产福利亚洲| 亚洲精品久久久久国产| 亚洲最大的黄色网址| 欧美一级播放| 99高清视频有精品视频| 国产福利第一视频在线播放| 亚洲午夜激情av| 在线观看国产一级片| 日韩丝袜视频| 欧美激情视频在线观看| 中文字幕av无码一区二区三区| 国产91精品一区二区麻豆亚洲| 亚洲精品不卡| 最新欧美色图| 精品国产第一区二区三区观看体验| av在线播放中文字幕| 日韩午夜黄色| 成人欧美一区二区三区黑人免费| 欧美成人三区| 欧美制服丝袜第一页| 精品无码在线视频| 亚洲午夜一级| 97伦理在线四区| 麻豆系列在线观看| 91福利在线免费观看| 国产三级国产精品| 亚洲精选国产| 国产欧美日韩一区| 色网在线观看| 欧美一区二区三级| 国产成人自拍网站| 久久国产麻豆精品| 日韩亚洲视频在线| 网友自拍亚洲| 亚洲人精品午夜在线观看| 日产精品久久久| 99久久综合精品| 国产69精品久久久久久久| 成人午夜三级| 性色av香蕉一区二区| 高清国产mv在线观看| 亚洲午夜精品久久久久久久久| 亚洲欧美日韩网站| 欧美一区二区三区另类| 成人高清视频观看www| 婷婷成人激情| 欧美一区二区在线免费播放| 极品颜值美女露脸啪啪| 国产一区二区导航在线播放| 三级在线免费观看| 亚洲精品一二三**| 欧美劲爆第一页| 黄色av一区二区三区| 亚洲国产精品久久久男人的天堂 | 欧美电影在线观看一区| 久久亚洲精品网站| 亚洲第一黄色片| 亚洲va韩国va欧美va| 屁屁影院国产第一页| 蜜桃av一区| 午夜精品福利一区二区| 日本午夜精品久久久久| 美日韩精品免费视频| www.久久成人| 疯狂欧美牲乱大交777| 自拍偷拍视频亚洲| 精品一区二区精品| 黑人巨茎大战欧美白妇 | 天堂一区二区三区| 欧美网站免费| 欧美日本高清视频| 色视频在线观看免费| 在线观看免费视频综合| 黄色片子在线观看| 99久久伊人网影院| 九九视频精品在线观看| 羞羞色午夜精品一区二区三区| 超碰97在线人人| 在线天堂资源| 久久高清视频免费| 国产成人无码www免费视频播放| 亚洲成国产人片在线观看| 色哟哟精品观看| 精品无码三级在线观看视频| 亚洲色欲久久久综合网东京热| 日韩有码av| 成人免费视频网址| mm视频在线视频| 中文字幕亚洲综合| 人妻少妇精品无码专区| 欧洲精品视频在线观看| 久久久久久久蜜桃| 国产精品视频免费| 中国一级特黄录像播放| 蜜桃视频在线观看一区二区| 免费人成自慰网站| 欧美激情黄色片| 精品在线一区| 久久久久久久久成人| 国产成人在线视频| 毛片在线导航| 久久激情视频免费观看| 日av在线播放| 精品国产一区二区三区久久影院| 欧美黄色一级大片| 亚洲国产精品久久久久婷婷884| 三区四区在线观看| 99久久免费视频.com| 污污的视频免费观看| 久久一区中文字幕| 欧美一级视频在线播放| 亚洲一区二区| 亚洲国产精品日韩| 免费欧美视频| 黑人中文字幕一区二区三区| 四虎影视国产精品| 国产盗摄xxxx视频xxx69| brazzers在线观看| 欧美成人免费小视频| 五月婷婷在线视频| 伊人一区二区三区久久精品| 美女毛片在线看| 亚洲精品不卡在线| 色香蕉在线视频| 精品国产伦一区二区三区免费 | 一级做a爰片久久| 国产一区不卡| 欧美高清性xxxxhd| 亚洲影院天堂中文av色| 精品久久久久久乱码天堂| 一区中文字幕| av日韩免费电影| 日韩精品一区二区三区免费视频| 国产自产女人91一区在线观看| 天天综合网天天| 欧美在线一级va免费观看| 国产伦子伦对白在线播放观看| 欧美极品少妇全裸体| 秋霞在线视频| 久久久久五月天| xxx.xxx欧美| 国内自拍欧美激情| 电影在线观看一区| 91精品国产亚洲| 无码小电影在线观看网站免费 | 亚洲综合伊人| 成人在线一区二区| 玖玖玖视频精品| 91视频免费进入| 99ri日韩精品视频| 国产亚洲欧美另类一区二区三区| 狠狠久久伊人| 久久伊人资源站| 欧美精品羞羞答答| 亚洲最大免费| 综合亚洲视频| 草草视频在线免费观看| 午夜亚洲激情| 天天操天天爽天天射| 麻豆国产精品777777在线| 加勒比av中文字幕| 国产高清视频一区| 加勒比精品视频| 久久精品人人做| 娇小11一12╳yⅹ╳毛片| 亚洲精品乱码久久久久久久久| 精品亚洲永久免费| 色诱视频网站一区| 中文字幕av无码一区二区三区| 欧美一区二区三区婷婷月色| 男人天堂手机在线观看| 亚洲美女自拍视频| 欧美黄色激情| 97热在线精品视频在线观看| 日韩精品影片| 3d精品h动漫啪啪一区二区| 久久99国产精品久久99大师| 欧美另类视频在线| 99精品国产一区二区三区| 日本一本中文字幕| 首页国产欧美久久| 成人三级做爰av| 久久精品视频在线看| 国产女人18水真多毛片18精品| 亚洲v日本v欧美v久久精品| 波多野结衣小视频| 日韩精品一区二区三区蜜臀 | 国产精品色眯眯| 激情小说中文字幕| 日本乱码高清不卡字幕| 精品久久人妻av中文字幕| 亚洲国产中文在线二区三区免| 欧美自拍大量在线观看| 国产麻豆精品| 欧美理论一区二区| 国产精品草草| www.天天射.com| 99久久久无码国产精品| 成人三级视频在线观看| 黄色一区二区在线观看| 国产精品欧美激情在线| 亚洲电影在线观看| 成人三级网址| 国产精品99免视看9| 韩国精品福利一区二区三区| 中文字幕成人一区| 久久久久久穴| 性高潮免费视频| 国产精品传媒入口麻豆| 日本免费在线观看视频| 精品三级在线看| 男人和女人做事情在线视频网站免费观看| 国外成人免费在线播放| 精品视频在线播放一区二区三区 | 九色porny自拍| 91一区一区三区| 国产真实的和子乱拍在线观看| 欧美美女喷水视频| 国产一二在线观看| 98精品国产自产在线观看| 一区二区三区高清在线观看| 综合久久国产| 奇米精品一区二区三区在线观看一 | 久久久久97国产| 91精品国产欧美一区二区| avtt在线播放| 国产精国产精品| 精品一区欧美| 国产一区亚洲二区三区| 91在线小视频| 国产精品久久久免费视频| 亚洲成色777777女色窝| 丁香花高清在线观看完整版| 亚洲影院色无极综合| 亚洲精品网址| 国内av免费观看| 亚洲欧美日韩人成在线播放| 一级黄色片免费| 北条麻妃久久精品| 久久精品 人人爱| 中文一区一区三区免费| 极品少妇一区二区| 亚洲天堂黄色片| 日韩一二三区视频| 欧美日韩在线视频免费观看| 99视频免费观看| 亚洲福利专区| 亚洲久久久久久| 日韩欧美中文字幕在线观看| 青青九九免费视频在线| 国产精品国语对白| 日韩1区在线| 手机av在线网| 洋洋成人永久网站入口| 手机看片1024日韩| 日本亚洲欧洲色α| 成人一区二区| 亚洲理论中文字幕| 夜夜嗨av一区二区三区网页| 五月天激情婷婷| 国产91色在线免费| 久久国产亚洲精品| 午夜免费视频网站| 亚洲成人资源网| 可以在线观看的av| 国产一区视频在线| 亚洲视频久久| 成人免费毛片糖心| 欧美电影在线免费观看| 欧美videossex| 欧美激情专区| 九九九久久久精品| 日本三级网站在线观看| 日韩精品免费在线| 激情中国色综合| 亚洲精品蜜桃久久久久久| xnxx国产精品| 97人人爽人人爽人人爽 | 欧美日韩视频免费观看| 在线观看福利一区| 成人h版在线观看| 在线观看亚洲黄色| 欧美大秀在线观看| 国产精品一在线观看| 一级片免费在线观看视频| 黑人精品xxx一区一二区| 免费黄色网址在线观看| 国产精品区一区| 麻豆精品一区二区av白丝在线| 久久丫精品久久丫| 中文字幕免费国产精品| 精品视频自拍| 亚洲黄色a v| 午夜欧美视频在线观看| mm1313亚洲国产精品美女| 欧美日韩在线精品一区二区三区| 国产成人在线色|