rocketMq-consumer介紹

系列

rocketMq概念介紹

rocketMq-namesrv介紹

rocketMq-Topic創(chuàng)建過程

rocketMq-producer介紹

rocketMq-consumer介紹

rocketMq - rebalance介紹

rocketMq - 并發(fā)消費過程

rocketMq - 串行消費過程

rocketMq-broker介紹

rocketMq-broker消息存儲介紹

rocketMq - commitLog

rocketMq - index介紹

rocketMq-延遲消息介紹

rocketMq-事務(wù)消息介紹

rocketMq消息查詢

rocketMq和kafka的架構(gòu)區(qū)別

rocketMq - master/slave同步


說到rocketMq的consumer,該篇文章特指pushConsumer,pullConsumer在后續(xù)文章中在分享。

提到consumer,需要搞清楚幾個核心問題,分別是consumer的初始化過程做了哪些事情,消息是如何消費,consumer如何動態(tài)平衡的,整個邏輯還是比較繞的,其中這章節(jié)主要會講清楚兩個事情,1、初始化過程中client做了哪些事情;2、consumer如何動態(tài)平衡拉取任務(wù),具體的任務(wù)消費會由額外的一章進(jìn)行講解。


consumer的初始化過程

consumer初始化過程

說明:整體執(zhí)行過程如下,著重介紹subscribe和start兩個過程

? ? 1、創(chuàng)建consumer并設(shè)置消費分組

? ? 2、設(shè)置消費位移

? ? 3、設(shè)置訂閱topic

? ? 4、設(shè)置消費執(zhí)行的回調(diào)函數(shù)

? ? 5、啟動consumer


consumer的初始化流程圖


初始化流程圖


consumer內(nèi)部初始化過程

說明:整個初始化比較復(fù)雜,為了大家能夠理解,先用簡單的語句概述一遍

? ? 1、構(gòu)建consumer的訂閱信息,包括consumer本身的訂閱和消費分組的重試隊列。

? ? 2、創(chuàng)建Rebalance服務(wù),該服務(wù)每隔20s進(jìn)行消費端負(fù)責(zé)的messageQueue的消費。

? ? 3、啟動消費偏移量獲取服務(wù),獲取上一次消費位移。

? ? 4、啟動定時任務(wù),其中核心任務(wù)之一是定時去namesrv拉取broker信息。

? ? 5、啟動pullMessageService,負(fù)責(zé)從broker拉取待消費消息

? ? 6、啟動rebalanceService,負(fù)責(zé)定期調(diào)整consumer端負(fù)載均衡包括第一次觸發(fā)拉取任務(wù)

? ? 7、其中rebalanceService和pullMessageService相互配合使用,前者負(fù)責(zé)將新加入messageQueue拉取任務(wù)加入到pullMessageservice當(dāng)中,將舊的messageQueue的拉取任務(wù)從pullMessageService中停止,兩者之間通過消息隊列的形式進(jìn)行通信。


構(gòu)建subscription過程

構(gòu)建subscribe信息

說明:參見DefaultMQPushConsumerImpl類

? ? 1、訂閱消息最后保存至RebalanceImpl當(dāng)中,因為這個是后面動態(tài)負(fù)載均衡的核心。


client端啟動過程

consumer啟動過程-注冊訂閱消息過程

說明:參見DefaultMQPushConsumerImpl類


consumer啟動過程-注冊回調(diào)函數(shù)并啟動一系列服務(wù)

說明:參見DefaultMQPushConsumerImpl類

? ? 1、啟動了獲取消費進(jìn)度的服務(wù)


consumer核心邏輯-啟動核心消費邏輯

說明:參見MQClientInstance類

? ? 1、啟動定時任務(wù),主要是從namsrv中拉取broker的信息

? ? 2、啟動client從broker拉取消息的服務(wù)

? ? 3、啟動Rebalance服務(wù),負(fù)責(zé)觸發(fā)消息拉取的任務(wù)

? ? 4、步驟3和步驟4之間的兩個服務(wù)通過消息隊列通信


定時拉取broker信息

說明:參見MQClientInstance類

? ? 1、負(fù)責(zé)從namesrv拉取broker的信息


拉取任務(wù)的執(zhí)行過程

定時獲取拉取任務(wù)執(zhí)行消息拉取

說明:參見PullMessageService類

? ? 1、負(fù)責(zé)從pullRequestQueue中獲取拉取任務(wù)并執(zhí)行,該任務(wù)由Rebalance服務(wù)投遞


拉取任務(wù)的生成過程

定期動態(tài)消費負(fù)載均衡

說明:參見RebalanceService類

? ? 1、consumer端負(fù)載均衡的入口


針對每個consumer動態(tài)調(diào)整負(fù)載均衡

說明:參見MQClientInstance類

? ? 1、每個consumer客戶端只會有一個對象,所以這里for循環(huán)只有一次。


針對每個topic進(jìn)行負(fù)載均衡

說明:參見MQClientInstance類

1、針對每個訂閱信息都進(jìn)行動態(tài)負(fù)責(zé)均衡,包括consumer本身的訂閱分組和consumerGroup的重試分組。


針對topic下的messageQueue和consumer進(jìn)行動態(tài)負(fù)載均衡

說明:參見RebalanceImpl類

? ? 1、動態(tài)負(fù)載均衡就是一個topic下所有的messageQueue和消費分組里面的消費者按照一定的動態(tài)調(diào)整策略進(jìn)行分配,同一個消費分組里面的消費者每人負(fù)責(zé)一部分的messageQueue。


動態(tài)調(diào)整拉取任務(wù)

說明:參見RebalanceImpl類

? ? 1、consumer新負(fù)責(zé)的messageQueue加入到拉取任務(wù)當(dāng)中來

? ? ? ? 2、consumer不負(fù)責(zé)的messageQueue從拉取任務(wù)中剔除。



新增消息拉取任務(wù)

說明:參見PullMessageService類

投遞待拉取消息任務(wù)

說明:參見PullMessageService類


訂閱重試隊列邏輯

注冊%retry%@consumerGroup消費分組

說明:

? ? 核心代碼邏輯,這個表明了consumer訂閱了重試隊列并對重試隊列進(jìn)行消費。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容