精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot整合RocketMQ事務/廣播/順序消息

開發 前端
本篇帶給大家SpringBoot整合RocketMQ事務/廣播/順序消息相關知識,希望能夠幫助到你!

[[393265]]

環境:springboot2.3.9RELEASE + RocketMQ4.8.0

依賴

  1. <dependency> 
  2.   <groupId>org.springframework.boot</groupId> 
  3.     <artifactId>spring-boot-starter-web</artifactId> 
  4. </dependency> 
  5. <dependency> 
  6.     <groupId>org.apache.rocketmq</groupId> 
  7.     <artifactId>rocketmq-spring-boot-starter</artifactId> 
  8.     <version>2.2.0</version> 
  9. </dependency> 

配置文件

  1. server: 
  2.   port: 8080 
  3. --- 
  4. rocketmq: 
  5.   nameServer: localhost:9876 
  6.   producer: 
  7.     group: demo-mq 

普通消息

發送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String message) { 
  5.   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); 

接受

  1. @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2"
  2. @Component 
  3. public class ConsumerListener implements RocketMQListener<String> { 
  4.  
  5.     @Override 
  6.     public void onMessage(String message) { 
  7.         System.out.println("接收到消息:" + message) ; 
  8.     } 
  9.  

順序消息

發送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.  
  4. public void sendOrder(String topic, String message, String tags, int id) { 
  5.     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),  
  6.             "order-" + id, new SendCallback() { 
  7.                 @Override 
  8.                 public void onSuccess(SendResult sendResult) { 
  9.                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; 
  10.                 } 
  11.                 @Override 
  12.                 public void onException(Throwable e) { 
  13.                     e.printStackTrace() ; 
  14.                 } 
  15.             }); 

這里是根據hashkey將消息發送到不同的隊列中

  1. @RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",  
  2.     selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) 
  3. @Component 
  4. public class ConsumerOrderListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; 
  9.     } 
  10.  

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。

結果

 

當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:

集群/廣播消息模式

發送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String topic, String message, String tags) { 
  5.     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; 

集群消息模式

消費端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.CLUSTERING

測試

啟動兩個服務分別端口是8080,8081

8080服務

8081服務

集群消息模式下,每個服務分別接收一部分消息,實現了負載均衡

廣播消息模式

消費端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.BROADCASTING

測試

啟動兩個服務分別端口是8080,8081

8080服務

8081服務

集群消息模式下,每個服務分別都接受了同樣的消息。

事務消息

RocketMQ事務的3個狀態

TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息

TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。

TransactionStatus.Unknown :中間狀態,它代表需要檢查消息隊列來確定狀態。

RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程 整體流程為:

正常事務發送與提交階段

1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)

2、服務端響應消息寫入結果,半消息發送成功

3、開始執行本地事務

4、根據本地事務的執行狀態執行Commit或者Rollback操作

事務信息的補償流程

1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求

2、生產者收到確認回查請求后,檢查本地事務的執行狀態

3、根據檢查后的結果執行Commit或者Rollback操作

補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。

發送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void sendTx(String topic, Long id, String tags) { 
  5.     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( 
  6.             new Users(id, UUID.randomUUID().toString().replaceAll("-"""))). 
  7.             setHeader("BID", UUID.randomUUID().toString().replaceAll("-""")).build(),  
  8.             UUID.randomUUID().toString().replaceAll("-""")) ; 

生產者對應的監聽器

  1. @RocketMQTransactionListener 
  2. public class ProducerTxListener implements RocketMQLocalTransactionListener { 
  3.      
  4.     @Resource 
  5.     private BusinessService bs ; 
  6.  
  7.     @Override 
  8.     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  9.         // 這里執行本地的事務操作,比如保存數據。 
  10.         try { 
  11.             // 創建一個日志記錄表,將這唯一的ID存入數據庫中,在下面的check方法中可以根據這個id查詢是否有數據 
  12.             String id = (String) msg.getHeaders().get("BID") ; 
  13.             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; 
  14.             System.out.println("消息內容:" + users + "\t參與數據:" + arg + "\t本次事務的唯一編號:" + id) ; 
  15.             bs.save(users, new UsersLog(users.getId(), id)) ; 
  16.         } catch (Exception e) { 
  17.             e.printStackTrace() ; 
  18.             return RocketMQLocalTransactionState.ROLLBACK ; 
  19.         } 
  20.         return RocketMQLocalTransactionState.COMMIT ; 
  21.     } 
  22.  
  23.     @Override 
  24.     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { 
  25.         // 這里檢查本地事務是否執行成功 
  26.         String id = (String) msg.getHeaders().get("BID") ; 
  27.         System.out.println("執行查詢ID為:" + id + " 的數據是否存在") ; 
  28.         UsersLog usersLog = bs.queryUsersLog(id) ; 
  29.         if (usersLog == null) { 
  30.             return RocketMQLocalTransactionState.ROLLBACK ; 
  31.         } 
  32.         return RocketMQLocalTransactionState.COMMIT ; 
  33.     } 
  34.  

消費端

  1. @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10"
  2. @Component 
  3. public class ConsumerTxListener implements RocketMQListener<Users> { 
  4.  
  5.     @Override 
  6.     public void onMessage(Users users) { 
  7.         System.out.println("TX接收到消息:" + users) ; 
  8.     } 
  9.  

Service

  1. @Transactional 
  2. public boolean save(Users users, UsersLog usersLog) { 
  3.     usersRepository.save(users) ; 
  4.     usersLogRepository.save(usersLog) ; 
  5.     if (users.getId() == 1) { 
  6.         throw new RuntimeException("數據錯誤") ; 
  7.     } 
  8.     return true ; 
  9.      
  10. public UsersLog queryUsersLog(String bid) { 
  11.     return usersLogRepository.findByBid(bid) ; 

Controller

  1. @GetMapping("/tx/{id}"
  2. public Object sendTx(@PathVariable("id")Long id) { 
  3.     ps.sendTx("tx-topic", id, "tag10") ; 
  4.     return "send transaction success" ; 

測試

調用接口后,控制臺輸出:

從打印日志看出來都保存完畢了后 消費端才接受到消息。

刪除數據,再測試ID為1會報錯的。

數據庫中沒有數據。。。

是不是也不是很復雜,2個階段來處理。

完畢!!!

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2023-09-04 08:00:53

提交事務消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-04-07 08:43:09

SpringBootRocketMQ開發技術

2021-10-03 21:41:13

RocketMQKafkaPulsar

2023-12-15 13:08:00

RocketMQ中間件消費順序

2024-02-04 09:02:29

RocketMQ項目處理器

2024-04-25 14:27:32

順序消息事務消息

2021-07-13 11:52:47

順序消息RocketMQkafka

2024-11-11 00:00:10

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-04 11:06:02

RocketMQ事務消息實現

2024-06-13 09:25:14

2023-04-12 08:56:37

RocketMQSpring核心業務

2022-01-10 11:58:51

SpringBootPulsar分布式

2025-06-18 07:09:05

2024-10-22 08:01:15

2023-09-26 08:01:46

消費者TopicRocketMQ
點贊
收藏

51CTO技術棧公眾號

亚洲成a人在线观看| 麻豆久久久久久| 亚洲美女久久久| 国产精品igao| 视频一区二区三区不卡 | 九热视频在线观看| 好了av在线| 99久久精品免费看国产| 国产精品女视频| 久操视频免费在线观看| 精品国产乱码久久久| 欧美一级生活片| 久久久久久久激情| 黄色成人影院| 国产色产综合产在线视频| 91色在线观看| 91视频久久久| 在线观看的日韩av| 中文字幕在线视频日韩| 亚洲av永久无码精品| 欧美a一级片| 日韩欧美有码在线| 国产精品第157页| 992tv免费直播在线观看| 高清成人在线观看| 成人av色在线观看| 久久久久女人精品毛片九一| 欧美日韩成人| 日韩一区视频在线| 女人又爽又黄免费女仆| 97视频一区| 欧美一区二区视频网站| 午夜精品在线免费观看| 日韩伦理在线| 亚洲线精品一区二区三区八戒| 亚洲蜜桃av| 国产一二三区在线视频| 91啪亚洲精品| 狠狠色狠狠色综合人人| www.国产视频| 国产精品18久久久久久久久久久久| 国产精品视频区1| av一级在线观看| 99亚洲一区二区| 欧美精品久久一区二区| 手机在线免费看毛片| 91亚洲成人| 中文字幕国产日韩| 欧美性猛交xxxx乱| 九九久久婷婷| 国产午夜精品全部视频在线播放 | 美女一区二区三区视频| 日韩在线免费| 91福利精品视频| 久久久久久久久久福利| 中文在线免费视频| 欧美日韩中文在线观看| 69堂免费视频| 极品美女一区| 欧洲视频一区二区| 天天爽夜夜爽一区二区三区| 久久电影天堂| 3atv在线一区二区三区| av噜噜在线观看| 在线观看亚洲精品福利片| 欧美日本韩国一区二区三区视频| 在线看的黄色网址| 四虎精品永久免费| 欧美一区二区三区的| 女人扒开双腿让男人捅| 91精品啪在线观看国产爱臀 | 日本福利一区二区| 亚洲一区二区蜜桃| 亚洲一区av| 91精品国产高清一区二区三区 | 国产精品久久久久影院| 91香蕉视频网址| 欧美hdxxxx| 天天色天天爱天天射综合| 成年人视频在线免费| 免费在线成人激情电影| 91精品国产综合久久婷婷香蕉| 三上悠亚 电影| 免费一区二区| 日韩一区二区精品视频| 一级aaa毛片| 免费精品视频| 国产日韩欧美另类| 亚洲av永久纯肉无码精品动漫| gogogo免费视频观看亚洲一| 日韩电影天堂视频一区二区| 国产黄色小视频在线| 亚洲6080在线| 一道本在线免费视频| 亚洲一二av| 在线看日韩av| 亚洲精品午夜久久久久久久| 日韩电影免费一区| 都市激情久久久久久久久久久| 欧美日韩视频精品二区| 最新久久zyz资源站| 欧美极品欧美精品欧美| 日韩有码欧美| 亚洲国产一区自拍| 国产又色又爽又高潮免费| 亚洲网站在线| 国产精品久久二区| 成人乱码一区二区三区| 国产精品三级在线观看| 毛片在线视频播放| 国产精品欧美一区二区三区不卡 | 欧美色精品天天在线观看视频| 日本成人在线免费| 欧美亚洲国产一区| 羞羞色国产精品| 国产精品久久久久久免费播放| 99久久亚洲一区二区三区青草| 亚洲综合第一| 肉色欧美久久久久久久免费看| 日韩精品一区二区三区三区免费| 91导航在线观看| 亚洲女人av| 福利视频一区二区三区| 欧美一区二区三区| 91黄视频在线观看| 在线视频 日韩| 欧美99久久| 91精品国产综合久久香蕉| 日韩美女一级视频| 午夜激情综合网| 日韩av福利在线观看| 色综合天天综合网中文字幕| 日本亚洲欧洲色| 五月婷婷免费视频| 亚洲午夜在线电影| 九九九久久久久久久| 久久精品国产68国产精品亚洲| 日本精品一区二区三区在线| 婷婷开心激情网| 五月开心婷婷久久| 911亚洲精选| 欧美特黄a级高清免费大片a级| 91欧美精品午夜性色福利在线| 日本中文字幕视频在线| 欧美性高清videossexo| 人妻av无码一区二区三区 | jizz大全欧美jizzcom| 国产精品一区2区3区| 5566成人精品视频免费| 亚洲aⅴ在线观看| 精品日本高清在线播放| 免费在线观看成年人视频| 国产精品久久久久毛片大屁完整版| 国产精品sss| a级片在线免费| 亚洲精品国产品国语在线| 日韩少妇高潮抽搐| 91原创在线视频| 欧美视频第一区| 精品视频免费| 成人网页在线免费观看| 哥也色在线视频| 欧美成人一区二区三区在线观看| 欧美日韩精品在线观看视频 | 在线免费观看毛片| 91色乱码一区二区三区| www.欧美日本| 999国产精品| 91精品天堂| av日韩国产| 亚洲精品综合久久中文字幕| 黄色污污视频软件| 中文字幕一区二区三区视频| 交换做爰国语对白| 国产精品久久久久久久久久妞妞| 欧美午夜精品理论片a级大开眼界 欧美午夜精品久久久久免费视 | 久久理论片午夜琪琪电影网| 三级视频网站在线| 欧美色国产精品| 欧美精品99久久久| 91免费在线看| www.五月天色| 一本久久综合| 亚洲欧洲一区二区| 综合激情五月婷婷| 国产成人精品一区二区三区| 成人在线观看亚洲| 精品一区精品二区| 一本一道人人妻人人妻αv| 一区二区不卡在线视频 午夜欧美不卡在| 中文字幕在线永久| 久久精品国产精品青草| 成人午夜视频在线观看免费| 国产日产精品_国产精品毛片| 成人网页在线免费观看| 成人小电影网站| 欧美xxxx18国产| 国产在线高清| 精品处破学生在线二十三| 中文字幕+乱码+中文字幕明步| 亚洲精品乱码久久久久久黑人| 波多野结衣 在线| 国产精品一区免费视频| 日韩欧美精品在线观看视频| 亚洲中无吗在线| 欧美日韩亚洲一区二区三区四区| 亚洲精品aⅴ| 国产日韩欧美在线| 日韩精品极品| 欧美国产日韩视频| 日韩黄色影院| 亚洲天堂av女优| 天天综合天天综合| 精品区一区二区| 国产精品视频无码| 色先锋久久av资源部| 日韩精品无码一区二区| 亚洲理论在线观看| 蜜桃av免费观看| 国产亚洲成年网址在线观看| 一本色道久久hezyo无码| 韩国女主播成人在线观看| 欧美xxxxx在线视频| 伊人天天综合| 毛片av在线播放| 女主播福利一区| 中文字幕中文字幕99| 日韩精品一卡| 日韩欧美精品一区二区| 久久av免费| 美日韩免费视频| 青青草原在线亚洲| 精品卡一卡二| 国产成人一二| 国产九色精品| 国产精品任我爽爆在线播放| 99国产超薄丝袜足j在线观看| 国产不卡精品| 91午夜在线播放| 国产成人视屏| 91麻豆桃色免费看| 在线欧美激情| 亚洲精品欧美一区二区三区| 97久久中文字幕| 成人在线中文字幕| 激情久久免费视频| 亚洲综合在线做性| 日韩一区二区三区在线看| 91久久极品少妇xxxxⅹ软件| 精品视频在线观看免费观看| 91九色视频在线| 日韩欧美另类中文字幕| 成人在线观看网址| 精品国产乱子伦一区二区| 国产一区不卡在线观看| 日本福利一区| 欧美日韩精品免费看| 九九精品久久| 亚洲欧美一区二区原创| 亚洲欧美综合| av动漫在线看| 日本欧美久久久久免费播放网| 中文字幕在线不卡| 99热这里只有精品4| 国产精品盗摄一区二区三区| av激情在线观看| 亚洲国产乱码最新视频| 久久久久久91亚洲精品中文字幕| 色美美综合视频| 国产又黄又爽视频| 欧美成人vr18sexvr| 性xxxx视频播放免费| 伊人青青综合网站| 伊人在我在线看导航| 欧美极品美女视频网站在线观看免费 | 在这里有精品| 精品一卡二卡三卡四卡日本乱码| 久草在线成人| 手机成人av在线| 亚洲国产一区二区精品专区| 不卡av免费在线| 国产麻豆精品在线观看| 自拍视频一区二区| 国产精品视频一二三| 国产盗摄一区二区三区在线| 精品久久久久久中文字幕一区奶水| wwwwww在线观看| 日韩精品一区二区三区三区免费 | 一区二区三区美女视频| 蜜臀精品一区二区三区| 欧美精品久久久久久久久老牛影院| www.黄色片| 伊人久久久久久久久久久| 福利在线导航136| 国产精品丝袜白浆摸在线 | 韩国午夜理伦三级不卡影院| 黄色国产在线观看| 亚洲欧美另类久久久精品| 国产精品久久久久久久久久精爆| 欧美一卡二卡三卡| 激情小说 在线视频| 久久久欧美一区二区| 成人a在线观看高清电影| 国语精品中文字幕| 亚洲色图插插| 牛夜精品久久久久久久| 9久草视频在线视频精品| 蜜桃av.com| 在线观看免费亚洲| 性xxxx搡xxxxx搡欧美| 久久999免费视频| 欧美v亚洲v综合v国产v仙踪林| 欧美国产视频在线观看| 国自产拍偷拍福利精品免费一| 男人的天堂最新网址| 久久精品亚洲乱码伦伦中文| 日本亚洲欧美在线| 日韩一级黄色大片| 在线激情小视频| 国产成人精品一区二区| 亚洲日产av中文字幕| 免费毛片网站在线观看| 国产精品2024| 青草影院在线观看| 欧美男男青年gay1069videost | 欧美福利在线播放网址导航| 国产成人一二三区| 国产资源在线一区| 黄色香蕉视频在线观看| 精品婷婷伊人一区三区三| 国产在线中文字幕| 日本亚洲欧洲色| 曰本一区二区三区视频| 一本大道熟女人妻中文字幕在线 | 深夜av在线| 国精产品99永久一区一区| 黄色在线一区| 激情综合激情五月| 亚洲福利视频导航| 可以免费观看的毛片| 欧美人在线视频| 亚洲一区二区三区免费| 妞干网在线播放| 成人激情校园春色| 日韩三级视频在线播放| 日韩成人网免费视频| 涩涩网在线视频| 欧美日韩三区四区| 日本成人超碰在线观看| 国产一二三av| 在线成人午夜影院| 成人在线观看亚洲| 国产精品亚洲综合| 日韩亚洲精品在线| 日韩精品电影一区二区| 欧美日韩在线播放三区| 麻豆传媒在线免费看| 不卡的av一区| 中文一区二区| 男人的天堂官网| 7777精品伊人久久久大香线蕉完整版 | 成年人黄色片视频| 欧美激情一区二区三区蜜桃视频| 亚洲无码精品国产| 欧美精品中文字幕一区| 国产ts一区| 日本美女高潮视频| 日韩码欧中文字| 蜜臀久久精品久久久久| 日产精品久久久一区二区福利| 成人6969www免费视频| 在线视频日韩欧美| 精品久久中文字幕久久av| 国产黄在线观看| 亚洲影院色无极综合| 亚洲激情网址| 在线看片中文字幕| 日韩欧美三级在线| 韩国久久久久久| 奇米777四色影视在线看| 91社区在线播放| 国产精品永久久久久久久久久| 欧美精品激情blacked18| 国产一区二区三区四区大秀| 182午夜视频| 日韩欧美在线网址| av在线免费观看网址| 欧美精品一区在线| 国产v综合v亚洲欧| 少妇一级淫片日本| 国精产品一区一区三区有限在线| 精品欧美久久| 在线免费看黄色片| 欧美肥妇毛茸茸| 老司机2019福利精品视频导航| 99热一区二区三区| 久久综合久久综合久久| 国产高清视频免费| 国产成人鲁鲁免费视频a| 国产主播一区|