SpringBoot整合RabbitMQ系列(二)Exchange的四種類型

Exchange是什么

我們可以將 Exchange 當做一個消息的中轉(zhuǎn)站,所有的消息在發(fā)送到指定隊列前都要經(jīng)過這一層中轉(zhuǎn)站。中轉(zhuǎn)站再根據(jù)某些規(guī)則進行匹配來決定將消息分發(fā)到哪些隊列中。

Exchange的幾種類型

  1. Direct(默認)
    只有當 Routing Key 等于 Binding Key 時,消息才會被路由到隊列中。
  2. Fanout
    實現(xiàn)了一種廣播機制,將消息發(fā)送到所有與這個 Exchange 綁定的隊列中。
  3. Topic
    定義了一種匹配規(guī)則,同樣根據(jù) Routing Key 和 Binding Key 進行模糊匹配,更加靈活。
  4. Headers
    Headers Exchange 會忽略 RoutingKey 而根據(jù)消息中的 Headers 和創(chuàng)建綁定關(guān)系時指定的 Arguments 來匹配決定路由到哪些 Queue,其性能比較差而且 Direct Exchange 完全可以代替它,所以不建議使用。

Exchange 的創(chuàng)建與綁定

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "隊列名"),
            exchange = @Exchange(name = "交換機名稱"), key = "Binding Key"))
    public void listen(String msg) {
        log.info("成功消費隊列的消息: {}", msg);
    }

以上就是一個通過注解聲明隊列同時綁定Exchange的一個列子。
如果僅聲明一條隊列,那么當它被創(chuàng)建時,后臺會自動將這個隊列綁定到一個名稱為空的 Direct Exchange 上,綁定 Routing Key 與隊列名稱相同。

代碼示例

package com.jiangzhe.rabbitmq.chapter2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Slf4j
@EnableScheduling
@SpringBootApplication
public class Chapter2Application {
    private static final String FANOUT_EXCHANGE_NAME = "amqp.fanout";
    private static final String TOPIC_EXCHANGE_NAME = "amqp.topic";
    private static final String DIRECT_EXCHANGE_NAME = "amqp.direct";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(Chapter2Application.class, args);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct"),
            exchange = @Exchange(name = DIRECT_EXCHANGE_NAME), key = "direct.cn"))
    public void listenDirect(String msg, Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        log.info("成功消費隊列 {} 的消息: {}", consumerQueue, msg);
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "fanoutA"),
                    exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME)),
            @QueueBinding(value = @Queue(value = "fanoutB"),
                    exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME)),
            @QueueBinding(value = @Queue(value = "fanoutC"),
                    exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME))})
    public void listenFanout(String msg, Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        log.info("成功消費隊列 {} 的消息: {}", consumerQueue, msg);
    }

    /**
     * 其中*代表匹配任意的一個詞,#代表匹配0或多個詞
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(name = "topicA"),
                    exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.cn.#"),
            @QueueBinding(value = @Queue(name = "topicB"),
                    exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.com.*"),
            @QueueBinding(value = @Queue(name = "topicC"),
                    exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.#"),
    })
    public void listenTopic(String msg, Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        log.info("成功消費隊列 {} 的消息: {}", consumerQueue, msg);
    }

    @Scheduled(initialDelay = 2000, fixedRate = 1000000)
    public void send() throws InterruptedException {
        log.info("----------------------------");
        log.info("direct");
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, "direct.cn", "hello");
        Thread.sleep(1000);
        log.info("----------------------------");
        log.info("fanout");
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "這個值無效", "hello");
        Thread.sleep(1000);
        log.info("----------------------------");
        log.info("to topicC and topicC");
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "topic.com.a", "hello");
        Thread.sleep(1000);
        log.info("----------------------------");
        log.info("to topicA and topicC");
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "topic.cn", "hello");
    }

}

輸出結(jié)果

2019-11-02 12:54:09.775  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
2019-11-02 12:54:09.775  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : direct
2019-11-02 12:54:09.849  INFO 2756 --- [ntContainer#2-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 direct 的消息: hello
2019-11-02 12:54:10.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
2019-11-02 12:54:10.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : fanout
2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 fanoutA 的消息: hello
2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 fanoutB 的消息: hello
2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 fanoutC 的消息: hello
2019-11-02 12:54:11.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
2019-11-02 12:54:11.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : to topicC and topicC
2019-11-02 12:54:11.838  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 topicB 的消息: hello
2019-11-02 12:54:11.838  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 topicC 的消息: hello
2019-11-02 12:54:12.810  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
2019-11-02 12:54:12.810  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : to topicA and topicC
2019-11-02 12:54:12.840  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 topicC 的消息: hello
2019-11-02 12:54:12.840  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消費隊列 topicA 的消息: hello
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容