RabbitMQ六種隊列模式-路由模式

其實只要看過上篇發(fā)布模式后,相信路由模式上手就非常 easy 了,唯一差距就是兩個參數(shù),exchange類型和 routingKey 。

1. 什么是路由模式

路由模式跟發(fā)布訂閱模式類似,然后在訂閱模式的基礎(chǔ)上加上了類型,訂閱模式是分發(fā)到所有綁定到交換機的隊列,路由模式只分發(fā)到綁定在交換機上面指定路由鍵的隊列,我們可以看一下下面這張圖:



P 表示為生產(chǎn)者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

上圖是一個結(jié)合日志消費級別的配圖,在路由模式它會把消息路由到那些 binding keyrouting key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的 direct 模式。

以上圖的配置為例,我們以 routingKey="error" 發(fā)送消息到 Exchange,則消息會路由到Queue1amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2amqp.gen-Agl…)。如果我們以 routingKey="info"routingKey="warning"來發(fā)送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發(fā)送消息,則消息不會路由到這兩個 Queue 中。

相對于發(fā)布訂閱模式,我們可以看到不再是廣播似的接收全部消息,而是有選擇性的消費。

我們就以接收不同日志級別的隊列為例吧。

2. 代碼部分

日志生產(chǎn)者

cn.lovingliu.rabbitmq_direct.producer.Prodecer

package cn.lovingliu.rabbitmq_direct.producer;

import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author:LovingLiu
 * @Description: 日志消息的生產(chǎn)者
 * @Date:Created in 2020-01-16
 */
public class Prodecer {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.創(chuàng)建新的連接 */
        Connection connection = ConnectionUtil.getConnection();
        /** 2.創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        /** 3.綁定的交換機 參數(shù)1交互機名稱 參數(shù)2 exchange類型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        /** 4.發(fā)送消息 */
        String message = "",sendType="";
        for (int i = 0; i < 10; i++) {
            if(i%2==0){
                sendType = "info";
                message = "我是 info 級別的消息類型:" + i;
            }else{
                sendType = "error";
                message = "我是 error 級別的消息類型:" + i;
            }
            System.out.println("[send]:" + message + "  " +sendType);
            channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes("utf-8"));
            try {
                Thread.sleep(5 * i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /** 5.關(guān)閉通道、連接 */
        channel.close();
        connection.close();
        /** 注意:如果消費者沒有綁定交換機和隊列,則消息會丟失 */
    }
}

注意:exchangeDeclare() 方法 exchange 類型為 direct

消費者

info消費者 cn.lovingliu.rabbitmq_direct.consumer.ConsumerInfo

package cn.lovingliu.rabbitmq_direct.consumer;

import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author:LovingLiu
 * @Description: info日志消息的消費者
 * @Date:Created in 2020-01-16
 */
public class ConsumerInfo {
    private static final String QUEUE_NAME = "consumer_info";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("info消費者啟動");
        /** 1.創(chuàng)建新的連接 */
        Connection connection = ConnectionUtil.getConnection();
        /** 2.創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        /** 3.消費者關(guān)聯(lián)隊列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 4.消費者綁定交換機 參數(shù)1 隊列 參數(shù)2交換機 參數(shù)3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產(chǎn)者消息:" + msg);
            }
        };
        /** 5.消費者監(jiān)聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

error消費者

package cn.lovingliu.rabbitmq_direct.consumer;

import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author:LovingLiu
 * @Description: Error消息的消費者
 * @Date:Created in 2020-01-16
 */
public class ConsumerError {

    private static final String QUEUE_NAME = "consumer_error";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("error消費者啟動");
        /* 1.創(chuàng)建新的連接 */
        Connection connection = ConnectionUtil.getConnection();
        /* 2.創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        /* 3.消費者關(guān)聯(lián)隊列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消費者綁定交換機 參數(shù)1 隊列 參數(shù)2交換機 參數(shù)3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產(chǎn)者消息:" + msg);
            }
        };
        /* 5.消費者監(jiān)聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

3.運行截圖

  • 先運行生產(chǎn)者,生成對應(yīng)的交換機Exchange。
  • 先運行兩個消費者,再運行生產(chǎn)者。如果沒有提前將隊列綁定到交換機,那么直接運行生產(chǎn)者的話,消息是不會發(fā)到任何隊列里的。

生產(chǎn)者

info消費者

error消費者

4. 路由模式總結(jié)

  • 1、兩個隊列消費者設(shè)置的路由不一樣,接收到的消息就不一樣。路由模式下,決定消息向隊列推送的主要取決于路由,而不是交換機了。

  • 2、該模式必須設(shè)置交換機,且聲明路由模式channel.exchangeDeclare(EXCHANGE_NAME, "direct");

生產(chǎn)者發(fā)送消息到交換機,同時定義了一個路由 routingKey,多個消費者聲明多個隊列,與交換機進行綁定,同時定義路由 routingKey,只有路由 routingKey相同的消費者才能消費數(shù)據(jù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容