消費(fèi)者從Broker中獲取消息的方式有兩種:pull拉取方式和push推動(dòng)方式。
消費(fèi)者組對(duì)于消息消費(fèi)的模式又分為兩種:集群消費(fèi)Clustering和廣播消費(fèi)Broadcasting。
1.獲取消費(fèi)類型
拉取式消費(fèi)
Consumer主動(dòng)從Broker中拉取 消息,主動(dòng)權(quán)由Consumer控制。一旦獲取了批量消息,就會(huì)啟動(dòng)消費(fèi)過程。不過,該方式的實(shí)時(shí)性比較若,即Broker中有了新的消息時(shí)消費(fèi)者并不能及時(shí)發(fā)現(xiàn)并消費(fèi)。
由于拉取時(shí)間間隔是由用戶指定的,所以在設(shè)置該間隔時(shí)需要注意平穩(wěn):間隔太短,空請(qǐng)求比例會(huì)增加;間隔太長,消息的實(shí)時(shí)性太差。
推送式消費(fèi)
該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給Consumer。該獲取方式一般實(shí)時(shí)性較高。
該方式是典型的發(fā)布訂閱模式,即Consumer向其關(guān)聯(lián)的Queue注冊(cè)了監(jiān)聽器,一旦發(fā)現(xiàn)有新的消息到來就會(huì)觸發(fā)回調(diào)的執(zhí)行,回調(diào)方法是Consumer去Queue中拉取消息。而這些都是基于Consumer與Broker間的長連接的。長連接的維護(hù)是需要消耗系統(tǒng)資源的。
對(duì)比
- pull:需要應(yīng)用去實(shí)現(xiàn)對(duì)關(guān)聯(lián)Queue的遍歷,實(shí)時(shí)性差;但便于應(yīng)用控制消息的拉取。
- push:封裝了對(duì)關(guān)聯(lián)Queue的遍歷,實(shí)時(shí)性強(qiáng),但會(huì)占用較多的系統(tǒng)資源
2.消費(fèi)模式
廣播消費(fèi)

廣播消費(fèi)模式下,相同ConsumerGroup的每個(gè)Consumer實(shí)例都會(huì)接收同一個(gè)Topic的全量消息。即每條消息都會(huì)被發(fā)送到Consumer Group中的每個(gè)Consumer。
集群消費(fèi)

集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)?/code>同一個(gè)Topic的消息。即每條消息只會(huì)被發(fā)送到ConsumerGroup中的某個(gè)Consumer。
消息進(jìn)度保存
- 廣播模式:消費(fèi)進(jìn)度保存在consumer端。因?yàn)閺V播模式下consumer group中每個(gè)consumer都會(huì)消費(fèi)所有消息,但他們的消費(fèi)進(jìn)度是不同的。所以consumer各自保存各自的消費(fèi)進(jìn)度。
- 集群模式:消費(fèi)進(jìn)度保存在broker中。consumer group中的所有consumer共同消費(fèi)同一個(gè)Topic中的消息,同一條消息只會(huì)被消費(fèi)一次。消費(fèi)進(jìn)度會(huì)參與到了消費(fèi)的負(fù)載均衡中,故消費(fèi)進(jìn)度是需要共享的。
3.Rebalance機(jī)制
Rebalance機(jī)制討論的前提是:集群消費(fèi)。
什么是Rebalance
Rebalance即再均衡,指的是,將一個(gè)Topic下的多個(gè)Queue在同一個(gè)Consumer Group中的多個(gè)Consumer間進(jìn)行重新分配的過程。

Rebalance機(jī)制的本意是為了提升消息的并行消費(fèi)能力。例如,一個(gè)Topic下5個(gè)隊(duì)列,在只有一個(gè)消費(fèi)者的情況下,這個(gè)消費(fèi)者將負(fù)責(zé)消費(fèi)者5個(gè)隊(duì)列的消息。如果我們此時(shí)增加一個(gè)消費(fèi)者,那么九可以給其中一個(gè)消費(fèi)者分配2個(gè)隊(duì)列,給另一個(gè)分配3個(gè)隊(duì)列,從而提升消息的并行消費(fèi)能力。
Rebalance限制
由于一個(gè)隊(duì)列最多分配給一個(gè)消費(fèi)者,因此當(dāng)某個(gè)消費(fèi)者組下的消費(fèi)者實(shí)例數(shù)量大于隊(duì)列的數(shù)量時(shí),多余的消費(fèi)者實(shí)例將分配不到任何隊(duì)列。
Rebalance危害
Rebalance在提升消費(fèi)能力的同時(shí),也帶來一些問題:
消費(fèi)暫停:在只有一個(gè)Consumer時(shí),其負(fù)責(zé)消費(fèi)所有隊(duì)列;在新增了一個(gè)Consumer后會(huì)觸發(fā)Rebalance的發(fā)生。此時(shí)原Consumer就需要暫停部分隊(duì)列的消費(fèi),等到這些隊(duì)列分配給新的Consumer后,這些暫停消費(fèi)的隊(duì)列才能繼續(xù)被消費(fèi)。
消費(fèi)重復(fù):Consumer在消費(fèi)新分配給自己的隊(duì)列時(shí),必須接著之前Consumer提交的消費(fèi)進(jìn)度的offset繼續(xù)消費(fèi)。然而默認(rèn)情況下,offset是異步提交的,這個(gè)異步性導(dǎo)致提交到Broker的offset與Consumer實(shí)際消費(fèi)的消息并不一致。這個(gè)不一致的差值就是可能會(huì)重復(fù)消費(fèi)的消息。
同步提交:consumer提交了其消費(fèi)完畢的一批消息的offset給broker后,需要等待Broker的成功ACK。當(dāng)收到ACK后,consumer才會(huì)繼續(xù)獲取并消費(fèi)下一批消息。在等待ACK期間,consumer是阻塞的。
異步提交:consumer提交了其消費(fèi)完畢的一批消息的offset給broker后,不需要等待broker的成功ACK。consumer可以直接獲取并消費(fèi)下一批消息。
對(duì)于一次性讀取消息的數(shù)量,需要根據(jù)具體業(yè)務(wù)場(chǎng)景選擇一個(gè)相對(duì)均衡的策略是很有必要的。因?yàn)閿?shù)量過大,系統(tǒng)性能提升了,但產(chǎn)生重復(fù)消費(fèi)的消息數(shù)量可能會(huì)增加;數(shù)量國小,系統(tǒng)性能下降,但被重復(fù)消費(fèi)的消息數(shù)量可能會(huì)減少
消費(fèi)突刺:由于Rebalance可能導(dǎo)致重復(fù)消費(fèi),如果需要重復(fù)消費(fèi)的消息過多,或者因?yàn)镽ebalance暫停時(shí)間過長而導(dǎo)致積壓了部分消息。那么又可能會(huì)導(dǎo)致在Rebalance結(jié)束之后瞬間需要消費(fèi)很多消息。
Rebalance產(chǎn)生的原因
導(dǎo)致Rebalance產(chǎn)生的原因,無非就兩個(gè):消費(fèi)者所訂閱Topic的Queue數(shù)量發(fā)生變化,或消費(fèi)者組中消費(fèi)者的數(shù)量發(fā)生變化。
1)Queue數(shù)量發(fā)生變化的場(chǎng)景:
Broker擴(kuò)容或縮容
Broker升級(jí)運(yùn)維
Broker與NameServer間的網(wǎng)絡(luò)異常
Queue擴(kuò)容或縮容
2)消費(fèi)者數(shù)量發(fā)生變化的場(chǎng)景:
Consumer Group擴(kuò)容或縮容
Consumer升級(jí)運(yùn)維
Consumer與NameServer間網(wǎng)絡(luò)異常
Rebalance過程
在Broker中維護(hù)著多個(gè)Map集合,這些集合中動(dòng)態(tài)存放著當(dāng)前Topic中Queue的信息、ConsumerGroup中Consumer實(shí)例的信息。一旦發(fā)現(xiàn)消費(fèi)者所訂閱的Queue數(shù)量發(fā)生變化,或消費(fèi)者的數(shù)量發(fā)生變化,立即向ConsumerGroup中的每個(gè)實(shí)例發(fā)出Rebalance通知。
與Kafka的對(duì)比
在Kafka中,一旦發(fā)現(xiàn)出現(xiàn)了Rebalance條件,Broker會(huì)調(diào)用Group Coordinator來完成ReBalance。
Coordinator是Broker中的一個(gè)進(jìn)程。Coordinator會(huì)在Consumer Group中選出一個(gè)Group Leader。由于這個(gè)Leader根據(jù)自己本身組情況完成Parition分區(qū)的再分配。這個(gè)再分配結(jié)果會(huì)上報(bào)給Coordinator,并由Coordinator同步給Group中的所有Consumer實(shí)例。
Kafka中的Rebalance是由Consumer Leader完成的。而RocketMQ中的Rebalance是由每個(gè)Consumer自身完成的,Group中不存在Leader。
4.Queue分配算法
一個(gè)Topic中的Queue只能由Consumer Group中的一個(gè)Consumer進(jìn)行消費(fèi),而一個(gè)Consumer可以同時(shí)消費(fèi)多個(gè)Queue中的消息。那么Queue與Consumer間的配對(duì)關(guān)系是如何確定的,即Queue要分配給哪個(gè)Consumer進(jìn)行消費(fèi),也是有算法策略的。常見的有四種策略,這些策略是通過在創(chuàng)建Consumer時(shí)的構(gòu)造器傳進(jìn)去的。
4.1平均分配策略

該算法主要時(shí)根據(jù)avg = QueueCount / ConsumerCount的計(jì)算結(jié)果進(jìn)行分配的。如果能夠整除,則按順序?qū)vg個(gè)Queue逐個(gè)分配Consumer;如果不能整除,則將多余處的Queue按照Consumer順序逐個(gè)分配。
4.2環(huán)境平均策略

環(huán)形平均算法是指,根據(jù)消費(fèi)者的順序,一次在由queue隊(duì)列組成的環(huán)形圖中逐個(gè)分配。
該算法不用事先計(jì)算每個(gè)Consumer需要分配幾個(gè)Queue,直接一個(gè)一個(gè)分即可。
4.3一致性hash策略

該算法會(huì)將consumer的hash值作為Node節(jié)點(diǎn)存放到hash環(huán)上,然后將queue的hash值也放到hash環(huán)上,通過順時(shí)針方向,距離queue最近的那個(gè)consumer就是該queue要分配的consumer。
4.4同機(jī)房策略

該算法會(huì)根據(jù)queue的部署機(jī)房位置和consumer的位置,過濾出當(dāng)前consumer相同機(jī)房的queue。然后按照平均分配策略或環(huán)形平均策略對(duì)同機(jī)房queue進(jìn)行分配。如果沒有同機(jī)房queue,則按照平均分配策略或環(huán)形平均策略對(duì)所有queue進(jìn)行分配。
4.5對(duì)比
一致性hash存在的問題:
兩種平均分配策略的分配效率高,一致性hash策略的較低。因?yàn)橐恢滦詇ash算法較復(fù)雜。另外,一致性hash策略分配的結(jié)果也很大可能上存在不平均的情況。
一致性hash算法存在的意義:
其可以有效減少由于消費(fèi)者組擴(kuò)容或縮容所帶來的大量Rebalance
一致性hash算法的應(yīng)用場(chǎng)景:
Consumer數(shù)量變化較頻繁的場(chǎng)景。
5.至少一次原則
RocketMQ由一個(gè)原則:每條消息必須要被成功消費(fèi)一次。
那什么是成功消費(fèi)呢?Consumer在消費(fèi)完消息后會(huì)向其消費(fèi)進(jìn)度記錄器提交消費(fèi)消息的offser,offset被成功記錄到記錄器中,那么這條消費(fèi)就被成功消費(fèi)了。
什么是消費(fèi)進(jìn)度記錄器?
對(duì)于廣播消費(fèi)模式來說,Consumer本身就是消費(fèi)進(jìn)度記錄器。
對(duì)于集群消費(fèi)模式來說,Broker是消費(fèi)進(jìn)度記錄器。