精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

RabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

開發 架構
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

訂閱模式類型

訂閱模式示例圖:

前面2個案例中,只有3個角色:

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
  • Fanout:廣播,將消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

Publish/Subscribe發布與訂閱模式

1、模式說明

發布訂閱模式:

每個消費者監聽自己的隊列。

生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息

2、案例

(1)生產者

package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建交換機
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           參數:
            1. exchange:交換機名稱
            2. type:交換機類型
                DIRECT("direct"):定向
                FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。
                TOPIC("topic") 通配符的方式
                HEADERS("headers") 參數匹配
            3. durable:是否持久化
            4. autoDelete:自動刪除
            5. internal:內部使用。 一般false
            6. arguments:參數
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        //6. 創建隊列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7. 綁定隊列和交換機
        /*
            queueBind(String queue, String exchange, String routingKey)
            參數:
                1. queue:隊列名稱
                2. exchange:交換機名稱
                3. routingKey:路由鍵,綁定規則
                    如果交換機的類型為fanout ,routingKey設置為""
         */
        channel.queueBind(queue1Name, exchangeName, "");
        channel.queueBind(queue2Name, exchangeName, "");
        //8. 發送消息至交換機,由交換機分發消息
        String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO....";
        channel.basicPublish(exchangeName, "", null, body.getBytes());
        //9. 釋放資源
        channel.close();
        connection.close();
        
    }
}

執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:

下面再來看看隊列,如下:

下面我們繼續來寫兩個消費者接收消息。

(2)消費者1:讀取隊列1的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub1 {

    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。

         */
        channel.queueDeclare(queueName, true, false, false, null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);

        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

(3)消費者2:讀取隊列2的消息

package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_PubSub2 {
    //定義接收隊列的名稱
    final static String queueName = "test_fanout_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設置參數
        factory.setHost("127.0.0.1"); // ip  默認值 localhost
        factory.setPort(5672); //端口  默認值 5672
        factory.setVirtualHost("/test"); //虛擬機 默認值 /
        factory.setUsername("libai"); // 用戶名 默認 guest
        factory.setPassword("libai"); //密碼 默認值 guest
        //3. 創建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創建Channel
        Channel channel = connection.createChannel();
        //5. 創建隊列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數:
            1. queue:隊列名稱
            2. durable:是否持久化,當mq重啟之后,還在
            3. exclusive:
                * 是否獨占。只能有一個消費者監聽這隊列
                * 當Connection關閉時,是否刪除隊列
            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
            5. arguments:參數。
         */
        channel.queueDeclare(queueName, true, false, false, null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數:
            1. queue:隊列名稱
            2. autoAck:是否自動確認
            3. callback:回調對象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調方法,當收到消息后,會自動執行該方法
                1. consumerTag:標識
                2. envelope:獲取一些信息,交換機,路由key...
                3. properties:配置信息
                4. body:數據
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊列的數據 body: " + new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);
        //不需要關閉資源,因為消費者需要持續監聽隊列信息
    }
}

3、測試

啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。

  • 消費者1接收到的消息:

  • 消費者2接收到的消息:

從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。

4、小結

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。

發布訂閱模式與工作隊列模式的區別:

  • 工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
  • 發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
  • 發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。
責任編輯:姜華 來源: 今日頭條
相關推薦

2022-08-15 09:02:22

Redis模式訂閱消息

2023-11-20 08:54:38

2025-01-09 11:15:47

2022-06-27 13:56:10

設計模式緩存分布式系統

2022-12-02 07:28:58

Event訂閱模式Spring

2009-11-05 10:07:37

WCF設計模式

2024-03-28 08:07:42

RabbitMQ訂閱模式

2021-08-02 17:21:08

設計模式訂閱

2024-07-29 08:34:18

C++訂閱者模式線程

2013-10-31 14:30:44

CloudaAPI

2024-05-31 08:53:56

2023-12-04 08:24:23

2023-01-11 08:22:22

RabbitMQ通信模型

2021-04-18 21:07:32

門面模式設計

2023-11-07 12:09:44

TopicKafka

2025-03-11 09:30:00

2012-08-30 09:07:33

設計模式

2021-04-14 09:02:22

模式 設計建造者

2023-05-17 08:16:04

RabbitMQ消息傳遞

2012-10-08 11:18:38

企業應用架構工作單元模式
點贊
收藏

51CTO技術棧公眾號

女人十八岁毛片| 黄黄视频在线观看| 成人一级免费视频| 牛牛国产精品| 亚洲毛片在线看| 天天干天天av| 丝袜老师在线| 亚洲精品亚洲人成人网| 久久福利电影| 国产精品热久久| 午夜亚洲影视| 欧美成人免费小视频| 精品少妇人妻一区二区黑料社区| 日韩三级成人| 色综合天天做天天爱| 九一免费在线观看| 国产免费av高清在线| 国产乱淫av一区二区三区| 51精品在线观看| 午夜精品一区二区三区视频| 久久av电影| 欧美精品一区二区三区在线| 久久久精品高清| 桃花岛成人影院| 亚洲国产视频a| 国产精品12p| 99riav在线| 26uuu另类欧美| 国产伦精品一区二区三区免费视频| 91片黄在线观看喷潮| 久久一二三四| 97人人做人人爱| 精品肉丝脚一区二区三区| 四虎8848精品成人免费网站| 亚洲欧美制服另类日韩| 久久人妻一区二区| 成人偷拍自拍| 日韩午夜中文字幕| av噜噜在线观看| 日韩黄色三级在线观看| 在线亚洲+欧美+日本专区| 免费看国产一级片| 91福利在线免费| 亚洲午夜免费视频| 男人天堂网站在线| caopeng在线| 亚洲免费观看视频| 四虎4hu永久免费入口| 浪潮av一区| 日韩美女久久久| 欧美日韩亚洲国产成人| 九色porny丨首页在线| 中文字幕av一区二区三区| 欧美一二三区| av在线免费观看网| 中文字幕第一区综合| 日韩av电影免费在线观看| 国产最新视频在线| 欧美国产日韩在线观看| 天天综合色天天综合色hd| 国产福利片在线| 中文字幕中文字幕在线一区| 亚洲一卡二卡区| 老司机午夜在线视频| 亚洲精品911| 欧美综合国产| 国产国语videosex另类| 国产美女www| 久久av资源网| av在线不卡一区| 人妻中文字幕一区| 91麻豆国产自产在线观看| 欧美日韩一区二区视频在线| 成人在线观看黄色| 亚洲欧美日韩中文字幕一区二区三区| 成人黄色一级视频| 久久综合伊人77777尤物| 青娱乐91视频| 日韩午夜精品| 国产精品视频1区| 国产黄a三级三级三级| 成人av网站免费观看| 欧美激情第一页在线观看| 国产高清自拍视频在线观看| 国产精品美日韩| 丁香色欲久久久久久综合网| av在线不卡免费| 日韩人体视频一二区| 欧美一级特黄a| 日韩最新av| 亚洲欧美精品suv| 美国一级片在线观看| 欧美国产专区| 国产成人aa精品一区在线播放| 国产又粗又猛又色又| 不卡免费追剧大全电视剧网站| 日本一区二区三区免费看| 色女人在线视频| 欧美性黄网官网| 午夜福利123| 亚州国产精品| 美女精品视频一区| 狠狠狠狠狠狠狠| 国产精品一卡二卡| 日韩精品伦理第一区| 1stkiss在线漫画| 色婷婷av久久久久久久| 特黄特黄一级片| 国产成人av| 国内精品一区二区三区| 国产免费福利视频| 国产日韩欧美一区二区三区综合| 国产一级做a爰片久久毛片男| 美女网站视频一区| 亚洲成av人乱码色午夜| 91制片厂在线| 久久精品综合| 好吊色欧美一区二区三区| 麻豆视频免费在线观看| 91黄色免费看| 一本色道久久综合亚洲精品图片| 亚洲一级淫片| 国产一区在线播放| 国产二区在线播放| 色欧美88888久久久久久影院| 国产乱淫av麻豆国产免费| 欧美www视频在线观看| 国产91色在线免费| 欧美专区福利在线| 性一交一乱一乱一视频| 中文字幕亚洲区| 99热成人精品热久久66| 国产一级成人av| 欧美大学生性色视频| 岳乳丰满一区二区三区| 久久久777精品电影网影网| av动漫在线看| 欧美一区 二区| 午夜精品视频在线| 欧美一区二区三区激情| 一区二区三区成人在线视频| 五月天丁香花婷婷| 91久久高清国语自产拍| 国产精品影片在线观看| 在线视频自拍| 欧美日韩www| 免费黄色激情视频| 免费精品视频在线| 夜夜爽99久久国产综合精品女不卡 | 成人午夜在线| 在线视频一区二区| 亚洲一区二区激情| 日韩毛片一二三区| 波多野结衣在线免费观看| 五月久久久综合一区二区小说| 国产日韩欧美在线播放| 久久黄色美女电影| 日韩一区二区三区在线观看 | 天天做天天爱天天爽综合网| 91精品久久久久久综合乱菊| 乱人伦中文视频在线| 欧美电影一区二区三区| 欧美精品入口蜜桃| 99久久综合99久久综合网站| www黄色av| 日韩电影一区| 91超碰在线电影| 爱情岛亚洲播放路线| 精品视频中文字幕| 无码久久精品国产亚洲av影片| 中文字幕免费不卡在线| 五月六月丁香婷婷| 亚洲裸体俱乐部裸体舞表演av| 久久久久久欧美精品色一二三四| 丝袜美腿一区| 欧美成人午夜免费视在线看片| 日本久久一级片| 一本久道久久综合中文字幕| 国产jizz18女人高潮| 国产馆精品极品| 国产1区2区在线| 成人在线免费观看视频| 2022国产精品| 国产不卡网站| 免费99精品国产自在在线| 欧美在线 | 亚洲| 欧美最新大片在线看| 在线观看成人毛片| 久久久夜色精品亚洲| 亚洲综合在线一区二区| 国产午夜久久| 一级一片免费播放| 色天天色综合| 亚洲va国产va天堂va久久| 中文字幕乱码在线播放| 久久精品99国产精品酒店日本| 午夜小视频免费| 欧美精品自拍偷拍动漫精品| 精国产品一区二区三区a片| 久久精品欧美日韩精品 | 久久久久九九视频| 中文字幕在线播放一区二区| 久久婷婷一区| 精品国偷自产一区二区三区| 日韩欧美网站| 久久久久久久久久久一区| 精品国产亚洲一区二区三区大结局 | 日韩欧美亚洲一区二区三区| 中文字幕日韩一区| 性欧美丰满熟妇xxxx性仙踪林| 国产尤物一区二区| 韩国中文字幕av| 国产精品日本欧美一区二区三区| 色哺乳xxxxhd奶水米仓惠香| 国产亚洲精品美女久久久久久久久久| 国产精品久久久久久久久久直播 | 成人精品在线看| 亚洲精品乱码久久久久久久久 | 日韩一区二区在线免费| 久久精品人成| 综合视频一区| 91久久久久久久| 热久久久久久| 国产精品劲爆视频| av日韩电影| 午夜欧美大片免费观看| 欧美一卡二卡| 欧美成人精品xxx| 成人在线播放免费观看| 搡老女人一区二区三区视频tv| 暖暖视频在线免费观看| 亚洲精品一区二区久| 三级在线观看| 精品小视频在线| 日本免费一区二区三区最新| 亚洲成人激情图| 乱精品一区字幕二区| 欧美大胆一级视频| 亚洲国产成人一区二区 | 动漫美女无遮挡免费| 国产福利不卡视频| www.日本久久| 国产麻豆日韩欧美久久| 国产毛片久久久久久| 精品亚洲免费视频| 99中文字幕在线| 国产精品白丝jk黑袜喷水| 无套内谢丰满少妇中文字幕| 国精产品一区一区三区mba桃花 | 国产女同互慰高潮91漫画| xxxxx在线观看| 国产欧美日韩在线观看| 欧美激情视频二区| 国产精品福利一区| 破处女黄色一级片| 亚洲国产视频在线| 国产做受高潮漫动| 疯狂欧美牲乱大交777| 日韩免费av网站| 欧美日韩在线播放| 国产精品欧美激情在线| 精品国产乱子伦一区| 五月婷婷伊人网| 一区二区成人精品| 超碰免费公开在线| 亚洲97在线观看| 国精产品一区一区三区四川| 国产色视频一区| 2020最新国产精品| 欧美日韩电影一区二区三区| 精品视频97| 超碰在线免费观看97| 亚洲全部视频| 无限资源日本好片| 国产91精品欧美| 蜜桃精品一区二区| 亚洲视频一二区| 日本在线观看中文字幕| 色狠狠桃花综合| 国产欧美一级片| 亚洲精品理论电影| 欧美性天天影视| 午夜精品理论片| 九七影院97影院理论片久久| 超碰在线观看97| 国产麻豆一区二区三区精品视频| 伊甸园精品99久久久久久| 尤物网精品视频| 无码日韩人妻精品久久蜜桃| 国产老妇另类xxxxx| 久久人人爽人人爽人人片| 国产精品的网站| 欧美另类一区二区| 69堂成人精品免费视频| 日本天堂在线| 欧美高清无遮挡| 久久不卡日韩美女| 精品视频一区二区| 91tv精品福利国产在线观看| 无码人妻丰满熟妇区毛片| 国产精品资源在线| 欧美激情久久久久久久| 午夜精品一区二区三区电影天堂| 中文在线观看免费高清| 日韩av在线直播| 最近中文字幕免费mv2018在线| 国产精品∨欧美精品v日韩精品| 2021年精品国产福利在线| 色姑娘综合av| 亚洲一区二区伦理| 男人添女人荫蒂国产| 国产精品久久一卡二卡| 中文字幕国产在线观看| 亚洲成成品网站| 成人影欧美片| 国产日韩欧美一二三区| 欧美午夜精彩| 久草在在线视频| 99re成人精品视频| 国产一级做a爱免费视频| 91精品一区二区三区久久久久久 | 亚洲日韩欧美一区二区在线| 色屁屁影院www国产高清麻豆| 亚洲精品一区二区三区香蕉| 成人a在线视频免费观看| 国产主播欧美精品| 国产精品一区二区三区av麻| av之家在线观看| 波多野结衣中文一区| 久一视频在线观看| 日韩精品综合一本久道在线视频| 日本美女在线中文版| 国产成人免费91av在线| 伊人成综合网yiren22| 少妇人妻在线视频| av动漫一区二区| 黄色激情视频在线观看| 精品久久久久久久久久久久久久久久久 | 糖心vlog精品一区二区| 亚洲无亚洲人成网站77777| 亚洲一区二区三区四区| 欧美日韩一区二区视频在线观看| 久久久久中文| 亚洲黄色小说视频| 欧美手机在线视频| 欧美18一19xxx性| 91精品在线影院| 最新欧美人z0oozo0| 中文字幕亚洲日本| 亚洲一二三四久久| 色视频在线看| 国产成人小视频在线观看| 欧美日韩水蜜桃| www.色欧美| 亚洲一二三区不卡| 日韩porn| 国产精品久久久久久久久影视| 色呦哟—国产精品| 黑人巨大猛交丰满少妇| 亚洲妇熟xx妇色黄| 青青草在线视频免费观看| 国产精品极品尤物在线观看| 91一区二区三区四区| 人妻激情偷乱视频一区二区三区| 一区二区三区在线看| 熟妇人妻一区二区三区四区 | 久久人人爽人人爽爽久久| 精品国产18久久久久久二百| 日韩精品一区二区在线视频| 丰满亚洲少妇av| 男人天堂av在线播放| 精品国产一区二区三区久久狼5月| 久久久久久久久久久久电影| 日韩日韩日韩日韩日韩| 亚洲国产精品成人久久综合一区 | 亚洲欧美日韩爽爽影院| 国精品产品一区| 国产精品三级一区二区| 91理论电影在线观看| 97人妻精品一区二区三区动漫| 久久99国产精品自在自在app | av一区二区三区在线| 波多野结衣视频在线观看| 久久伊人91精品综合网站| 色婷婷av一区二区三区丝袜美腿| jizzzz日本| 激情久久av一区av二区av三区| av一本在线| 国产美女精品在线观看| 男人的天堂久久精品| 国产亚洲欧美久久久久| 在线视频亚洲欧美| 欧美日韩夜夜| 97超碰人人看| 日本高清不卡在线观看| 国内高清免费在线视频| 亚洲狠狠婷婷综合久久久| 99热在这里有精品免费| 国产色综合视频| 国产精品96久久久久久|