精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

一文帶你理解 RocketMQ 廣播模式實現機制

開發 架構
本文主要講解了 RocketMQ 廣播消息的實現機制,理解廣播消息。

大家好,我是君哥。今天聊聊 RocketMQ 的廣播消息實現機制。

RocketMQ 有兩種消費模式,集群模式和廣播模式。

集群模式是指 RocketMQ 中的一條消息只能被同一個消費者組中的一個消費者消費。如下圖,Producer 向 TopicTest 這個 Topic 并發寫入 3 條新消息,分別被分配到了 MessageQueue1~MessageQueue3 這 3 個隊列,然后 Group 中的三個 Consumer 分別消費了一條消息:

圖片

廣播模式是  RocketMQ 中的消息會被消費組中的每個消費者都消費一次,如下圖:

圖片

使用 RocketMQ 的廣播模式時,需要在消費端進行定義,下面是一段官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}

從代碼中可以看到,在定義 Consumer 時,通過 messageModel 這個屬性指定消費模式,這里指定為 BROADCASTING,也就啟動了廣播模式的消費者。

1、消費者啟動

以 RocketMQ 推模式為例,看一下消費者調用關系類圖:

圖片

DefaultMQPushConsumer 作為啟動入口類,它的 start 方法調用了 DefaultMQPushConsumerImpl 類的 start 方法,下面重點看一下這個方法。

(1)拷貝訂閱關系

start 方法中調用了 copySubscription 方法,代碼如下:

private void copySubscription() throws MQClientException {
try {
//拷貝訂閱關系
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

這里的代碼有一點需要注意:集群模式會創建一個重試 Topic 的訂閱關系,而廣播模式是不會創建這個訂閱關系的。也就是說廣播模式不考慮重試。

(2)初始化偏移量

下面是初始化 offset 的代碼:

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}

從上面的代碼可以看到,廣播模式使用了 LocalFileOffsetStore,也就是說偏移量保存在客戶端本地,除了在內存中會保存,在本地文件中也會保存。

2、消息拉取

ConsumeMessageService 是真正拉取消息的地方,消費者初始化時會初始化 ConsumeMessageService,并且這里會區分并發消息還是順序消息。

(1)順序消息

在集群模式下,需要獲取到 processQueue 的鎖才會拉取消息,而在廣播模式下,不用獲取鎖,直接就可以拉取消息。判斷邏輯如下:

//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
}
}

這里有個疑問,對于順序消息,獲取鎖是必須的,這樣才能保證一個 processQueue 只能由一個線程進行處理,從而保證消費的順序性。那對于廣播模式,為什么不用獲取 processQueue 的鎖呢?難道廣播模式不支持順序消息?

(2)并發消息

對于并發消息,廣播模式不同的是,對消費結果的處理。集群模式消費失敗后需要把消息發送回 Broker 等待再次被拉取,而廣播模式則不需要重試。代碼如下:

//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

這再次說明,廣播模式是不支持消息重試的。

3、重平衡

在消費者啟動過程中,會調用 RebalanceService 的 start 方法,進行重平衡。從重平衡的代碼中可以看到,廣播模式消費者會消費所有 MessageQueue,而集群模式下會根據負載均衡策略選擇其中幾個 MessageQueue。代碼如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//省略部分邏輯
} else {
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//省略部分邏輯
if (mqSet != null && cidAll != null) {
//省略部分邏輯
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略部分邏輯
}
break;
}
default:
break;
}
}

上面 updateProcessQueueTableInRebalance 這個方法調用前,要獲取到需要消費的 MessageQueue 集合。廣播模式下,直接取了訂閱的 Topic 下的所有集合元素,而集群模式下,則需要通過負責均衡獲取當前消費者自己要消費的 MessageQueue 集合。

4、總結

本文主要講解了 RocketMQ 廣播消息的實現機制,理解廣播消息,要把握下面幾點:

1.偏移量保存在消費者本地內存和文件中。

2.廣播消息不支持重試。

3.從源碼上看,廣播模式并不能支持順序消息。

4.廣播模式消費者訂閱了 Topic 下的所有 MessageQueue,不會重平衡。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2019-10-11 08:41:35

JVM虛擬機語言

2022-06-27 11:04:24

RocketMQ順序消息

2021-09-02 12:07:48

Swift 監聽系統Promise

2024-10-16 10:11:52

2023-07-17 10:45:03

向量數據庫NumPy

2021-09-08 17:42:45

JVM內存模型

2020-03-18 13:40:03

Spring事數據庫代碼

2022-03-18 13:58:00

RocketMQ消息隊列

2023-11-20 08:18:49

Netty服務器

2022-12-20 07:39:46

2023-12-21 17:11:21

Containerd管理工具命令行

2020-11-17 09:32:57

設計模式責任鏈

2023-12-26 08:08:02

Spring事務MySQL

2022-05-11 07:38:45

SpringWebFlux

2023-07-31 08:18:50

Docker參數容器

2023-11-06 08:16:19

APM系統運維

2021-05-29 10:11:00

Kafa數據業務

2022-11-11 19:09:13

架構

2022-06-13 11:05:35

RocketMQ消費者線程

2020-06-03 08:19:00

Kubernetes
點贊
收藏

51CTO技術棧公眾號

久久艹在线视频| 欧美日韩日日摸| 精品国产二区在线| 黄色污污网站在线观看| 欧美成人直播| 亚洲成成品网站| 嫩草影院国产精品| 色a资源在线| 97se狠狠狠综合亚洲狠狠| 国产精品久久久久久久久久久久久久| 萌白酱视频在线| 亚洲天堂av资源在线观看| 欧美日韩免费看| 亚洲日本理论电影| 日韩在线视频第一页| 免费人成精品欧美精品| 欧美精品videosex性欧美| 永久免费看mv网站入口78| www.久久久久爱免| 欧美小视频在线观看| 欧美另类videosbestsex日本| 欧美xxx.com| 丰满岳乱妇一区二区三区| 国产成一区二区| 国产精品成人网站| 国产精品久久久久久久免费观看 | 无码人妻丰满熟妇区毛片18| 超碰在线观看免费版| 国产日韩欧美电影| 久久精品成人一区二区三区蜜臀| 99久久精品国产色欲| 视频一区二区三区在线| 欧美国产日韩一区| 永久免费看mv网站入口| 欧美日韩一区二区三区视频播放| 亚洲二区中文字幕| 在线成人精品视频| 99热这里有精品| 欧美性极品少妇| 久久人妻精品白浆国产| 日本在线啊啊| 五月婷婷激情综合网| 超碰人人爱人人| 黄色的网站在线观看| 国产欧美日韩另类一区| 欧美日韩综合另类| 欧美日韩在线精品一区二区三区激情综| 国产成人av电影| 亚洲综合色激情五月| 国产精品毛片久久久久久久av| 日本美女一区二区三区| 日本欧美中文字幕| 国产91精品看黄网站在线观看| av不卡在线看| 91地址最新发布| 日韩精品一区二区在线播放| 亚洲国产免费看| 国内精品久久久久久中文字幕| 久久免费视频99| 国产在线欧美| 欧美高清在线播放| 国产精彩视频在线观看| 亚洲茄子视频| 日本伊人精品一区二区三区介绍| 日本中文在线播放| 久久综合图片| 国产精品精品视频| 国产精品羞羞答答在线| 国产一区二区三区免费观看| 99理论电影网| 天堂中文在线官网| 国产性色一区二区| 午夜久久资源| 91小视频xxxx网站在线| 亚洲午夜精品久久久久久久久| 国产深夜男女无套内射| 最新中文字幕在线播放| 欧美自拍偷拍一区| 午夜免费福利网站| 好吊妞视频这里有精品| 精品在线观看国产| 国产毛片欧美毛片久久久| 亚洲不卡av不卡一区二区| 欧美国产日韩xxxxx| 一级黄色大片视频| 蜜臀久久99精品久久久久久9| 成人xxxx视频| 色婷婷av一区二区三区之e本道| 久久在线免费观看| 品久久久久久久久久96高清| 91国内在线| 精品久久久久久久久国产字幕| 国产成人无码精品久久久性色| av成人在线观看| 日韩精品一区二区三区视频在线观看| 少妇饥渴放荡91麻豆| 成人激情免费视频| 欧美华人在线视频| 二区视频在线观看| 国产一区二区在线影院| 久久精品日韩| 在线免费av导航| 色综合久久99| 免费观看一区二区三区| 成人在线丰满少妇av| 欧美精品国产精品日韩精品| 在线观看免费观看在线| www.亚洲色图| 黄频视频在线观看| 偷拍视频一区二区三区| 欧美成人vps| 精品无码在线观看| 亚洲黄网站黄| 亚洲在线免费看| 久草在现在线| 亚洲国产精品久久人人爱蜜臀| 一区二区三区 日韩| 国产一区调教| 蜜臀久久99精品久久久久久宅男 | 日韩影院精彩在线| 国产精品露出视频| 久做在线视频免费观看| 91成人免费在线| 亚洲av无码一区二区三区网址| 亚洲精品网址| 国产欧美日韩视频| 毛片在线能看| 狠狠躁夜夜躁人人躁婷婷91| 亚洲v在线观看| 亚洲综合专区| 成人亚洲激情网| 在线免费av电影| 色狠狠一区二区三区香蕉| 亚洲啪av永久无码精品放毛片 | 亚洲欧美另类动漫| 亚洲精品推荐| 91黑丝在线观看| 丰满人妻一区二区三区四区53 | 成人av电影在线观看| 黄色一级视频播放| 亚洲最大的免费视频网站| 在线播放精品一区二区三区| 亚洲精品中文字幕乱码三区91| 99riav一区二区三区| 国产又粗又猛又爽又黄的网站| 国产一区二区三区免费观看在线 | 青青草精品视频| 欧日韩一区二区三区| 丝袜美腿诱惑一区二区三区| 精品亚洲一区二区三区在线观看 | 成人高潮免费视频| 国内外成人在线视频| 亚洲综合网中心| 成人久久精品| 超在线视频97| 亚洲va欧美va| 亚洲午夜激情网站| 亚洲自拍偷拍精品| 国产欧美一级| 欧美日韩精品免费看| 周于希免费高清在线观看| 国产视频综合在线| 99久久久久久久久| 国产精品天天摸av网| 在线不卡一区二区三区| 欧美一区精品| 国精产品一区二区| 春暖花开亚洲一区二区三区| 一本色道久久88综合亚洲精品ⅰ| 亚洲图片中文字幕| 国产精品66| 成人黄色一级视频| 日日摸日日碰夜夜爽无码| 欧美变态网站| 国产精品99久久久久久www| porn视频在线观看| 在线综合亚洲欧美在线视频| 国产亚洲第一页| 91美女蜜桃在线| 午夜宅男在线视频| 欧美区亚洲区| 久久一区二区精品| julia一区二区三区中文字幕| 久久中文字幕国产| 手机看片1024日韩| 欧美色手机在线观看| 日本一级二级视频| 91小视频免费看| 性猛交ⅹ×××乱大交| 亚洲激情女人| 亚洲人成影视在线观看| 国产精品99久久免费观看| 日韩美女在线看| 91麻豆免费在线视频| 亚洲国产中文字幕久久网| 中文字幕 日韩有码| 一区二区三区蜜桃网| 日本少妇xxxxx| 丁香啪啪综合成人亚洲小说 | 激情aⅴ欧美一区二区欲海潮| 亚洲一区二区精品| 亚洲av无码国产精品久久不卡| 日韩欧美aⅴ综合网站发布| 国产精品99久久久久久成人| 91亚洲午夜精品久久久久久| 天天av天天操| 天堂一区二区在线| 99久久久精品视频| 日韩理论电影院| 久草热久草热线频97精品| 欧美一区=区三区| 热久久这里只有| 国产网红女主播精品视频| 中文字幕欧美日韩精品| 少妇av一区二区| 日韩亚洲欧美在线观看| 午夜一区二区三区四区| 精品日韩中文字幕| 麻豆一区产品精品蜜桃的特点| 亚洲国产成人自拍| 麻豆av免费观看| zzijzzij亚洲日本少妇熟睡| 亚洲制服中文字幕| 日韩精品一级二级 | free性护士videos欧美| 久久精品亚洲一区| av影片在线看| 亚洲色图色老头| 亚洲区小说区图片区| 精品对白一区国产伦| 国产wwwxxx| 欧美一区二区三区男人的天堂| 亚洲天堂自拍偷拍| 欧美日免费三级在线| 正在播放木下凛凛xv99| 一本一道综合狠狠老| √资源天堂中文在线| 精品人伦一区二区三区蜜桃网站| 久久精品国产亚洲AV无码男同| 亚洲伦理在线精品| 农村黄色一级片| 在线日本成人| 日韩专区中文字幕| 台湾av在线二三区观看| 亚洲第一中文字幕在线观看| 性少妇videosexfreexxx片| 这里只有精品99re| 99热这里只有精| 日韩视频免费直播| 亚洲国产成人一区二区| 6080日韩午夜伦伦午夜伦| 国产一区二区三区黄片| 欧美日韩精品久久久| 中文字幕一区二区人妻| 欧美日本在线视频| 国产又粗又大又黄| 欧美久久久影院| 国产伦精品一区二区三区四区| 欧美群妇大交群的观看方式| 国产一区二区麻豆| 欧美成人女星排名| 亚洲日本国产精品| 亚洲人成毛片在线播放| wwwww在线观看免费视频| 国产一区二区三区在线观看网站| h视频在线播放| 久久手机免费视频| 波多野结衣在线高清| 国产成人av电影在线| 热久久精品免费视频| 老司机精品视频导航| 91香蕉视频在线观看视频| 国产精品一区二区久久不卡| 91人妻一区二区| 久久久久久毛片| 永久免费看片直接| 亚洲国产日韩一区二区| 亚洲不卡在线视频| 欧美人妖巨大在线| 亚洲第一页视频| 国产婷婷色综合av蜜臀av| 日本在线天堂| 欧美国产乱视频| 精品网站在线| 亚洲精品mp4| 欧美另类自拍| 久久精品视频va| 日本三级一区| 91久久精品久久国产性色也91| 国产亚洲精品美女久久| 神马影院午夜我不卡影院| 亚洲欧美一区在线| 黄色a级片免费| 国产成人日日夜夜| 波多野结衣一本| 一区二区三区免费看视频| 精产国品一区二区| 精品国产免费视频| 成a人v在线播放| 午夜精品久久久久久久99热浪潮| 国产精品诱惑| 久久精精品视频| 91精品观看| 欧美一级黄色影院| 国产91精品久久久久久久网曝门 | 国产精品久久久久久久久免费相片 | 中文字幕中文字幕中文字幕亚洲无线| 久久老司机精品视频| 性xx色xx综合久久久xx| 精品久久精品久久| 999久久久免费精品国产| 日本免费黄视频| 国产高清不卡一区二区| 国产又粗又长又硬| 欧美日在线观看| 国产综合在线播放| 久久在线免费观看视频| 国模私拍国内精品国内av| 精品一区二区三区国产| 欧美精品不卡| 热久久久久久久久| 国产精品女上位| 国产又大又粗又爽| 日韩av在线免费观看| 久草在线视频网站| 99re在线观看| 欧美fxxxxxx另类| 天天操狠狠操夜夜操| 国产午夜一区二区三区| 欧美激情黑白配| 亚洲国产精品va在线| 欧美寡妇性猛交xxx免费| 亚洲自拍偷拍视频| 亚洲网色网站| 伊人网在线综合| 国产精品免费久久| 中文在线字幕免费观| 综合国产在线观看| 韩日一区二区| 亚洲欧洲精品在线观看| 免费观看30秒视频久久| 亚洲色图欧美色| 精品视频1区2区3区| 91美女视频在线| 国产精品久久久久久久久免费看| 欧美日韩爱爱| 少妇高清精品毛片在线视频| 久久免费电影网| 国产天堂第一区| 最近的2019中文字幕免费一页| 欧美日韩亚洲国产| 天堂社区 天堂综合网 天堂资源最新版 | 成人噜噜噜噜| 天堂av在线中文| 国产99精品视频| 国产 日韩 欧美 在线| 日韩精品在线观看网站| 欧洲一区二区三区精品| 日韩成人在线资源| 久久国产日韩欧美精品| 成人免费黄色小视频| 欧美成人一级视频| 美女91在线看| 日本午夜精品一区二区| 亚洲福利影院| 另类视频在线观看| 一区二区三区自拍视频| 免费观看美女裸体网站| 久久综合狠狠综合久久激情| 亚洲精品国产欧美在线观看| 久久黄色av网站| 国产精品宾馆| 亚洲五月天综合| 中文字幕欧美一| 人妻中文字幕一区| 日韩av色在线| 亚洲破处大片| 中文人妻一区二区三区| 欧美日韩一级黄| 欧美xxxx做受欧美88bbw| 欧美成熟毛茸茸复古| 丝袜美腿亚洲一区二区图片| 色哟哟一一国产精品| 亚洲精品按摩视频| 日本黄色成人| 日本十八禁视频无遮挡| 国产精品视频在线看| www.综合色| 国产精品国产三级国产专播精品人 | 狠狠狠色丁香婷婷综合激情 | 欧美午夜精品免费| 女人黄色免费在线观看| 日韩亚洲视频| 成人午夜视频在线观看| 中文字幕免费观看视频| 国内精品美女av在线播放| 日韩在线视屏| 精品夜夜澡人妻无码av | 午夜黄色小视频| 亚洲一区二区三区sesese|