RabbitMQ筆記二十:Dead Letter Exchange

Dead Letter Exchange

在隊(duì)列上指定一個(gè)Exchange,則在該隊(duì)列上發(fā)生如下情況,
1.消息被拒絕(basic.reject or basic.nack),且requeue=false
2.消息過(guò)期而被刪除(TTL)
3.消息數(shù)量超過(guò)隊(duì)列最大限制而被刪除
4.消息總大小超過(guò)隊(duì)列最大限制而被刪除

就會(huì)把該消息轉(zhuǎn)發(fā)到指定的這個(gè)exchange
同時(shí)也可以指定一個(gè)可選的x-dead-letter-routing-key,表示默認(rèn)的routing-key,如果沒(méi)有指定,則使用消息的routeing-key(也跟指定的exchange有關(guān),
如果是Fanout類型的exchange,則會(huì)轉(zhuǎn)發(fā)到所有綁定到該exchange的所有隊(duì)列)。

拒絕消息或者nack

示列

定義一個(gè)隊(duì)列zhihao.miao.order,其有屬性x-dead-letter-exchangezhihao.miao.exchange.pay,往Exchange名為zhihao.miao.exchange.order中發(fā)送消息。

zhihao.miao.order隊(duì)列中發(fā)送消息,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        byte[] body = "hello,zhihao.miao".getBytes();

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        rabbitTemplate.send("zhihao.miao.exchange.order","zhihao.miao.order",message);

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

配置類:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

}

此時(shí)消息端拒絕消費(fèi)這個(gè)消息

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        System.out.println(rabbitTemplate);

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

配置類

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }


    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        container.setDefaultRequeueRejected(false);
        //手動(dòng)確認(rèn)
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("=====rece msg======");
                System.out.println(new String(message.getBody()));
                //執(zhí)行拒絕消息
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }
        });
        return container;
    }
}

因?yàn)槠渲付?code>x-dead-letter-exchange是zhihao.miao.exchange.pay,所以會(huì)將消息轉(zhuǎn)發(fā)到zhihao.miao.exchange.pay,而因?yàn)闆](méi)有指定x-dead-letter-routing-key,所以會(huì)使用默認(rèn)的發(fā)送的消息的route key(zhihao.miao.order)進(jìn)行路由,而我們zhihao.miao.exchange.pay的路由信息如下,所以會(huì)將消息轉(zhuǎn)發(fā)到zhihao.miao.auto隊(duì)列中去。

zhihao.miao.exchange.pay的路由信息
發(fā)送二條消息,執(zhí)行了拒絕策略,所以消息轉(zhuǎn)發(fā)到了zhihao.miao.auto隊(duì)列中

示列2

定義了隊(duì)列zhihao.miao.order,不僅定義了x-dead-letter-exchange屬性,也指定了x-dead-letter-routing-key屬性

隊(duì)列zhihao.miao.order定義
zhihao.miao.exchange.pay的路由信息

顯而易見(jiàn)當(dāng)拒絕了該消息的時(shí)候就會(huì)轉(zhuǎn)發(fā)到了zhihao.miao.exchange.pay,而應(yīng)該該隊(duì)列指定了route key為zhihao.miao.pay,所以轉(zhuǎn)發(fā)到了zhihao.miao.pay隊(duì)列中去了。

代碼很上面的一樣。

總結(jié)

上面的示列展示了當(dāng)定義隊(duì)列時(shí)指定了x-dead-letter-exchangex-dead-letter-routing-key視情況而定),并且消費(fèi)端執(zhí)行拒絕策略的時(shí)候?qū)⑾⒙酚傻街付ǖ腅xchange中去。我們知道還有二種情況會(huì)造成消息轉(zhuǎn)發(fā)到死信隊(duì)列。
一種是消息過(guò)期而被刪除,可以使用這個(gè)方式使的rabbitmq實(shí)現(xiàn)延遲隊(duì)列的作用。還有一種就是消息數(shù)量超過(guò)隊(duì)列最大限制而被刪除或者消息總大小超過(guò)隊(duì)列最大限制而被刪除

參考資料

Dead Letter Exchanges
使用TTL(消息過(guò)期)來(lái)實(shí)現(xiàn)消息延遲

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容