rocketMq - 并發(fā)消費(fèi)過程

系列

rocketMq概念介紹

rocketMq-namesrv介紹

rocketMq-Topic創(chuàng)建過程

rocketMq-producer介紹

rocketMq-consumer介紹

rocketMq - rebalance介紹

rocketMq - 并發(fā)消費(fèi)過程

rocketMq - 串行消費(fèi)過程

rocketMq-broker介紹

rocketMq-broker消息存儲(chǔ)介紹

rocketMq - commitLog

rocketMq - index介紹

rocketMq-延遲消息介紹

rocketMq-事務(wù)消息介紹

rocketMq消息查詢

rocketMq和kafka的架構(gòu)區(qū)別

rocketMq - master/slave同步


rocketMq消費(fèi)過程包括兩種,分別是并發(fā)消費(fèi)和有序消費(fèi),每個(gè)消費(fèi)方式都可以單獨(dú)拿出來進(jìn)行分享,這篇文章單獨(dú)用來分析并發(fā)消費(fèi)問題。

并發(fā)消費(fèi)需要理解的幾個(gè)核心點(diǎn):并發(fā)消費(fèi)的消息拉取,并發(fā)消費(fèi)的消息重試,并發(fā)消息的ack機(jī)制,消費(fèi)進(jìn)度的持久化,這篇分享會(huì)就這幾個(gè)問題分解展開。

其他邏輯

? ? 1、consumer會(huì)定期向broker同步ack消息偏移量,也就是已經(jīng)消費(fèi)的位置。

? ? 2、極端情況下consumer會(huì)因?yàn)橐粋€(gè)消息一直失敗導(dǎo)致ack消息偏移量無法前進(jìn),但是因?yàn)闀?huì)有定時(shí)任務(wù)去清楚過期消息,所以ack進(jìn)度正常便宜。

并發(fā)消費(fèi)整體流程


并發(fā)消費(fèi)過程

說明:

? ? 1、Rebalance負(fù)責(zé)生成pullRequest放置到pullRequestQueue當(dāng)中。

? ? 2、PullMessageService負(fù)責(zé)消費(fèi)pullRequest來完成數(shù)據(jù)的拉取。

? ? 3、數(shù)據(jù)拉取后生成ConsumeRequest對象投遞到consumeExecutor的線程池當(dāng)中

? ? 4、ConsumeRequest是一個(gè)線程實(shí)例,負(fù)責(zé)消費(fèi)拉取的消息。

? ? 5、消費(fèi)消息成功就從ConsumeRequest的ProcessQueue中刪除,消費(fèi)失敗就投遞到broker的重試隊(duì)列中,重試次數(shù)和延遲粒度在broker端處理。

? ? 6、consumeRequest內(nèi)部維持的processQueue作為一個(gè)TreeMap對象可以維持消息的有序性,用于判斷消費(fèi)進(jìn)度。

? ? 7、pullRequest在消費(fèi)完以后還是再次投遞到pullRequestQueue當(dāng)中。


pullRequest執(zhí)行過程

consumer消費(fèi)入口

說明:參見PullMessageService類

? ? 1、單線程循環(huán)消費(fèi)pullRequest。


消費(fèi)流速控制

說明:參見PullMessageService類

? ? 1、消費(fèi)過程中進(jìn)行一些狀態(tài)判斷以及流速控制


有序消費(fèi)和無須消費(fèi)處理邏輯

說明:參見DefaultMQPushConsumerImpl類

? ? 1、區(qū)分有序消費(fèi)和無須消費(fèi)

? ? 2、無序消費(fèi)會(huì)判斷消費(fèi)偏移量是否差別過大


拉取消息的回調(diào)函數(shù)

說明:參見DefaultMQPushConsumerImpl類

? ? 1、處理拉取消息的后續(xù)操作

? ? 2、處理完以后再次投遞pullRequest請求


消息拉取執(zhí)行部分

說明:參見PullAPIWrapper類


真正執(zhí)行拉取的地方

說明:參見PullAPIWrapper類


處理拉取的消息結(jié)果

說明:參見ConsumeMessageConcurrentlyService類。

? ? 1、拉取消息成功后設(shè)置下一次拉取的偏移量。

? ? 2、更新拉取的消息到processQueue當(dāng)中。

? ? 3、再次投遞pullRequest發(fā)起下一次拉取。


處理拉取消息的分配處理

說明:參見ConsumeMessageConcurrentlyService類

? ? 1、分一次能夠處理完成和分多次能夠處理完成。


consumer消費(fèi)對象的核心

說明:

? ? 1、processQueue是待處理消息保存位置,里面核心數(shù)據(jù)結(jié)構(gòu)之一為TreeMap

? ? 2、messageQueue就是這個(gè)ConsumeRequest負(fù)責(zé)處理的messageQueue


回調(diào)函數(shù)消費(fèi)并進(jìn)行結(jié)果處理

說明:參見ConsumeMessageConcurrentlyService類

? ? 1、consumer消費(fèi)拉取消息的邏輯及后續(xù)處理


持久化消費(fèi)位移

說明:參見ConsumeMessageConcurrentlyService類

? ? 1、消費(fèi)成功就刪除所有拉取的消息


broker端存儲(chǔ)重試消息

說明:參見SendMessageProcessor類

? ? 1、處理邏輯在consumerSendMsgBack方法中

? ? 2、里面涉及到延遲粒度和重試次數(shù)的設(shè)置

? ? 3、消息是被投遞到延遲隊(duì)列當(dāng)中的


定期持久化消費(fèi)位移

說明:參見MQClientInstance類

? ? 1、在persistAllConsumerOffset定期持久化消費(fèi)偏移量

? ? 2、消費(fèi)偏移量由ConsumerRequest請求在處理的過程中變更的


重新發(fā)送拉取請求

說明:參見DefaultMQPushConsumerImpl類

? ? 1、處理沒有從broker拉取消息的過程

? ? 2、再次投遞pullRequest請求

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,888評(píng)論 13 425
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,525評(píng)論 19 139
  • 消息隊(duì)列設(shè)計(jì)精要 消息隊(duì)列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終...
    meng_philip123閱讀 1,580評(píng)論 1 25
  • 東進(jìn)中南三千天,寒暑過往序連綿。 春來春去春又回,夏生夏長夏亦殘。 花開著露心有淚,月落云山逐影寒。 晨飛潮起風(fēng)激...
    陽春閱讀 381評(píng)論 3 3
  • 今天看了下幣價(jià),阿朵的第一條子鏈火鏈(ignis)大跌,已經(jīng)跌到1元以下了。 小民是看著他從17元一路狂瀉下來的,...
    王小民的吐槽閱讀 451評(píng)論 0 0

友情鏈接更多精彩內(nèi)容