push consumer:consumer的一種,需要向consumer對象注冊監(jiān)聽。
pull consumer:consumer的一種,需要主動(dòng)請求Broker拉取消息。
1、消費(fèi)組到底是個(gè)什么概念?
在理解了Broker數(shù)據(jù)存儲(chǔ)機(jī)制以及Broker高可用主從同步機(jī)制之后,我們就可以來看一下消費(fèi)者是如何從Broker獲取消息,并且進(jìn)行處理以及維護(hù)消費(fèi)進(jìn)度的。
首先,我們需要了解的第一個(gè)概念,就是消費(fèi)者組
消費(fèi)者組的意思,就是讓你給一組消費(fèi)者起一個(gè)名字。比如我們有一個(gè)Topic叫“TopicOrderPaySuccess”,然后假設(shè)有庫存系統(tǒng)、積分系統(tǒng)、營銷系統(tǒng)、倉儲(chǔ)系統(tǒng)他們都要去消費(fèi)這個(gè)Topic中的數(shù)據(jù)。
此時(shí)我們應(yīng)該給那四個(gè)系統(tǒng)分別起一個(gè)消費(fèi)組的名字,比如說:stock_consumer_group,marketing_consumer_group,credie_consumer_group,wms_consumer_group。
設(shè)置消費(fèi)組的方式是在代碼里進(jìn)行的,類似下面這樣:

然后比如庫存系統(tǒng)部署了4臺(tái)機(jī)器,每臺(tái)機(jī)器上的消費(fèi)者組的名字都是“stock_consumer_group”,那么這4臺(tái)機(jī)器就同屬于一個(gè)消費(fèi)者組,以此類推,每個(gè)系統(tǒng)的幾臺(tái)機(jī)器都是屬于各自的消費(fèi)者組的。
我們看一下下面的圖,里面示意了兩個(gè)系統(tǒng),每個(gè)系統(tǒng)都有2臺(tái)機(jī)器,每個(gè)系統(tǒng)都有一個(gè)自己的消費(fèi)組。

然后給大家先解釋一下不同消費(fèi)者之間的關(guān)系,假設(shè)庫存系統(tǒng)和營銷系統(tǒng)作為兩個(gè)消費(fèi)者組,都訂閱了“TopicOrderPaySuccess”這個(gè)訂單支付成功消息的Topic,此時(shí)假設(shè)訂單系統(tǒng)作為生產(chǎn)者發(fā)送了一條消息到這個(gè)Topic,如下圖所示。

此時(shí)這條消息是怎么被消費(fèi)的呢?
正常情況下來說,這條消息進(jìn)入Broker之后,庫存系統(tǒng)和營銷系統(tǒng)作為兩個(gè)消費(fèi)組,每個(gè)組都會(huì)拉取到這條消息。
也就是說這個(gè)訂單支付成功的消息,庫存系統(tǒng)會(huì)獲取到一條,營銷系統(tǒng)也會(huì)獲取到一條,他們倆都會(huì)獲取到這條消息。
但是下一個(gè)問題來了,庫存系統(tǒng)這個(gè)消費(fèi)組里有兩臺(tái)機(jī)器,是兩臺(tái)機(jī)器都獲取到這條消息?還是說只有一臺(tái)機(jī)器會(huì)獲取到這條消息?
答案是,正常情況下來說,庫存系統(tǒng)的兩臺(tái)機(jī)器中只有一臺(tái)機(jī)器會(huì)獲取到這條消息,營銷系統(tǒng)也是同理。
我們看下面的圖,示意了對于一條訂單支付成功的消息,庫存系統(tǒng)的一臺(tái)機(jī)器獲取到了,營銷系統(tǒng)的一臺(tái)機(jī)器也獲取到了。
當(dāng)然為了畫圖方便,圖里是讓營銷系統(tǒng)從Master Broker拉取的,庫存系統(tǒng)從Slave Broker拉取的。

這就是在消費(fèi)的時(shí)候我們要給大家介紹的第一個(gè)知識(shí)點(diǎn),不同的系統(tǒng)應(yīng)該設(shè)置不同的消費(fèi)組,如果不同的消費(fèi)組訂閱了同一個(gè)Topic,對Topic里的一條消息,每個(gè)消費(fèi)組都會(huì)獲取到這條消息。
2、集群模式消費(fèi) vs 廣播模式消費(fèi)
接著我們給大家介紹下一個(gè)概念,就是對于一個(gè)消費(fèi)組而言,他獲取到一條消息之后,如果消費(fèi)組內(nèi)部有多臺(tái)機(jī)器,到底是只有一臺(tái)機(jī)器可以獲取到這個(gè)消息,還是每臺(tái)機(jī)器都可以獲取到這個(gè)消息?
這個(gè)就是集群模式和廣播模式的區(qū)別。
默認(rèn)情況下我們都是集群模式,也就是說,一個(gè)消費(fèi)組獲取到一條消息,只會(huì)交給組內(nèi)的一臺(tái)機(jī)器去處理,不是每臺(tái)機(jī)器都可以獲取到這條消息的。
但是我們可以通過如下設(shè)置來改變?yōu)閺V播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
如果修改為廣播模式,那么對于消費(fèi)組獲取到的一條消息,組內(nèi)每臺(tái)機(jī)器都可以獲取到這條消息。但是相對而言廣播模式其實(shí)用的很少,常見基本上都是使用集群模式來進(jìn)行消費(fèi)的。
3、重溫MessageQueue、CommitLog、ConsumeQueue之間的關(guān)系
接著我們來研究一下MessageQueue與消費(fèi)者的關(guān)系,通過之前的學(xué)習(xí)我們都已經(jīng)知道了,一個(gè)Topic在創(chuàng)建的時(shí)候我們是要設(shè)置他有多少個(gè)MessageQueue的,而且我們也知道了,在Broker上MessageQueue是如何跟ConsumeQueue對應(yīng)起來的。
我們先在圖中展示出這些概念,在Broker上我們會(huì)看到CommitLog文件,還有對應(yīng)的多個(gè)ConsumeQueue文件

根據(jù)之前學(xué)習(xí)到的知識(shí),我們大致可以如此理解,Topic中的多個(gè)MessageQueue會(huì)分散在多個(gè)Broker上,在每個(gè)Broker機(jī)器上,一個(gè)MessageQueue就對應(yīng)了一個(gè)ConsumeQueue,當(dāng)然在物理磁盤上其實(shí)是對應(yīng)了多個(gè)ConsumeQueue文件的,但是我們大致也理解為一 一對應(yīng)關(guān)系。
我們看下圖中,我圈出來了ConsumeQueue,就代表了一個(gè)Topic的多個(gè)MessageQueue在物理磁盤上分別對應(yīng)一個(gè)ConsumeQueue的意思。

但是對于一個(gè)Broker機(jī)器而言,存儲(chǔ)在他上面的所有Topic以及MessageQueue的消息數(shù)據(jù)都是寫入一個(gè)統(tǒng)一的CommitLog的
我們看下面的圖,我圈出來了CommitLog,代表的是Broker上所有消息都是往里面寫的。

然后對于Topic的各個(gè)MessageQueue而言,就是通過各個(gè)ConsumeQueue文件來存儲(chǔ)屬于MessageQueue的消息在CommitLog文件中的物理地址,就是一個(gè)offset偏移量,我在下面的圖中標(biāo)識(shí)出來了這個(gè)地址應(yīng)用的關(guān)系。

4、MessageQueue與消費(fèi)者的關(guān)系
接著我們來想一個(gè)問題,對于一個(gè)Topic上的多個(gè)MessageQueue,是如何由一個(gè)消費(fèi)組中的多臺(tái)機(jī)器來進(jìn)行消費(fèi)的呢?
其實(shí)這里的源碼實(shí)現(xiàn)細(xì)節(jié)是較為復(fù)雜的,但我們可以簡單的理解為,他會(huì)均勻的將MessageQueue分配給消費(fèi)組的多臺(tái)機(jī)器來消費(fèi)。
舉個(gè)例子,假設(shè)我們的“TopicOrderPaySuccess”有4個(gè)MessageQueue,這4個(gè)MessageQueue分布在兩個(gè)Master Broker上,每個(gè)Master Broker上有2個(gè)MessageQueue。
然后庫存系統(tǒng)作為一個(gè)消費(fèi)組里有兩臺(tái)機(jī)器,那么正常情況下,當(dāng)然最好的就是讓這兩臺(tái)機(jī)器每個(gè)都負(fù)責(zé)2個(gè)MessageQueue的消費(fèi)了
比如庫存系統(tǒng)的機(jī)器01從Master Broker01上消費(fèi)2個(gè)MessageQueue,然后庫存系統(tǒng)的機(jī)器02從Master Broker02上消費(fèi)2個(gè)MessageQueue,這樣不就把消費(fèi)的負(fù)載均攤到兩臺(tái)Master Broker上去了?
我們在下面的圖里畫出了這個(gè)示意。

所以你大致可以認(rèn)為一個(gè)Topic的多個(gè)MessageQueue會(huì)均勻分?jǐn)偨o消費(fèi)組內(nèi)的多個(gè)機(jī)器去消費(fèi),這里的一個(gè)原則就是,一個(gè)MessageQueue只能被一個(gè)消費(fèi)機(jī)器去處理,但是一臺(tái)消費(fèi)者機(jī)器可以負(fù)責(zé)多個(gè)MessageQueue的消息處理。
5、Push模式 vs Pull模式
現(xiàn)在我們已經(jīng)知道了一個(gè)消費(fèi)組內(nèi)的多臺(tái)機(jī)器是分別負(fù)責(zé)一部分MessageQueue的消費(fèi)的,那么既然如此,每臺(tái)機(jī)器都必須去連接到對應(yīng)的Broker,嘗試消費(fèi)里面的MessageQueue對應(yīng)的消息了。
此時(shí)就要涉及到兩種消費(fèi)模式了,之前我們也提到過,一個(gè)是Push,一個(gè)是Pull。
實(shí)際上,這兩個(gè)消費(fèi)模式本質(zhì)是一樣的,都是消費(fèi)者機(jī)器主動(dòng)發(fā)送請求到Broker機(jī)器去拉取一批消息下來。
Push消費(fèi)模式本質(zhì)底層也是基于這種消費(fèi)者主動(dòng)拉取的模式來實(shí)現(xiàn)的,只不過他的名字叫做Push而已,意思是Broker會(huì)盡可能實(shí)時(shí)的把新消息交給消費(fèi)者機(jī)器來進(jìn)行處理,他的消息時(shí)效性會(huì)更好。
一般我們使用RocketMQ的時(shí)候,消費(fèi)模式通常都是基于他的Push模式來做的,因?yàn)镻ull模式的代碼寫起來更加的復(fù)雜和繁瑣,而且Push模式底層本身就是基于消息拉取的方式來做的,只不過時(shí)效性更好而已。
Push模式的實(shí)現(xiàn)思路我這里簡單說一下:當(dāng)消費(fèi)者發(fā)送請求到Broker去拉取消息的時(shí)候,如果有新的消息可以消費(fèi)那么就會(huì)立馬返回一批消息到消費(fèi)機(jī)器去處理,處理完之后會(huì)接著立刻發(fā)送請求到Broker機(jī)器去拉取下一批消息。
所以消費(fèi)機(jī)器在Push模式下會(huì)處理完一批消息,立馬發(fā)起請求拉取下一批消息,消息處理的時(shí)效性非常好,看起來就跟Broker一直不停的推送消息到消費(fèi)機(jī)器一樣。
另外Push模式下有一個(gè)請求掛起和長輪詢的機(jī)制,也要給大家簡單介紹一下。
當(dāng)你的請求發(fā)送到Broker,結(jié)果他發(fā)現(xiàn)沒有新的消息給你處理的時(shí)候,就會(huì)讓請求線程掛起,默認(rèn)是掛起15秒,然后這個(gè)期間他會(huì)有后臺(tái)線程每隔一會(huì)兒就去檢查一下是否有的新的消息給你,另外如果在這個(gè)掛起過程中,如果有新的消息到達(dá)了會(huì)主動(dòng)喚醒掛起的線程,然后把消息返回給你。
當(dāng)然其實(shí)消費(fèi)者進(jìn)行消息拉取的底層源碼是非常復(fù)雜的,涉及到大量的細(xì)節(jié),但是他的核心思路大致就是如此,我們只要知道,哪怕是用常見的Push模式消費(fèi),本質(zhì)也是消費(fèi)者不停的發(fā)送請求到broker去拉取一批一批的消息就行了。
6、Broker是如何將消息讀取出來返回給消費(fèi)機(jī)器的?
接著我們思考一個(gè)小問題,Broker在收到消費(fèi)機(jī)器的拉取請求之后,是如何將消息讀取出來返回給消費(fèi)機(jī)器的?
其實(shí)這里要涉及到兩個(gè)概念,分別是ConsumeQueue和CommitLog。
假設(shè)一個(gè)消費(fèi)者機(jī)器發(fā)送了拉取請求到Broker了,他說我這次要拉取MessageQueue0中的消息,然后我之前都沒拉取過消息,所以就從這個(gè)MessageQueue0中的第一條消息開始拉取好了。
于是,Broker就會(huì)找到MessageQueue0對應(yīng)的ConsumeQueue0,從里面找到第一條消息的offset,如下圖所示。

接著Broker就需要根據(jù)ConsumeQueue0中找到的第一條消息的地址,去CommitLog中根據(jù)這個(gè)offset地址去讀取出來這條消息的數(shù)據(jù),然后把這條消息的數(shù)據(jù)返回給消費(fèi)者機(jī)器,如下圖所示。

所以其實(shí)消費(fèi)消息的時(shí)候,本質(zhì)就是根據(jù)你要消費(fèi)的MessageQueue以及開始消費(fèi)的位置,去找到對應(yīng)的ConsumeQueue讀取里面對應(yīng)位置的消息在CommitLog中的物理offset偏移量,然后到CommitLog中根據(jù)offset讀取消息數(shù)據(jù),返回給消費(fèi)者機(jī)器。
7、消費(fèi)者機(jī)器如何處理消息、進(jìn)行ACK以及提交消費(fèi)進(jìn)度?
接著消費(fèi)者機(jī)器拉取到一批消息之后,就會(huì)將這批消息回調(diào)我們注冊的一個(gè)函數(shù),如下面這樣子:

當(dāng)我們處理完這批消息之后,消費(fèi)者機(jī)器就會(huì)提交我們目前的一個(gè)消費(fèi)進(jìn)度到Broker上去,然后Broker就會(huì)存儲(chǔ)我們的消費(fèi)進(jìn)度
比如我們現(xiàn)在對ConsumeQueue0的消費(fèi)進(jìn)度假設(shè)就是在offset=1的位置,那么他會(huì)記錄下來一個(gè)ConsumeOffset的東西去標(biāo)記我們的消費(fèi)進(jìn)度,如下圖。

那么下次這個(gè)消費(fèi)組只要再次拉取這個(gè)ConsumeQueue的消息,就可以從Broker記錄的消費(fèi)位置開始繼續(xù)拉取,不用重頭開始拉取了。
8、如果消費(fèi)組中出現(xiàn)機(jī)器宕機(jī)或者擴(kuò)容加機(jī)器,會(huì)怎么處理?
最后我們來看一下,如果消費(fèi)組中出現(xiàn)機(jī)器宕機(jī)或者擴(kuò)容加機(jī)器的情況,他會(huì)怎么處理?
這個(gè)時(shí)候其實(shí)會(huì)進(jìn)入一個(gè)rabalance的環(huán)節(jié),也就是說重新給各個(gè)消費(fèi)機(jī)器分配他們要處理的MessageQueue。
給大家舉個(gè)例子,比如現(xiàn)在機(jī)器01負(fù)責(zé)MessageQueue0和Message1,機(jī)器02負(fù)責(zé)MessageQueue2和MessageQueue3,現(xiàn)在機(jī)器02宕機(jī)了,那么機(jī)器01就會(huì)接管機(jī)器02之前負(fù)責(zé)的MessageQueue2和MessageQueue3。
或者如果此時(shí)消費(fèi)組加入了一臺(tái)機(jī)器03,此時(shí)就可以把機(jī)器02之前負(fù)責(zé)的MessageQueue3轉(zhuǎn)移給機(jī)器03,然后機(jī)器01就僅僅負(fù)責(zé)一個(gè)MessageQueue2的消費(fèi)了,這就是負(fù)載重平衡的概念。
9、PushConsumer核心參數(shù)詳解
- consumerFormWhere
- allocateMessageQueueStrategy
- subscription
- offsetStore
- consumeThreadMin/consumeThreadMax
- consumeConcurrentlyMaxSpan/pullThresholdForQueue
- pullinterval/pullBatchSize
- consumeMessageBatchMaxSize