RabbitMQ(五) - 路由(Routing)

路由(routing)

上一個(gè)教程中,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們將日志消息廣播到很多個(gè)消費(fèi)者。

在這個(gè)教程我們將給它加一個(gè)特性 —— 我們將使它可以只訂閱消息的一個(gè)子集。例如:我們直接將致命的錯(cuò)誤信息打印到日志(保存在磁盤中),同時(shí)能將所有的日志信息打印在控制臺(tái)上。

綁定(Bindings)

在前面的教程中我們已經(jīng)用到了綁定,可以重新調(diào)用那段代碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個(gè)exchange和queue之間的綁定關(guān)系。換句話說:這個(gè)queue對(duì)exchange中的消息感興趣。

綁定需要增加一個(gè)額外的參數(shù)routingKey。為了避免和推送消息中basic_publish的參數(shù)名造成混亂,消費(fèi)者中我們叫它binding key。下面展示創(chuàng)建一個(gè)binding key:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的關(guān)鍵在于依賴的exchange類型。類型為fanout的exchange,上個(gè)教程用到的,忽略了它的值。

直接交換(direct exchange)

上個(gè)教程我們的日志系統(tǒng)直接將所有的信息廣播給所有的消費(fèi)者。我們想在這個(gè)的基礎(chǔ)上根據(jù)消息的嚴(yán)重程度進(jìn)行過濾。例如:我們想有一個(gè)程序用來將重要的錯(cuò)誤信息存儲(chǔ)到磁盤當(dāng)中,而不像浪費(fèi)空間去存儲(chǔ)warning或者是info級(jí)別的日志消息。

我們使用的fanout類型的exchange,并不能給我們提供這樣的靈活性 —— 它只能盲目的進(jìn)行廣播。

我們將用direct類型的exchange來代替它。direct exchange背后的路由算法很簡(jiǎn)單 —— 生產(chǎn)者的routing key 完全匹配消費(fèi)者中的binding key

為了說明這些,請(qǐng)看下圖中的配置:

rabbit-direct

在上圖的配置中,我們看到有兩個(gè)隊(duì)列綁定了類型為direct的exchange X。第一個(gè)隊(duì)列綁定了的key為orange,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定的key為black,另一個(gè)是green。

在這樣的一個(gè)配置中,推送到exchange的消息,routing key為orange的消息將路由到隊(duì)列1(Q1)中,routing key為black或者為green的消息將路由到隊(duì)列2(Q2)中。其他的消息將被丟棄。

多個(gè)綁定(multiple bindings)

多個(gè)隊(duì)列綁定相同的key是完全允許的。在我們的例子當(dāng)中,我們可以在XQ1之間添加一個(gè)綁定key為black的關(guān)系。這樣,類型為direct的exchange就類似于fanout了,能將消息廣播到所有匹配的隊(duì)列中。當(dāng)消息的routing key為black時(shí),將分發(fā)到Q1Q2中。

發(fā)送日志(emitting logs)

我們將在我們的日志系統(tǒng)中使用這個(gè)模式。將消息發(fā)送到類型為direct的exchange中,而不是fanout類型的exchange。這樣接收程序就能選擇重要的消息接收了。
像以前一樣,我們需要?jiǎng)?chuàng)建一個(gè)exchange先:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后發(fā)送一個(gè)消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

簡(jiǎn)單起見,我們將嚴(yán)重的級(jí)別的定義為:info、warning、error。

訂閱(subscribing)

接收消息將和前面的教程差不多,但有一點(diǎn)除外 —— 我們將給我們所有感興趣的每種嚴(yán)重的消息創(chuàng)建綁定關(guān)系。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

信息匯總

rabbitmq-routing

官網(wǎng)的使用命令行執(zhí)行的。這里我們將一次性的向exchange發(fā)送6條消息,info、warn、error三個(gè)級(jí)別各兩條。如上圖,我們創(chuàng)建兩個(gè)消費(fèi)者,這里我們創(chuàng)建兩個(gè)類,一個(gè)接收routing key為error的消息,并將其打印到文件中;另一個(gè)接收所有消息并打印到控制臺(tái)。

EmitLogDirect.java

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        System.out.println(" [*] begin sent message to exchange");
        /* 分別發(fā)送兩條 info,warn,error基本的消息 */
        channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is first error message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is first warn message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is first info message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is second error message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is second warn message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is second info message.".getBytes("UTF-8"));
        System.out.println( " [x] done. . . ");

        channel.close();
        connection.close();
    }
}

ReceiveLogDirectToConsole

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogDirectToConsole {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warn");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for message and handle it to console . . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] receive " + envelope.getRoutingKey() + " : '" + message + "'");
            }
        };

        channel.basicConsume(queueName,consumer);
    }
}

ReceiveLogDirectToFile

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.*;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogDirectToFile {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for message and handle it to file . . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                File file = new File("/temp/direct.log");

                FileOutputStream out = new FileOutputStream(file, true);
                out.write(body);
                out.write(("\r\n").getBytes());
                out.flush();
                out.close();
            }
        };

        channel.basicConsume(queueName,consumer);
    }
}

我們先運(yùn)行兩個(gè)消費(fèi)者,然后運(yùn)行生產(chǎn)者??摧敵鼋Y(jié)果:

ReceiveLogDirectToConsole 消費(fèi)者

rabbitmq-direct-console

ReceiveLogDierctToFile 消費(fèi)者

rabbitmq-direct-file
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評(píng)論 2 34
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,370評(píng)論 0 11
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,236評(píng)論 3 51
  • RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖,消息分發(fā)的作用。 消息...
    彩虹之夢(mèng)閱讀 1,155評(píng)論 2 1

友情鏈接更多精彩內(nèi)容