RabbitMQ Spring-AMQP官方教程(四)--路由

Routing(路由)

In the previous tutorial we built a simple fanout exchange. We were able to broadcast messages to many receivers.

在上一個(gè)教程里,我們構(gòu)建了一個(gè)簡單的廣播交換器。通過它我們能將消息廣播到多個(gè)接收者。

In this tutorial we're going to add a feature to it - we're going to make it possible to subscribe only to a subset of the messages. For example, we will be able to direct only messages to the certain colors of interest ("orange", "black", "green"), while still being able to print all of the messages on the console.

在本教程里,我們將往里添加一個(gè)功能——我們準(zhǔn)備讓接收者可以只訂閱部分消息。例如,我們將只要某些我們感興趣的顏色(“橙色”,“黑色”,“綠色”)的消息,但仍能在控制臺(tái)里打印出所有的信息。

Bindings(綁定)

In previous examples we were already creating bindings. You may recall code like this in our Tut3Config file:

在之前的例子當(dāng)中,我們創(chuàng)建了綁定器。通過下面的代碼片段回顧下我們的Tut3Config配置文件:

@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

交換器和隊(duì)列是通過綁定器連結(jié)在一起的。這種關(guān)系可以讀作:這個(gè)隊(duì)列對(duì)這個(gè)交換器里的消息感興趣。

Bindings can take an extra routingKey parameter. Spring-amqp uses a fluent API to make this relationship very clear. We pass in the exchange and queue into the BindingBuilder and simply bind the queue "to" the exchange "with a routing key" as follows:

可以再傳一個(gè)路由鍵(routingKey)參數(shù)給綁定。在這點(diǎn)上,Spring-amqp使用了流式API,使得隊(duì)列,交換器和路由鍵之間的關(guān)系變得很清晰。我們將某個(gè)交換器和某個(gè)隊(duì)列傳給BindingBuilder,將傳入的隊(duì)列用某個(gè)路由鍵綁定到傳入的交換器上,如下所示:

@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange");
}

The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.

綁定鍵的含義取決與交換器的類型。我們之前使用的廣播交換器就忽略了這個(gè)值。

Direct exchange(直接交換器)

Our messaging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their color type. For example, we may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

在上一個(gè)教程里,我們的消息隊(duì)列系統(tǒng)將所有消息廣播給所有的消費(fèi)者?,F(xiàn)在我們需要讓消息隊(duì)列系統(tǒng)可以基于消息的顏色類型進(jìn)行消息過濾。例如,對(duì)于一個(gè)將日志消息寫入磁盤的程序,我們可能只想讓它接收嚴(yán)重錯(cuò)誤類型的日志消息,而不是警告或者信息級(jí)別的日志消息,從而不致于浪費(fèi)磁盤空間。

We were using a fanout exchange, which doesn't give us much flexibility - it's only capable of mindless broadcasting.

在上一個(gè)教程里,我們用到的廣播交換器不能給我們這個(gè)靈活性,因?yàn)樗粫?huì)做機(jī)械廣播。

We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.

我們將使用直連交換器來替換它。直連交換器背后的路由算法很簡單——當(dāng)消息被推入到某個(gè)隊(duì)列時(shí),這個(gè)隊(duì)列綁定的鍵要與消息的路由鍵完全匹配。

To illustrate that, consider the following setup:

為了說明這一點(diǎn),考慮下面的情況:

image

In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.

從圖中我們可以看到,有兩個(gè)隊(duì)列綁定了直連交換器X。第一個(gè)隊(duì)列綁定時(shí)用了orange鍵,第二個(gè)隊(duì)列用了兩個(gè),一個(gè)是black鍵,另一個(gè)是green鍵。

In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.

在這種情況下,帶有orange路由鍵的消息被發(fā)布到交換器時(shí),會(huì)被路由到隊(duì)列Q1。帶有black鍵或green鍵的消息將被推入到隊(duì)列Q2。所有其它的消息將被丟棄。

Multiple bindings(多綁定)

image

It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

多個(gè)隊(duì)列用同個(gè)鍵綁定到同個(gè)交換器是完全合法的。在我們的例子當(dāng)中,我們可以用鍵black在交換器X和隊(duì)列Q1之間添加綁定。在這種情況下,直接交換器的行為將和廣播交換器一樣,將消息廣播給所有匹配的隊(duì)列。帶有路由鍵black的消息將被發(fā)送給隊(duì)列Q1和隊(duì)列Q2。

Publishing messages(發(fā)布消息)

We'll use this model for our routing system. Instead of fanout we'll send messages to a direct exchange. We will supply the color as a routing key. That way the receiving program will be able to select the color it wants to receive (or subscribe to). Let's focus on sending messages first.

我們將在我們的路由系統(tǒng)中使用這種模型。我們將發(fā)送消息給直連交換器,而不是廣播交換器。我們將使用顏色作為路由鍵。這么做的話接收者程序就可以選擇它想接收(或者說訂閱)的顏色。讓我們先看看如何發(fā)送消息。

As always, we do some spring boot configuration in Tut4Config:

照例,我們在Tut4Config配置文件里做些spring boot配置:

@Bean
public FanoutExchange fanout() {
    return new FanoutExchange("tut.fanout");
}

And we're ready to send a message. Colors, as in the diagram, can be one of 'orange', 'black', or 'green'.

現(xiàn)在我們準(zhǔn)備發(fā)送一條消息。如圖所示,顏色可以是"orange","black",或者“green”

Subscribing(訂閱)

Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each color we're interested in. This also goes into the Tut4Config.

接收消息將會(huì)像上一個(gè)教程那樣,除了有一點(diǎn)不同——我們將為每一個(gè)我們感興趣的顏色創(chuàng)建一個(gè)新的綁定器。這一點(diǎn)也將在Tut4Config里體現(xiàn)。

@Bean
public DirectExchange direct() {
    return new DirectExchange("tut.direct");
}
...
@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange");
}

Putting it all together(整合代碼)

image

As in the previous tutorials, create a new package for this tutorial called "tut4" and create the Tut4Config class. The code for Tut4Config.java class:

像之前的教程那樣,為本教程創(chuàng)建一個(gè)新的包目錄“tut4”,并創(chuàng)建Tut4Config類。以下為TutConfig.java的代碼:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut4","routing"})
@Configuration
public class Tut4Config {

    @Bean
    public DirectExchange direct() {
        return new DirectExchange("tut.direct");
    }
    
    @Profile("receiver")
    private static class ReceiverConfig {
    
        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }
        
        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }
        
        @Bean
        public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange");
        }
        
        @Bean
        public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("black");
        }
        
        @Bean
        public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) {
        return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("green");
        }
        
        @Bean
        public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("black");
        }
        
        @Bean
        public Tut4Receiver receiver() {
            return new Tut4Receiver();
        }
    }
    
    @Profile("sender")
    @Bean
    public Tut4Sender sender() {
        return new Tut4Sender();
    }
}

The code for our sender class is:

我們的發(fā)送者類是這樣的:

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut4Sender {

    @Autowired
    private RabbitTemplate template;
    
    @Autowired
    private DirectExchange direct;
    
    private int index;
    
    private int count;
    
    private final String[] keys = {"orange", "black", "green"};
    
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == 3) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(direct.getName(), key, message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

The code for Tut4Receiver.java is:

然后下面為Tut4Receiver.java的代碼:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut4Receiver {

    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receive1(String in) throws InterruptedException {
        receive(in, 1);
    }
    
    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
        receive(in, 2);
    }
    
    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }
        
    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }

}

Compile as usual (see tutorial one for maven compilation and executing the options from the jar).

像之前那樣去編譯(對(duì)于如何用maven進(jìn)行編譯以及如何通過參數(shù)來運(yùn)行jar包,請見教程1)。

mvn clean package

In one terminal window you can run:

打開一個(gè)終端窗口,運(yùn)行以下命令:

java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar
--spring.profiles.active=routing,receiver
--tutorial.client.duration=60000

and in the other temrinal window run the sender:

打開另一個(gè)終端窗口,輸入以下命令來運(yùn)行發(fā)送者:

java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar
--spring.profiles.active=routing,sender
--tutorial.client.duration=60000

Full source code for Tut4Receiver.java source and Tut4Sender.java source. The configuration is inTut4Config.java source.

完整的源代碼可以參考Tut4Receiver.java源碼Tut4Sender.java源碼。配置類請參考Tut4Config.java源碼

Move on to tutorial 5 to find out how to listen for messages based on a pattern.

下面開始教程5,看看如何基于主題來監(jiān)聽消息。

轉(zhuǎn)自我的博客:https://jiapengcai.github.io/posts/48556/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,847評(píng)論 0 10
  • 文/夏北君 風(fēng)寒,街闊,人群熙攘總之,龐貝冊為我的封地時(shí)龐貝已成廢墟 01 絢麗唯美的畫面,恰到好處的背景音樂,倒...
    dhrbdjkal閱讀 1,228評(píng)論 3 2
  • 要守護(hù)著我
    010ed0d5a362閱讀 130評(píng)論 0 0
  • 1.盡可能多的出參: 大海參不耐高溫,小海參不耐低溫!度夏損失多的一般是成參,所以成參盡可能出干凈! 一方面——減...
    尋夢2019ll閱讀 879評(píng)論 0 0
  • Embulk 開源的批量數(shù)據(jù)加載器,用來在不同數(shù)據(jù)庫、存儲(chǔ)設(shè)備、文件格式以及云服務(wù)間轉(zhuǎn)移數(shù)據(jù),支持:自動(dòng)識(shí)別輸入的...
    Alex90閱讀 1,531評(píng)論 0 2

友情鏈接更多精彩內(nèi)容