探索RocketMQ的重復(fù)消費和亂序問題

前言

在之前的MQ專題中,我們已經(jīng)解決了消息中間件的一大難題,消息丟失問題。

但MQ在實際應(yīng)用中不是說保證消息不丟失就萬無一失了,它還有兩個令人頭疼的問題:重復(fù)消費和亂序。

今天我們就來聊一聊這兩個常見的問題,看看RocketMQ是如何解決這兩個問題的。

為什么會重復(fù)消費

首先我們來聊一聊重復(fù)消費的問題,要解決一個問題最開始的一步當(dāng)然是去查找問題發(fā)生的原因了。

那出現(xiàn)重復(fù)消費的原因到底是什么呢?

我們先來思考一下生產(chǎn)者發(fā)送消息這一過程中是不是有可能重復(fù)發(fā)送消息到MQ呢?

答案是肯定的,比如生產(chǎn)者發(fā)送消息的時候使用了重試機(jī)制,發(fā)送消息后由于網(wǎng)絡(luò)原因沒有收到MQ的響應(yīng)信息,報了個超時異常,然后又去重新發(fā)送了一次消息。

但其實MQ已經(jīng)接到了消息,并返回了響應(yīng),只是因為網(wǎng)絡(luò)原因超時了。

這種情況下,一條消息就會被發(fā)送兩次。

當(dāng)然,這只是列舉了一種情況,實際有很多情況會造成消息的重新發(fā)送。

那么假如生產(chǎn)者沒有重復(fù)發(fā)送消息,消費者就能保證不重復(fù)消費了嗎?

當(dāng)然不能保證,我們知道,在消費者處理了一條消息后會返回一個offset給MQ,證明這條消息被處理過了。

但是,假如這條消息已經(jīng)處理過了,在返回offset給MQ的時候服務(wù)宕機(jī)了,MQ就沒有接收到這條offset,那么服務(wù)重啟后會再次消費這條消息。


如何解決重復(fù)消費

解決重復(fù)消費的關(guān)鍵就是引入冪等性機(jī)制,什么是冪等性機(jī)制呢?我們可以把它理解成,假如一個接口被重復(fù)調(diào)用,依然可以保證數(shù)據(jù)的準(zhǔn)確性。

對于生產(chǎn)者重復(fù)發(fā)送消息到MQ這一過程,其實我們沒有必要去保證冪等性,只要在消費者處理消息時保證冪等性就可以了。

這塊其實就比較簡單了,只要處理消息之前先根據(jù)業(yè)務(wù)判斷一下本次操作是否已經(jīng)執(zhí)行過了,如果已經(jīng)執(zhí)行過了,那就不再執(zhí)行了,這樣就可以保證消費者的冪等性。

舉個例子,比如每條消息都會有一條唯一的消息ID,消費者接收到消息會存儲消息日志,如果日志中存在相同ID的消息,就證明這條消息已經(jīng)被處理過了。

消息重試、延時消息、死信隊列

解決完重復(fù)消費問題,我們來思考一種極端情況,比如某一時刻,消費者操作的數(shù)據(jù)庫宕機(jī)了,這個時候消費者會發(fā)生異常,當(dāng)然不能返回給MQ一個CONSUME_SUCCESS了,我們可以返回RECONSUME_LATER,他的意思是我現(xiàn)在沒法處理這些消息,一會再來試試能不能處理。

簡單來說,RocketMQ會有一個針對當(dāng)前Consumer Group的重試隊列,如果你返回了RECONSUME_LATER,MQ會把你的這批消費放到當(dāng)前消費組的重試隊列中,然后過一段時間重試隊列中的消息會再次發(fā)送給消費者,默認(rèn)可以重試16次,每次重試的間隔是不同的,這個時間間隔是可以配置的,默認(rèn)配置如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

細(xì)心的小伙伴會發(fā)現(xiàn),這個配置一共有18個時間,為什么最多重試16次,配置中卻有18個時間呢,這里就要說到延時消息了。

上邊的配置其實不是針對重試隊列的,而是針對延時消息的,18個時間分別代表延遲level1-level18,延時消息大概流程如下:

1 所有的延遲消息到達(dá)broker后,會存放到SCHEDULE_TOPIC_XXX的topic下(這個topic比較特殊,對客戶端是不可見的,包括使用rocketmq-console,也查不到這個topic)

2 SCHEDULE_TOPIC_XXX這個topic下存在18個隊列,每個隊列中存放的消息都是同一個延遲級別消息

3 broker端啟動了一個timer和timerTask的任務(wù),定時從此topic下拉取數(shù)據(jù),如果延遲時間到了,就會把此消息發(fā)送到指定的topic下,完成延遲消息的發(fā)送

剛才我們說如果你返回了RECONSUME_LATER,消息就會進(jìn)入重試隊列,其實不完全準(zhǔn)確。

當(dāng)MQ接收到RECONSUME_LATER后,首先會完成消息的轉(zhuǎn)換,把消息存到延時隊列中,然后再根據(jù)消息的延時時間保存到重試隊列中。

如果重試了16次之后依然無法處理,就會把這些消費放入死信隊列。死信隊列中的消息RocketMQ不會再做處理,這部分?jǐn)?shù)據(jù)要怎么處理就要看我們的業(yè)務(wù)場景了,我們可以做一個后臺線程去訂閱這個死信隊列,完成后續(xù)消息的處理。

消息亂序

接下來我們聊一聊消息亂序問題,為什么會出現(xiàn)這個問題呢,這個其實不難理解。

我們都學(xué)過,每個Topic可以有多個MessageQueue,寫入消息的時候?qū)嶋H上會平均分配給不同的MessageQueue。

然后假如我們有一個Consume Group,這個消費組中的每臺機(jī)器都會負(fù)責(zé)一部分MessageQueue,那么就會導(dǎo)致消息的順序亂序問題。

舉個例子,生產(chǎn)者發(fā)送了兩條順序消息,先是insert,后是update,分別分配到兩個MessageQueue中,消費者組中的兩臺機(jī)器分別處理兩個隊列的消息,這個時候是無法保證順序性的,有可能會先執(zhí)行update,后執(zhí)行insert,導(dǎo)致數(shù)據(jù)發(fā)生錯誤。

那么如何解決消息亂序問題呢?

其實道理也很簡單,把需要保持順序的消息都放入到同一個MessageQueue中,讓同一臺機(jī)器處理不就可以了嗎。

我們完全可以根據(jù)唯一ID與隊列的數(shù)量進(jìn)行hash運算,保證這些消息進(jìn)入到同一個隊列中,最簡單的算法就是取余運算了。

現(xiàn)在我們能保證這批消息進(jìn)入到同一個隊列中了,似乎這樣就能保證消息不會亂序了,但真的是這樣嗎?

上文我們說到如果消費者數(shù)據(jù)庫出現(xiàn)問題,使用重試隊列重試消息,那么對于需要保證順序的消息也可以使用這套方案嗎?

肯定是不能的,如果使用重試機(jī)制是無法保證順序性的。

RocketMQ提供了另一個狀態(tài),SUSPEND_CURRENT_QUEUE_A_MOMENT,意思是先等一會,再接著處理這批消息,而不是把這批消息放入重試隊列里去處理其他消息。

所以我們只要返回這個狀態(tài)就可以了。

總結(jié)

好了,到這里關(guān)于RocketMQ重復(fù)消費和亂序問題的產(chǎn)生原因和解決方案我們就介紹完了,同時也介紹了RocketMQ的重試機(jī)制、延時消息和死信隊列。

有些地方可能比較復(fù)雜,可能需要小伙伴們重復(fù)閱讀幾次才能理解,如果哪里有想不清楚的,或者有疑問的可以聯(lián)系王子共同探討。

往期文章推薦:

深入研究Broker是如何持久化的

Dledger是如何實現(xiàn)主從自動切換的

深入研究RocketMQ消費者是如何獲取消息的

RocketMQ的消息是怎么丟失的

RocketMQ消息丟失解決方案:事務(wù)消息

RocketMQ消息丟失解決方案:同步刷盤+手動提交

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容