精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

阿里二面:RocketMQ 消費者拉取一批消息,其中部分消費失敗了,偏移量怎樣更新?

人工智能 機器學習
如果一批消息按照順序消費,是不可能出現第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環,不再繼續進行消費。

大家好,我是君哥。

最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批消息,比如 100 條,第 100 條消息消費成功了,但是第 50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊一下,如果一批消息有消費失敗的情況時,偏移量怎么保存。

1 拉取消息

1.1 封裝拉取請求

以 RocketMQ 推模式為例,RocketMQ 消費者啟動代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

上面的 DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是 start。消費者啟動后會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取消息的請求到 pullRequestQueue。這個過程涉及到的 UML 類圖如下:

圖片

1.2 處理拉取請求

封裝好拉取消息的請求 PullRequest 后,RocketMQ 就會不停地從 pullRequestQueue 獲取消息拉取請求進行處理。UML 類圖如下:

圖片

拉取消息的入口方法是一個死循環,代碼如下:

//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

這里拉取到消息后,提交給 PullCallback 這個回調函數進行處理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封裝到 ConsumeRequest 這個線程類來處理。把代碼精簡后,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.執行消費邏輯,這里的邏輯是在文章開頭的代碼中定義的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.處理消費結果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

2 處理消費結果

2.1 并發消息

并發消息處理消費結果的代碼做精簡后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

從上面的代碼可以看出,如果處理消息的邏輯是串行的,比如文章開頭的代碼使用 for 循環來處理消息,那如果在某一條消息處理失敗了,直接退出循環,給 ConsumeConcurrentlyContext 的 ackIndex 變量賦值為消息列表中失敗消息的位置,這樣這條失敗消息后面的消息就不再處理了,發送給 Broker 等待重新拉取。代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

消費成功的消息則從 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要發送消息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理消息的邏輯是并行的,處理消息失敗后給 ackIndex 賦值是沒有意義的,因為可能有多條消息失敗,給 ackIndex 變量賦值并不準確。最好的方法就是給 ackIndex 賦值 0,整批消息全部重新消費,這樣又可能帶來冥等問題。

2.2 順序消息

對于順序消息,從 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 總結

回到開頭的問題,如果一批消息按照順序消費,是不可能出現第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環,不再繼續進行消費。

如果是并發消費,如果出現了這種情況,建議是整批消息全部重新消費,也就是給 ackIndex 賦值 0,這樣必須考慮冥等問題。

責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-03-14 11:05:01

RocketMQRedis緩存

2022-06-02 10:54:16

BrokerRocketMQ

2023-03-14 08:45:25

RocketMQ消息消費

2022-08-15 10:45:34

RocketMQ消息隊列

2021-12-17 08:17:00

RocketMQ數據結構消息中間件

2024-01-24 09:00:31

SSD訂閱關系內存

2024-04-22 00:00:00

RocketMQ優化位點

2022-07-07 09:00:49

RocketMQ消費者消息消費

2022-11-08 07:36:17

RocketMQ消費者消息堆積

2009-04-15 11:17:23

2021-07-12 10:25:03

RocketMQ數據結構kafka

2011-08-05 16:21:24

2011-07-22 16:25:38

CA TechnoloIT消費化

2023-06-01 08:08:38

kafka消費者分區策略

2021-04-20 08:32:51

消息MQ隊列

2024-03-14 11:58:43

2025-02-26 07:53:21

2015-08-26 09:39:30

java消費者

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式

2021-03-01 07:31:53

消息支付高可用
點贊
收藏

51CTO技術棧公眾號

亚洲性在线观看| 免费成人深夜夜行网站| 日本不卡一二三| 亚洲国产精品av| 91蜜桃网站免费观看| 99精品在线播放| 99久久影视| 日韩精品999| 中文字幕55页| 欧美日韩精品免费观看视完整| ㊣最新国产の精品bt伙计久久| 国产精品区免费视频| 夜夜躁日日躁狠狠久久av| 欧美日韩一区二区高清| 亚洲视频综合网| 免费啪视频在线观看| 色综合天天色| 激情久久av一区av二区av三区| www.午夜色| 免费一级毛片在线观看| 国产成人午夜视频| 国产欧美日韩亚洲精品| 无码人妻av免费一区二区三区| 午夜日韩激情| 日韩一区av在线| 受虐m奴xxx在线观看| 盗摄牛牛av影视一区二区| 欧美群妇大交群的观看方式| 丝袜老师办公室里做好紧好爽| 最新黄网在线观看| 国产精品毛片久久久久久久| 老司机精品福利在线观看| 国产成人a人亚洲精品无码| 蜜桃精品视频在线| 日韩免费av片在线观看| 日韩欧美三级视频| 国内综合精品午夜久久资源| 久久精品色欧美aⅴ一区二区| 亚洲精品视频久久久| 红杏视频成人| 精品国产亚洲在线| 337p日本欧洲亚洲大胆张筱雨| 日韩国产一二三区| 欧美中文一区二区三区| 成人三级视频在线播放| 99爱在线视频| 天天射综合影视| 久久99中文字幕| 国产黄色大片在线观看| 亚洲第一在线综合网站| 被灌满精子的波多野结衣| 中文字幕在线观看播放| 亚洲乱码精品一二三四区日韩在线| 亚洲欧美日韩另类精品一区二区三区| 成人h小游戏| 国产午夜亚洲精品不卡| 日韩三级电影免费观看| 黄色国产在线| 欧美国产丝袜视频| 亚洲人成网站在线观看播放| 99中文字幕一区| 国产精品久久久一本精品| 一区二区三区四区视频在线观看| aaa日本高清在线播放免费观看| 欧美韩国一区二区| 国产精品jizz在线观看老狼| 中文字幕中文字幕在线十八区| 亚洲靠逼com| 成年人午夜免费视频| 绿色成人影院| 欧洲国内综合视频| 亚洲va在线va天堂va偷拍| 日韩激情精品| 日韩电影免费观看中文字幕 | 日本系列第一页| 99在线精品免费视频九九视| 国产91精品在线播放| 亚洲天堂视频网| 高清视频一区二区| 久久综合一区| 免费在线观看av| 亚洲曰韩产成在线| 国产免费黄色av| 欧美123区| 日韩欧美一区二区久久婷婷| 亚洲综合自拍网| 日韩1区2区| 久热爱精品视频线路一| 国产真实的和子乱拍在线观看| 蜜桃视频一区| 亚洲一区二区三区xxx视频| 女人18毛片水真多18精品| 国产喷白浆一区二区三区| 国产精品亚洲天堂| 麻豆视频在线观看免费网站黄| 91久久精品一区二区二区| 天天av天天操| 九一国产精品| 欧美大片在线看| 无码人妻一区二区三区线| 国产黄人亚洲片| 欧美一区二区三区在线免费观看| 麻豆av在线免费看| 欧美三级免费观看| 成人三级做爰av| 欧美日一区二区| 韩日精品中文字幕| 一区二区三区黄| 91在线观看免费视频| 国产日本欧美在线| 成人影院av| 日韩精品专区在线影院观看| 91麻豆精品国产91久久综合| 亚洲三级影院| 91人成网站www| 超碰在线国产| 日韩欧美在线字幕| 不许穿内裤随时挨c调教h苏绵| 一区二区小说| 性色av一区二区咪爱| 国产毛片毛片毛片毛片毛片| 久久精品亚洲麻豆av一区二区| 国产毛片久久久久久国产毛片| 国产第一精品| 国产亚洲欧美日韩美女| 久久久久久久久影院| 国产成人久久精品77777最新版本| 日韩免费中文专区| 不卡一二三区| 日韩精品免费综合视频在线播放| 激情五月婷婷小说| 精品无人区卡一卡二卡三乱码免费卡| 欧美日韩国产综合在线| 波多野结衣在线观看| 日韩一级高清毛片| 国产一区二区精彩视频| 久久精品国产亚洲高清剧情介绍| 日韩免费中文专区| 国产一区二区三区朝在线观看| 日韩精品亚洲精品| 91午夜视频在线观看| 国产成人av网站| 波多野结衣 作品| 亚洲国产欧美国产第一区| 久久精视频免费在线久久完整在线看| 日韩xxx视频| 国产精品久久午夜| 中文字幕一区二区三区四区在线视频| 国产精选一区| 国产精品国模在线| 国产精品99999| 欧美日韩亚洲综合在线 欧美亚洲特黄一级 | 国产精品123区| 女人床在线观看| 91免费精品国偷自产在线在线| 欧美多人乱p欧美4p久久| 国产av一区二区三区精品| 亚洲精品久久嫩草网站秘色| 91精品国产高清91久久久久久| 欧美日韩国产欧| 精品一区二区不卡| 日本韩国欧美| 日韩一区二区福利| 成人av手机在线| 午夜av一区二区| 亚洲黄色免费视频| 蜜桃精品视频在线| 成人在线免费高清视频| 欧美激情99| 国产精品国产亚洲伊人久久| 久操视频在线| 精品日韩成人av| 国产成人无码一区二区在线播放| 日本一区二区三级电影在线观看| 天天色天天综合网| 影音先锋在线一区| 欧美日本韩国国产| 91麻豆精品国产综合久久久| 久久久久久久一| 成在在线免费视频| 欧美一区二区三区视频在线观看| 男人的天堂一区| 国产欧美va欧美不卡在线| 超碰在线免费av| 免费日韩av片| 美女在线免费视频| 神马香蕉久久| 国产原创欧美精品| 小h片在线观看| 久久精品91久久香蕉加勒比| 欧美一级在线免费观看| 欧美日韩一区二区三区在线看 | 亚洲最大的网站| 欧美gv在线观看| 精品国产一区二区在线 | 夜夜躁日日躁狠狠久久88av| 国产激情无套内精对白视频| 欧美日韩亚洲精品内裤| 国产又色又爽又高潮免费| av中文字幕在线不卡| 色噜噜狠狠一区二区| 亚洲日本免费| 国产盗摄视频在线观看| 免费看成人吃奶视频在线| 91在线观看免费| 日韩天堂在线| 国语自产偷拍精品视频偷| 日韩成人影视| 亚洲人成电影网站色…| 亚洲av永久无码国产精品久久| 91久久精品一区二区三区| 日本少妇性生活| 亚洲日本成人在线观看| 国产又黄又粗视频| jvid福利写真一区二区三区| 天美一区二区三区| 美女性感视频久久| 乱子伦视频在线看| 夜夜嗨一区二区| www.激情网| 国产精品成人一区二区不卡| 日本精品免费| 亚洲自拍电影| 久久久久九九九| 电影一区二区在线观看| 91在线中文字幕| 国产美女久久| 国产精品jizz在线观看麻豆| 欧美在线极品| 38少妇精品导航| 18aaaa精品欧美大片h| 久久国产精品视频| 拍真实国产伦偷精品| 在线观看中文字幕亚洲| 久久这里精品| 亚洲色图第一页| 免费在线高清av| 亚洲图中文字幕| 欧美成人综合在线| 亚洲欧美日韩一区在线| 头脑特工队2在线播放| 亚洲国产精品资源| 日批视频在线播放| 精品国产99国产精品| 蜜臀久久99精品久久久| 欧美videos中文字幕| 精品女同一区二区三区| 日韩欧美一二三四区| 国产91视频在线| 欧美videofree性高清杂交| 国产国语亲子伦亲子| 日韩精品一区二区三区视频在线观看 | heyzo在线播放| 97视频在线观看免费| 午夜影院在线播放| 日韩美女福利视频| 手机看片久久| 国产精品永久在线| 亚洲午夜国产成人| 亚洲综合中文字幕在线观看| 99国产精品久久一区二区三区| 国产伦精品一区二区三区视频黑人| 成人免费直播在线| 久久天堂国产精品| 波多野结衣的一区二区三区| 自拍偷拍一区二区三区| 国内自拍一区| 亚洲乱码中文字幕久久孕妇黑人| 肉色丝袜一区二区| 日本男人操女人| 韩国精品在线观看| 在线免费看黄色片| 久久一区二区三区四区| 久久久久久成人网| 亚洲图片激情小说| 日韩 欧美 综合| 欧美日免费三级在线| 国产suv一区二区| 日韩久久午夜影院| 日本不卡三区| 久久噜噜噜精品国产亚洲综合 | 亚洲福利国产| 老头吃奶性行交视频| 国产激情偷乱视频一区二区三区| 国产精品无码专区| 国产精品不卡一区| 日本三级片在线观看| 欧美性受xxxx黑人xyx性爽| www.日日夜夜| 亚洲欧美在线x视频| 爆操欧美美女| 国产成人avxxxxx在线看| 日韩黄色av| 日韩电影大全在线观看| 一区二区中文字| 精品少妇无遮挡毛片| 国产成人免费av在线| 综合 欧美 亚洲日本| 偷拍与自拍一区| 国产视频手机在线观看| 亚洲欧美制服中文字幕| 国产美女福利在线观看| 成人亚洲激情网| 精品国产一区二区三区四区| 97免费视频观看| 捆绑调教美女网站视频一区| 亚洲色图14p| 一个色在线综合| 亚洲无码久久久久久久| 亚洲欧美日韩一区二区在线 | 日本精品视频一区二区三区| 亚洲第一页综合| 日韩亚洲精品视频| 欧洲一级精品| 九色91在线视频| 国内精品久久久久久久97牛牛 | 九一在线视频| 久久久在线视频| 日韩欧美中文在线观看| 综合一区中文字幕| 免费美女久久99| 一级黄色录像毛片| 色婷婷综合久久久中文字幕| 三级网站在线看| 欧美激情乱人伦一区| 久久精品一级| 成人性做爰片免费视频| 精品在线播放免费| 亚洲精品自拍视频在线观看| 欧美综合久久久| 巨骚激情综合| 国产成人一区二区| 国产精品午夜一区二区三区| avav在线看| 国产日韩欧美在线一区| 国产黄色免费观看| 日韩精品在线观看网站| 天堂8中文在线最新版在线| 国产在线资源一区| 日韩一级不卡| 国产黄色三级网站| 偷拍一区二区三区| 十九岁完整版在线观看好看云免费| 国外成人在线播放| 麻豆一区二区麻豆免费观看| 日本国产在线播放| av一区二区三区| 久久久免费高清视频| 亚洲精品视频免费| 电影一区二区三区| 亚洲精品久久区二区三区蜜桃臀 | 欧美欧美天天天天操| 四川一级毛毛片| 亚洲成a人v欧美综合天堂| 欧美 日韩 中文字幕| 91福利视频网| 要久久电视剧全集免费| 国产精品69页| 国产精品久久午夜夜伦鲁鲁| 国产乱淫片视频| 欧美日韩成人在线播放| 国产色噜噜噜91在线精品| av免费观看网| 国产片一区二区| 国产av一区二区三区精品| 久久久欧美一区二区| 亚洲福利天堂| 国产视频1区2区3区| 亚洲欧美激情一区二区| 欧美 日韩 中文字幕| 国产v综合ⅴ日韩v欧美大片| 日韩精品午夜| 无码国产69精品久久久久网站| 午夜精品免费在线| 东凛在线观看| 97久久天天综合色天天综合色hd | 99精品一区| 怡红院一区二区| 欧美亚洲国产怡红院影院| 91精选在线| 乱一区二区三区在线播放| 麻豆精品国产传媒mv男同| 久久久久久免费观看| 亚洲日本成人女熟在线观看| 久久精品一级| 人人爽人人av| 亚洲一区成人在线| 高清国产福利在线观看| 99热99热| 蜜桃精品在线观看| 日本视频www| 日韩最新免费不卡| 日韩av三区| 最新av免费在线观看| 精品国产乱码久久久久久婷婷| 日韩在线资源| 日本精品二区| 成人在线视频一区| 亚洲资源在线播放| 欧美一区二区三区四区在线|