rocketmq實(shí)現(xiàn)延時(shí)隊(duì)列

說明:rocketmq實(shí)現(xiàn)的延時(shí)隊(duì)列只支持特定的延時(shí)時(shí)間段,1s,5s,10s,...2h,不能支持任意時(shí)間段的延時(shí)

具體實(shí)現(xiàn):rocketmq發(fā)送延時(shí)消息時(shí)先把消息按照延遲時(shí)間段發(fā)送到指定的隊(duì)列中(rocketmq把每種延遲時(shí)間段的消息都存放到同一個(gè)隊(duì)列中)然后通過一個(gè)定時(shí)器進(jìn)行輪訓(xùn)這些隊(duì)列,查看消息是否到期,如果到期就把這個(gè)消息發(fā)送到指定topic的隊(duì)列中,這樣的好處是同一隊(duì)列中的消息延時(shí)時(shí)間是一致的,還有一個(gè)好處是這個(gè)隊(duì)列中的消息時(shí)按照消息到期時(shí)間進(jìn)行遞增排序的,說的簡單直白就是隊(duì)列中消息越靠前的到期時(shí)間越早

流程圖


源碼分析:

如果想要深入了解的可以看一下ScheduleMessageService這個(gè)類


delayLevelTable定義了延遲級(jí)別和延遲時(shí)間的對(duì)應(yīng)關(guān)系,offsetTable存放延延遲級(jí)別對(duì)應(yīng)的隊(duì)列消費(fèi)的offset


使用timer定時(shí)器啟動(dòng)了一個(gè)定時(shí)任務(wù),把每個(gè)掃描隊(duì)列封裝成一個(gè)任務(wù),然后加入到timer中


i

每個(gè)掃描任務(wù)主要是把隊(duì)列中所有到期的消息都拿出來,并發(fā)送到指定的topic下,并把延遲隊(duì)列中的消息刪除

總結(jié)

優(yōu)點(diǎn):設(shè)計(jì)簡單,把所有相同延遲時(shí)間的消息都先放到一個(gè)隊(duì)列中,定時(shí)掃描,可以保證消息消費(fèi)的有序性

缺點(diǎn):定時(shí)器采用了timer,timer是單線程運(yùn)行,如果延遲消息數(shù)量很大的情況下,可能單線程處理不過來,造成消息到期后也沒有發(fā)送出去的情況

改進(jìn)點(diǎn):可以在每個(gè)延遲隊(duì)列上各采用一個(gè)timer,或者使用timer進(jìn)行掃描,加一個(gè)線程池對(duì)消息進(jìn)行處理,這樣可以提供效率

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容