RabbitMQ消息冪等性問題

  1. 什么是冪等性?
    在編程中一個(gè)冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

HTTP方法的冪等性是指一次和多次請求某一個(gè)資源應(yīng)該具有同樣的副作用。冪等性屬于語義范疇,正如編譯器只能幫助檢查語法錯(cuò)誤一樣,HTTP規(guī)范也沒有辦法通過消息格式等語法手段來定義它。

簡之:一個(gè)請求,不管重復(fù)來多少次,結(jié)果是不會(huì)改變的。

1.1 消息隊(duì)列的冪等性
如同HTTP方法的冪等性,消息隊(duì)列同樣會(huì)出現(xiàn)冪等性問題。

消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息;注意,RabbitMQ 這種消息重試(補(bǔ)償)機(jī)制是默認(rèn)的。

所以,MQ 消費(fèi)者的冪等性問題,主要在于 MQ 的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。

那么,如何合適選擇重試機(jī)制?我們來看兩種情況。

情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無法訪問,是否需要重試?

需要重試

情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試?

不需要重試

總結(jié):對(duì)于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù) job 健康檢查+人工進(jìn)行補(bǔ)償

1.2 模擬重試機(jī)制
我們采用一種短信消費(fèi)者客戶端異常的情況來模擬 RabbitMQ 的重試機(jī)制。

@RabbitListener(queues = "fanout_sms_queue")
public void process(String msg) {
System.out.println("短信消費(fèi)者獲取生產(chǎn)者消息msg:" + msg);
int i = 1/0;
}
如上代碼,很顯然會(huì)報(bào)錯(cuò),一擔(dān)報(bào)錯(cuò)生產(chǎn)者的消息時(shí)不會(huì)被消費(fèi)的?

@RabbitListener 底層使用 AOP 進(jìn)行異常通知攔截,如果程序沒有拋出異常信息,那么就會(huì)自動(dòng)提交事務(wù);如果 AOP 異常通知攔截有捕獲到異常信息的話,就會(huì)自動(dòng)實(shí)現(xiàn)重試(補(bǔ)償)機(jī)制,同時(shí),這個(gè)補(bǔ)償機(jī)制的消息會(huì)緩存到 RabbitMQ 服務(wù)器端進(jìn)行存放,一直重試到不拋出異常為止。

1.2.1 生產(chǎn)者代碼
@Component
public class FanoutProducer {

@Autowired
private AmqpTemplate amqpTemplate;

/**
 * 發(fā)送消息
 *
 * @param queueName 隊(duì)列名稱
 */
public void send(String queueName) {
    String msg = "my_fanout_msg:" + System.currentTimeMillis();
    Message message = MessageBuilder
                    .withBody(msg.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "")
                    .build();
    System.out.println(msg + ":" + msg);
    amqpTemplate.convertAndSend(queueName, message);
}

}
1.2.2 消費(fèi)者代碼
@Component
public class FanoutEamilConsumer {

@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
    String revMessage = Thread.currentThread().getName() 
            + ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:" 
            + new String(message.getBody(), "UTF-8")
            + ",messageId:" + message.getMessageProperties().getMessageId();
    System.out.println(revMessage);
}

}
1.2.3 消費(fèi)者 application.yml 配置
spring:
rabbitmq:

連接地址

host: 127.0.0.1

端口號(hào)

port: 5672

賬號(hào)

username: guest

密碼

password: guest

地址

virtual-host: /admin_host
listener:
  simple:
    retry:
      ####開啟消費(fèi)者重試
      enabled: true
      ####最大重試次數(shù)
      max-attempts: 5
      ####重試間隔次數(shù)
      initial-interval: 3000

server:
port: 8081
我們通過 RabbitMQ 配置,增加了 RabbitMQ 重試時(shí)間以及重試次數(shù)限制,在一定程度上解決了重復(fù)消費(fèi)的問題,接下來看一道常問的面試題。

  1. 如何保證消息冪等性,不被重復(fù)消費(fèi)?
    其實(shí),這個(gè)問題也算是 MQ 面試當(dāng)中經(jīng)??疾斓囊稽c(diǎn),因?yàn)闊o論是什么 MQ 都會(huì)有這個(gè)問題。

首先通過上邊我們了解了什么是“冪等性”,以及 MQ 冪等性問題的產(chǎn)生,所以我們要清楚為什么會(huì)出現(xiàn)重復(fù)性消費(fèi)?在什么場景會(huì)出現(xiàn)重復(fù)消費(fèi)?

解決方法
使用全局 MessageID 判斷消費(fèi)方使用同一個(gè),解決冪等性問題。
或者使用業(yè)務(wù)邏輯保證唯一(比如訂單號(hào)碼)

生產(chǎn)者關(guān)鍵代碼:

@Autowired
private AmqpTemplate amqpTemplate;

/**

  • 發(fā)送消息
  • @param queueName 隊(duì)列名稱
    */
    public void send(String queueName) {
    String msg = "my_fanout_msg:" + System.currentTimeMillis();
    Message message = MessageBuilder
    .withBody(msg.getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8")
    .setMessageId(UUID.randomUUID() + "")
    .build();
    System.out.println(msg + ":" + msg);
    amqpTemplate.convertAndSend(queueName, message);
    }
    如上,生產(chǎn)者在發(fā)送消息時(shí)(convertAndSend),給消息對(duì)象設(shè)置了唯一的 MessageID,只有該 MessageID 沒有被消費(fèi)者標(biāo)記方能在重試機(jī)制中再次被消費(fèi)。

消費(fèi)者關(guān)鍵代碼:

@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
String revMessage = Thread.currentThread().getName()
+ ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId();
System.out.println(revMessage);
發(fā)送郵件的邏輯XXX
}
如上,通過 message.getMessageProperties().getMessageId() 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經(jīng)被消費(fèi)者消費(fèi)過了,如果已經(jīng)消費(fèi)則取消再次消費(fèi)。

通常怎么判斷呢?

比如上方是一個(gè)郵件發(fā)送的消費(fèi)者,在做補(bǔ)償時(shí),假如上一步郵件發(fā)送成功了,我們會(huì)把該 ID 存至 redis中,下次再調(diào)用時(shí),先去 redis 判斷是否存在該 ID 了,如果存在表明已經(jīng)消費(fèi)過了則直接返回,不再消費(fèi),否則消費(fèi),然后將記錄存至 redis。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容