面試官:RocketMQ 消息積壓了,增 加消費(fèi)者有用嗎?
我:這個(gè)要看具體的場景,不同的場景下情況是不一樣的。
面試官:可以詳細(xì)說一下嗎?
我:如果消費(fèi)者的數(shù)量小于 MessageQueue 的數(shù)量,增加消費(fèi)者可以加快消 息消費(fèi)速度,減少消 息積壓。比如一個(gè) Topic 有 4 個(gè) MessageQueue,2 個(gè)消費(fèi)者進(jìn)行消費(fèi),如果增加一個(gè)消費(fèi)者,明細(xì)可以加快拉取消息的頻率。如下圖:

如果消費(fèi)者的數(shù)量大于等于 MessageQueue 的數(shù)量,增加消費(fèi)者是沒有用的。比如一個(gè) Topic 有 4 個(gè) MessageQueue,并且有 4 個(gè)消費(fèi)者進(jìn)行消費(fèi)。如下圖

面試官:你說的第一種情況,增加消費(fèi)者一定能加快消 息 消 費(fèi)的速度嗎?
我:這...,一般情況下是可以的。
面試官:有特殊的情況嗎?
我:當(dāng)然有。消費(fèi)者消息拉取的速度也取決于本地消息的消費(fèi)速度,如果本地消息消費(fèi)的慢,就會延遲一段時(shí)間后再去拉取。
面試官:在什么情況下消費(fèi)者會延遲一段時(shí)間后再去拉取呢?
我:消費(fèi)者拉取的消息存在 ProcessQueue,消費(fèi)者是有流量控制的,如果出現(xiàn)下面三種情況,就不會主動去拉?。?/p>
- ProcessQueue 保存的消息數(shù)量超過閾值(默認(rèn) 1000,可以配置);
- ProcessQueue 保存的消息大小超過閾值(默認(rèn) 100M,可以配置);
- 對于非順序消費(fèi)的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認(rèn) 2000,可以配置)。
這部分源碼請參考類:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。
面試官:還有其他情況嗎?
我:對于順序消費(fèi)的場景,ProcessQueue 加鎖失敗,也會延遲拉取,這個(gè)延遲時(shí)間是 3s。
面試官:消費(fèi)者延遲拉取消息,一般可能是什么原因?qū)е碌哪兀?/p>
我:其實(shí)延遲拉取的本質(zhì)就是消費(fèi)者消費(fèi)慢,導(dǎo)致下次去拉取的時(shí)候 ProcessQueue 中積壓的消息超過閾值。以下面這張架構(gòu)圖為例:

消費(fèi)者消費(fèi)慢,可 是能下面的原因:
- 消費(fèi)者處理的業(yè)務(wù)邏輯復(fù)雜,耗時(shí)很長;
- 消費(fèi)者有慢查詢,或者數(shù)據(jù)庫負(fù)載高導(dǎo)致響應(yīng)慢;
- 緩存等中間件響應(yīng)慢,比如 Redis 響應(yīng)慢;
- 調(diào)用外部服務(wù)接口響應(yīng)慢。
面試官:對于外部接口響應(yīng)慢的情況,有什么應(yīng)對措施嗎?
我:這個(gè)要分情況討論。
如果調(diào)用外部系統(tǒng)只是一個(gè)通知,或者調(diào)用外部接口的結(jié)果并 不處理,可以采用異步的方式,異步邏輯里采用重試的方式 保 證 接口調(diào)成 功。很多在問怎么進(jìn)階Java架構(gòu)師或者是提升自己的Java技能,這邊我推薦一個(gè)老師的 指導(dǎo)危號給你,你可以搜索找塔下,首先是:125 接著是:343 最后是是:1195 ,你連著數(shù)字就可以練習(xí)到了!有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領(lǐng)取免費(fèi)的學(xué)習(xí)資源,目前受益良多
如果外部接口返回結(jié)果必須要處理,可以考慮接口返回的結(jié)果是否可以緩存默認(rèn)值(要考慮業(yè)務(wù)可行),在調(diào)用失敗后采用快速降級的方式,使用默認(rèn)值替代返回接口返回值。
如果這個(gè)接口返回結(jié)果必須要處理,并且不能緩存,可以把拉取到的消息存入本地然后給 Broker 直接返回 CONSUME_SUCCESS。等外部系統(tǒng)恢復(fù)正常后再從本地取出來進(jìn)行處理。
面試官:如果消 費(fèi) 者數(shù)小于 MessageQueue 數(shù)量,并且外部系統(tǒng)響應(yīng)正常,為了快速消費(fèi)積壓消息而增加消費(fèi)者,有什么需要考慮的嗎?
我:外部系統(tǒng)雖然響應(yīng)正常,但是增加多個(gè)消費(fèi)者后,外部系統(tǒng)的接口調(diào)用量會突增,如果達(dá)到吞吐量上限,外部系統(tǒng)會響應(yīng)變慢,甚至被打掛。
同時(shí)也要考慮本地?cái)?shù)據(jù)庫、緩存的壓力,如果數(shù)據(jù)庫響應(yīng)變慢,處理消息的速度就會變慢,起不到緩解消息積壓的作用。
面試官:新增加了消費(fèi)者后,怎么給它分配 MessageQueue 呢?
我:Consumer 在拉取消息之前,需要對 MessageQueue 進(jìn)行負(fù)載操作。RocketMQ 使用一個(gè)定時(shí)器來完成負(fù)載操作,默認(rèn)每間隔 20s 重新負(fù)載一次。
面試官:能詳細(xì)說一下都有哪些負(fù)載策略嗎?
我:RocketMQ 提供了 6 種負(fù)載策略,依次來看一下。
平均負(fù)載策略:
- 把消費(fèi)者進(jìn)行排序;
- 計(jì)算每個(gè)消費(fèi)者可以平均分配的 MessageQueue 數(shù)量;
- 如果消費(fèi)者數(shù)量大于 MessageQueue 數(shù)量,多出的消費(fèi)者就分不到;
- 如果不可以平分,就使用 MessageQueue 總 數(shù)量對消費(fèi)者數(shù)量求余數(shù) mod;
- 對前 mod 數(shù)量消費(fèi)者,每個(gè)消費(fèi)者加一個(gè),這樣就獲取到了每個(gè)消費(fèi)者分配的 MessageQueue 數(shù)量。
比如 4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況:

源代碼的邏輯非常簡單,如下:
// AllocateMessageQueueAveragely 這個(gè)類// 4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況,假如第一個(gè),index = 0int index = cidAll.indexOf(currentCID);// mod = 1int mod = mqAll.size() % cidAll.size();// averageSize = 2int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());// startIndex = 0int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// range = 2,所以第一個(gè)消費(fèi)者分配到了2個(gè)int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size()));}
循環(huán)分配策略:
這個(gè)很容易理解,遍 歷 消費(fèi)者,把 MessageQueue 分一個(gè)給遍歷到的消費(fèi)者,如果 MessageQueue 數(shù)量比消費(fèi)者多,需要進(jìn)行多次遍歷,遍歷次數(shù)等于 (MessageQueue 數(shù)量/消費(fèi)者數(shù)量),還是以 4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況,如下圖:

源代碼如下:
//AllocateMessageQueueAveragelyByCircle 這個(gè)類//4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況,假如第一個(gè),index = 0int index = cidAll.indexOf(currentCID);for (int i = index; i < mqAll.size(); i++) { if (i % cidAll.size() == index) { //i == 0 或者 i == 3 都會走到這里 result.add(mqAll.get(i)); }}
自定義分配策略:
這種策略在消費(fèi)者啟動的時(shí)候可以指定消費(fèi)哪些 MessageQueue??梢詤⒖枷旅娲a:
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();//綁定消費(fèi) messageQueue1allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1","broker1",0)));consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);consumer.start();
按照機(jī)房分配策略:
這種方式 Consumer 只消費(fèi)指定機(jī)房的 MessageQueue,如下圖:Consumer0、Consumer1、Consumer2 綁定 room1 和 room2 這兩個(gè)機(jī)房,而 room3 這個(gè)機(jī)房沒有消費(fèi)者。

Consumer 啟動的時(shí)候需要綁定機(jī)房名稱??梢詤⒖枷旅娲a:
AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();//綁定消費(fèi) room1 和 room2 這兩個(gè)機(jī)房allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1","room2")));consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);consumer.start();
這種策略 broker 的命名必須按照格式:機(jī)房名@brokerName,因?yàn)橄M(fèi)者分配隊(duì)列的時(shí)候,首先按照機(jī)房名稱過濾出所有的 MessageQueue,然后再按照平均分配策略進(jìn)行分配。
//AllocateMessageQueueByMachineRoom 這個(gè)類List<MessageQueue> premqAll = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) { String[] temp = mq.getBrokerName().split("@"); if (temp.length == 2 && consumeridcs.contains(temp[0])) { premqAll.add(mq); }}//上面按照機(jī)房名稱過濾出所有的 MessageQueue 放入premqAll,后面就是平均分配策略
按照機(jī)房就近分配:
跟按照機(jī)房分配原則相比,就近分配的好處是可以對沒有消費(fèi)者的機(jī)房進(jìn)行分配。如下圖,機(jī)房 3 的 MessageQueue 也分配到了消費(fèi)者:

如果一個(gè)機(jī)房沒有消費(fèi)者,則會把這個(gè)機(jī)房的 MessageQueue 分配給集群中所有的消費(fèi)者。
源碼所在類:AllocateMachineRoomNearby。
一致性 Hash 算法策略:
把所有的消費(fèi)者經(jīng)過 Hash 計(jì)算分布到 Hash 環(huán)上,對所有的 MessageQueue 進(jìn)行 Hash 計(jì)算,找到順時(shí)針方向最近的消費(fèi)者節(jié)點(diǎn)進(jìn)行綁定。如下圖:

源代碼如下:
//所在類 AllocateMessageQueueConsistentHashCollection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) { cidNodes.add(new ClientNode(cid));}//使用消費(fèi)者構(gòu)建 Hash 環(huán),把消費(fèi)者分布在 Hash 環(huán)節(jié)點(diǎn)上final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}//對 MessageQueue 做 Hash 運(yùn)算,找到環(huán)上距離最近的消費(fèi)者List<MessageQueue> results = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { results.add(mq); }}
面試官:恭喜你,通過了。