今天正式服務(wù)器上2臺(tái)服務(wù)器收到同一個(gè)消息,因?yàn)槭羌耗J讲豢赡軆膳_(tái)服務(wù)器都收到同一個(gè)消息,后來排查發(fā)現(xiàn)是由于網(wǎng)絡(luò)各種原因確認(rèn)消息沒及時(shí)到達(dá)到rocketmq,所以會(huì)重發(fā)。
有以下解決辦法
1、消費(fèi)冪等的必要性
處理建議
消息隊(duì)列 RocketMQ 消費(fèi)者在接收到消息以后,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對(duì)消息做冪等處理的必要性。
2、消費(fèi)冪等的必要性
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息隊(duì)列 RocketMQ 的消息有可能會(huì)出現(xiàn)重復(fù),這個(gè)重復(fù)簡單可以概括為以下兩種情況:
發(fā)送時(shí)消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī),導(dǎo)致服務(wù)端對(duì)客戶端應(yīng)答失敗。 如果此時(shí)生產(chǎn)者意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
投遞時(shí)消息重復(fù)
消息消費(fèi)的場景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。 為了保證消息至少被消費(fèi)一次,消息隊(duì)列 RocketMQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
處理建議
因?yàn)?Message ID 有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。 最好的方式是以業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識(shí)可以通過消息 Key 進(jìn)行設(shè)置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
訂閱方收到消息時(shí)可以根據(jù)消息的 Key 進(jìn)行冪等處理:
consumer.subscribe("ons_test", "*", new MessageListener() {
? ? public Action consume(Message message, ConsumeContext context) {
? ? ? ? String key = message.getKey()
? ? ? ? // 根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)的 key 做冪等處理
? ? }
});
3、業(yè)務(wù)上做好冪等消費(fèi)