系列
說到rocketMq的consumer,該篇文章特指pushConsumer,pullConsumer在后續(xù)文章中在分享。
提到consumer,需要搞清楚幾個核心問題,分別是consumer的初始化過程做了哪些事情,消息是如何消費,consumer如何動態(tài)平衡的,整個邏輯還是比較繞的,其中這章節(jié)主要會講清楚兩個事情,1、初始化過程中client做了哪些事情;2、consumer如何動態(tài)平衡拉取任務(wù),具體的任務(wù)消費會由額外的一章進(jìn)行講解。
consumer的初始化過程

說明:整體執(zhí)行過程如下,著重介紹subscribe和start兩個過程
? ? 1、創(chuàng)建consumer并設(shè)置消費分組
? ? 2、設(shè)置消費位移
? ? 3、設(shè)置訂閱topic
? ? 4、設(shè)置消費執(zhí)行的回調(diào)函數(shù)
? ? 5、啟動consumer
consumer的初始化流程圖

consumer內(nèi)部初始化過程
說明:整個初始化比較復(fù)雜,為了大家能夠理解,先用簡單的語句概述一遍
? ? 1、構(gòu)建consumer的訂閱信息,包括consumer本身的訂閱和消費分組的重試隊列。
? ? 2、創(chuàng)建Rebalance服務(wù),該服務(wù)每隔20s進(jìn)行消費端負(fù)責(zé)的messageQueue的消費。
? ? 3、啟動消費偏移量獲取服務(wù),獲取上一次消費位移。
? ? 4、啟動定時任務(wù),其中核心任務(wù)之一是定時去namesrv拉取broker信息。
? ? 5、啟動pullMessageService,負(fù)責(zé)從broker拉取待消費消息
? ? 6、啟動rebalanceService,負(fù)責(zé)定期調(diào)整consumer端負(fù)載均衡包括第一次觸發(fā)拉取任務(wù)
? ? 7、其中rebalanceService和pullMessageService相互配合使用,前者負(fù)責(zé)將新加入messageQueue拉取任務(wù)加入到pullMessageservice當(dāng)中,將舊的messageQueue的拉取任務(wù)從pullMessageService中停止,兩者之間通過消息隊列的形式進(jìn)行通信。
構(gòu)建subscription過程

說明:參見DefaultMQPushConsumerImpl類
? ? 1、訂閱消息最后保存至RebalanceImpl當(dāng)中,因為這個是后面動態(tài)負(fù)載均衡的核心。
client端啟動過程

說明:參見DefaultMQPushConsumerImpl類

說明:參見DefaultMQPushConsumerImpl類
? ? 1、啟動了獲取消費進(jìn)度的服務(wù)

說明:參見MQClientInstance類
? ? 1、啟動定時任務(wù),主要是從namsrv中拉取broker的信息
? ? 2、啟動client從broker拉取消息的服務(wù)
? ? 3、啟動Rebalance服務(wù),負(fù)責(zé)觸發(fā)消息拉取的任務(wù)
? ? 4、步驟3和步驟4之間的兩個服務(wù)通過消息隊列通信

說明:參見MQClientInstance類
? ? 1、負(fù)責(zé)從namesrv拉取broker的信息
拉取任務(wù)的執(zhí)行過程

說明:參見PullMessageService類
? ? 1、負(fù)責(zé)從pullRequestQueue中獲取拉取任務(wù)并執(zhí)行,該任務(wù)由Rebalance服務(wù)投遞
拉取任務(wù)的生成過程

說明:參見RebalanceService類
? ? 1、consumer端負(fù)載均衡的入口

說明:參見MQClientInstance類
? ? 1、每個consumer客戶端只會有一個對象,所以這里for循環(huán)只有一次。

說明:參見MQClientInstance類
1、針對每個訂閱信息都進(jìn)行動態(tài)負(fù)責(zé)均衡,包括consumer本身的訂閱分組和consumerGroup的重試分組。

說明:參見RebalanceImpl類
? ? 1、動態(tài)負(fù)載均衡就是一個topic下所有的messageQueue和消費分組里面的消費者按照一定的動態(tài)調(diào)整策略進(jìn)行分配,同一個消費分組里面的消費者每人負(fù)責(zé)一部分的messageQueue。

說明:參見RebalanceImpl類
? ? 1、consumer新負(fù)責(zé)的messageQueue加入到拉取任務(wù)當(dāng)中來
? ? ? ? 2、consumer不負(fù)責(zé)的messageQueue從拉取任務(wù)中剔除。

說明:參見PullMessageService類

說明:參見PullMessageService類
訂閱重試隊列邏輯

說明:
? ? 核心代碼邏輯,這個表明了consumer訂閱了重試隊列并對重試隊列進(jìn)行消費。