【譯】RabbitMQ教程四

內(nèi)容來自:RabbitMQ Tutorials Java版


Routing

在上一個(gè)教程中,我們創(chuàng)建了一個(gè)簡單的日志系統(tǒng)。我們可以將日志消息廣播給所有的接收者(消費(fèi)者)。

在這個(gè)教程中,我們將為我們的日志系統(tǒng)添加一個(gè)功能:僅僅訂閱一部分消息。比如,我們可以直接將關(guān)鍵的錯(cuò)誤類型日志消息保存到日志文件中,還可以同時(shí)將所有的日志消息打印到控制臺(tái)。


綁定(Bindings)

在之前的例子中,我們已經(jīng)創(chuàng)建了綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個(gè)綁定是建立在一個(gè)隊(duì)列和一個(gè)路由器之間的關(guān)系,可以解讀為:該隊(duì)列對這個(gè)路由器中的消息感興趣。

綁定可以設(shè)置另外的參數(shù):路由鍵routingKey。為了避免和void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)中的routingKey混淆,我們將這里的key稱為綁定鍵binding key,下面的代碼展示了如何使用綁定鍵來創(chuàng)建一個(gè)綁定關(guān)系:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

綁定鍵的含義取決于路由器的類型,我們之前使用的fanout類型路由器會(huì)忽略該值。


直接路由器 (Direct Exchange)

我們之前的日志系統(tǒng)會(huì)將所有消息廣播給所有消費(fèi)者。現(xiàn)在我們想根據(jù)日志的嚴(yán)重程度來過濾日志。比如,我們想要一個(gè)程序來將error日志寫到磁盤文件中,而不要將warninginfo日志寫到磁盤中,以免浪費(fèi)磁盤空間。

我們之前使用的fanout路由器缺少靈活性,它只是沒頭腦地廣播消息。所以,我們用direct路由器來替換它。direct路由器背后的路由算法很簡單:只有當(dāng)消息的路由鍵routing key與隊(duì)列的綁定鍵binding key完全匹配時(shí),該消息才會(huì)進(jìn)入該隊(duì)列。

為了演示上面拗口的表述中的意思,考慮下面的設(shè)置:

direct exchange.png

上圖中,直接路由器x與兩個(gè)隊(duì)列綁定。第一個(gè)隊(duì)列以綁定鍵orange來綁定,第二個(gè)隊(duì)列以兩個(gè)綁定鍵blackgreen和路由器綁定。

按照這種設(shè)置,路由鍵為orange的消息以發(fā)布給路由器后,將會(huì)被路由到隊(duì)列Q1,路由鍵為black或者green的消息將會(huì)路由到隊(duì)列Q2。


多重綁定(Multiple bindings)

Multiple bindings

多個(gè)隊(duì)列以相同的綁定鍵binding key綁定到同一個(gè)Exchange上,是完全可以的。按照這種方式設(shè)置的話,直接路由器就會(huì)像fanout路由器一樣,將消息廣播給所有符合路由規(guī)則的隊(duì)列。一個(gè)路由鍵為black的消息將會(huì)發(fā)布到隊(duì)列Q1和Q2。


發(fā)布消息

在這個(gè)教程中,我們使用direct路由器來代替上個(gè)教程中的fanout路由器。同時(shí),我們?yōu)槿罩驹O(shè)置嚴(yán)重級(jí)別,并將此作為路由鍵。這樣,接收者(消費(fèi)者)就可以選擇性地接收日志消息。
首先,創(chuàng)建一個(gè)路由器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接著,發(fā)送一個(gè)消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

簡單起見,我們假設(shè)severity只能是infowarning 、error中的一種。


消息訂閱

接收消息將會(huì)和之前的教程類似,只是我們會(huì)為每一個(gè)級(jí)別的消息來創(chuàng)建不同的綁定:

String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

放在一塊

完整示意圖

生產(chǎn)者EmitLogDirect.java的完整代碼:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        //創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明路由器和路由器的類型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        String severity = "info";
        String message = ".........i am msg.........";

        //發(fā)布消息
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    
}

消費(fèi)者ReceiveLogsDirect.java的完整代碼如下:

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        //建立連接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明路由器和類型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //聲明隊(duì)列
        String queueName = channel.queueDeclare().getQueue();
        //定義要監(jiān)聽的級(jí)別
        String[] severities = {"info", "warning", "error"};
        //根據(jù)綁定鍵綁定
        for (String severity : severities) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

現(xiàn)在可以進(jìn)行測試了。首先,啟動(dòng)一個(gè)消費(fèi)者實(shí)例(ReceiveLogsDirect.java),然后將其中的要監(jiān)聽的級(jí)別改為String[] severities = {"error"};,再啟動(dòng)另一個(gè)消費(fèi)者實(shí)例。此時(shí),這兩個(gè)消費(fèi)者都開始監(jiān)聽了,一個(gè)監(jiān)聽所有級(jí)別的日志消息,另一個(gè)監(jiān)聽error日志消息。
然后,啟動(dòng)生產(chǎn)者(EmitLogDirect.java),之后將String severity = "info";中的info,分別改為warningerror后運(yùn)行。
這樣,就可以在控制臺(tái)看到如下輸出:

//生產(chǎn)者
[x] Sent 'warning':'.........i am msg.........'
[x] Sent 'info':'.........i am msg.........'
[x] Sent 'error':'.........i am msg.........'
//消費(fèi)者1
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'info':'.........i am msg.........'
 [x] Received 'error':'.........i am msg.........'
 [x] Received 'warning':'.........i am msg.........'
//消費(fèi)者2
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'error':'.........i am msg.........'

說明

①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯后通過javacp命令直接運(yùn)行程序,我是在IDE中進(jìn)行的,相應(yīng)的操作做了修改。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • 【譯】RabbitMQ教程一 主要通過Hello Word對RabbitMQ有初步認(rèn)識(shí) 【譯】RabbitMQ教程...
    maxwellyue閱讀 23,048評論 1 30
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 內(nèi)容來自:RabbitMQ Tutorials Java版 Topics 在上一個(gè)教程中我們改進(jìn)了我們的日志系統(tǒng):...
    maxwellyue閱讀 2,909評論 1 2
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,613評論 51 787

友情鏈接更多精彩內(nèi)容