精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

如何在 SpringBoot 項目中控制 RocketMQ消費線程數量

開發
如何設置單個 topic 消費線程的最小數量和最大數量,用來區分不同 topic 吞吐量不同。

1 背景

最近在新項目開發中遇到一個有趣的問題,如何在 SpringBoot 項目中控制 RocketMQ 消費線程數量。如何設置單個 topic 消費線程的最小數量和最大數量,用來區分不同 topic 吞吐量不同。

我們先介紹一下 RocketMQ 消息監聽再來說明 RocketMQ 消費線程。

2 RocketMQ 消息監聽

設置消費者組為 my_consumer_group,監聽 TopicTest 隊列,并使用并發消息監聽器MessageListenerConcurrently

1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6 consumer.subscribe("TopicTest", "*");
7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
8 consumer.registerMessageListener(new MessageListenerConcurrently() {
9 @Override
10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
11 ConsumeConcurrentlyContext context) {
12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14 }
15 });
16 consumer.start();
17 System.out.printf("Consumer Started.%n");
18 }
19}

3 RocketMQ 中連接結構圖

圖片

4 消費監聽器

接口:org.apache.rocketmq.client.consumer.listener.MessageListener

圖片

有兩個子接口:

- 順序消費:MessageListenerOrderly
- 并發消費: MessageListenerConcurrently

圖片

4.1 MessageListenerConcurrently

作用:consumer并發消費消息的監聽器

圖片

比如,在 quick start 中,就是使用的并發消費消息監聽器:?

1 consumer.registerMessageListener(new MessageListenerConcurrently() {
2 @Override
3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
4 ConsumeConcurrentlyContext context) {
5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
7 }
8 });

方法返回值,是個枚舉:

1 package org.apache.rocketmq.client.consumer.listener;
2
3/**
4 * 并發消費mq消息結果
5 */
6public enum ConsumeConcurrentlyStatus {
7
8 /**
9 * Success consumption
10 * 成功消費
11 */
12 CONSUME_SUCCESS,
13
14 /**
15 * Failure consumption,later try to consume
16 * 失敗消費,稍后嘗試消費
17 *
18 *
19 * 如果 {@link MessageListener}返回的消費結果為 RECONSUME_LATER,則需要將這些消息發送給Broker延遲消息。
20 * 如果給broker發送消息失敗,將延遲5s后提交線程池進行消費。
21 *
22 * RECONSUME_LATER的消息發送入口: MQClientAPIImpl#consumerSendMessageBack,
23 * 命令編碼: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}
24 */
25 RECONSUME_LATER;
26}

畫外音:

當前,我們在具體開發中,肯定不會直接使用這種方式來寫consumer。

常用的Consumer實現是:基于 推 的consumer:DefaultMQPushConsumer

4.2 MessageListenerOrderly

作用:consumer順序消費消息的監聽器

5 消費線程池

5.1 DefaultMQPushConsumer

作用:基于 推 的consumer消費者

5.2 注冊并發消息監聽器

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener

圖片

當使用這個方法注冊消息監聽器時,實際上會把這個病發消息監聽器設置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner屬性中。

5.3 設置 consumer 消費 service

可選有兩種:?

并發消費的service

順序消費的service

當consumer在啟動的時,會使用MessageListener具體實現類型進行判斷:

圖片

MessageListener 就有并發和順序兩種,所以service也有兩種。

1public synchronized void start() throws MQClientException {
2 switch (this.serviceState) {
3 case CREATE_JUST:
4
5 // 省略一部分代碼...........
6
7 // 根據注冊的監聽器類型[并發消息監聽器/順序執行消息監聽器],來確定使用哪種消費服務.
8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
9 this.consumeOrderly = true;
10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
12 this.consumeOrderly = false;
13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
14 }
15 this.consumeMessageService.start();
16
17 // 省略一部分代碼..........
18 this.serviceState = ServiceState.RUNNING;
19 break;
20 case RUNNING:
21 case START_FAILED:
22 case SHUTDOWN_ALREADY:
23 throw new MQClientException("The PushConsumer service state not OK, maybe started once");
24 default:
25 break;
26 }
27
28 // 省略一部分代碼..........
29 }

如果使用的是并發消費的話,使用 ConsumeMessageConcurrentlyService :

在實例化的時候,會創建一個線程池:

圖片

1// 無界隊列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫無意義了.
2this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
3this.consumeExecutor = new ThreadPoolExecutor(
4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默認20
5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默認64
6 1000 * 60,
7 TimeUnit.MILLISECONDS,
8 this.consumeRequestQueue,
9 new ThreadFactoryImpl("ConsumeMessageThread_"));

consumer消費線程池參數:

  • 默認最小消費線程數 20
  • 默認最大消費線程數 64
  • keepAliveTime = 60*1000      單位:秒
  • 隊列:new LinkedBlockingQueue<>()? 無界隊列
  • 線程名稱:前綴是:ConsumeMessageThread_

注意:因為線程池使用的是無界隊列,那么設置的最大線程數,其實沒有什么意義。

5.4 修改線程池線程數

上面我們已經知道了,設置線程池的最大線程數是沒什么用的。

那我們其實可以設置線程池的最小線程數,來修改consumer消費消息時的線程池大小。

1public static void main(String[] args) throws InterruptedException, MQClientException {
2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
3
4 consumer.setConsumeThreadMin(30);
5 consumer.setConsumeThreadMax(64);
6
7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8 consumer.subscribe("TopicTest", "*");
9 consumer.registerMessageListener(new MessageListenerConcurrently() {
10
11 @Override
12 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
13 ConsumeConcurrentlyContext context) {
14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16 }
17 });
18 consumer.start();
19 System.out.printf("Consumer Started.%n");
20 }

注意:consumeThreadMin? 如果大于64,則也需要設置 consumeThreadMax 參數,因為有個校驗:

圖片

-修改線程池線程數-SpringBoot版

如果consumer是使用spring boot進行集成的,則可以這樣設置消費者線程數:

圖片

責任編輯:張燕妮 來源: 中生代技術
相關推薦

2022-08-02 10:01:42

架構

2022-11-23 15:44:49

2022-12-04 23:54:39

2017-07-04 19:02:17

ReacRedux 項目

2020-10-27 14:15:42

SpringBoot

2021-08-23 10:40:30

人工智能KubernetesAI

2009-04-07 09:12:35

敏捷新手入門大型開發

2021-09-14 07:06:13

React項目TypeScript

2021-09-15 07:56:32

TypeScriptVue項目

2020-03-17 08:04:11

物聯網隱私安全

2022-07-04 10:39:24

TienChin項目自定義

2021-03-30 10:46:42

SpringBoot計數器漏桶算法

2021-03-23 08:39:27

SpringBootRedis管道技術

2022-07-07 09:00:49

RocketMQ消費者消息消費

2025-01-03 16:32:13

SpringBoot虛擬線程Java

2023-09-26 08:01:46

消費者TopicRocketMQ

2023-03-28 07:08:09

RocketMQ消費者堆棧

2022-06-09 13:52:35

Vue協作開發項目

2025-05-12 02:00:00

2023-08-23 13:24:00

異步編程方法
點贊
收藏

51CTO技術棧公眾號

久久五月精品中文字幕| 亚洲高清视频免费观看| 精品淫伦v久久水蜜桃| 精品人伦一区二区三区蜜桃网站 | 亚洲网一区二区三区| 五月婷婷激情综合网| 国产精品chinese在线观看| 久久久www成人免费无遮挡大片| 国产精品视频久久| 国产精品成人aaaa在线| 日韩av片子| 亚洲第一色中文字幕| 一区二区三区视频在线观看免费| 26uuu亚洲电影在线观看| 99久久免费国产| 亚洲永久在线观看| 久久久久久无码精品大片| 欧美日韩四区| 日韩中文字幕国产| 国产 欧美 在线| 99这里只有精品视频| 欧美日韩高清在线| 亚洲欧洲日产国码无码久久99| 麻豆tv免费在线观看| 久久久不卡网国产精品一区| 国产精品久久久久久久天堂第1集 国产精品久久久久久久免费大片 国产精品久久久久久久久婷婷 | 日韩高清不卡一区二区| 久久久久久亚洲精品中文字幕| 亚洲成人黄色av| 红杏aⅴ成人免费视频| 5858s免费视频成人| 8x8x最新地址| 午夜日韩成人影院| 欧美视频不卡中文| 少妇av一区二区三区无码| 手机在线免费观看av| 亚洲欧美二区三区| 中日韩在线视频| 成人精品一区二区| 国产日产精品1区| 久久国产日韩欧美| 亚洲人在线观看视频| 成人福利电影精品一区二区在线观看| 亚洲影视九九影院在线观看| 国产又大又黄的视频| 久久99精品久久久久久| 国产日本欧美在线观看| 欧美日韩 一区二区三区| 亚洲永久字幕| 日韩av片永久免费网站| 国产寡妇亲子伦一区二区三区四区| 一区二区日本视频| **欧美日韩vr在线| 午夜婷婷在线观看| 久久久久国产精品一区三寸| 欧洲中文字幕国产精品| 日韩在线 中文字幕| 日韩精品福利网| 国产精品视频自拍| 国产理论视频在线观看| 国产不卡在线一区| 精品国产乱码久久久久久郑州公司| 欧美一级性视频| 99精品国产一区二区三区不卡| 久久久久久精| 国产毛片在线| 国产精品久久久久久久久快鸭 | 夜夜春很很躁夜夜躁| 久久激情电影| 久久国产精品久久久久久| 青青草原免费观看| 国产亚洲精品久久久久婷婷瑜伽| 日本国产一区二区三区| 在线免费观看中文字幕| 国产成人免费高清| 久久久福利视频| 国产精品免费观看| 怡红院av一区二区三区| 欧美黄色免费影院| 日韩美香港a一级毛片| 精品精品欲导航| 亚洲av无码一区二区三区人| 97视频热人人精品免费| 国内精品久久影院| 在线观看国产区| 国产精品一区专区| 欧美一区观看| 免费在线播放电影| 欧美综合一区二区| 中文在线字幕观看| 青青草国产成人a∨下载安卓| 欧美精品做受xxx性少妇| 色婷婷av国产精品| 国产一区视频导航| 久久久久久久久四区三区| 麻豆电影在线播放| 色视频一区二区| 能看毛片的网站| 国产欧美日韩在线一区二区 | 色就是色欧美| 黄页在线观看免费| 欧美伊人久久久久久午夜久久久久| 日批视频在线看| 狠狠操综合网| 91国语精品自产拍在线观看性色| 91久久久久久久久久久久| av欧美精品.com| 亚洲av综合色区| 欧美亚洲大片| 亚洲精品久久久久久下一站| 久草福利资源在线| 久久九九免费| 国产精品乱子乱xxxx| 欧美a在线看| 色噜噜夜夜夜综合网| 成人做爰www看视频软件| 99久久婷婷| 国产成人精品av| 天天综合网在线观看| 亚洲黄一区二区三区| 91极品尤物在线播放国产| 日韩精选在线| 欧美激情精品久久久久久| 亚洲中文字幕在线观看| 国产欧美一区二区在线观看| 动漫av网站免费观看| 都市激情亚洲欧美| 欧美国产视频一区二区| 国产精品一二三四五区| 中文字幕在线观看一区二区| 欧美成人黄色网址| 精品久久久亚洲| 热99在线视频| 国产小视频在线| 欧美色视频日本版| av网站免费在线播放| 日韩视频二区| 精品高清视频| 鲁鲁在线中文| 亚洲男人天堂2023| 无码人妻丰满熟妇奶水区码| 久久久精品欧美丰满| 国产淫片av片久久久久久| 欧美黑白配在线| 欧美怡红院视频一区二区三区| 天堂网av在线播放| 福利视频一区二区| 99久久人妻无码精品系列| 亚洲精品日本| 久久99精品久久久久久秒播放器| 国产精品蜜芽在线观看| 日韩精品日韩在线观看| 91精品国产高清一区二区三密臀| 91尤物视频在线观看| 黄www在线观看| 国产一区二区三区日韩精品 | 国产精品久久99| 亚洲自拍第三页| 欧美日韩国产欧| 国产亚洲欧美一区二区| 在线观看欧美日韩电影| 亚洲人在线观看| 一道本在线视频| 亚洲人成网站色在线观看| 蜜桃色一区二区三区| 在线精品一区二区| 开心色怡人综合网站| 神马久久资源| 久久好看免费视频| 老熟妇高潮一区二区高清视频| 五月婷婷综合网| 免费成人深夜天涯网站| 国产精品一区在线| 日韩少妇内射免费播放18禁裸乳| 狠狠综合久久av一区二区蜜桃| 国产精品直播网红| 国精产品一区一区三区mba下载| 精品一区二区三区四区在线| 亚洲视频在线免费播放| 一区二区三区四区在线播放 | 天天射—综合中文网| av一本久道久久波多野结衣| 中文字幕在线高清| 日韩有码在线播放| 日韩一级片免费看| 欧美另类高清zo欧美| 日本三级理论片| 国产精品久线观看视频| 国产十八熟妇av成人一区| 人人精品人人爱| 国产一区二区三区小说| 国产欧美日韩精品一区二区免费| 99热国产免费| 日韩欧美一区二区三区免费观看 | 精品一区二区电影| 99产精品成人啪免费网站| 一本到一区二区三区| 亚洲av无码一区二区三区在线| 久久这里只有精品视频网| 亚洲制服在线观看| 日本色综合中文字幕| 日韩伦理在线免费观看| 亚洲欧美色图| 日本精品一区二区三区高清 久久 日本精品一区二区三区不卡无字幕 | 狠狠干 狠狠操| 亚洲一区二区| 日韩精品久久一区| 欧美五码在线| caoporn国产精品免费公开| 99久久亚洲国产日韩美女| 午夜精品理论片| 中日韩高清电影网| 在线观看欧美成人| 国产在线视频网址| 亚洲精品国产综合区久久久久久久| 国产亲伦免费视频播放| 91成人在线免费观看| 国产特黄大片aaaa毛片| 夜夜操天天操亚洲| 欧美激情图片小说| 亚洲色图在线看| 成人一级片免费看| 久久精品视频免费| 亚洲黄色在线网站| 成人黄色a**站在线观看| 在线观看中文av| 狠狠色2019综合网| 最新天堂中文在线| 日韩1区2区日韩1区2区| 国产一区视频免费观看| 免费一区视频| 欧美二区在线视频| 99riav1国产精品视频| av高清在线免费观看| 欧美午夜久久| 欧美一区二区激情| 红桃视频国产精品| 91黄色在线看| 91久久亚洲| 久久视频这里有精品| 一本一本久久| 动漫av网站免费观看| 香蕉久久夜色精品| 男女视频一区二区三区| 久久综合网络一区二区| 不卡av免费在线| 青草av.久久免费一区| 无限资源日本好片| 久久精品国产亚洲aⅴ| 思思久久精品视频| 国产又粗又猛又爽又黄91精品| 中文字幕欧美视频| 国产白丝精品91爽爽久久| 污污免费在线观看| caoporen国产精品视频| 亚洲综合色一区| 欧美激情综合五月色丁香| 99热在线观看精品| 一区二区三区精品久久久| 日韩少妇裸体做爰视频| 狠狠做深爱婷婷久久综合一区| 国产精品va无码一区二区三区| 色菇凉天天综合网| 91精东传媒理伦片在线观看| 欧美一级黄色片| 无码国产色欲xxxx视频| 亚洲天堂视频在线观看| 免费黄色网页在线观看| 久久免费精品视频| 吞精囗交69激情欧美| 成人国产在线视频| 国产劲爆久久| 天堂社区 天堂综合网 天堂资源最新版| 欧美成人自拍| 久草视频国产在线| 老牛影视一区二区三区| 色婷婷激情视频| 91在线你懂得| 欧美色图17p| 亚洲高清免费观看| 无码人妻熟妇av又粗又大| 欧美精品一卡两卡| 日本毛片在线观看| 久久精品99久久久香蕉| 黄频免费在线观看| 成人av在线网址| 欧美aaaaaaaa牛牛影院| 中国成人亚色综合网站| 在线视频免费在线观看一区二区| 91香蕉视频污版| 粉嫩绯色av一区二区在线观看| 国产成人福利在线| 夜夜嗨av一区二区三区四季av| 97人妻一区二区精品视频| 欧美一区二区视频在线观看2020 | 综合网在线视频| 97久久久久久久| 日韩一级二级三级精品视频| 国产三级视频在线| 97精品视频在线| 精品午夜视频| 亚洲国产一区二区三区在线| 亚洲国产专区校园欧美| 中文字幕第38页| 91麻豆精品在线观看| 中文字幕亚洲欧美日韩| 欧洲一区在线电影| 日本护士...精品国| 国内偷自视频区视频综合 | 国产伦理久久久| 久久久9色精品国产一区二区三区| 日韩少妇内射免费播放18禁裸乳| 国产成人午夜精品5599 | 亚洲6080在线| 精品国产九九九| 日韩网站免费观看高清| 九九热线视频只有这里最精品| 成人一区二区在线| 亚洲激情久久| 黄色一级片免费的| 日本一区二区三区四区 | 交100部在线观看| dy888夜精品国产专区| 亚洲国产日韩欧美在线| 亚洲天堂av一区二区| 欧美极品少妇xxxxⅹ高跟鞋 | 视频这里只有精品| 在线成人高清不卡| av资源网站在线观看| 国产精品劲爆视频| 欧美日韩精品一区二区视频| 可以在线看的黄色网址| 91天堂素人约啪| 日本三级小视频| 日韩av网站电影| 亚洲啊v在线| 久久综合给合久久狠狠色| 亚洲久久一区| 中文文字幕文字幕高清| 精品久久久久久久久久国产| 欧美一级性视频| 97久久精品国产| 亚洲瘦老头同性70tv| 99re在线视频免费观看| 久久久国产午夜精品| 日韩黄色一级视频| 中文字幕在线日韩| 色综合视频一区二区三区44| 中文网丁香综合网| 狠狠色丁香婷综合久久| 久久国产精品波多野结衣av| 精品国产乱码久久久久久图片 | 激情小说综合网| 亚洲综合日本| 日韩毛片无码永久免费看| 欧美在线观看一二区| 免费网站黄在线观看| 999国产视频| 国产精品美女久久久浪潮软件| 在线观看国产网站| 日本韩国精品在线| 蜜桃视频网站在线| www 成人av com| 性色一区二区三区| 成年人视频软件| 日韩精品专区在线影院观看| 7777kkk亚洲综合欧美网站| 久热这里只精品99re8久| 久久精品国产亚洲aⅴ| 国产性猛交普通话对白| 亚洲男人天堂网站| 99精品国产九九国产精品| 国产精品国产对白熟妇| 久久久久久久久久久电影| 亚洲中文字幕在线一区| 高清一区二区三区日本久| 精品国产网站| 日本泡妞xxxx免费视频软件| 欧美性jizz18性欧美| 精品视频在线一区二区| 精品蜜桃一区二区三区| 麻豆国产91在线播放| 日韩在线观看第一页| 最新日韩中文字幕| 精品女人视频| 中文字幕中文在线| 精品久久久久久国产| 激情影院在线观看| 精品国产_亚洲人成在线| 精品一二三四在线| 日本一区二区三区精品| 久久综合色影院| 黑丝美女一区二区| fc2成人免费视频| 欧美精品在线一区二区| 日韩精品三区| 麻豆tv在线播放| 中文字幕一区免费在线观看 | 欧美韩国日本一区| 手机看片一区二区|