RabbitMQ筆記十七: Alternate Exchange

Alternate Exchange

Rabbitmq自己擴(kuò)展的功能,不是AMQP協(xié)議定義的。
Alternate Exchange屬性的作用,創(chuàng)建Exchange指定該ExchangeAlternate Exchange屬性,發(fā)送消息的時(shí)候根據(jù)route key并沒(méi)有把消息路由到隊(duì)列中去,這就會(huì)將此消息路由到Alternate Exchange屬性指定的Exchange上了。

創(chuàng)建一個(gè)Fanout類(lèi)型的Exchange

自動(dòng)聲明帶有Alternate Exchange的Exchange,

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public Exchange exchange(){
        Map<String,Object> argsMap = new HashMap<>();
        argsMap.put("alternate-exchange","zhihao.miao.exchange.order");
        return new TopicExchange("zhihao.miao.exchange.pay",true,false,argsMap);
    }


     @Bean
    public Binding binding(){
        return new Binding("zhihao.miao.pay",Binding.DestinationType.QUEUE,"zhihao.miao.exchange.pay","zhihao.miao.pay.*",new HashMap<>());
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

應(yīng)用啟動(dòng)類(lèi)

import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        //使得客戶(hù)端第一次連接rabbitmq
        context.getBean(RabbitAdmin.class).getQueueProperties("**");
        context.close();
    }
}

啟動(dòng)應(yīng)用啟動(dòng)類(lèi)之后生成一個(gè)帶有alternate-exchange屬性的Exchange。

生成了一個(gè)帶有alternate-exchange屬性的Exchange

zhihao.miao.exchange.pay是個(gè)包含alternate-exchange屬性的topic類(lèi)型的exchange(route key是zhihao.miao.pay.*,隊(duì)列名是zhihao.miao.pay),alternate-exchange屬性指定的是fanout類(lèi)型的exchange,exchange的名稱(chēng)是zhihao.miao.exchange.order(綁定到zhihao.miao.order隊(duì)列)

如果正確的路由(符合zhihao.miao.pay.*)規(guī)則,則zhihao.miao.pay隊(duì)列接收到消息。如果是不正確的路由(不符合zhihao.miao.pay.*)規(guī)則,則路由到zhihao.miao.exchange.pay Exchange指定的alternate-exchange屬性的Exchange中。

測(cè)試

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

啟動(dòng)應(yīng)用類(lèi):

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        byte[] body = "hello,zhihao.miao".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        rabbitTemplate.send("zhihao.miao.exchange.pay","zhihao.miao.pay.aaa",message);

        TimeUnit.SECONDS.sleep(3);

        context.close();
    }
}

此時(shí)發(fā)送消息到名為zhihao.miao.exchange.payExchange,而Route keyzhihao.miao.pay.aaa,所以能正確地路由到zhihao.miao.pay隊(duì)列中。

當(dāng)指定的Route key不能正確的路由的時(shí)候,則直接發(fā)送到名為zhihao.miao.exchange.orderExchange,而因?yàn)槲覀兌x的Exchange類(lèi)型是fanout類(lèi)型,所以就能路由到zhihao.miao.order隊(duì)列中了。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        byte[] body = "hello,zhihao.miao".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        //此時(shí)route不到,那么就路由到alternate-exchange的屬性配置的exchage
        rabbitTemplate.send("zhihao.miao.exchange.pay","hehe.zhihao.miao",message);

        TimeUnit.SECONDS.sleep(3);

        context.close();
    }
}

一般alternate-exchange屬性的值最好是fanout類(lèi)型的exchange,否則還會(huì)根據(jù)route keyalternate-exchange屬性的exchange進(jìn)行匹配再去路由。而如果指定了fanout類(lèi)型的exchange,不需要去匹配routekey。

alternate-exchange配置的Exchange也不能正確路由

示列說(shuō)明

創(chuàng)建了一個(gè)topic類(lèi)型的Exchange帶有alternate-exchange屬性,其alternate-exchangeexchange也是topic類(lèi)型的exchange,如果消息的route key既不能,這個(gè)消息就會(huì)丟失??梢杂|發(fā)publish confirm機(jī)制,表示這個(gè)消息沒(méi)有確認(rèn)。

創(chuàng)建Exchange
binding關(guān)系
binding關(guān)系

配置:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

正常路由到Exchange名為head.info路由的隊(duì)列中。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        byte[] body = "hello,zhihao.miao".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        //正確路由到header.info隊(duì)列
        rabbitTemplate.send("head.info","head.info.a",message);

        TimeUnit.SECONDS.sleep(3);

        context.close();
    }
}

路由到Exchange名為head.info指定的alternate-exchange配置的head.error所路由的隊(duì)列中。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        byte[] body = "hello,zhihao.miao".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        //正確路由到header.info隊(duì)列
        rabbitTemplate.send("head.info","head.error.a",message);

        TimeUnit.SECONDS.sleep(3);

        context.close();
    }
}

二者都不符合則消息丟失,可以使用publish confirm來(lái)做生產(chǎn)端的消息確認(rèn),因?yàn)橄](méi)有正確路由到隊(duì)列,所以觸發(fā)了return method。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        byte[] body = "hello".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        //正確路由到header.info隊(duì)列
        rabbitTemplate.send("head.info","header.debug.a",message);

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

配置:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        factory.setPublisherReturns(true);
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("===========消息無(wú)法被路由=========");
            System.out.println("replyCode: "+replyCode);
            System.out.println("replyText: "+replyText);
            System.out.println("exchange: "+exchange);
            System.out.println("routingKey: "+routingKey);
        });
        return rabbitTemplate;
    }
}

總結(jié)

  • 建議Alternate Exchange的類(lèi)型是fanout,防止出現(xiàn)路由失敗。
    fanout exchange一般不需要指定Alternate Exchange屬性。
  • 如果一個(gè)Exchange指定了Alternate Exchange,那就意味著,當(dāng)ExchangeAlternate Exchange都無(wú)法路由的時(shí)候,才會(huì)觸發(fā)return method。

參考資料

Alternate Exchange官網(wǎng)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,695評(píng)論 19 139
  • Dead Letter Exchange 在隊(duì)列上指定一個(gè)Exchange,則在該隊(duì)列上發(fā)生如下情況,1.消息被拒...
    二月_春風(fēng)閱讀 9,656評(píng)論 0 4
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,514評(píng)論 2 34
  • Exchange概念 Exchange:交互機(jī),根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列。 自己說(shuō)說(shuō)Exchange在Rab...
    二月_春風(fēng)閱讀 23,078評(píng)論 1 20
  • 今天繼續(xù)培訓(xùn),繼昨天核心詞匯與客戶(hù)感知外,今天做了成功五步驟,銷(xiāo)售技巧,消費(fèi)心理及品牌價(jià)值及理念的培訓(xùn),下午還考試...
    橋上橋樹(shù)閱讀 291評(píng)論 0 0

友情鏈接更多精彩內(nèi)容