rabbitmq消息可靠性之消息回調(diào)機制

rabbitmq消息可靠性之消息回調(diào)機制
rabbitmq消息回調(diào)機制圖解.png

rabbitmq在消息的發(fā)送與接收中,會經(jīng)過上面的流程,這些流程中每一步都有可能導致消息丟失,或者消費失敗甚至直接是服務(wù)器宕機等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機制

  • 生產(chǎn)者確認機制

  • 消息持久化存儲

  • 消費者確認機制

  • 失敗重試機制

本文主要講解生產(chǎn)者確認機制,也是rabbitmq提供的消息回調(diào)機制,這個機制可以解決生產(chǎn)者發(fā)送消息到交換機和交換機路由到隊列過程中的消息丟失問題

這種機制必須給每個消息指定一個唯一ID,消息發(fā)送到rabbitmq之后會返回結(jié)果給生產(chǎn)者,表示消息是否發(fā)送成功,返回結(jié)果有以下兩種

  • publisher-confirm:發(fā)送者確認:消息成功投遞到交換機,返回 ack;消息未投遞到交換機,返回 nack

  • publisher-return:發(fā)送者回執(zhí):消息成功投遞到交換機,但是沒有路由到隊列。返回 ack,及路由失敗原因

spring:
  rabbitmq:
    # rabbitMQ的ip地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 集群模式配置
    # addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
    username: admin
    password: 123456
    virtual-host: /
    # 消費者確認機制相關(guān)配置 
    # 開啟publisher-confirm,
    # 這里支持兩種類型:simple:同步等待confirm結(jié)果,直到超時;# correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
    publisher-returns: true
    # 定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
    template:
      mandatory: true

然后定義 ReturnCallback 回調(diào),每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置

package com.gitee.small.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {

    //綁定鍵
    public final static String DOG = "topic.dog";
    public final static String CAT = "topic.cat";


    /**
     * Queue構(gòu)造函數(shù)參數(shù)說明
     * new Queue(SMS_QUEUE, true);
     * 1. 隊列名
     * 2. 是否持久化 true:持久化 false:不持久化
     */


    @Bean
    public Queue firstQueue() {
        return new Queue(DOG);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(CAT);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    /**
     * 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.dog
     * 這樣只要是消息攜帶的路由鍵是topic.dog,才會分發(fā)到該隊列
     */
    @Bean(name = "binding.dog")
    public Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
    }

    /**
     * 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
     * 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
     */
    @Bean(name = "binding.cat")
    public Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
    }
}

接著定義 ConfirmCallback,ConfirmCallback 可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理 confirm 成功或失敗的邏輯不一定相同,上面已經(jīng)定義好exchange 和 queue,新建RabbitMqTest測試類

package smallJ;

import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws InterruptedException {
        // 1.準備CorrelationData
        // 1.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 1.2.準備ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判斷結(jié)果
            if (result.isAck()) {
                // ACK
                log.info("消息成功投遞到交換機!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機失??!消息ID:{},原因:{}", correlationData.getId(), result.getReason());
                // 重發(fā)消息
            }
        }, ex -> {
            // 記錄日志
            log.error("消息發(fā)送異常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
            // 可以重發(fā)消息
        });
        rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式測試-dog", correlationData);
        // 程序休眠兩秒等待回調(diào)
        Thread.sleep(2000);
    }
}

加兩個監(jiān)聽器進行測試

package com.gitee.small.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicRabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
    ))
    public void process(String msg) {
        log.info("dog-收到消息:{}", msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.cat"),
            exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
    ))
    public void  process2(String msg){
        log.info("cat-收到消息:{}", msg);
    }
}

測試結(jié)果如下

smallJ.RabbitMqTest   : 消息成功投遞到交換機!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver   : dog-收到消息:路由模式測試-dog
c.g.small.rabbitmq.TopicRabbitReceiver   : cat-收到消息:路由模式測試-dog
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容