RabbitMQ非常實(shí)用技巧,動(dòng)態(tài)調(diào)整消息并發(fā)處理能力
環(huán)境:SpringBoot2.7.16 + RabbitMQ3.8.35
1. 簡介
RabbitMQ 是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用于通過輕量級(jí)和可靠的消息傳遞,在服務(wù)器之間進(jìn)行通信。在Spring Boot項(xiàng)目中我們一般都是通過@RabbitListener進(jìn)行消息監(jiān)聽。可以通過配置消息監(jiān)聽器并發(fā)數(shù)來提高系統(tǒng)的消息處理能力。
在實(shí)際應(yīng)用中,根據(jù)業(yè)務(wù)場景的不同,我們可能需要?jiǎng)討B(tài)調(diào)整 RabbitMQ 消息監(jiān)聽的并發(fā)數(shù)。例如,當(dāng)RabbitMQ消息積壓過多時(shí),這時(shí)候我們就可以考慮通過動(dòng)態(tài)調(diào)整并發(fā)數(shù),以提高消息處理速度;而在系統(tǒng)自身負(fù)載過高時(shí),這時(shí)候可以考慮通過減少并發(fā)數(shù)來減輕系統(tǒng)的整體壓力。本篇文章將通過具體的示例來展示如何調(diào)整運(yùn)行中消息監(jiān)聽處理器的并發(fā)數(shù)。
注意:動(dòng)態(tài)調(diào)整并發(fā)監(jiān)聽數(shù)還可以幫助我們更好地控制系統(tǒng)的穩(wěn)定性和可靠性。通過實(shí)時(shí)監(jiān)測系統(tǒng)的負(fù)載情況和消息處理速度,我們可以及時(shí)發(fā)現(xiàn)潛在的問題并進(jìn)行調(diào)整,從而確保系統(tǒng)的正常運(yùn)行。
2. 實(shí)戰(zhàn)案例
2.1依賴管理
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.2 配置管理
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
# 手動(dòng)應(yīng)答
acknowledgeMode: manual
concurrency: 2
max-concurrency: 22.3 創(chuàng)建交換機(jī)及隊(duì)列
通過管理界面創(chuàng)建交換機(jī)及隊(duì)列。
- 交換機(jī)名:test.exchange類型為topic
- 隊(duì)列名: test
- 將交換機(jī)與隊(duì)列進(jìn)行綁定路由key:akf.#
2.4 消息隊(duì)列準(zhǔn)備消息
通過如下接口,先往隊(duì)列中插入100條消息
@Resource
private RabbitTemplate rabbitTemplate ;
@GetMapping("/send")
public String send() {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("test.exchange", "akf.a", "message - " + i) ;
}
}).start() ;
return "success" ;
}
圖片
2.5 消息監(jiān)聽器
@RabbitListener(queues = "test")
public void listener1(String message) {
System.out.printf("%s - 接收到消息:%s%n", Thread.currentThread().getName(), message) ;
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {}
}2.6 測試
測試上面的消息監(jiān)聽器是正常的
圖片
2.7 調(diào)整并發(fā)數(shù)
在上一步的測試中我們發(fā)現(xiàn)控制臺(tái)打印的始終是一個(gè)線程在執(zhí)行消息處理。但是在一開始的配置文件中我們將concurrency屬性設(shè)置的為2,起碼這里應(yīng)該是2個(gè)線程交替執(zhí)行才對(duì),這是為什么呢?
Spring監(jiān)聽RabbitMQ的消息時(shí)默認(rèn)并不是一條一條的從RabbitMQ中去,是一次預(yù)期一批數(shù)據(jù),這一批消費(fèi)完后才進(jìn)行下一批的獲取,默認(rèn)預(yù)期250條。而我們向隊(duì)列中存入的數(shù)據(jù)才100條,所以控制臺(tái)中你只能看到一個(gè)線程打印,因?yàn)槟銢]有足夠的消息供其它線程去獲取處理。我們可以通過如下配置進(jìn)行預(yù)期數(shù)的設(shè)置:
spring:
rabbitmq:
listener:
simple:
prefetch: 5重新啟動(dòng)服務(wù),測試如下
圖片
2個(gè)線程交替執(zhí)行;接下來該如何實(shí)現(xiàn)動(dòng)態(tài)調(diào)整并發(fā)數(shù)呢?
首先,修改消息監(jiān)聽器配置
@RabbitListener(id = "test-queue", queues = "test", ackMode = "AUTO")
public void listener1(String message) {
// ...
}id: 這里最好是設(shè)置唯一的id值,我們是要通過該id值來獲取當(dāng)前隊(duì)列的消息監(jiān)聽容器。ackMode: AUTO 這里設(shè)置的應(yīng)答模式,用來覆蓋配置文件中的設(shè)置。
其次,通過RabbitListenerEndpointRegistry操作
@Resource
private RabbitListenerEndpointRegistry registry ;
@GetMapping("/modify/{count}")
public Object modify(@PathVariable("count") Integer count) {
// 這里通過id獲取對(duì)應(yīng)的隊(duì)列監(jiān)聽器;所以上面一定要定義唯一的id值
MessageListenerContainer listenerContainer = registry.getListenerContainer("test-queue") ;
if (listenerContainer instanceof SimpleMessageListenerContainer container) {
container.setConcurrentConsumers(count) ;
}
return String.format("并發(fā)接收消息:%d%n", count) ;
}最后,測試。
首先將服務(wù)啟動(dòng),控制輸出如下(當(dāng)前只有2個(gè)線程處理)
圖片
目前只有2個(gè)線程。
調(diào)用上面的接口修改并發(fā)數(shù)為3個(gè)后,控制臺(tái)輸出。
圖片
成功增加了一個(gè)消費(fèi)者線程。
接下來再測試,如果修改的數(shù)量大于最大數(shù)(spring.rabbitmq.listener.simple.max-concurrency)
圖片
控制臺(tái)拋出如下異常。
圖片
不能超過最大數(shù);再看看調(diào)小是否可以。
圖片
可以動(dòng)態(tài)調(diào)小。
我們也可以對(duì)消息監(jiān)聽器進(jìn)行暫停消費(fèi)和重新啟動(dòng)消息監(jiān)聽,這里就不在演示了,非常簡單調(diào)用相應(yīng)start/stop即可。
總結(jié):在 Spring Boot 中動(dòng)態(tài)調(diào)整 RabbitMQ 消息監(jiān)聽的并發(fā)數(shù)是一個(gè)重要的優(yōu)化手段。通過合理設(shè)置并發(fā)數(shù)并根據(jù)系統(tǒng)負(fù)載情況進(jìn)行動(dòng)態(tài)調(diào)整,我們可以提高消息處理效率、節(jié)省系統(tǒng)資源、確保系統(tǒng)的穩(wěn)定性和可靠性。在實(shí)際應(yīng)用中,我們應(yīng)該根據(jù)具體的業(yè)務(wù)場景和需求來選擇合適的并發(fā)數(shù)調(diào)整策略,以達(dá)到最佳的性能和效果。

























