rabbitMQ是如何保障消息可靠性的(一)

一. 前言

消息隊(duì)列一般有三部分:生產(chǎn)者,隊(duì)列本身和消費(fèi)者。消息出現(xiàn)問題,一般也就圍繞這三部分。

二. 消息丟失處理

2.1 發(fā)送端確認(rèn)機(jī)制
發(fā)送端確認(rèn)(publisher confirm)機(jī)制。生產(chǎn)者將信道設(shè)置成confirm(確認(rèn))模式,一旦信道進(jìn)入confirm 模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后(如果消息和隊(duì)列是持久化的,那么確認(rèn)消息會(huì)在消息持久化后發(fā)出),RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達(dá)了。
如果rabbitMQ沒能處理該條消息,比如因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失等異常情況發(fā)生,就會(huì)發(fā)送一個(gè)Basic.Nack給生產(chǎn)者,此時(shí)程序可以進(jìn)行重試等操作。

2.1.1 程序案例

  • application.properties配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
  • controller代碼
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback((correlationData,
                                                flag, cause) -> {
            if (flag) {
                try {
                    System.out.println("消息確認(rèn):" +
                            correlationData.getId() + " "
                            + new String(correlationData.getReturnedMessage().getBody(), "utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println(cause);
            }
        });
    }
        核心業(yè)務(wù)代碼
        CorrelationData correlationData = new CorrelationData();//correlationData消息唯一id
        String cId = UUID.randomUUID().toString();
        correlationData.setId(cId);
        correlationData.setReturnedMessage(new Message("這是msg1的響應(yīng)".getBytes("utf-8"), null));
        log.info("消息UUID為:{}", cId);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUNTING_KEY, "your message",
                correlationData);

2.2.2 事務(wù)
一但事務(wù)提交后都沒有異常,確實(shí)就說明消息是投遞成功了

Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        try {
            channel.txSelect();
            channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, null, "your message".getBytes());
            channel.txCommit();
        } catch (IOException e) {
            e.printStackTrace();
            try {
                channel.txRollback();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

注意開啟事務(wù)吞吐量會(huì)下降。

在下一篇文章中,我們會(huì)討論下隊(duì)列消息持久化機(jī)制。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容