rabbitmq消息可靠性之消費(fèi)者確認(rèn)機(jī)制(ack機(jī)制)

rabbitmq消息可靠性之消費(fèi)者確認(rèn)機(jī)制(ack機(jī)制)

之前講到rabbitmq提供了以下幾種機(jī)制來保證消息可靠性,上篇介紹了生產(chǎn)者確認(rèn)機(jī)制,本篇講解消費(fèi)者確認(rèn)機(jī)制,消息持久化存儲(chǔ)amqp默認(rèn)就實(shí)現(xiàn)了,不用過多關(guān)注

  • 生產(chǎn)者確認(rèn)機(jī)制

  • 消息持久化存儲(chǔ)

  • 消費(fèi)者確認(rèn)機(jī)制

  • 失敗重試機(jī)制

yml文件配置

spring:
  rabbitmq:
    host: 1.15.108.206
    port: 5672
    username: guest
    password: guest
    virtual-host: smallJHost
    # 消費(fèi)者確認(rèn)機(jī)制相關(guān)配置 
    # 開啟publisher-confirm,
    # 這里支持兩種類型:simple:同步等待confirm結(jié)果,直到超時(shí);# correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
    publisher-returns: true
    # 定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
    template:
      mandatory: true
    listener:
      simple:
        # ack機(jī)制類型
        acknowledge-mode: manual
        # 設(shè)置預(yù)取消息數(shù)量
        prefetch: 2

注意配置中的acknowledge-mode屬性,這是設(shè)置ack機(jī)制的類型,但是如果是用@RabbitListener注解實(shí)現(xiàn)的消費(fèi)者,那么這里的設(shè)置是不會(huì)生效的,因?yàn)樽⒔庾陨碛?strong>ackMode = "AUTO"的默認(rèn)值,所以在實(shí)現(xiàn)消費(fèi)者的時(shí)候應(yīng)該寫明這個(gè)屬性ackMode = "MANUAL"

消費(fèi)者示例:

package com.gitee.small.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class RabbitReceiver {

    private static Integer index = 0;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "binding.dog", type = ExchangeTypes.TOPIC)),
            ackMode = "MANUAL"
    )
    public void process(String msg, Channel channel, Message message) {
        try {
//            TimeUnit.SECONDS.sleep(3);
//        // 多個(gè)消費(fèi)者可開啟競爭模式
//        channel.basicQos(1);
            if (index % 2 == 0) {
                log.info("dog-收到消息-成功:{}", msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                index++;
            } else {
                log.info("dog-收到消息-失敗:{}", msg + index);
                // 不批量處理,消費(fèi)失敗將消息重新投遞回隊(duì)列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                index++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息確認(rèn)

// 第一個(gè)參數(shù)是消息的唯一ID,第二個(gè)參數(shù)表示是否批量處理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

消息否認(rèn)

// 第一個(gè)參數(shù)是消息的唯一ID,第二個(gè)參數(shù)表示是否批量處理,第三個(gè)參數(shù)表示是否將消息重發(fā)回隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容