理解Confirm消息確認(rèn)機(jī)制
- 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個(gè)應(yīng)答。
- 生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到Broker,這種方式也是消息的可靠性投遞的核心保障!
確認(rèn)機(jī)制流程圖
生產(chǎn)端發(fā)送消息到Broker,然后Broker接收到了消息后,進(jìn)行回送響應(yīng),生產(chǎn)端有一個(gè)Confirm Listener,去監(jiān)聽?wèi)?yīng)答,當(dāng)然這個(gè)操作是異步進(jìn)行的,生產(chǎn)端將消息發(fā)送出去就可以不用管了,讓內(nèi)部監(jiān)聽器去監(jiān)聽Broker給我們的響應(yīng)。

如何實(shí)現(xiàn)Confirm確認(rèn)消息?
- 第一步,在channel上開啟確認(rèn)模式:
channel.confirmSelect() - 第二步,在channel上添加監(jiān)聽:
addConfirmListener,監(jiān)聽成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!
生產(chǎn)端實(shí)現(xiàn)
public class ConfirmProducer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel();
//4 指定我們的消息投遞模式: 消息的確認(rèn)模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 發(fā)送一條消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一個(gè)確認(rèn)監(jiān)聽
channel.addConfirmListener(new ConfirmListener() {
//消息失敗處理
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//deliveryTag;唯一消息標(biāo)簽
//multiple:是否批量
System.err.println("-------no ack!-----------");
}
//消息成功處理
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
消費(fèi)端實(shí)現(xiàn)
public class ConfirmConsumer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 聲明交換機(jī)和隊(duì)列,然后進(jìn)行綁定設(shè)置路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
//6 接收消息
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費(fèi)端: " + msg);
}
}
}
運(yùn)行說明
先啟動消費(fèi)端,訪問管控臺:http://192.168.43.157:15672,檢查Exchange和Queue是否設(shè)置OK,然后啟動生產(chǎn)端,消息被消費(fèi)端消費(fèi),生產(chǎn)端也成功監(jiān)聽到了ACK響應(yīng)。
# 消費(fèi)端打印
消費(fèi)端: Hello RabbitMQ Send confirm message!
# 生產(chǎn)端打印
-------ack!-----------1
什么時(shí)候會走 handleNack 方法呢,比如磁盤寫滿了,MQ出現(xiàn)了一些異常,或者Queue容量到達(dá)上限了等等
也有可能兩個(gè)方法都不走,比如生產(chǎn)端發(fā)送消息就失敗了,或者Broker端收到消息在返回ack時(shí)中途出現(xiàn)了網(wǎng)絡(luò)閃斷。
這種情況就需要定時(shí)任務(wù)去抓取中間狀態(tài)的消息進(jìn)行最大努力嘗試次數(shù)的補(bǔ)償重發(fā),從而保障消息投遞的可靠性。