https://blog.csdn.net/qq_36236890/article/details/81174504
如何保證消息消費時的冪等性
其實消息重復(fù)消費的主要原因在于回饋機制(RabbitMQ是ack,Kafka是offset),在某些場景中我們采用的回饋機制不同,原因也不同,例如消費者消費完消息后回復(fù)ack, 但是剛消費完還沒來得及提交系統(tǒng)就重啟了,這時候上來就pull消息的時候由于沒有提交ack或者offset,消費的還是上條消息。
那么如何怎么來保證消息消費的冪等性呢?實際上我們只要保證多條相同的數(shù)據(jù)過來的時候只處理一條或者說多條處理和處理一條造成的結(jié)果相同即可,但是具體怎么做要根據(jù)業(yè)務(wù)需求來定,例如入庫消息,先查一下消息是否已經(jīng)入庫啊或者說搞個唯一約束啊什么的,還有一些是天生保證冪等性就根本不用去管,例如redis就是天然冪等性。
還有一個問題,消費者消費消息的時候在某些場景下要放過消費不了的消息,遇到消費不了的消息通過日志記錄一下或者搞個什么措施以后再來處理,但是一定要放過消息,因為在某些場景下例如spring-rabbitmq的默認(rèn)回饋策略是出現(xiàn)異常就沒有提交ack,導(dǎo)致了一直在重發(fā)那條消費異常的消息,而且一直還消費不了,這就尷尬了,后果你會懂的。
六、如何保證消息的可靠性傳輸?
由于筆者只使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ了解的不深,所以分析一下RabbitMQ和Kafka的消息可靠性傳輸?shù)膯栴}。、
(一)RabbitMQ
(1)生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到RabbitMQ的時候,可能數(shù)據(jù)就在半路給搞丟了,因為網(wǎng)絡(luò)啥的問題,都有可能。此時可以選擇用RabbitMQ提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟RabbitMQ事務(wù)(channel.txSelect),然后發(fā)送消息,如果消息沒有成功被RabbitMQ接收到,那么生產(chǎn)者會收到異常報錯,此時就可以回滾事務(wù)(channel.txRollback),然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)(channel.txCommit)。但是問題是,RabbitMQ事務(wù)機制一搞,基本上吞吐量會下來,因為太耗性能。
所以一般來說,如果你要確保說寫RabbitMQ的消息別丟,可以開啟confirm模式,在生產(chǎn)者那里設(shè)置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的id,然后如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調(diào)你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結(jié)合這個機制自己在內(nèi)存里維護每個消息id的狀態(tài),如果超過一定時間還沒接收到這個消息的回調(diào),那么你可以重發(fā)。
事務(wù)機制和cnofirm機制最大的不同在于,事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒,但是confirm機制是異步的,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息RabbitMQ接收了之后會異步回調(diào)你一個接口通知你這個消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用confirm機制的。
(2)RabbitMQ弄丟了數(shù)據(jù)
就是RabbitMQ自己弄丟了數(shù)據(jù),這個你必須開啟RabbitMQ的持久化,就是消息寫入之后會持久化到磁盤,哪怕是RabbitMQ自己掛了,恢復(fù)之后會自動讀取之前存儲的數(shù)據(jù),一般數(shù)據(jù)不會丟。除非極其罕見的是,RabbitMQ還沒持久化,自己就掛了,可能導(dǎo)致少量數(shù)據(jù)會丟失的,但是這個概率較小。
設(shè)置持久化有兩個步驟,第一個是創(chuàng)建queue的時候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數(shù)據(jù),但是不會持久化queue里的數(shù)據(jù);第二個是發(fā)送消息的時候?qū)⑾⒌膁eliveryMode設(shè)置為2,就是將消息設(shè)置為持久化的,此時RabbitMQ就會將消息持久化到磁盤上去。必須要同時設(shè)置這兩個持久化才行,RabbitMQ哪怕是掛了,再次重啟,也會從磁盤上重啟恢復(fù)queue,恢復(fù)這個queue里的數(shù)據(jù)。
而且持久化可以跟生產(chǎn)者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產(chǎn)者ack了,所以哪怕是在持久化到磁盤之前,RabbitMQ掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack,你也是可以自己重發(fā)的。
哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結(jié)果不巧,此時RabbitMQ掛了,就會導(dǎo)致內(nèi)存里的一點點數(shù)據(jù)會丟失。
(3)消費端弄丟了數(shù)據(jù)
RabbitMQ如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ認(rèn)為你都消費了,這數(shù)據(jù)就丟了。
這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關(guān)閉RabbitMQ自動ack,可以通過一個api來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再程序里ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認(rèn)為你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,消息是不會丟的。
(二)Kafka
(1)消費端弄丟了數(shù)據(jù)
唯一可能導(dǎo)致消費者弄丟數(shù)據(jù)的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經(jīng)消費好了這個消息,其實你剛準(zhǔn)備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
大家都知道kafka會自動提交offset,那么只要關(guān)閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數(shù)據(jù)不會丟。但是此時確實還是會重復(fù)消費,比如你剛處理完,還沒提交offset,結(jié)果自己掛了,此時肯定會重復(fù)消費一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的kafka消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的queue里先緩沖一下,結(jié)果有的時候,你剛把消息寫入內(nèi)存queue,然后消費者會自動提交offset。
然后此時我們重啟了系統(tǒng),就會導(dǎo)致內(nèi)存queue里還沒來得及處理的數(shù)據(jù)就丟失了
(2)kafka弄丟了數(shù)據(jù)
這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些數(shù)據(jù)沒有同步,結(jié)果此時leader掛了,然后選舉某個follower成leader之后,他不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
生產(chǎn)環(huán)境也遇到過,我們也是,之前kafka的leader機器宕機了,將follower切換為leader之后,就會發(fā)現(xiàn)說這個數(shù)據(jù)就丟了。
所以此時一般是要求起碼設(shè)置如下4個參數(shù):
給這個topic設(shè)置replication.factor參數(shù):這個值必須大于1,要求每個partition必須有至少2個副本。
在kafka服務(wù)端設(shè)置min.insync.replicas參數(shù):這個值必須大于1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯(lián)系,沒掉隊,這樣才能確保leader掛了還有一個follower吧。
在producer端設(shè)置acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有replica之后,才能認(rèn)為是寫成功了。
在producer端設(shè)置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
(3)生產(chǎn)者會不會弄丟數(shù)據(jù)
如果按照上述的思路設(shè)置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試,重試無限次。
六、如何保證消息的順序性
因為在某些情況下我們?nèi)舆M(jìn)MQ中的消息是要嚴(yán)格保證順序的,尤其涉及到訂單什么的業(yè)務(wù)需求,消費的時候也是要嚴(yán)格保證順序,不然會出大問題的。
先看看順序會錯亂的倆場景
rabbitmq:一個queue,多個consumer,這不明顯亂了
kafka:一個topic,一個partition,一個consumer,內(nèi)部多線程,這不也明顯亂了
如何來保證消息的順序性呢?
rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue但是對應(yīng)一個consumer,然后這個consumer內(nèi)部用內(nèi)存隊列做排隊,然后分發(fā)給底層不同的worker來處理。
kafka:一個topic,一個partition,一個consumer,內(nèi)部單線程消費,寫N個內(nèi)存queue,然后N個線程分別消費一個內(nèi)存queue即可。
七、如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以后該怎么處理?有幾百萬消息持續(xù)積壓幾小時怎么解決?
(一)、大量消息在mq里積壓了幾個小時了還沒解決
幾千萬條數(shù)據(jù)在MQ里積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多
這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復(fù)consumer的問題,讓他恢復(fù)消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。
一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條,所以如果你積壓了幾百萬到上千萬的數(shù)據(jù),即使消費者恢復(fù)了,也需要大概1小時的時間才能恢復(fù)過來。
一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:
先修復(fù)consumer的問題,確保其恢復(fù)消費速度,然后將現(xiàn)有cnosumer都停掉。
新建一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數(shù)量。
然后寫一個臨時的分發(fā)數(shù)據(jù)的consumer程序,這個程序部署上去消費積壓的數(shù)據(jù),消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數(shù)量的queue。
接著臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數(shù)據(jù)。
這種做法相當(dāng)于是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數(shù)據(jù)。
等快速消費完積壓數(shù)據(jù)之后,得恢復(fù)原先部署架構(gòu),重新用原先的consumer機器來消費消息。
(二)、消息隊列過期失效問題
假設(shè)你用的是rabbitmq,rabbitmq是可以設(shè)置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數(shù)據(jù)就沒了。那這就是第二個坑了。這就不是說數(shù)據(jù)會大量積壓在mq里,而是大量的數(shù)據(jù)會直接搞丟。
這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導(dǎo),這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當(dāng)時就直接丟棄數(shù)據(jù)了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。
這個時候我們就開始寫程序,將丟失的那批數(shù)據(jù),寫個臨時程序,一點一點的查出來,然后重新灌入mq里面去,把白天丟的數(shù)據(jù)給他補回來。也只能是這樣了。
假設(shè)1萬個訂單積壓在mq里面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發(fā)到mq里去再補一次。
(三)、消息隊列滿了怎么搞?
如果走的方式是消息積壓在mq里,那么如果你很長時間都沒處理掉,此時導(dǎo)致mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執(zhí)行的太慢了,你臨時寫程序,接入數(shù)據(jù)來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數(shù)據(jù)吧。
來源:CSDN
原文:https://blog.csdn.net/qq_36236890/article/details/81174504
版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接!