rocketMq - 串行消費過程

系列

rocketMq概念介紹

rocketMq-namesrv介紹

rocketMq-Topic創(chuàng)建過程

rocketMq-producer介紹

rocketMq-consumer介紹

rocketMq - rebalance介紹

rocketMq - 并發(fā)消費過程

rocketMq - 串行消費過程

rocketMq-broker介紹

rocketMq-broker消息存儲介紹

rocketMq - commitLog

rocketMq - index介紹

rocketMq-延遲消息介紹

rocketMq-事務(wù)消息介紹

rocketMq消息查詢

rocketMq和kafka的架構(gòu)區(qū)別

rocketMq - master/slave同步


介紹

rocketMq消費過程包括兩種,分別是并發(fā)消費和有序消費,每個消費方式都可以單獨拿出來進(jìn)行分享,這篇文章單獨用來分析串行消費問題。

由于串行消費和并行消費的大體邏輯都是相同的,所以建議先看rocketMq - 并發(fā)消費過程文章,本章只會針對不同的地方進(jìn)行說明。

為什么有序消費能夠保證消息被順序消費?

為什么有序消費能夠保證消息被順序消費?

為什么有序消費能夠保證消息被順序消費?

答案是:

? ? 1、順序消費的順序是有序保存在ProcessQueue的TreeMap對象中,key為消息的偏移量,也就是一個messageQueue拉取的消息有序放置在ProcessQueue當(dāng)中

? ? 2、每次消費的時候都是按序從ProcessQueue中按順序拷貝待消費任務(wù)到臨時的TreeMap對象當(dāng)中

? ? 3、消費失敗后依舊會重新消費剛剛消費失敗的那部分任務(wù)

? ? 4、每次pullRequest執(zhí)行完成后都會觸發(fā)一次ConsumeRequest任務(wù),會在原來的TreeMap對象中加入新的待消費的消息


提交有序消費ConsumeRequest

說明:參見DefaultMQPushConsumerImpl類


消費中需要多次加鎖

說明:參見ConsumeMessageOrderlyService類

? ? 1、takeMessages操作會按照順序情況從processQueue的msgTreeMap中獲?。═reeMap是有序?qū)ο螅?/p>



說明:參見ConsumeMessageOrderlyService類

? ? 1、如果保證順序消費呢,獲取待消費數(shù)據(jù)的時候按照順序進(jìn)行獲取放到臨時TreeMap中并刪除全局TreeMap里面的message對象

? ? 2、消費成功后清除臨時TreeMap里面的消息

? ? 3、消費失敗后將消息對象的重試次數(shù)+1然后重新提交ConsumeRequest,如果已經(jīng)超出重試次數(shù)就丟棄。再次投遞是一個延遲多少時間后消費的任務(wù)

? ? 4、和無序消費的任務(wù)相比,沒有重新發(fā)送broker的重試隊列這一步



處理消費結(jié)果

說明:參見ConsumeMessageOrderlyService類

? ? 1、消費成功后會持久化成功消費的偏移量,本地保存消費偏移量

? ? 2、定時任務(wù)定時同步消費偏移量到broker進(jìn)行保存

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容