rabbitmq消息可靠性之消費(fèi)者確認(rèn)機(jī)制(ack機(jī)制)
之前講到rabbitmq提供了以下幾種機(jī)制來保證消息可靠性,上篇介紹了生產(chǎn)者確認(rèn)機(jī)制,本篇講解消費(fèi)者確認(rèn)機(jī)制,消息持久化存儲(chǔ)amqp默認(rèn)就實(shí)現(xiàn)了,不用過多關(guān)注
生產(chǎn)者確認(rèn)機(jī)制
消息持久化存儲(chǔ)
消費(fèi)者確認(rèn)機(jī)制
失敗重試機(jī)制
yml文件配置
spring:
rabbitmq:
host: 1.15.108.206
port: 5672
username: guest
password: guest
virtual-host: smallJHost
# 消費(fèi)者確認(rèn)機(jī)制相關(guān)配置
# 開啟publisher-confirm,
# 這里支持兩種類型:simple:同步等待confirm結(jié)果,直到超時(shí);# correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
publisher-returns: true
# 定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
template:
mandatory: true
listener:
simple:
# ack機(jī)制類型
acknowledge-mode: manual
# 設(shè)置預(yù)取消息數(shù)量
prefetch: 2
注意配置中的acknowledge-mode屬性,這是設(shè)置ack機(jī)制的類型,但是如果是用@RabbitListener注解實(shí)現(xiàn)的消費(fèi)者,那么這里的設(shè)置是不會(huì)生效的,因?yàn)樽⒔庾陨碛?strong>ackMode = "AUTO"的默認(rèn)值,所以在實(shí)現(xiàn)消費(fèi)者的時(shí)候應(yīng)該寫明這個(gè)屬性ackMode = "MANUAL"
消費(fèi)者示例:
package com.gitee.small.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RabbitReceiver {
private static Integer index = 0;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.dog"),
exchange = @Exchange(value = "binding.dog", type = ExchangeTypes.TOPIC)),
ackMode = "MANUAL"
)
public void process(String msg, Channel channel, Message message) {
try {
// TimeUnit.SECONDS.sleep(3);
// // 多個(gè)消費(fèi)者可開啟競爭模式
// channel.basicQos(1);
if (index % 2 == 0) {
log.info("dog-收到消息-成功:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
index++;
} else {
log.info("dog-收到消息-失敗:{}", msg + index);
// 不批量處理,消費(fèi)失敗將消息重新投遞回隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
index++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息確認(rèn)
// 第一個(gè)參數(shù)是消息的唯一ID,第二個(gè)參數(shù)表示是否批量處理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
消息否認(rèn)
// 第一個(gè)參數(shù)是消息的唯一ID,第二個(gè)參數(shù)表示是否批量處理,第三個(gè)參數(shù)表示是否將消息重發(fā)回隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);