RocketMQ 5.1.0 消費(fèi)端 源碼分析

書接上文,當(dāng)消息存儲(chǔ)到文件后,并沒有看到他與消費(fèi)者有相關(guān)操作?那么消費(fèi)者改如何獲取呢?

image.png

在前文中講述了文件落盤,在log中還包含一個(gè)提交服務(wù)。CommitRealTimeService,用于將已經(jīng)落盤的數(shù)據(jù)進(jìn)行提交到分配索引中(pos)。也即當(dāng)我們落盤后需要經(jīng)過CommitRealTimeService才能繼續(xù)處理。


image.png

image.png

image.png

如果是主從。


image.png

提交如下:
image.png

至此提交完成,writeBuffer是否存在來判斷是否直接以WROTE_POSITION_UPDATER作為pos,否則以COMMITTED_POSITION_UPDATER為pos。此處大量篇幅講述pos主要在于后面派發(fā)時(shí)會(huì)使用到。
image.png

此服務(wù)非常重要,用于消息的派發(fā)。
image.png

image.png

image.png

image.png

image.png

image.png

image.png

存在三個(gè)派發(fā)器,此處以CommitLogDispatcherBuildConsumeQueue進(jìn)行講解,該派發(fā)器用于處理消息到消費(fèi)隊(duì)列的過程。


image.png

image.png

image.png

還是比較簡單的,就是從map中獲取。
image.png

當(dāng)然再啟動(dòng)是也會(huì)從文件中加載該map。
image.png

image.png

至此后續(xù)所有操作都將進(jìn)入消費(fèi)隊(duì)列consumeQueue對(duì)象。
image.png

image.png

image.png

至此將消息加入到消費(fèi)隊(duì)列中。
消息消費(fèi)在mq中存在兩種模式1、消費(fèi)端維護(hù)pos通過pull獲取消息 2.pop彈出最新消息兩種模式。

第一種 PullMessageProcessor

image.png

image.png

image.png

image.png

image.png

至此消息已經(jīng)獲取完成。還需要講解得是,當(dāng)獲取完消息后需要記錄消費(fèi)偏移,為Pop模式做記錄,pull模式由用戶維護(hù)偏移,而pop模式則由服務(wù)端做記錄。


image.png

image.png

image.png

至此更新到map中。目前看他是通過內(nèi)存進(jìn)行記錄的,實(shí)際它存在對(duì)應(yīng)的文件。
image.png

每十秒存儲(chǔ)一次。


image.png

寫入文件。
image.png

提供offset存儲(chǔ)文件位置。

第二種 PopMessageProcessor

image.png

image.png

image.png

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容