RocketMQ源碼解讀之Consumer

立志欲堅(jiān)不欲銳,成功在久不在速。

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?——張孝祥

大綱

圖示

Rebalance(針對(duì)集群消費(fèi)模式)

????(1)消費(fèi)Group下的所有消費(fèi)者

????(2)Topic的所有Queue隊(duì)列

????(3)Queue分配策略

1.觸發(fā)時(shí)機(jī)

????(1)消費(fèi)者啟動(dòng)

????(2)消費(fèi)者加入或者退出消費(fèi)組

????(3)定時(shí)觸發(fā)Rebalance(10s)

2.舉例

? ??假設(shè),一個(gè)topic中有4個(gè)隊(duì)列,有一個(gè)Producer往4個(gè)隊(duì)列中發(fā)數(shù)據(jù),在集群消費(fèi)中,在一個(gè)消費(fèi)者分組中如果只有一個(gè)消費(fèi)者。那么這個(gè)消費(fèi)者肯定會(huì)消費(fèi)4個(gè)隊(duì)列,不然就會(huì)漏數(shù)據(jù)。

????如果加入了一個(gè)Consumer2,這個(gè)時(shí)候就會(huì)觸發(fā)一個(gè)Rebalance

(Consumer增加了觸發(fā)),這2個(gè)消費(fèi)者平均消費(fèi)4個(gè)隊(duì)列。

圖示

????如果再加入了一個(gè)Consumer3,這個(gè)是否平均分不了,一般的處理,默認(rèn)情況下,Consumer1消費(fèi)兩個(gè),其他的消費(fèi)一個(gè)。

????如果再加入了一個(gè)Consumer4,剛好一對(duì)一,所以每個(gè) Consumer消費(fèi)一個(gè)隊(duì)列。

圖示

????如果再加入了一個(gè)Consumer5,消費(fèi)者數(shù)據(jù)大于隊(duì)列,那么Consumer5就消費(fèi)不了數(shù)據(jù),除非隊(duì)列增加了,或者是說Consumer減少了才行。

????所以當(dāng)你啟動(dòng)多個(gè)消費(fèi)者,如果消費(fèi)者數(shù)量大于queue的數(shù)量,也只能有queue數(shù)量的消費(fèi)者消費(fèi)(就跟在軟件公司內(nèi)部找女朋友一樣,狼多肉少)蛋糕都被吃完了,你沒得吃了。這個(gè)其實(shí)就是消費(fèi)并發(fā)度。消費(fèi)并發(fā)度決定因素是queue的數(shù)量。

圖示

3.源碼解讀

????這里講到的是基于推模式的消費(fèi),也就是我們常用的消費(fèi)模式。

源碼1

DefaultMQPushConsumerImpl.start()方法

源碼2
源碼3

還是要進(jìn)入MQClientInstance.start()方法

源碼4

,在MQClientInstance.start()方法,有一個(gè)線程RebalanceService就是鎖Rebalance。具體實(shí)現(xiàn)RebalanceService來做的,下面我們來看下。

RebalanceService

run()
doRebalance()
doRebalance()
doRebalance()
rebalanceByTopic()

????這里有一個(gè)針對(duì)MessageQueue的排序。

sort()

????為什么這么設(shè)計(jì)。如果同一個(gè)分組的多個(gè)客戶端,分布在不同的機(jī)器上(消費(fèi)者的機(jī)器上),每臺(tái)客戶端都單獨(dú)算,并且算出來的效果是一致的。

? ??總體消費(fèi)就是讓每一個(gè)Consumer有同樣的一個(gè)MessageQueue的視圖,因?yàn)槊總€(gè)消費(fèi)者的視圖是一致的,那么在每個(gè)客戶端算負(fù)載,算出來的結(jié)果當(dāng)然就是一致的。這樣就能保障之前的負(fù)載均衡的算出之前的效果。

圖示

????對(duì)于Consumer1和Consumer2,經(jīng)過統(tǒng)一的排序,在Consumer1客戶端也好,還是Consumer2的客戶端也好,算出來的結(jié)果是一致的。

????Consumer1消費(fèi) queue1和queue2。Consumer2消費(fèi)queue3和queue4。

????對(duì)比Kafka,在消費(fèi)的時(shí)候依賴Zookeeper,broker變動(dòng)還要走選舉之類,如果選不出或者比較卡,這個(gè)是否會(huì)導(dǎo)致負(fù)載不正常,負(fù)載不成功就不能正常的工作。

????而RocketMQ的這種方式簡(jiǎn)單,并且高可用。

? ??強(qiáng)一致性必定要犧牲高可用性,RocketMQ設(shè)計(jì)上更多偏向高可用。

消費(fèi)者源碼解讀

????我們知道,在消費(fèi)的時(shí)候有兩種模式,一個(gè)是并發(fā)消費(fèi),另外一種是順序消費(fèi)。

代碼示例1
代碼示例2

? ??因?yàn)橄M(fèi)者的代碼非常復(fù)雜,并且我認(rèn)為沒有必要全部讀懂。所以我采取了一種偏向于大家都能聽懂的高可用方式(犧牲讀源碼的全面性)讀兩個(gè)流程。

1.并發(fā)消費(fèi)

(1)功能描述

?>獲取topic配置信息[GET_ROUTEINFO_BY_TOPIC]

>獲取GroupConsumerList[GET_CONSUMER_LIST_BY_GROUP]

>獲取Queue的消費(fèi)Offset[QUERY_CONSUMER_OFFSET]

>拉取Queue的消息[PULL_MESSAGE]

?>更新Queue的消費(fèi)Offset[UPDATE_CONSUMER_OFFSET]

>注銷Consumer[UNREGISTER_CLIENT]

圖示

(2)部分源碼解讀

三個(gè)角色:消費(fèi)者Consumer、 Borker、NameServer

????NameServer主要記錄了Borker上有哪些Topic。

????>在消費(fèi)者啟動(dòng)之后,第一步都要從NameServer中獲取Topic相關(guān)信息。

????這一步設(shè)計(jì)到組件之間的交互,RocketMQ使用功能號(hào)來設(shè)計(jì)的。

????GET_ROUTEINFO_BY_TOPIC

????我在idea上使用ctrl+H 查找功能。

????很快就定位這段代碼:

getTopicRouteInfoFromNameServer()

????消費(fèi)者拿到topic相關(guān)信息之后,第2步需要知道Topic中有哪些queue,并且消費(fèi)的時(shí)候還跟消費(fèi)者分組相關(guān)。所以這里就需要根據(jù)group獲取相關(guān)信息。(這里有定時(shí)觸發(fā)<默認(rèn)10s一次>,同時(shí)在消費(fèi)者啟動(dòng)的時(shí)候也會(huì)主動(dòng)觸發(fā)一次)

功能號(hào):GET_CONSUMER_LIST_BY_GROUP

getConsumerIdListByGroup()

? ??當(dāng)我們拿到了消費(fèi)者Group下的所有信息之后,這個(gè)就可以做分配,可以分配到比如自己這臺(tái)消費(fèi)者的應(yīng)該要消費(fèi)哪些主機(jī)上的哪些隊(duì)列。

這個(gè)地方叫DoRebalance,同時(shí)這個(gè)DoRebalacne之前已經(jīng)細(xì)講(具體這里不細(xì)講)。

RebalanceService
doRebalance()

? ??確定了消費(fèi)者的group、topic、還有queue之后,還需要知道從哪個(gè)位置開始消費(fèi)。于是還需要獲取Queue的Offset。

功能號(hào):QUERY_CONSUMER_OFFSET

queryConsumerOffset()

調(diào)用的地方RemoteBrokerOffsetStore類中fetchConsumeOffsetFromBroker

fetchConsumeOffsetFromBroker()

????確定了消費(fèi)者的group、topic、還有queue和需要獲取Queue的Offset,就要正式開始拉取消息了。

????送入的信息:topic、queueid、offset,

????還有maxnum(每次拉取多少條消息),suspendtimeout ?長(zhǎng)輪詢,Consumer拉消息請(qǐng)求在Broker掛起最長(zhǎng)時(shí)間,單位毫秒默認(rèn)值20000。

功能號(hào):PULL_MESSAGE

? ??拉到消息后,消費(fèi)者就要進(jìn)行消息的消費(fèi)了。消費(fèi)完了之后,要更新offset,這個(gè)時(shí)候也要發(fā)起調(diào)用。

功能號(hào):UPDATE_CONSUMER_OFFSET

? ??這個(gè)地方要注意有兩種方式:

????1、定時(shí),默認(rèn)5s提交。

????2、前面步驟的拉取消息時(shí)會(huì)帶入?yún)?shù):commitoffset,這個(gè)時(shí)候也會(huì)更新。

圖示

? ??最后的話,消費(fèi)者關(guān)閉的話,也會(huì)調(diào)用

功能號(hào):UNREGISTER_CLIENT

當(dāng)然,生產(chǎn)者和和Broker之間還有心跳機(jī)制,這里就不多說了。

2.順序消費(fèi)

(1)功能描述

? ? >獲取topic配置信息[GET_ROUTEINFO_BY_TOPIC]

????>獲取GroupConsumerList[GET_CONSUMER_LIST_BY_GROUP]

????>加鎖Queue[LOCK_BATCH_MQ]

????>獲取Queue的消費(fèi)Offset[QUERY_CONSUMER_OFFSET]

????>拉取Queue的消息[PULL_MESSAGE]

????>更新Queue的消費(fèi)Offset[UPDATE_CONSUMER_OFFSET]

????>解鎖Queue[UNLOCK_BATCH_MQ]

????>注銷Consumer[UNREGISTER_CLIENT]

圖示

(2)部分源碼解讀

順序消費(fèi)的主體步驟和并發(fā)消費(fèi)差不多,主要的差別就是有一個(gè)加鎖和解鎖的過程。

? ? >只要確定了是拉哪個(gè)queue。這個(gè)地方要加鎖,加鎖的目的就可以達(dá)到順序性。在一個(gè)queue中消息是順序的,當(dāng)一個(gè)消費(fèi)者確定了一個(gè)queue進(jìn)行消費(fèi)時(shí),使用一個(gè)分布式鎖機(jī)制,是不是就可以確定這個(gè)消費(fèi)者的順序性。

????加鎖Queue

????LOCK_BATCH_MQ

? ??同時(shí)發(fā)現(xiàn),這個(gè)地方也有一個(gè)定時(shí)執(zhí)行,20s,這個(gè)是周期性的去續(xù)鎖。因?yàn)樵赽roker端,這把的鎖的時(shí)間也有一定的失效的,(默認(rèn)60s),如果超過這個(gè)時(shí)間,這把鎖就釋放了。

? ??Broker端針對(duì)這個(gè)的實(shí)現(xiàn)就是一個(gè)ReentrantLock而已。

lockBatchMQ()

? ??解鎖Queue

????UNLOCK_BATCH_MQ

unlockBatchMQ()

消費(fèi)中常見問題

1.重復(fù)消息

? ??RocketMQ生產(chǎn)也好,消費(fèi)也好,有重試機(jī)制、重發(fā)隊(duì)列等等,所以在網(wǎng)絡(luò)情況不太好的情況下, RocketMQ避免不了消息的重復(fù)。

2.消費(fèi)卡死

? ??之前我講到了消費(fèi)的流程中,尤其是針對(duì)順序消息,我們感覺上會(huì)有卡死的現(xiàn)象,由于順序消息中需要到Broker中加鎖,如果消費(fèi)者某一個(gè)掛了,那么在Broker層是維護(hù)了60s的時(shí)間才能釋放鎖,所以在這段時(shí)間只能(消費(fèi)者是消費(fèi)不了的)在等待鎖。

????另外如果還有Broker層面也掛了,如果是主從機(jī)構(gòu),獲取鎖都是走的Master節(jié)點(diǎn),如果Master節(jié)點(diǎn)掛了,走Slave消費(fèi),但是slave節(jié)點(diǎn)上沒有鎖,所以順序消息如果發(fā)生了這樣的情況,也是會(huì)有卡死的現(xiàn)象。

3.啟動(dòng)之后較長(zhǎng)時(shí)間才消費(fèi)

? ??在并發(fā)消費(fèi)的時(shí)候,當(dāng)我們啟動(dòng)了非常多的消費(fèi)者,維護(hù)了非常多的topic的時(shí)候、或者queue比較多的時(shí)候,你可以看到消費(fèi)的流程的交互是比較多的(5~6步),要啟動(dòng)多線程,也要做相當(dāng)多的事情,所以你會(huì)感覺要啟動(dòng)較長(zhǎng)的時(shí)間才能消費(fèi)。

????還有順序消費(fèi)的時(shí)候,如果是之前的消費(fèi)者掛了,這個(gè)鎖要60秒才會(huì)釋放,也會(huì)導(dǎo)致下一個(gè)消費(fèi)者啟動(dòng)的時(shí)候需要等60s才能消費(fèi)。


我是嬈疆_蚩夢(mèng),讓堅(jiān)持成為一種習(xí)慣,感謝各位大佬的:點(diǎn)贊、收藏評(píng)論,我們下期見!


上一篇:RocketMQ源碼解讀之Store

下一篇:RocketMQ常見問題分析以及性能優(yōu)化

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容