1. 為什么產(chǎn)生消息堆積?
大多是因?yàn)?Consumer 出問(wèn)題了,沒(méi)有及時(shí)發(fā)現(xiàn),或者故障恢復(fù)需要較長(zhǎng)的時(shí)間,導(dǎo)致大量消息積壓在 MQ 中。
2. 消息堆積會(huì)有什么后果呢?
2.1 消息被丟棄
例如 RabbitMQ 有一個(gè)消息過(guò)期時(shí)間 TTL,過(guò)期的消息會(huì)被扔掉,這樣消息就徹底沒(méi)有了。
2.2 磁盤(pán)滿了
如果堆積量太大,可能導(dǎo)致磁盤(pán)空間不足,那么新消息就進(jìn)不來(lái)了。
2.3 海量消息待處理
如果消息沒(méi)過(guò)期,并且磁盤(pán)空間也夠用,那么就是產(chǎn)生海量消息等待被消費(fèi),Consumer 的噩夢(mèng)。
3. 如何應(yīng)對(duì)呢?
3.1 消息被丟棄的情況
首先,要實(shí)現(xiàn)防止消息過(guò)期問(wèn)題,不應(yīng)該設(shè)置過(guò)期時(shí)間。
如果就是設(shè)置了過(guò)期時(shí)間,導(dǎo)致了消息丟失,怎么補(bǔ)救呢?
那就只能在夜深人靜,趁著訪問(wèn)量最低的時(shí)候,寫(xiě)一個(gè)臨時(shí)程序來(lái)補(bǔ)消息了。
例如有訂單消息丟了,那就需要找出哪些訂單消息丟了,然后重新發(fā)到隊(duì)列。
3.2 磁盤(pán)不足的情況
系統(tǒng)通常都是有監(jiān)控的,達(dá)到空間閾值時(shí)就會(huì)發(fā)警報(bào),這時(shí)就要馬上處理了。

例如,在其他機(jī)器上創(chuàng)建臨時(shí)的消息隊(duì)列,再寫(xiě)一個(gè)臨時(shí)的 Consumer,作為消息的中轉(zhuǎn),把消息積壓隊(duì)列中的消息取出來(lái),放到臨時(shí)隊(duì)列里面去。
快速疏散積壓的消息,讓磁盤(pán)空間恢復(fù)正常水平。
3.3 快速處理海量積壓消息
Consumer 恢復(fù)正常之后,面對(duì)堆積如山的消息,怎么處理呢?
如何按照之前正常情況處理的話,猴年馬月才能消費(fèi)完,此過(guò)程中還有新消息在不斷進(jìn)來(lái)。
例如,積壓了 100 萬(wàn)條,有 3 個(gè) Consumer,每一個(gè)每秒能處理 200 條,3 個(gè) Consumer 每秒一共能處理 600 條。
大概需要一個(gè)多小時(shí)才能處理完。
這一個(gè)多小時(shí)又會(huì)積壓多少新的消息呢?
所以正常處理肯定不行,需要提速。
例如 Kafka,這個(gè)消息積壓的 Topic 有 3 個(gè) Partition,那最多就能用 3 個(gè) Consumer,所以增加 Consumer 沒(méi)有用。
還是可以使用臨時(shí)隊(duì)列的方式。

新建一個(gè) Topic,設(shè)置為 20 個(gè) Partition
Consumer 不再處理業(yè)務(wù)邏輯了,只負(fù)責(zé)搬運(yùn),把消息放到臨時(shí) Topic 中
這 20 個(gè) Partition 可以有 20個(gè) Consumer 了,它們來(lái)處理原來(lái)的業(yè)務(wù)邏輯。
這 20 個(gè) Consumer 每秒一共能處理 4000 條了,這樣幾分鐘就可以處理完積壓的 100 萬(wàn)條。
這幾分鐘新來(lái)的消息也不會(huì)太多,所以很快就可以恢復(fù)正常水平,之后,再把整體結(jié)構(gòu)恢復(fù)為原來(lái)的形式。
小結(jié)一下,消息積壓還是比較麻煩的,最好是提前防范,做好硬件和消息系統(tǒng)的健康監(jiān)控。如果出現(xiàn)消息丟失,就要人工查找丟失的消息,然后補(bǔ)上。在消費(fèi)不過(guò)來(lái)的時(shí)候,可以考慮使用臨時(shí)隊(duì)列作為中轉(zhuǎn),提升處理能力。
推薦閱讀