RabbitMQ之五工作模式詳解

個(gè)人專題目錄

1. RabbitMQ工作模式

1.1 Work queues工作隊(duì)列模式

模式說(shuō)明

1556009144848.png

Work Queues與入門程序的簡(jiǎn)單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。

應(yīng)用場(chǎng)景:對(duì)于 任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

代碼

Work Queues與入門程序的簡(jiǎn)單模式的代碼是幾乎一樣的;可以完全復(fù)制,并復(fù)制多一個(gè)消費(fèi)者進(jìn)行多個(gè)消費(fèi)者同時(shí)消費(fèi)消息的測(cè)試。

1)生產(chǎn)者

public class Producer {

    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        //創(chuàng)建連接
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 1; i <= 30; i++) {
            // 發(fā)送信息
            String message = "你好;小兔子!work模式--" + i;
            /**
             * 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
             * 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
             * 參數(shù)3:消息其它屬性
             * 參數(shù)4:消息內(nèi)容
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("已發(fā)送消息:" + message);
        }

        // 關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2)消費(fèi)者1

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

        //一次只能接收并處理一個(gè)消息
        channel.basicQos(1);

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //路由key
                    System.out.println("路由key為:" + envelope.getRoutingKey());
                    //交換機(jī)
                    System.out.println("交換機(jī)為:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id為:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
                    Thread.sleep(1000);

                    //確認(rèn)消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
    }
}

3)消費(fèi)者2

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

        //一次只能接收并處理一個(gè)消息
        channel.basicQos(1);

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //路由key
                    System.out.println("路由key為:" + envelope.getRoutingKey());
                    //交換機(jī)
                    System.out.println("交換機(jī)為:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id為:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
                    Thread.sleep(1000);

                    //確認(rèn)消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
    }
}

測(cè)試

啟動(dòng)兩個(gè)消費(fèi)者,然后再啟動(dòng)生產(chǎn)者發(fā)送消息;到IDEA的兩個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)查看是否競(jìng)爭(zhēng)性的接收到消息。

小結(jié)

在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間對(duì)于同一個(gè)消息的關(guān)系是競(jìng)爭(zhēng)的關(guān)系。

1.2. 訂閱模式類型

訂閱模式示例圖:

1556014499573.png

前面2個(gè)案例中,只有3個(gè)角色:

  • P:生產(chǎn)者,也就是要發(fā)送消息的程序
  • C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
  • queue:消息隊(duì)列,圖中紅色部分

而在訂閱模型中,多了一個(gè)exchange角色,而且過(guò)程略有變化:

  • P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
  • C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來(lái)。
  • Queue:消息隊(duì)列,接收消息、緩存消息。
  • Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見(jiàn)以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
    • Direct:定向,把消息交給符合指定routing key 的隊(duì)列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

1.3. Publish/Subscribe發(fā)布與訂閱模式

模式說(shuō)明

1556010329032.png

發(fā)布訂閱模式:
1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列。
2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收
到消息

代碼

1)生產(chǎn)者

/**
 * 發(fā)布與訂閱使用的交換機(jī)類型為:fanout
 */
public class Producer {

    //交換機(jī)名稱
    static final String FANOUT_EXCHAGE = "fanout_exchange";
    //隊(duì)列名稱
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    //隊(duì)列名稱
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws Exception {

        //創(chuàng)建連接
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        /**
         * 聲明交換機(jī)
         * 參數(shù)1:交換機(jī)名稱
         * 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");

        for (int i = 1; i <= 10; i++) {
            // 發(fā)送信息
            String message = "你好;小兔子!發(fā)布訂閱模式--" + i;
            /**
             * 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
             * 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
             * 參數(shù)3:消息其它屬性
             * 參數(shù)4:消息內(nèi)容
             */
            channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
            System.out.println("已發(fā)送消息:" + message);
        }

        // 關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2)消費(fèi)者1

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
    }
}

3)消費(fèi)者2

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
    }
}

測(cè)試

啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在每個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送的所有消息;到達(dá)廣播的效果。

在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 fanout_exchange 的交換機(jī),可以查看到綁定關(guān)系。

小結(jié)

交換機(jī)需要與隊(duì)列進(jìn)行綁定,綁定之后;一個(gè)消息可以被多個(gè)消費(fèi)者都收到。

發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別

1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。

2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。

3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。

1.4. Routing路由模式

模式說(shuō)明

路由模式特點(diǎn):

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息
1556029284397.png

圖解:

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
  • X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
  • C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
  • C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息

代碼

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。

1)生產(chǎn)者

/**
 * 路由模式的交換機(jī)類型為:direct
 */
public class Producer {

    //交換機(jī)名稱
    static final String DIRECT_EXCHAGE = "direct_exchange";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";

    public static void main(String[] args) throws Exception {

        //創(chuàng)建連接
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        /**
         * 聲明交換機(jī)
         * 參數(shù)1:交換機(jī)名稱
         * 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");

        // 發(fā)送信息
        String message = "新增了商品。路由模式;routing key 為 insert " ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);

        // 發(fā)送信息
        message = "修改了商品。路由模式;routing key 為 update" ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);

        // 關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2)消費(fèi)者1

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
    }
}

3)消費(fèi)者2

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
    }
}

測(cè)試

啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果。

在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 direct_exchange 的交換機(jī),可以查看到綁定關(guān)系。

小結(jié)

Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routing key,消息會(huì)轉(zhuǎn)發(fā)到符合routing key的隊(duì)列。

1.5. Topics通配符模式

模式說(shuō)明

Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!

Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

#:匹配一個(gè)或多個(gè)詞

*:匹配不多不少恰好1個(gè)詞

舉例:

item.#:能夠匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

1556031362048.png
1556031519931.png

圖解:

  • 紅色Queue:綁定的是usa.# ,因此凡是以 usa.開頭的routing key 都會(huì)被匹配到
  • 黃色Queue:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配

代碼

1)生產(chǎn)者

使用topic類型的Exchange,發(fā)送消息的routing key有3種: item.insert、item.updateitem.delete

/**
 * 通配符Topic的交換機(jī)類型為:topic
 */
public class Producer {

    //交換機(jī)名稱
    static final String TOPIC_EXCHAGE = "topic_exchange";
    //隊(duì)列名稱
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    //隊(duì)列名稱
    static final String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) throws Exception {

        //創(chuàng)建連接
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        /**
         * 聲明交換機(jī)
         * 參數(shù)1:交換機(jī)名稱
         * 參數(shù)2:交換機(jī)類型,fanout、topic、topic、headers
         */
        channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);


        // 發(fā)送信息
        String message = "新增了商品。Topic模式;routing key 為 item.insert " ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);

        // 發(fā)送信息
        message = "修改了商品。Topic模式;routing key 為 item.update" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);

        // 發(fā)送信息
        message = "刪除了商品。Topic模式;routing key 為 item.delete" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);

        // 關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2)消費(fèi)者1

接收兩種類型的消息:更新商品和刪除商品

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
    }
}

3)消費(fèi)者2

接收所有類型的消息:新增商品,更新商品和刪除商品。

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 創(chuàng)建頻道
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);

        // 聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);

        //隊(duì)列綁定交換機(jī)
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");

        //創(chuàng)建消費(fèi)者;并設(shè)置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
             * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
             * properties 屬性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機(jī)
                System.out.println("交換機(jī)為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //監(jiān)聽(tīng)消息
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
         * 參數(shù)3:消息接收到后回調(diào)
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
    }
}

測(cè)試

啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果;并且這些routing key可以使用通配符。

在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 topic_exchange 的交換機(jī),可以查看到綁定關(guān)系。

小結(jié)

Topic主題模式可以實(shí)現(xiàn) Publish/Subscribe發(fā)布與訂閱模式Routing路由模式 的功能;只是Topic在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。

1.6. 模式總結(jié)

RabbitMQ工作模式:
1、簡(jiǎn)單模式 HelloWorld
一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))

2、工作隊(duì)列模式 Work Queue
一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(競(jìng)爭(zhēng)關(guān)系),不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))

3、發(fā)布訂閱模式 Publish/subscribe
需要設(shè)置類型為fanout的交換機(jī),并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)將消息發(fā)送到綁定的隊(duì)列

4、路由模式 Routing
需要設(shè)置類型為direct的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列

5、通配符模式 Topic
需要設(shè)置類型為topic的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定通配符方式的routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容