SpringBoot整合RabbitMQ延遲隊(duì)列&優(yōu)先級(jí)隊(duì)列詳解
延遲隊(duì)列
延遲隊(duì)列:簡(jiǎn)單說就是發(fā)送出去的消息經(jīng)過給定的時(shí)間后,消費(fèi)者才能看見消息(消費(fèi)消息)。
這里簡(jiǎn)單說下步驟:
- 創(chuàng)建一個(gè)隊(duì)列,如:bs-queue, 設(shè)置死信交換機(jī)(死信交換機(jī)路由key(這是可選的))及隊(duì)列,如:dead-exchange; 消息的消費(fèi)端監(jiān)聽該dead-queue隊(duì)列。設(shè)置消息有效期參數(shù)x-message-ttl參數(shù)(值為自己需要延遲的時(shí)間,單位:毫秒)。
- 發(fā)送消息發(fā)送到bs-queue上。由于消息消費(fèi)端監(jiān)聽的是死信隊(duì)列,所以只需要等待指定的時(shí)間后消息會(huì)自動(dòng)被轉(zhuǎn)發(fā)到死信隊(duì)列上(dead-queue)。
- 消息的消費(fèi)端監(jiān)聽dead-queu隊(duì)列即可。
優(yōu)先級(jí)隊(duì)列
優(yōu)先級(jí)隊(duì)列是在RabbitMQ3.5.0之后的版本才支持的。
具有高優(yōu)先級(jí)的隊(duì)列具有高的優(yōu)先權(quán),優(yōu)先級(jí)高的消息具備優(yōu)先被消費(fèi)的特權(quán)。
隊(duì)列的優(yōu)先級(jí)通過x-max-priority參數(shù)設(shè)置。
建立一個(gè)priority-exchange交換機(jī),類型:direct。
圖片
建立一個(gè)priority-queue隊(duì)列,并與priority-exchange綁定。
圖片
設(shè)置x-max-priority參數(shù)的值為100,表示最大優(yōu)先級(jí)為100。
注意:x-max-priority參數(shù)的值應(yīng)該介于1到255。建議使用1到10之間的隊(duì)列。如果設(shè)置的優(yōu)先級(jí)更大將使用更多的Erlang進(jìn)程消耗更多的CPU資源。運(yùn)行時(shí)調(diào)度也會(huì)受到影響。
接下來演示優(yōu)先級(jí)隊(duì)列
我們先只發(fā)送消息,然后再把消息的消費(fèi)功能打開。
發(fā)送消息接口:
@GetMapping("/sendPriority")
public Object sendPriority(String msg, Integer priority) {
ms.sendPriorityQueue(msg, priority) ;
return "success" ;
}
public void sendPriorityQueue(String msg, Integer priority) {
logger.info("準(zhǔn)備發(fā)送消息:{}", msg);
Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(priority).build() ;
rabbitTemplate.convertAndSend("priority-exchange", "pe.msg", message) ;
}發(fā)送4條消息:
// 第一條消息
msg=第一條消息&priority=2
// 第二條消息
msg=第二條消息&priority=10
// 第三條消息
msg=第三條消息&priority=1
// 第四條消息
msg=第四條消息&priority=7查看消息隊(duì)列:
圖片
消息消費(fèi)端:
@RabbitListener(queues = { "priority-queue" })
@RabbitHandler
public void listenerPriority(Message message, Channel channel) {
System.out.println("接受到消息.....income");
byte[] body = message.getBody();
MessageProperties mps = message.getMessageProperties();
String content = new String(body, Charset.forName("UTF-8"));
try {
System.out.println("接受到消息來自交換機(jī): 【" + mps.getReceivedExchange() + "】, 隊(duì)列:【" + mps.getConsumerQueue()+ "】:\n內(nèi)容: " + content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
try {
channel.basicReject(mps.getDeliveryTag(), false);
} catch (IOException e1) {
e1.printStackTrace() ;
}
}
}啟動(dòng)服務(wù)
圖片
根據(jù)打印出的結(jié)果,正好是我們?cè)O(shè)置優(yōu)先級(jí)的順序輸出。
上面設(shè)置的消息優(yōu)先級(jí)都是在指定的范圍<100,如果消息的優(yōu)先級(jí)超過這個(gè)值會(huì)怎么樣呢?
發(fā)送8條消息:
// 第一條消息
msg=第一條消息&priority=2
// 第二條消息
msg=第二條消息&priority=10
// 第三條消息
msg=第三條消息&priority=1
// 第四條消息
msg=第四條消息&priority=7
// 第五條消息
msg=第五條消息&priority=101消費(fèi)消息:
圖片
同樣是按照順序輸出的。































