RabbitMQ確認(rèn)消息Confirm詳解

理解Confirm消息確認(rèn)機(jī)制
  • 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個(gè)應(yīng)答。
  • 生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到Broker,這種方式也是消息的可靠性投遞的核心保障!
確認(rèn)機(jī)制流程圖

生產(chǎn)端發(fā)送消息到Broker,然后Broker接收到了消息后,進(jìn)行回送響應(yīng),生產(chǎn)端有一個(gè)Confirm Listener,去監(jiān)聽?wèi)?yīng)答,當(dāng)然這個(gè)操作是異步進(jìn)行的,生產(chǎn)端將消息發(fā)送出去就可以不用管了,讓內(nèi)部監(jiān)聽器去監(jiān)聽Broker給我們的響應(yīng)。

如何實(shí)現(xiàn)Confirm確認(rèn)消息?
  • 第一步,在channel上開啟確認(rèn)模式:channel.confirmSelect()
  • 第二步,在channel上添加監(jiān)聽:addConfirmListener,監(jiān)聽成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!
生產(chǎn)端實(shí)現(xiàn)
public class ConfirmProducer {
    
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創(chuàng)建一個(gè)新的Channel
        Channel channel = connection.createChannel();
        
        //4 指定我們的消息投遞模式: 消息的確認(rèn)模式 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        
        //5 發(fā)送一條消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        
        //6 添加一個(gè)確認(rèn)監(jiān)聽
        channel.addConfirmListener(new ConfirmListener() {
            //消息失敗處理
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //deliveryTag;唯一消息標(biāo)簽
                //multiple:是否批量
                System.err.println("-------no ack!-----------");
            }
            //消息成功處理
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });
    }
}
消費(fèi)端實(shí)現(xiàn)
public class ConfirmConsumer {

    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創(chuàng)建一個(gè)新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";
        
        //4 聲明交換機(jī)和隊(duì)列,然后進(jìn)行綁定設(shè)置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 創(chuàng)建消費(fèi)者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        //6 接收消息
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費(fèi)端: " + msg);
        }
    }
}
運(yùn)行說明

先啟動消費(fèi)端,訪問管控臺:http://192.168.43.157:15672,檢查Exchange和Queue是否設(shè)置OK,然后啟動生產(chǎn)端,消息被消費(fèi)端消費(fèi),生產(chǎn)端也成功監(jiān)聽到了ACK響應(yīng)。

# 消費(fèi)端打印
消費(fèi)端: Hello RabbitMQ Send confirm message!
# 生產(chǎn)端打印
-------ack!-----------1

什么時(shí)候會走 handleNack 方法呢,比如磁盤寫滿了,MQ出現(xiàn)了一些異常,或者Queue容量到達(dá)上限了等等

也有可能兩個(gè)方法都不走,比如生產(chǎn)端發(fā)送消息就失敗了,或者Broker端收到消息在返回ack時(shí)中途出現(xiàn)了網(wǎng)絡(luò)閃斷。
這種情況就需要定時(shí)任務(wù)去抓取中間狀態(tài)的消息進(jìn)行最大努力嘗試次數(shù)的補(bǔ)償重發(fā),從而保障消息投遞的可靠性。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容