- 什么是冪等性?
在編程中一個(gè)冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
HTTP方法的冪等性是指一次和多次請求某一個(gè)資源應(yīng)該具有同樣的副作用。冪等性屬于語義范疇,正如編譯器只能幫助檢查語法錯(cuò)誤一樣,HTTP規(guī)范也沒有辦法通過消息格式等語法手段來定義它。
簡之:一個(gè)請求,不管重復(fù)來多少次,結(jié)果是不會(huì)改變的。
1.1 消息隊(duì)列的冪等性
如同HTTP方法的冪等性,消息隊(duì)列同樣會(huì)出現(xiàn)冪等性問題。
消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息;注意,RabbitMQ 這種消息重試(補(bǔ)償)機(jī)制是默認(rèn)的。
所以,MQ 消費(fèi)者的冪等性問題,主要在于 MQ 的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。
那么,如何合適選擇重試機(jī)制?我們來看兩種情況。
情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無法訪問,是否需要重試?
需要重試
情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試?
不需要重試
總結(jié):對(duì)于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù) job 健康檢查+人工進(jìn)行補(bǔ)償
1.2 模擬重試機(jī)制
我們采用一種短信消費(fèi)者客戶端異常的情況來模擬 RabbitMQ 的重試機(jī)制。
@RabbitListener(queues = "fanout_sms_queue")
public void process(String msg) {
System.out.println("短信消費(fèi)者獲取生產(chǎn)者消息msg:" + msg);
int i = 1/0;
}
如上代碼,很顯然會(huì)報(bào)錯(cuò),一擔(dān)報(bào)錯(cuò)生產(chǎn)者的消息時(shí)不會(huì)被消費(fèi)的?
@RabbitListener 底層使用 AOP 進(jìn)行異常通知攔截,如果程序沒有拋出異常信息,那么就會(huì)自動(dòng)提交事務(wù);如果 AOP 異常通知攔截有捕獲到異常信息的話,就會(huì)自動(dòng)實(shí)現(xiàn)重試(補(bǔ)償)機(jī)制,同時(shí),這個(gè)補(bǔ)償機(jī)制的消息會(huì)緩存到 RabbitMQ 服務(wù)器端進(jìn)行存放,一直重試到不拋出異常為止。
1.2.1 生產(chǎn)者代碼
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 發(fā)送消息
*
* @param queueName 隊(duì)列名稱
*/
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
Message message = MessageBuilder
.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "")
.build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
}
1.2.2 消費(fèi)者代碼
@Component
public class FanoutEamilConsumer {
@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
String revMessage = Thread.currentThread().getName()
+ ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId();
System.out.println(revMessage);
}
}
1.2.3 消費(fèi)者 application.yml 配置
spring:
rabbitmq:
連接地址
host: 127.0.0.1
端口號(hào)
port: 5672
賬號(hào)
username: guest
密碼
password: guest
地址
virtual-host: /admin_host
listener:
simple:
retry:
####開啟消費(fèi)者重試
enabled: true
####最大重試次數(shù)
max-attempts: 5
####重試間隔次數(shù)
initial-interval: 3000
server:
port: 8081
我們通過 RabbitMQ 配置,增加了 RabbitMQ 重試時(shí)間以及重試次數(shù)限制,在一定程度上解決了重復(fù)消費(fèi)的問題,接下來看一道常問的面試題。
- 如何保證消息冪等性,不被重復(fù)消費(fèi)?
其實(shí),這個(gè)問題也算是 MQ 面試當(dāng)中經(jīng)??疾斓囊稽c(diǎn),因?yàn)闊o論是什么 MQ 都會(huì)有這個(gè)問題。
首先通過上邊我們了解了什么是“冪等性”,以及 MQ 冪等性問題的產(chǎn)生,所以我們要清楚為什么會(huì)出現(xiàn)重復(fù)性消費(fèi)?在什么場景會(huì)出現(xiàn)重復(fù)消費(fèi)?
解決方法
使用全局 MessageID 判斷消費(fèi)方使用同一個(gè),解決冪等性問題。
或者使用業(yè)務(wù)邏輯保證唯一(比如訂單號(hào)碼)
生產(chǎn)者關(guān)鍵代碼:
@Autowired
private AmqpTemplate amqpTemplate;
/**
- 發(fā)送消息
- @param queueName 隊(duì)列名稱
*/
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
Message message = MessageBuilder
.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "")
.build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
如上,生產(chǎn)者在發(fā)送消息時(shí)(convertAndSend),給消息對(duì)象設(shè)置了唯一的 MessageID,只有該 MessageID 沒有被消費(fèi)者標(biāo)記方能在重試機(jī)制中再次被消費(fèi)。
消費(fèi)者關(guān)鍵代碼:
@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
String revMessage = Thread.currentThread().getName()
+ ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId();
System.out.println(revMessage);
發(fā)送郵件的邏輯XXX
}
如上,通過 message.getMessageProperties().getMessageId() 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經(jīng)被消費(fèi)者消費(fèi)過了,如果已經(jīng)消費(fèi)則取消再次消費(fèi)。
通常怎么判斷呢?
比如上方是一個(gè)郵件發(fā)送的消費(fèi)者,在做補(bǔ)償時(shí),假如上一步郵件發(fā)送成功了,我們會(huì)把該 ID 存至 redis中,下次再調(diào)用時(shí),先去 redis 判斷是否存在該 ID 了,如果存在表明已經(jīng)消費(fèi)過了則直接返回,不再消費(fèi),否則消費(fèi),然后將記錄存至 redis。