精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

老弟問我,RocketMQ 中的 ProcessQueue 怎么理解?

開發(fā) 前端
ProcessQueue 是 MessageQueue 的消費快照,可以協(xié)助消費者進(jìn)行消息拉取、消息消費、更新偏移量、限流。

大家好,我是君哥。

今天來分享 RocketMQ 中一個非常重要又不太好理解的知識點-ProcessQueue。

一句話概括,ProcessQueue 就是 MessageQueue 的消費快照。看下面這張圖:

圖片

1 ProcessQueue 構(gòu)建

RocketMQ 客戶端啟動時,會開啟一個 rebalance 線程,代碼如下:

//MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
//...
// Start rebalance service
this.rebalanceService.start();
//...
}
}
}

這個線程會不停的做重平衡操作,對 ProcessQueue 進(jìn)行維護(hù)。在重平衡線程類 RebalanceImpl 定義了一個變量 processQueueTable,數(shù)據(jù)結(jié)構(gòu)如下:

圖片

可以看到,在 processQueueTable 這個數(shù)據(jù)結(jié)構(gòu)上維護(hù)了 MessageQueue 和 ProcessQueue 的映射。

下面看一下維護(hù) processQueueTable 的代碼:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
//從processQueueTable上移除
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY://拉模式
break;
case CONSUME_PASSIVELY://推模式
//從processQueueTable上移除
break;
default:
break;
}
}
}
}
//創(chuàng)建ProcessQueue并放到processQueueTable
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//...
ProcessQueue pq = new ProcessQueue();

long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}

if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
//封裝好processQueueTable后再創(chuàng)建一個PullRequest進(jìn)行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

this.dispatchPullRequest(pullRequestList);

return changed;
}

2 拉取消息

上一節(jié)中構(gòu)建 ProcessQueue 后,會再創(chuàng)建一個 PullRequest,這個 PullRequest 封裝了 MessageQueue 和 ProcessQueue,創(chuàng)建成功后被放到了 PullMessageService 中的 pullRequestQueue 變量:

//PullMessageService.java
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

這里以 RocketMQ 的推模式為例,Consumer 拉取到消息后,會進(jìn)行如下處理:

  1. 對拉取到的消息根據(jù) TAG 再次
    進(jìn)行過濾;
  2. 更新 PullRequest 下次拉取的偏移量 nextOffset;
  3. 把拉取的消息封裝到 ProcessQueue 的 msgTreeMap(
    ?
    放到 msgTreeMap 之前首先要獲取到寫鎖 treeMapLock
    ?
    );
  4. 封裝 ConsumeRequest 進(jìn)行消息消費;
  5. 封裝消息拉取請求再次進(jìn)行拉取。

代碼如下:

//DefaultMQPushConsumerImpl.java
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//1. 對拉取到的消息根據(jù) TAG 再次進(jìn)行過濾
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
//2. 更新 PullRequest 下次拉取的偏移量 nextOffset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//3. 把拉取的消息封裝到 ProcessQueue 的 msgTreeMap
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//4. 封裝 ConsumeRequest 進(jìn)行消息消費
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//5. 封裝消息拉取請求
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
//...
}
}
}

3 消費消息

在上一節(jié)提到過,拉取到消息后,會把消息封裝成一個 ConsumeRequest,這個線程類會調(diào)用消費者定義的 MessageListener 進(jìn)行消費處理。看一下源代碼:

//ConsumeMessageConcurrentlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

try {
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}//...

if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}

消息消費成功后,會調(diào)用 processConsumeResult 方法進(jìn)行結(jié)果處理。對于廣播模式,發(fā)送失敗后不會做重試,相當(dāng)于把消息丟棄,而對于集群模式,消費失敗的消息會發(fā)送到 Broker 端等待消費者重新拉取進(jìn)行重試。

消費結(jié)果處理完后,消費成功的消息會從 ProcessQueue 的 msgTreeMap 中移除(需要獲取到寫鎖 treeMapLock),同時從 msgTreeMap 中獲取最小的 Offset 來更新對應(yīng) MessageQueue 的偏移量。這個邏輯可以參考下面代碼:

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
break;
//...
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//...
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消費失敗的,發(fā)送回Broker
boolean result = this.sendMessageBack(msg, context);
//...
}

break;
default:
break;
}
//從msgTreeMap中移除并返回msgTreeMap第一條消息的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

4 消費者限流

4.1 緩存消息數(shù)量

如果消費者緩存的消息數(shù)量大于 RocketMQ 配置的閾值(默認(rèn) 1000),就會觸發(fā)延遲拉取,而消費者緩存的消息數(shù)量就來自 ProcessQueue,看下面代碼:

long cachedMessageCount = processQueue.getMsgCount().get();
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.2 緩存的消息大小

如果消費者緩存的消息大小大于 RocketMQ 配置的閾值(默認(rèn) 100M),就會觸發(fā)延遲拉取,而消費者緩存的消息大小就來自 ProcessQueue,看下面代碼:

long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.3 消息間隔

對于普通消息,如果消費偏移量間隔大于配置的閾值(默認(rèn) 2000),就會觸發(fā)延遲拉取,而消息間隔就來自 ProcessQueue,看下面代碼:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
}

4.4 獲取鎖失敗

對于順序消息,如果獲取鎖失敗,也會觸發(fā)延遲拉取,而判斷獲取鎖是否成功,也是在 ProcessQueue,看下面代碼:

if (processQueue.isLocked()) {
//...
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

5 總結(jié)

ProcessQueue 是 MessageQueue 的消費快照,可以協(xié)助消費者進(jìn)行消息拉取、消息消費、更新偏移量、限流。最后,看一下 ProcessQueue 的數(shù)據(jù)結(jié)構(gòu):

圖片

責(zé)任編輯:武曉燕 來源: 君哥聊技術(shù)
相關(guān)推薦

2023-09-26 08:01:46

消費者TopicRocketMQ

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2019-11-15 13:52:06

機(jī)器學(xué)習(xí)Shapley計算

2021-01-13 10:28:16

Maven插件Mojo

2022-08-26 00:02:03

RocketMQ單體架構(gòu)MQ

2023-04-11 08:35:22

RocketMQ云原生

2022-06-13 11:05:35

RocketMQ消費者線程

2022-07-11 11:06:11

RocketMQ函數(shù).消費端

2020-09-24 14:40:55

Python 開發(fā)編程語言

2021-10-08 07:53:01

Go 尋址元素

2022-04-19 07:31:28

事務(wù)隔離機(jī)制數(shù)據(jù)庫

2020-04-24 09:26:15

RocketMQ分布式MetaQ

2024-04-29 07:42:20

數(shù)據(jù)庫Mybatis事務(wù)

2023-12-25 19:28:59

RocketMQ大數(shù)據(jù)

2022-07-04 11:06:02

RocketMQ事務(wù)消息實現(xiàn)

2023-01-16 08:49:20

RocketMQ定時任務(wù)源代

2022-07-18 21:53:46

RocketMQ廣播消息

2022-06-27 11:04:24

RocketMQ順序消息

2014-11-11 15:25:30

PHPWeb

2020-07-31 08:06:39

MySQL遞歸查詢
點贊
收藏

51CTO技術(shù)棧公眾號

免费观看日韩毛片| 国产日韩一区欧美| 男女做暖暖视频| 日韩在线观看中文字幕| 午夜精品一区在线观看| 日本日本精品二区免费| 国产视频第一页| 国产欧美精品| 久久激情视频免费观看| 亚洲一级av无码毛片精品| 影音成人av| 亚洲国产精品久久人人爱蜜臀| 免费看成人片| www久久久com| 人妖欧美一区二区| 韩国美女主播一区| 久久成人小视频| 偷拍自拍一区| 日韩欧美你懂的| 亚州精品一二三区| 日本蜜桃在线观看视频| 伊人色综合久久天天人手人婷| 欧美日韩国产免费一区二区三区| 国产片在线播放| 日本亚洲一区二区| 91国产中文字幕| 国产大片免费看| 成人国产精品一级毛片视频| 日韩av影视综合网| 又黄又色的网站| 亚洲日本中文| 欧美午夜一区二区| 免费观看精品视频| 成人福利影视| 亚洲精品国产精华液| 图片区小说区区亚洲五月| 老司机午夜福利视频| 精品制服美女丁香| 国产欧美日韩最新| 黄色网址中文字幕| 久久久久在线| 8050国产精品久久久久久| 免费在线一区二区三区| 91精品在线观看国产| 在线一区二区日韩| 一色道久久88加勒比一| 日韩aaa久久蜜桃av| 亚洲国产精品免费| 毛茸茸free性熟hd| 国产精品色在线网站| 欧美一区二区三区在| 三上悠亚在线一区| 亚洲欧洲二区| 日韩一区二区在线观看| 午夜视频在线网站| 亚洲国产精选| 91精品国产色综合久久不卡蜜臀 | 一本久久知道综合久久| 久久久久国产精品www| 国产小视频在线观看免费| 欧美精品大片| 欧美精品videos性欧美| 精品午夜福利在线观看| 最新国产乱人伦偷精品免费网站| 久久久久久久久综合| 久久精品视频9| 99精品国产在热久久婷婷| 久久久在线视频| 午夜精品三级久久久有码| 亚洲伦伦在线| 国语自产精品视频在线看抢先版图片| 黄色激情视频在线观看| 午夜亚洲影视| 国产精品视频播放| 国产欧美一级片| 高清不卡在线观看av| 国产伦精品一区二区三毛| 色偷偷在线观看| 久久综合成人精品亚洲另类欧美| 欧美一级日本a级v片| 午夜免费福利在线观看| 伊人婷婷欧美激情| 日本精品免费在线观看| 日韩一区二区三区免费视频| 538prom精品视频线放| 久草免费资源站| 四虎影视精品| 色噜噜狠狠狠综合曰曰曰88av | 午夜精品在线看| 精品免费国产一区二区| 久久99国产精品二区高清软件| 在线成人免费视频| 国产激情视频网站| 欧美亚洲国产精品久久| 欧美噜噜久久久xxx| 国产精品午夜影院| 蜜桃免费网站一区二区三区| 超碰97在线资源| 精品视频三区| 一区二区三区中文字幕电影| 欧美丰满熟妇bbbbbb百度| av一级久久| 亚洲男人天堂网站| 亚洲av无码一区二区三区在线| 91久久中文| 91久久国产综合久久91精品网站| 午夜成人免费影院| 最新成人av在线| 免费无码av片在线观看| 色妞ww精品视频7777| 亚洲人成网在线播放| 青青操国产视频| 琪琪一区二区三区| 久久久久久久久久码影片| 黄色网页在线免费看| 91福利在线看| 国产高清成人久久| 婷婷激情图片久久| 国产www精品| 日本黄色免费视频| 亚洲精品中文在线| 污污的视频免费| 九九久久婷婷| 国外色69视频在线观看| 成人av无码一区二区三区| 欧美激情在线一区二区| 无码aⅴ精品一区二区三区浪潮 | 伊人久久青草| 日韩和的一区二在线| 亚洲精品在线观看网站| 欧美国产精品一二三| 麻豆91在线观看| 欧美亚州在线观看| 日韩深夜视频| 亚洲国产高清高潮精品美女| 欧美成人黄色网| 韩国成人在线视频| 在线综合视频网站| 国产精品xxx| 夜夜嗨av一区二区三区免费区| 国产一区二区三区影院| 成人av片在线观看| 人人妻人人做人人爽| 少妇精品在线| 久久久久久久久久久成人| www.国产黄色| 一区二区三区日韩| 最新国产精品自拍| 亚洲午夜av| 国产一区二区不卡视频在线观看| 人人超在线公开视频| 日韩一区二区免费高清| 国产乱国产乱老熟300| 国产精品一二三区在线| 17c丨国产丨精品视频| 亚洲乱码一区| 久久久久久久影院| 天堂av在线免费观看| 黑人狂躁日本妞一区二区三区| 久久久久久久久免费看无码| 国产精品综合色区在线观看| 久久精品日产第一区二区三区| 性欧美xxx69hd高清| 亚洲欧美国内爽妇网| 久久午夜鲁丝片| 亚洲欧洲成人自拍| 三上悠亚 电影| 99精品免费| 欧美一区二区视频在线| 巨大黑人极品videos精品| 久久精品亚洲一区| 成人午夜福利视频| 日韩欧美aⅴ综合网站发布| www..com.cn蕾丝视频在线观看免费版| 三级一区在线视频先锋| 中文字幕中文字幕在线中一区高清| 国产午夜亚洲精品一级在线| 欧美黄色www| 欧美男男同志| 7777精品伊人久久久大香线蕉完整版 | 老司机av网站| 中文久久精品| 一区不卡字幕| 久久夜色电影| 国产精品亚洲美女av网站| 污影院在线观看| 亚洲美女av网站| 国产精品久久久久久久免费| 亚洲一区在线观看免费观看电影高清 | 99精品久久99久久久久| 久久99999| 精品动漫一区| 亚洲欧洲久久| 卡一精品卡二卡三网站乱码 | 白嫩白嫩国产精品| 国产精品白嫩美女在线观看 | 黄色av网站在线看| 91精品国产91久久综合桃花| 在线观看精品国产| 成人欧美一区二区三区黑人麻豆| 国产国语老龄妇女a片| 视频一区中文字幕| 男人日女人视频网站| 久久一区二区三区电影| 国产精选一区二区| 亚洲视频自拍| 国产99久久精品一区二区 夜夜躁日日躁 | 亚洲丝袜啪啪| 产国精品偷在线| 久久久加勒比| 国产激情999| 97蜜桃久久| 美女黄色丝袜一区| av片在线免费观看| 日韩理论片久久| 亚洲精品一区二区三区蜜桃 | 青青草国产精品一区二区| 羞羞的网站在线观看| 中文字幕9999| 飘雪影院手机免费高清版在线观看| 日韩欧美在线一区二区三区| 中文字幕在线观看国产| 欧美性高潮在线| 久久久久亚洲天堂| 亚洲精品成人在线| 大胸美女被爆操| 久久免费午夜影院| 亚洲精品女人久久久| 国产精品18久久久久久vr| 天堂在线中文在线| 秋霞av亚洲一区二区三| 免费无码av片在线观看| 国产日韩欧美一区在线| 大伊香蕉精品视频在线| 在线精品国产| 国产大尺度在线观看| 色综合久久一区二区三区| 日本视频一区二区不卡| 伊甸园亚洲一区| 久久涩涩网站| 在线日韩一区| 欧美日韩免费高清| 日韩伦理一区二区三区| 精品国产中文字幕| 林ゆな中文字幕一区二区| 国产日韩一区欧美| 欧美久久精品| 久久综合婷婷综合| 亚洲人成亚洲精品| 欧美日韩亚洲一区二区三区四区| 亚洲精品亚洲人成在线| 欧美综合激情| 精品免费视频| 一区二区不卡在线视频 午夜欧美不卡'| 欧美色图国产精品| 性刺激综合网| 91精品蜜臀一区二区三区在线| 国产精品久久成人免费观看| 91一区二区| 丰满人妻一区二区三区53号| 国产中文一区| 337p粉嫩大胆噜噜噜鲁| 麻豆精品网站| 无需播放器的av| 狠狠色伊人亚洲综合成人| 亚洲国产欧美日韩在线| 国产超碰在线一区| 亚洲第一香蕉网| 中文字幕va一区二区三区| 国产精品久久久免费看| 樱花草国产18久久久久| 亚洲永久精品在线观看| 欧美最新大片在线看| 国产又爽又黄又嫩又猛又粗| 日韩欧美一区电影| 亚洲av激情无码专区在线播放| 亚洲新中文字幕| 国产原创在线观看| 91精品国产高清自在线| 国产成人免费9x9x人网站视频| 亚洲va电影大全| 开心激情综合| 亚洲成人蜜桃| 国产一区观看| 国产小视频精品| 国产**成人网毛片九色 | 欧美伊久线香蕉线新在线| 国产综合色在线观看| 97伦理在线四区| 国产精品中文字幕亚洲欧美| 国产成人精品免费看在线播放| 在线成人欧美| 伊人影院综合在线| 成人国产在线观看| 中文字幕在线观看二区| 亚洲va韩国va欧美va精品| 中日韩av在线| 亚洲国产三级网| 欧美私人网站| 欧洲成人免费视频| 玖玖玖视频精品| 日韩偷拍一区二区| 伊人久久婷婷| 天天久久综合网| 久久夜色精品国产欧美乱极品| 国精产品一区一区二区三区mba| 亚洲va欧美va国产va天堂影院| 136福利视频导航| 日韩成人在线电影网| 成人影院在线看| 国产精品久久久久久久久| 国产suv精品一区二区四区视频| 神马影院午夜我不卡| 中文亚洲欧美| 中文字幕一区二区三区四| 久久久99精品免费观看不卡| 久久午夜无码鲁丝片午夜精品| 欧美日韩中文一区| 欧洲伦理片一区 二区 三区| 欧美精品久久久久久久| 成人黄色理论片| 日韩高清在线播放| 免费在线观看成人av| 蜜臀视频在线观看| 夜夜嗨av一区二区三区网页| 亚洲性在线观看| 一个色综合导航| 日韩av福利| 久久综合狠狠综合久久综青草| 黄色欧美成人| 少妇丰满尤物大尺度写真| 一区精品在线播放| 中文无码av一区二区三区| 亚洲欧美999| 在线免费日韩片| 久久99精品久久久久久秒播放器| 黄色工厂这里只有精品| 午夜福利三级理论电影 | 久久久久久人妻一区二区三区| 国产精品一区在线观看乱码| 午夜激情福利网| 91精品啪在线观看国产60岁| 日本高清视频在线播放| 国产精品久久久久久久久久久久久久| 美女久久久久| 日韩免费毛片视频| 国产亚洲制服色| 国产伦精品一区二区三区视频网站| 日韩精品欧美激情| 午夜欧美激情| 日本不卡二区| 男人操女人的视频在线观看欧美| 黄色片在线观看免费| 欧美伊人久久久久久午夜久久久久| yiren22亚洲综合伊人22| 国产精品久久久久久亚洲调教| re久久精品视频| 亚洲va在线va天堂va偷拍| 亚洲日本中文字幕区| 国产麻豆一精品一男同| 久久99青青精品免费观看| 91精品国产乱码久久久竹菊| 国产九色porny| 91免费小视频| 日批视频免费观看| 久久夜色精品国产亚洲aⅴ| 玖玖玖视频精品| 97在线国产视频| 久久久久久久久久美女| 中文字幕 视频一区| 萌白酱国产一区二区| 国产福利一区二区精品秒拍| 日日鲁鲁鲁夜夜爽爽狠狠视频97 | 浪潮色综合久久天堂| 亚欧精品在线| 国产福利一区在线| 天天操天天干视频| 播播国产欧美激情| 伊人久久亚洲| 日本999视频| 亚洲精品乱码久久久久久| 香蕉视频成人在线| 国产精品久久久久久亚洲调教| 亚洲精品888| 国产精品久久久免费观看| 欧美三级韩国三级日本三斤| 99在线播放| 日本一区二区三区四区高清视频 | 亚洲小说欧美另类社区| a级大片在线观看| 91精品国产综合久久久蜜臀图片| 波多野结衣在线播放| 性高潮久久久久久久久| 成人精品视频一区二区三区 | 精品国产乱码久久久久久牛牛| 巨茎人妖videos另类| 亚洲五码在线观看视频| 国产亚洲综合在线| 欧性猛交ⅹxxx乱大交| 国产精品一区二区久久国产|