精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

手把手教你 springBoot 整合 rabbitMQ,利用 MQ 實現事務補償

開發 前端
rabbitMQ 在互聯網公司有著大規模應用,本篇將實戰介紹 springboot 整合 rabbitMQ,同時也將在具體的業務場景中介紹利用 MQ 實現事務補償操作。

[[341134]]

本文轉載自微信公眾號「Java極客技術」,作者鴨血粉絲。轉載本文請聯系Java極客技術公眾號。  

rabbitMQ 在互聯網公司有著大規模應用,本篇將實戰介紹 springboot 整合 rabbitMQ,同時也將在具體的業務場景中介紹利用 MQ 實現事務補償操作。

一、介紹

本篇我們一起來實操一下SpringBoot整合rabbitMQ,為后續業務處理做鋪墊。

廢話不多說,直奔主題!

二、整合實戰

2.1、創建一個 maven 工程,引入 amqp 包

  1. <!--amqp 支持--> 
  2. <dependency> 
  3.     <groupId>org.springframework.boot</groupId> 
  4.     <artifactId>spring-boot-starter-amqp</artifactId> 
  5. </dependency> 

2.2、在全局文件中配置 rabbitMQ 服務信息

  1. spring.rabbitmq.addresses=197.168.24.206:5672 
  2. spring.rabbitmq.username=guest 
  3. spring.rabbitmq.password=guest 
  4. spring.rabbitmq.virtual-host=/ 

其中,spring.rabbitmq.addresses參數值為 rabbitmq 服務器地址

2.3、編寫 rabbitmq 配置類

  1. @Slf4j 
  2. @Configuration 
  3. public class RabbitConfig { 
  4.  
  5.     /** 
  6.      * 初始化連接工廠 
  7.      * @param addresses 
  8.      * @param userName 
  9.      * @param password 
  10.      * @param vhost 
  11.      * @return 
  12.      */ 
  13.     @Bean 
  14.     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses, 
  15.                                         @Value("${spring.rabbitmq.username}") String userName, 
  16.                                         @Value("${spring.rabbitmq.password}") String password
  17.                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) { 
  18.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
  19.         connectionFactory.setAddresses(addresses); 
  20.         connectionFactory.setUsername(userName); 
  21.         connectionFactory.setPassword(password); 
  22.         connectionFactory.setVirtualHost(vhost); 
  23.         return connectionFactory; 
  24.     } 
  25.  
  26.     /** 
  27.      * 重新實例化 RabbitAdmin 操作類 
  28.      * @param connectionFactory 
  29.      * @return 
  30.      */ 
  31.     @Bean 
  32.     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ 
  33.         return new RabbitAdmin(connectionFactory); 
  34.     } 
  35.  
  36.     /** 
  37.      * 重新實例化 RabbitTemplate 操作類 
  38.      * @param connectionFactory 
  39.      * @return 
  40.      */ 
  41.     @Bean 
  42.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ 
  43.         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); 
  44.         //數據轉換為json存入消息隊列 
  45.         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
  46.         return rabbitTemplate; 
  47.     } 
  48.  
  49.     /** 
  50.      * 將 RabbitUtil 操作工具類加入IOC容器 
  51.      * @return 
  52.      */ 
  53.     @Bean 
  54.     public RabbitUtil rabbitUtil(){ 
  55.         return new RabbitUtil(); 
  56.     } 
  57.  

2.4、編寫 RabbitUtil 工具類

  1. public class RabbitUtil { 
  2.  
  3.     private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitAdmin rabbitAdmin; 
  7.  
  8.     @Autowired 
  9.     private RabbitTemplate rabbitTemplate; 
  10.  
  11.     /** 
  12.      * 創建Exchange 
  13.      * @param exchangeName 
  14.      */ 
  15.     public void addExchange(String exchangeType, String exchangeName){ 
  16.         Exchange exchange = createExchange(exchangeType, exchangeName); 
  17.         rabbitAdmin.declareExchange(exchange); 
  18.     } 
  19.  
  20.     /** 
  21.      * 刪除一個Exchange 
  22.      * @param exchangeName 
  23.      */ 
  24.     public boolean deleteExchange(String exchangeName){ 
  25.         return rabbitAdmin.deleteExchange(exchangeName); 
  26.     } 
  27.  
  28.     /** 
  29.      * 創建一個指定的Queue 
  30.      * @param queueName 
  31.      * @return queueName 
  32.      */ 
  33.     public void addQueue(String queueName){ 
  34.         Queue queue = createQueue(queueName); 
  35.         rabbitAdmin.declareQueue(queue); 
  36.     } 
  37.  
  38.     /** 
  39.      * 刪除一個queue 
  40.      * @return queueName 
  41.      * @param queueName 
  42.      */ 
  43.     public boolean deleteQueue(String queueName){ 
  44.         return rabbitAdmin.deleteQueue(queueName); 
  45.     } 
  46.  
  47.     /** 
  48.      * 按照篩選條件,刪除隊列 
  49.      * @param queueName 
  50.      * @param unused 是否被使用 
  51.      * @param empty 內容是否為空 
  52.      */ 
  53.     public void deleteQueue(String queueName, boolean unused, boolean empty){ 
  54.         rabbitAdmin.deleteQueue(queueName,unused,empty); 
  55.     } 
  56.  
  57.     /** 
  58.      * 清空某個隊列中的消息,注意,清空的消息并沒有被消費 
  59.      * @return queueName 
  60.      * @param queueName 
  61.      */ 
  62.     public void purgeQueue(String queueName){ 
  63.         rabbitAdmin.purgeQueue(queueName, false); 
  64.     } 
  65.  
  66.     /** 
  67.      * 判斷指定的隊列是否存在 
  68.      * @param queueName 
  69.      * @return 
  70.      */ 
  71.     public boolean existQueue(String queueName){ 
  72.         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true
  73.     } 
  74.  
  75.     /** 
  76.      * 綁定一個隊列到一個匹配型交換器使用一個routingKey 
  77.      * @param exchangeType 
  78.      * @param exchangeName 
  79.      * @param queueName 
  80.      * @param routingKey 
  81.      * @param isWhereAll 
  82.      * @param headers EADERS模式類型設置,其他模式類型傳空 
  83.      */ 
  84.     public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  85.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  86.         rabbitAdmin.declareBinding(binding); 
  87.     } 
  88.  
  89.     /** 
  90.      * 聲明綁定 
  91.      * @param binding 
  92.      */ 
  93.     public void addBinding(Binding binding){ 
  94.         rabbitAdmin.declareBinding(binding); 
  95.     } 
  96.  
  97.     /** 
  98.      * 解除交換器與隊列的綁定 
  99.      * @param exchangeType 
  100.      * @param exchangeName 
  101.      * @param queueName 
  102.      * @param routingKey 
  103.      * @param isWhereAll 
  104.      * @param headers 
  105.      */ 
  106.     public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  107.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  108.         removeBinding(binding); 
  109.     } 
  110.  
  111.     /** 
  112.      * 解除交換器與隊列的綁定 
  113.      * @param binding 
  114.      */ 
  115.     public void removeBinding(Binding binding){ 
  116.         rabbitAdmin.removeBinding(binding); 
  117.     } 
  118.  
  119.     /** 
  120.      * 創建一個交換器、隊列,并綁定隊列 
  121.      * @param exchangeType 
  122.      * @param exchangeName 
  123.      * @param queueName 
  124.      * @param routingKey 
  125.      * @param isWhereAll 
  126.      * @param headers 
  127.      */ 
  128.     public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  129.         //聲明交換器 
  130.         addExchange(exchangeType, exchangeName); 
  131.         //聲明隊列 
  132.         addQueue(queueName); 
  133.         //聲明綁定關系 
  134.         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  135.     } 
  136.  
  137.     /** 
  138.      * 發送消息 
  139.      * @param exchange 
  140.      * @param routingKey 
  141.      * @param object 
  142.      */ 
  143.     public void convertAndSend(String exchange, String routingKey, final Object object){ 
  144.         rabbitTemplate.convertAndSend(exchange, routingKey, object); 
  145.     } 
  146.  
  147.     /** 
  148.      * 轉換Message對象 
  149.      * @param messageType 
  150.      * @param msg 
  151.      * @return 
  152.      */ 
  153.     public Message getMessage(String messageType, Object msg){ 
  154.         MessageProperties messageProperties = new MessageProperties(); 
  155.         messageProperties.setContentType(messageType); 
  156.         Message message = new Message(msg.toString().getBytes(),messageProperties); 
  157.         return message; 
  158.     } 
  159.  
  160.     /** 
  161.      * 聲明交換機 
  162.      * @param exchangeType 
  163.      * @param exchangeName 
  164.      * @return 
  165.      */ 
  166.     private Exchange createExchange(String exchangeType, String exchangeName){ 
  167.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  168.             return new DirectExchange(exchangeName); 
  169.         } 
  170.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  171.             return new TopicExchange(exchangeName); 
  172.         } 
  173.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  174.             return new HeadersExchange(exchangeName); 
  175.         } 
  176.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  177.             return new FanoutExchange(exchangeName); 
  178.         } 
  179.         return null
  180.     } 
  181.  
  182.     /** 
  183.      * 聲明綁定關系 
  184.      * @param exchangeType 
  185.      * @param exchangeName 
  186.      * @param queueName 
  187.      * @param routingKey 
  188.      * @param isWhereAll 
  189.      * @param headers 
  190.      * @return 
  191.      */ 
  192.     private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  193.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  194.             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey); 
  195.         } 
  196.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  197.             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey); 
  198.         } 
  199.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  200.             if(isWhereAll){ 
  201.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match(); 
  202.             }else
  203.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match(); 
  204.             } 
  205.         } 
  206.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  207.             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName)); 
  208.         } 
  209.         return null
  210.     } 
  211.  
  212.     /** 
  213.      * 聲明隊列 
  214.      * @param queueName 
  215.      * @return 
  216.      */ 
  217.     private Queue createQueue(String queueName){ 
  218.         return new Queue(queueName); 
  219.     } 
  220.  
  221.  
  222.     /** 
  223.      * 交換器類型 
  224.      */ 
  225.     public final static class ExchangeType { 
  226.  
  227.         /** 
  228.          * 直連交換機(全文匹配) 
  229.          */ 
  230.         public final static String DIRECT = "DIRECT"
  231.  
  232.         /** 
  233.          * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個) 
  234.          */ 
  235.         public final static String TOPIC = "TOPIC"
  236.  
  237.         /** 
  238.          * 頭交換機(自定義鍵值對匹配,根據發送消息內容中的headers屬性進行匹配) 
  239.          */ 
  240.         public final static String HEADERS = "HEADERS"
  241.  
  242.         /** 
  243.          * 扇形(廣播)交換機 (將消息轉發到所有與該交互機綁定的隊列上) 
  244.          */ 
  245.         public final static String FANOUT = "FANOUT"
  246.     } 

此致, rabbitMQ 核心操作功能操作已經開發完畢!

2.5、編寫隊列監聽類(靜態)

  1. @Slf4j 
  2. @Configuration 
  3. public class DirectConsumeListener { 
  4.  
  5.     /** 
  6.      * 監聽指定隊列,名稱:mq.direct.1 
  7.      * @param message 
  8.      * @param channel 
  9.      * @throws IOException 
  10.      */ 
  11.     @RabbitListener(queues = "mq.direct.1"
  12.     public void consume(Message message, Channel channel) throws IOException { 
  13.         log.info("DirectConsumeListener,收到消息: {}", message.toString()); 
  14.     } 

如果你需要監聽指定的隊列,只需要方法上加上@RabbitListener(queues = "")即可,同時填寫對應的隊列名稱。

但是,如果你想動態監聽隊列,而不是通過寫死在方法上呢?

請看下面介紹!

2.6、編寫隊列監聽類(動態)

重新實例化一個SimpleMessageListenerContainer對象,這個對象就是監聽容器。

  1. @Slf4j 
  2. @Configuration 
  3. public class DynamicConsumeListener { 
  4.  
  5.     /** 
  6.      * 使用SimpleMessageListenerContainer實現動態監聽 
  7.      * @param connectionFactory 
  8.      * @return 
  9.      */ 
  10.     @Bean 
  11.     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ 
  12.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
  13.         container.setMessageListener((MessageListener) message -> { 
  14.             log.info("ConsumerMessageListen,收到消息: {}", message.toString()); 
  15.         }); 
  16.         return container; 
  17.     } 

如果想向SimpleMessageListenerContainer添加監聽隊列或者移除隊列,只需通過如下方式即可操作。

  1. @Slf4j 
  2. @RestController 
  3. @RequestMapping("/consumer"
  4. public class ConsumerController { 
  5.  
  6.     @Autowired 
  7.     private SimpleMessageListenerContainer container; 
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 添加隊列到監聽器 
  14.      * @param consumerInfo 
  15.      */ 
  16.     @PostMapping("addQueue"
  17.     public void addQueue(@RequestBody ConsumerInfo consumerInfo) { 
  18.         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName()); 
  19.         if(!existQueue){ 
  20.             throw new CommonExecption("當前隊列不存在"); 
  21.         } 
  22.         //消費mq消息的類 
  23.         container.addQueueNames(consumerInfo.getQueueName()); 
  24.         //打印監聽容器中正在監聽到隊列 
  25.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  26.     } 
  27.  
  28.     /** 
  29.      * 移除正在監聽的隊列 
  30.      * @param consumerInfo 
  31.      */ 
  32.     @PostMapping("removeQueue"
  33.     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) { 
  34.         //消費mq消息的類 
  35.         container.removeQueueNames(consumerInfo.getQueueName()); 
  36.         //打印監聽容器中正在監聽到隊列 
  37.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  38.     } 
  39.  
  40.     /** 
  41.      * 查詢監聽容器中正在監聽到隊列 
  42.      */ 
  43.     @PostMapping("queryListenerQueue"
  44.     public void queryListenerQueue() { 
  45.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  46.     } 

2.7、發送消息到交換器

發送消息到交換器,非常簡單,只需要通過如下方式即可!

  • 先編寫一個請求參數實體類
  1. @Data 
  2. public class ProduceInfo implements Serializable { 
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器名稱 
  8.      */ 
  9.     private String exchangeName; 
  10.  
  11.     /** 
  12.      * 路由鍵key 
  13.      */ 
  14.     private String routingKey; 
  15.  
  16.     /** 
  17.      * 消息內容 
  18.      */ 
  19.     public String msg; 
  • 編寫接口api
  1. @RestController 
  2. @RequestMapping("/produce"
  3. public class ProduceController { 
  4.  
  5.     @Autowired 
  6.     private RabbitUtil rabbitUtil; 
  7.  
  8.     /** 
  9.      * 發送消息到交換器 
  10.      * @param produceInfo 
  11.      */ 
  12.     @PostMapping("sendMessage"
  13.     public void sendMessage(@RequestBody ProduceInfo produceInfo) { 
  14.         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo); 
  15.     } 
  16.  

當然,你也可以直接使用rabbitTemplate操作類,來實現發送消息。

  1. rabbitTemplate.convertAndSend(exchange, routingKey, message); 

參數內容解釋:

  • exchange:表示交換器名稱
  • routingKey:表示路由鍵key
  • message:表示消息

2.8、交換器、隊列維護操作

如果想通過接口對 rabbitMQ 中的交換器、隊列以及綁定關系進行維護,通過如下方式接口操作,即可實現!

先編寫一個請求參數實體類

  1. @Data 
  2. public class QueueConfig implements Serializable
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器類型 
  8.      */ 
  9.     private String exchangeType; 
  10.  
  11.     /** 
  12.      * 交換器名稱 
  13.      */ 
  14.     private String exchangeName; 
  15.  
  16.     /** 
  17.      * 隊列名稱 
  18.      */ 
  19.     private String queueName; 
  20.  
  21.     /** 
  22.      * 路由鍵key 
  23.      */ 
  24.     private String routingKey; 

編寫接口api

  1. /** 
  2.  * rabbitMQ管理操作控制層 
  3.  */ 
  4. @RestController 
  5. @RequestMapping("/config"
  6. public class RabbitController { 
  7.  
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 創建交換器 
  14.      * @param config 
  15.      */ 
  16.     @PostMapping("addExchange"
  17.     public void addExchange(@RequestBody QueueConfig config) { 
  18.         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName()); 
  19.     } 
  20.  
  21.     /** 
  22.      * 刪除交換器 
  23.      * @param config 
  24.      */ 
  25.     @PostMapping("deleteExchange"
  26.     public void deleteExchange(@RequestBody QueueConfig config) { 
  27.         rabbitUtil.deleteExchange(config.getExchangeName()); 
  28.     } 
  29.  
  30.     /** 
  31.      * 添加隊列 
  32.      * @param config 
  33.      */ 
  34.     @PostMapping("addQueue"
  35.     public void addQueue(@RequestBody QueueConfig config) { 
  36.         rabbitUtil.addQueue(config.getQueueName()); 
  37.     } 
  38.  
  39.     /** 
  40.      * 刪除隊列 
  41.      * @param config 
  42.      */ 
  43.     @PostMapping("deleteQueue"
  44.     public void deleteQueue(@RequestBody QueueConfig config) { 
  45.         rabbitUtil.deleteQueue(config.getQueueName()); 
  46.     } 
  47.  
  48.     /** 
  49.      * 清空隊列數據 
  50.      * @param config 
  51.      */ 
  52.     @PostMapping("purgeQueue"
  53.     public void purgeQueue(@RequestBody QueueConfig config) { 
  54.         rabbitUtil.purgeQueue(config.getQueueName()); 
  55.     } 
  56.  
  57.     /** 
  58.      * 添加綁定 
  59.      * @param config 
  60.      */ 
  61.     @PostMapping("addBinding"
  62.     public void addBinding(@RequestBody QueueConfig config) { 
  63.         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  64.     } 
  65.  
  66.     /** 
  67.      * 解除綁定 
  68.      * @param config 
  69.      */ 
  70.     @PostMapping("removeBinding"
  71.     public void removeBinding(@RequestBody QueueConfig config) { 
  72.         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  73.     } 
  74.  
  75.     /** 
  76.      * 創建頭部類型的交換器 
  77.      * 判斷條件是所有的鍵值對都匹配成功才發送到隊列 
  78.      * @param config 
  79.      */ 
  80.     @PostMapping("andExchangeBindingQueueOfHeaderAll"
  81.     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) { 
  82.         HashMap<String, Object> header = new HashMap<>(); 
  83.         header.put("queue""queue"); 
  84.         header.put("bindType""whereAll"); 
  85.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nulltrue, header); 
  86.     } 
  87.  
  88.     /** 
  89.      * 創建頭部類型的交換器 
  90.      * 判斷條件是只要有一個鍵值對匹配成功就發送到隊列 
  91.      * @param config 
  92.      */ 
  93.     @PostMapping("andExchangeBindingQueueOfHeaderAny"
  94.     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) { 
  95.         HashMap<String, Object> header = new HashMap<>(); 
  96.         header.put("queue""queue"); 
  97.         header.put("bindType""whereAny"); 
  98.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nullfalse, header); 
  99.     } 

至此,rabbitMQ 管理器基本的 crud 全部開發完成!

三、利用 MQ 實現事務補償

當然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web 項目將其管理起來,最重要的是能投入業務使用中去!

上面的操作只是告訴我們怎么使用 rabbitMQ!

  • 當你仔細回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?

以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:支付訂單、扣減庫存、生成相應單據、發紅包、發短信通知等等。

在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

這種是利用 MQ 實現業務解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。

利用 MQ 實現業務解耦的過程其實也很簡單。

  • 當主流程結束之后,將消息推送到發紅包、發短信交換器中即可
  1. @Service 
  2. public class OrderService { 
  3.  
  4.     @Autowired 
  5.     private RabbitUtil rabbitUtil; 
  6.  
  7.     /** 
  8.      * 創建訂單 
  9.      * @param order 
  10.      */ 
  11.     @Transactional 
  12.     public void createOrder(Order order){ 
  13.         //1、創建訂單 
  14.         //2、調用庫存接口,減庫存 
  15.         //3、向客戶發放紅包 
  16.         rabbitUtil.convertAndSend("exchange.send.bonus"nullorder); 
  17.         //4、發短信通知 
  18.         rabbitUtil.convertAndSend("exchange.sms.message"nullorder); 
  19.     } 
  20.  
  • 監聽發紅包操作
  1. /** 
  2.  * 監聽發紅包 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.send.bonus"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調用發紅包接口 

監聽發短信操作

  1. /** 
  2.  * 監聽發短信 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.sms.message"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調用發短信接口 

既然 MQ 這么好用,那是不是完全可以將以前的業務也按照整個模型進行拆分呢?

答案顯然不是!

當引入 MQ 之后業務的確是解耦了,但是當 MQ 一旦掛了,所有的服務基本都掛了,是不是很可怕!

但是沒關系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發者,有坑填坑!

在下篇文章中,我們會詳細介紹 rabbitMQ 的集群搭建和部署,保證消息幾乎 100% 的投遞和消費。

四、總結

本篇主要圍繞SpringBoot整合rabbitMQ做內容介紹,可能也有理解不到位的地方,歡迎網友批評指出!

 

責任編輯:武曉燕 來源: Java極客技術
相關推薦

2023-04-26 12:46:43

DockerSpringKubernetes

2009-11-09 14:57:37

WCF上傳文件

2011-01-06 10:39:25

.NET程序打包

2021-07-14 09:00:00

JavaFX開發應用

2011-05-03 15:59:00

黑盒打印機

2011-01-10 14:41:26

2025-05-07 00:31:30

2011-04-21 10:32:44

MySQL雙機同步

2021-03-12 10:01:24

JavaScript 前端表單驗證

2020-05-15 08:07:33

JWT登錄單點

2025-08-26 01:32:00

2022-08-04 10:39:23

Jenkins集成CD

2022-12-07 08:42:35

2022-03-14 14:47:21

HarmonyOS操作系統鴻蒙

2022-07-27 08:16:22

搜索引擎Lucene

2022-01-08 20:04:20

攔截系統調用

2011-02-22 13:46:27

微軟SQL.NET

2021-12-28 08:38:26

Linux 中斷喚醒系統Linux 系統

2021-02-26 11:54:38

MyBatis 插件接口

2011-02-22 14:36:40

ASP.NETmsdnC#
點贊
收藏

51CTO技術棧公眾號

成人国产精品久久| 免费在线看a| 日韩激情中文字幕| 久久精品国产亚洲| 95视频在线观看| 免费观看欧美大片| √…a在线天堂一区| 国产尤物91| 国产精品伦一区二区三区| 亚洲欧洲一级| 久久夜色精品国产| b站大片免费直播| 97se亚洲| 91麻豆精品国产| 欧美一级片中文字幕| 中文字幕中文字幕在线十八区 | 亚洲东热激情| 精品国内亚洲在观看18黄| 精品无码在线视频| 中文一区二区三区四区| 欧美亚洲日本国产| 一女被多男玩喷潮视频| 中文字幕伦理免费在线视频 | 亚洲欧美99| 亚洲av电影一区| 国产sm精品调教视频网站| 国产精品成av人在线视午夜片| 精品无码黑人又粗又大又长| 水蜜桃久久夜色精品一区| 亚洲人成77777在线观看网| 91传媒理伦片在线观看| 欧美一级片网址| 欧美欧美午夜aⅴ在线观看| 茄子视频成人免费观看| 爱啪视频在线观看视频免费| 伊人婷婷欧美激情| 一本二本三本亚洲码| 风间由美一区| 久久精品欧美日韩精品| 久久一区二区三区av| 国模人体一区二区| 成人永久aaa| 99视频免费观看| 国产精品一品二区三区的使用体验| 久久婷婷亚洲| 国产91在线播放| 亚洲黄色激情视频| 欧美资源在线| 日av在线播放中文不卡| 波多野结衣视频网站| 一区二区三区四区五区精品视频| 性金发美女69hd大尺寸| 亚洲视频免费播放| 一区二区国产在线观看| 66m—66摸成人免费视频| 国产情侣在线视频| 亚洲女同同性videoxma| 青青草原一区二区| 久久久国产免费| 日韩成人精品在线观看| 国产精品爽黄69| 97国产精品久久久| 国产成人亚洲精品青草天美| 成人欧美视频在线| 一区二区三区福利| 欧美精选在线播放| 在线观看免费av网址| 久久久久久一区二区三区四区别墅| 欧美图片一区二区三区| www亚洲成人| 国产麻豆一区二区三区| 欧美第一区第二区| 精品国产av色一区二区深夜久久| 网红女主播少妇精品视频| 国产视频精品一区二区三区| japanese中文字幕| 91精品国产91久久久久久黑人| 久久香蕉国产线看观看av| 免费人成视频在线| 米奇777在线欧美播放| 国产精品激情av在线播放| 亚洲天堂2021av| 国产精品亚洲一区二区三区在线| 国产精品一区视频网站| 黄色在线视频观看网站| 专区另类欧美日韩| 成人免费播放器| 亚洲成人短视频| 欧美一级黄色录像| 一级做a爰片毛片| 999久久久国产精品| 久久久视频精品| 99成人精品视频| 国产sm精品调教视频网站| 欧美一区二区在线| 91麻豆免费在线视频| 日韩人体视频一二区| 日本高清免费在线视频| 欧美爱爱网站| 欧美大胆在线视频| 凹凸精品一区二区三区| 国产黑丝在线一区二区三区| 欧美日韩电影一区二区| а√天堂资源地址在线下载| 精品国产999| 国产探花在线观看视频| 午夜先锋成人动漫在线| 欧美不卡视频一区发布| a片在线免费观看| 成人精品小蝌蚪| 在线播放 亚洲| 久久毛片亚洲| 欧美精品一区二区三区很污很色的 | av在线成人| 国产午夜精品视频免费不卡69堂| 久久国产精品二区| 国产一区二区在线影院| 婷婷精品国产一区二区三区日韩 | 国产精品久久久久久久9999 | 一区二区三区回区在观看免费视频 | 久久精品99久久无色码中文字幕| 91精品国产高清自在线| 亚洲黄色在线免费观看| 亚洲天天做日日做天天谢日日欢 | 91精品国产乱码在线观看| 国内精品伊人久久久久av影院| 日本中文不卡| 日韩精品99| 亚洲精品有码在线| 精品国产乱码一区二区| 成人免费精品视频| 日韩精品手机在线观看| 国产午夜久久av| 精品国偷自产在线| 中文无码精品一区二区三区| 久久婷婷国产综合国色天香 | 精品中文av资源站在线观看| 日本一区二区三区视频在线播放| 91精品论坛| 日韩精品视频在线播放| 天天综合天天干| 91丨九色丨尤物| 日日橹狠狠爱欧美超碰| 久久久伦理片| 97精品一区二区三区| 全国男人的天堂网| 午夜精品久久久久久久久| 婷婷五月精品中文字幕| 亚洲国产一区二区三区a毛片| 国产精品免费一区二区三区观看| 少女频道在线观看免费播放电视剧| 91精品国产综合久久久久久| 人妻久久一区二区| 国产99久久久久久免费看农村| 黄网站色视频免费观看| 视频一区日韩| 午夜精品一区二区三区在线视| 欧美一区二区三区成人片在线| 亚洲成人av一区| 久久精品老司机| 天堂成人免费av电影一区| 欧美亚洲另类久久综合| a成人v在线| 欧美成人精品不卡视频在线观看| 亚洲第一页在线观看| 亚洲高清三级视频| www.久久av| 久久激情综合网| 日韩视频 中文字幕| 林ゆな中文字幕一区二区| 国产成人精品免费久久久久| 一本一道波多野毛片中文在线| 欧美一级国产精品| 日韩熟女精品一区二区三区| 久久奇米777| 日本77777| 最新成人av网站| 午夜午夜精品一区二区三区文| 高清国产一区二区三区四区五区| 欧美精品九九久久| 九九在线视频| 日韩一区二区三区在线| www.伊人久久| 亚洲免费观看高清完整版在线观看熊 | 久久亚洲电影天堂| 少妇无码一区二区三区| 欧美性色黄大片手机版| 免费麻豆国产一区二区三区四区| 久久亚洲精品小早川怜子| 亚洲涩涩在线观看| 免费在线亚洲欧美| 免费看黄色a级片| 国内精品久久久久久99蜜桃| 亚洲在线第一页| 成人性生活视频| 九九热最新视频//这里只有精品| 色视频精品视频在线观看| 欧美日韩精品一区二区在线播放| 在线免费观看毛片| 亚洲欧美中日韩| 毛片网站免费观看| 成人污污视频在线观看| 鲁一鲁一鲁一鲁一av| 99热这里只有精品8| 久久久久亚洲av无码专区喷水| 全国精品免费看| 91黄色精品| 久久亚洲人体| 国产精品video| av漫画网站在线观看| 久久精品人人做人人爽| 精品影院一区| 日韩黄色高清视频| 高清一区二区三区四区| 777奇米四色成人影色区| 久久亚洲精品石原莉奈| 亚洲成精国产精品女| xxxx日本少妇| 日韩一区在线播放| 亚洲高潮女人毛茸茸| 91欧美一区二区| 国产高潮视频在线观看| 国产乱妇无码大片在线观看| 色播五月激情五月| 七七婷婷婷婷精品国产| 茄子视频成人免费观看| 亚洲欧美日本日韩| 欧美日韩二三区| 99精品视频免费| 妺妺窝人体色777777| 午夜精品999| 日日噜噜夜夜狠狠久久丁香五月 | 久久久久国色av免费看影院| 亚洲天堂美女视频| av电影在线观看不卡| 稀缺小u女呦精品呦| 国产999精品久久久久久绿帽| 91热视频在线观看| 国产一区二区0| 91在线第一页| 国产成人av自拍| jjzzjjzz欧美69巨大| 国产精品91xxx| 少妇献身老头系列| 丰满亚洲少妇av| 国产伦精品一区二区三区精品| 成人午夜av在线| 性色av蜜臀av色欲av| 91免费视频网址| 亚洲第一页av| 国产欧美一区二区精品忘忧草 | 欧美日韩免费观看一区=区三区| av电影一区二区三区| 欧美日韩午夜| 成熟丰满熟妇高潮xxxxx视频| 日韩午夜高潮| 白嫩少妇丰满一区二区| 蜜桃视频免费观看一区| 午夜视频在线网站| 国产精品一二三在| 你懂的在线观看网站| 久久久亚洲精品石原莉奈| 91精品久久久久久久久久久久| 国产精品久久久久久久久久免费看| 天堂а√在线中文在线鲁大师| 亚洲欧洲无码一区二区三区| 欧美成人国产精品高潮| 午夜视频在线观看一区二区三区| 国产69精品久久久久久久久久| 色天天综合久久久久综合片| 国产一区二区网站| 精品国产3级a| 高清福利在线观看| 久久99精品视频一区97| 成人美女黄网站| 91精品视频网站| 欧美亚视频在线中文字幕免费| 日韩免费中文专区| 欧美日一区二区三区在线观看国产免| 久久久999视频| 九色综合国产一区二区三区| 美女露出粉嫩尿囗让男人桶| 久久蜜臀中文字幕| a级片在线观看免费| 欧美日韩在线影院| 国产裸体永久免费无遮挡| 亚洲国产私拍精品国模在线观看| 国产一级免费在线观看| 欧美另类高清videos| 日韩不卡免费高清视频| 91日韩久久| japanese国产精品| 很污的网站在线观看| 轻轻草成人在线| 亚洲久久久久久| 亚洲色图欧美激情| 无码人妻一区二区三区线| 日韩一级黄色片| 91在线免费看| 性色av一区二区三区| 国产精品欧美一区二区三区不卡 | 精品成人在线| 久久久精品高清| 久久久九九九九| 日本五十熟hd丰满| 3d动漫精品啪啪一区二区竹菊| 国产在线一二三| 高清欧美电影在线| 国产激情一区| 午夜午夜精品一区二区三区文| 亚洲一区观看| www.17c.com喷水少妇| 亚洲少妇30p| 中文字幕永久在线| 亚洲奶大毛多的老太婆| 国产精品探花在线| 7777奇米亚洲综合久久| 99久久精品网站| 久久久精品三级| 久久久久综合网| 久久夜靖品2区| 亚洲成色777777女色窝| 亚洲性图自拍| 亚洲综合在线小说| 91精品精品| gai在线观看免费高清| 国产精品网站在线| 丰满人妻一区二区三区四区| 亚洲情综合五月天| 欧美一区 二区 三区| 欧美日韩视频在线一区二区观看视频| 亚洲激情网址| 日韩www视频| 欧美日韩中文字幕| 欧美日韩国产综合视频| 欧美在线中文字幕| 亚洲欧美日本伦理| 男人天堂网视频| 久久久美女毛片| 最近中文在线观看| 这里只有精品在线播放| 影音成人av| 亚洲综合五月天| 国内精品写真在线观看| 国产一区二区精彩视频| 日韩欧美一区二区视频| 色呦呦在线播放| 国产日韩精品久久| 国产日韩视频| 国产传媒国产传媒| 欧美视频一区二区三区四区| 欧美日韩xx| 91九色极品视频| 亚洲三级观看| 美女洗澡无遮挡| 欧美裸体bbwbbwbbw| 手机电影在线观看| 久久亚洲一区二区| 蜜臀精品一区二区三区在线观看| 国产大屁股喷水视频在线观看| 欧美丰满少妇xxxbbb| 尤物yw193can在线观看| 国产麻豆一区二区三区在线观看| 国产欧美日韩综合一区在线播放| caopeng视频| 7777精品伊人久久久大香线蕉经典版下载 | 成人久久精品人妻一区二区三区| 韩国福利视频一区| 久久不见久久见中文字幕免费| 污网站免费在线| 亚洲乱码国产乱码精品精98午夜| 秋霞av鲁丝片一区二区| 国产成人av在线播放| 91精品国产91久久久久久黑人| 性囗交免费视频观看| 在线观看国产一区二区| 在线你懂的视频| 欧美另类一区| 黑人巨大精品欧美黑白配亚洲| 国产无码精品久久久| 中文字幕精品在线| swag国产精品一区二区| 国产无套粉嫩白浆内谢的出处| 亚洲激情网站免费观看| 日韩av高清在线| 亚洲a∨日韩av高清在线观看| 99亚洲视频| 国产成人综合在线视频| 亚洲男女性事视频| 91成人午夜| 天堂网在线免费观看| 香蕉av福利精品导航| 午夜免费视频在线国产| 国产精品有限公司| 极品少妇xxxx精品少妇偷拍| 国产99久久久| 欧美激情乱人伦一区| 国产精品7m凸凹视频分类| 亚洲av无码一区二区三区网址| 欧美一区二区三区在线观看视频|