系列
本章主要講解Rebalance在consumer端的作用,如果要理解consumer的邏輯,就必須要知道在consumer端有Rebalance這個(gè)服務(wù),沒有Rebalance也就沒有consumer的消息拉取。
Rebalance過程介紹
????1、從namesrv獲取messageQueue信息
????2、從broker獲取consumer信息
????3、選擇Rebalance策略
????4、三者結(jié)合實(shí)現(xiàn)Rebalance操作
Rebalance的平衡粒度
Rebalance是針對Topic+ConsumerGroup進(jìn)行Rebalance的,在我們創(chuàng)建的comsumer過程中會(huì)訂閱topic(包括%retry%consumerGroup),Rebalance就是要這些Topic下的所有messageQueue按照一定的規(guī)則分發(fā)給consumerGroup下的consumer進(jìn)行消費(fèi)。

說明:參見RebalanceImpl類
? ? 1、著重需要強(qiáng)調(diào)的概念,Rebalance是針對訂閱的topic進(jìn)行Rebalance,也就是假如consumer訂閱了10個(gè)Topic,那么我們就需要對10個(gè)Topic里的每一個(gè)Topic進(jìn)行Rebalance。
Rebalance的平衡過程

說明:參見RebalanceImpl類
????1、Rebalance的過程需要3個(gè)要素,分別是Topic下的所有MessageQueue、consumerGroup下的所有consumer、Rebalance策略。這里的MessageQueue是指Topic在每個(gè)broker上的隊(duì)列配置信息。
????2、獲取MessageQueue信息,獲取consumerGroup下的consumer信息,根據(jù)Rebalance策略進(jìn)行Rebalance。
????3、更新Rebalance的結(jié)果進(jìn)行消息的拉取。
? ? 4、Rebalance更新consumer負(fù)責(zé)的messageQueue

說明:參見RebalanceImpl類
Rebalance策略

說明:參見RebalanceImpl類

說明:參見AllocateMessageQueueAveragely類
? ? 1、舉其中一種策略說明,這個(gè)策略是考慮當(dāng)前consumerId的位置,consumer的數(shù)量,MessageQueue的數(shù)量,根據(jù)consumerId所處的位置決定分配多少消費(fèi)隊(duì)列。
? ? 2、該過程會(huì)動(dòng)態(tài)調(diào)整,也可能會(huì)不一致,因?yàn)橐蕾嚨臄?shù)據(jù)來自broker會(huì)有不一致,但是最終肯定會(huì)一致,周期性的Rebalance的作用。
MessageQueue獲取過程

說明:參見MQClientInstance類
????1、consumer端的MessageQueue是根據(jù)topic中的readQueueNums來計(jì)算的
? ? 2、計(jì)算MessageQueue的TopicRouteData是從namesrv中獲取的
Consumer獲取過程

說明:參見MQClientInstance類

參見:MQClientAPIImpl類

說明:參見ConsumerManageProcessor類
? ? 1、獲取consumer列表跟注冊過程是對稱的
Consumer注冊過程
要知道consumer的獲取必須知道consumer是怎么注冊的,其實(shí)consumer會(huì)把注冊信息發(fā)送給broker保存,當(dāng)然由于沒有強(qiáng)一致性的保證,會(huì)存在某些極端情況下broker上的配置不一致,但是由于這是一個(gè)周期性的任務(wù),所以最終肯定會(huì)達(dá)到一致的。

說明:參見DefaultMQPushConsumerImpl類

說明:參見MQClientInstance類
????1、心跳信息包含client的注冊信息
????2、同步給broker的信息是最終一致性的,非強(qiáng)一致性

說明:參見ClientManageProcessor
? ? 1、處理consumer心跳信息的入口

說明:參見ConsumerManager類
????1、broker保存consumer元信息