4.Routing#前山翻譯

注:這是RabbitMQ-java版Client的指導教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊列的使用
第三篇Publish/Subscribe介紹轉換器以及其中fanout類型
第四篇Routing介紹direct類型轉換器
第五篇Topics介紹topic類型轉換器
第六篇RPC介紹遠程調用

在上一篇指導教程中,我們創(chuàng)建了一個日志系統(tǒng),可以把日志消息廣播給很多接受者。

在這篇指導教程中,我們需要添加一個功能:可以訂閱消息的一部分。例如:我們會直接將嚴重的錯誤信息生成日志文件(保存在空余的磁盤上),但是依然會把所有的日志信息顯示在控制臺。

綁定(Bindings)

在上篇指導教程的例子中,我們已經創(chuàng)建過綁定的實例,你可能會覺得跟下面的代碼類似:

channel.queueBind(queueName, EXCHANGE_NAME, "");

綁定的含義是轉換器和隊列之間的一種關聯(lián),通俗來說就是一個隊列對這個轉換器中的消息感興趣。

綁定可以帶有一個參數:routingKey。為了避免和basic_publish中的參數產生困惑,我們將這個參數叫著binding key(綁定鑰匙),下面是我們創(chuàng)建一個帶有鑰匙的綁定。

 channel.queueBind(queueName, EXCHANGE_NAME, "black");

這個綁定鑰匙的意思取決于轉換器的類型,如果是我們之前使用的fanout類型轉換器,那么會忽略綁定鑰匙的意義。

直接轉換器(Direct exchange)

在上篇指導教程中,我們的日志系統(tǒng)會廣播消息給所有綁定轉換器的消費者?,F(xiàn)在我們擴展一下:根據消息的級別來過濾消息。舉例來說,我們想一個應用只接受嚴重級別的消息并且寫入到磁盤里,就不用浪費磁盤空間去保存警告或者信息日志的消息。

如果使用fanout轉換器,那樣就沒有什么靈活性,不停的愚蠢的廣播。

可以使用direct轉換器,它的路由選擇的算法是容易理解,一個消息之所以到這個隊列中去,是因為隊列的binding Key和發(fā)出消息的routingkey相匹配。

為了說明這個問題,看下下面的結構:

direct-exchange.png

這張結構圖中,可以看到有兩個隊列綁定著類型為direct的轉換器,第一個隊列綁定鑰匙為orange,第二個綁定鑰匙有兩個:一個是black另一個是green。

在上面的結構圖中,一個帶有routingkey為orange的消息發(fā)送給轉換器將會被發(fā)送到隊列Q1中,帶有routing Key為black和green將會被發(fā)送給到隊列Q2中,其他所有的消息將會被清除。

多重綁定(Multiple bindings)

direct-exchange-multiple.png

多個隊列擁有相同的binding key是完全合規(guī)的,上圖中我們可以在轉換器x和帶有bindingkey為black的隊列Q1建立綁定關系。在這種情況下,direct類型的轉換器具有fanout類型的一樣特性,可以廣播給所有匹配的隊列消息。一個routingkey為black的消息將會被發(fā)送到Q1和Q2兩個隊列中。

發(fā)送消息(Emitting logs)

我們使用這種模型應用到日志系統(tǒng)上,發(fā)送給消息給direct而不是fanout類型的轉換器。將以日志嚴重等級作為routing key。按照那種方式,消費者應用將會選擇接受日志的嚴重等級的消息。首先我們先發(fā)送消息。

總是一樣的,先聲明一個轉換器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

準備好發(fā)送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

為了方面,我們假設等級分為三種:info,warning,error。

訂閱(Subscribing)

只要像上篇指導教程中接受消息就可以,有一個不同的地方就是:我們可以去創(chuàng)建任何一個等級的綁定。

String queueName = channel.queueDeclare().getQueue();

        for(String severity : argv){

            channel.queueBind(queueName, EXCHANGE_NAME, severity);

    }

綜合

python-four.png

下面是EmitLogDirect.java類,這里下載

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);  //獲取日志等級

        String message = getMessage(argv);  //獲取消息

        channel.basicPublish(EXCHANGE_NAME, severity, null,    message.getBytes());

       System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();

        connection.close();

        }

    //..

}

下面是ReceiveLogsDirect.java類,這里下載

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){

        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");

        System.exit(1);

        }

        for(String severity : argv){

        channel.queueBind(queueName, EXCHANGE_NAME, severity);

       }

       System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {

        @Override

        public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

             }

        };

        channel.basicConsume(queueName, true, consumer);

      }

}

跟以前一樣編譯,運行的時候為了方面,我們使用環(huán)境便來個$CP作為路徑配置:

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

如果你想把warning和error(而不是info)類型的日消息保存到文件中,只需要打開一個控制臺和記錄:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在你的屏幕上看到所有的日志消息,新打開一個終端和查看就可以:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

舉個例子,發(fā)送一個error類型的日志消息:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

第四節(jié)的內容大致翻譯完了,這里是原文鏈接。接著進入下一節(jié):Topics

終篇是我對RabbitMQ使用理解的總結文章,歡迎討教。
--謝謝--

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,536評論 19 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器。支持消息的持久化、事務、擁塞控...
    jiangmo閱讀 10,507評論 2 34
  • 【譯】RabbitMQ教程一 主要通過Hello Word對RabbitMQ有初步認識 【譯】RabbitMQ教程...
    maxwellyue閱讀 23,041評論 1 30
  • RabbitMQ筆記 本文參考資料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda閱讀 2,942評論 0 11
  • 注:這是RabbitMQ-java版Client的指導教程翻譯系列文章,歡迎大家批評指正第一篇Hello Word...
    前山飯店閱讀 773評論 0 0

友情鏈接更多精彩內容