這一講分析的是RocketMQ的消費(fèi)者,下一講分析broker。從第三講開始,主要是對照源碼來分析,所以這次的分享不僅僅是講消費(fèi)者的邏輯和消費(fèi)過程,還整理了client端(生產(chǎn)者和消費(fèi)者都是client)的接口圖,啟動(dòng)之后消費(fèi)端的狀態(tài)。
讀官方文檔,讀RocketMQ相關(guān)的書,還有查閱網(wǎng)上相關(guān)的一些博客,別人分析的再系統(tǒng),再有條理,從再多的維度去分析,看再多變,我總是覺得不是很清晰,理解的不夠透徹,因?yàn)樯婕暗降倪壿嬏嗔?,過幾天就忘了。所以在學(xué)習(xí)的過程中,必須整理出自己理解,雖不能面面俱到,但因?yàn)槭亲约旱睦斫?,就不?huì)忘記。
client接口和類圖
client端是依賴remoting模塊的,所以把remoting模塊的一些接口圖也整合進(jìn)來了。
remoting接口

client接口

主要接口圖

接口依賴關(guān)系圖
從下面這張圖中可以清晰的看出,client只用的了remoting接口的RemotingClient。RemotingServer接口在Namesrv和Broker中用到了。
接口總結(jié)
RocketMQ的設(shè)計(jì)中,分了三層接口:
用戶接口,實(shí)現(xiàn)接口,remoting接口。用戶接口是開箱即用的,有默認(rèn)的實(shí)現(xiàn)類,實(shí)現(xiàn)類是把用戶的配置,業(yè)務(wù)邏輯,配置文件信息,在啟動(dòng)的時(shí)候,初始化進(jìn)去。核心的業(yè)務(wù)邏輯在實(shí)現(xiàn)接口的實(shí)現(xiàn)類里。
實(shí)現(xiàn)接口的實(shí)現(xiàn)類持有MQClientInstance,MQClientInstance通過MQClientAPIImpl和remoting接口層打交道。MQClientInstance,MQClientAPIImpl和MQAdminImpl都沒有接口,MQAdminImpl持有MQClientInstance,訪問remoting接口也是通過MQClientAPIImpl類。
RocketMQ消息消費(fèi)概述
消息消費(fèi)以組的模式開展,一個(gè)消費(fèi)組內(nèi)可以包含多個(gè)消費(fèi)者,每一個(gè)消費(fèi)組可訂閱多個(gè)主題,消費(fèi)組之間有集群模式和廣播模式兩種消費(fèi)模式。
集群模式,主題下的同一條消息只允許被其中一個(gè)消費(fèi)者消費(fèi)。廣播模式,主題下的同一條消息將被集群內(nèi)的所有消費(fèi)者消費(fèi)一次。消息服務(wù)器與消費(fèi)者之間的消息傳送也有兩種方式:推模式、拉模式。所謂的拉模式,是消費(fèi)端主動(dòng)發(fā)起拉消息請求,而推模式是消息到達(dá)消息服務(wù)器后,推送給消息消費(fèi)者RocketMQ消息推模式的實(shí)現(xiàn)基于拉模式,在拉模式上包裝一層,一個(gè)拉取任務(wù)完成后開始下一個(gè)拉取任務(wù)。
集群模式下,多個(gè)消費(fèi)者如何對消息隊(duì)列進(jìn)行負(fù)載?消息隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想:一個(gè)消息隊(duì)列同一時(shí)間只允許被同一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多少個(gè)消息隊(duì)列。
消息消費(fèi)者初探
推模式的消費(fèi)者M(jìn)QPushConsume

實(shí)現(xiàn)類DefaultMQPushConsumer


DefaultMQPushConsumerImpl分析
- DefaultMQPushConsumerImpl :消息消息者默認(rèn)實(shí)現(xiàn)類,應(yīng)用程序中直接用該類的實(shí)例完成消息的消費(fèi),并回調(diào)業(yè)務(wù)方法。

RebalanceImpl :字面上的意思(重新平衡)也就是消費(fèi)端消費(fèi)者與消息隊(duì)列的重新分布,與消息應(yīng)該分配給哪個(gè)消費(fèi)者消費(fèi)息息相關(guān)。
MQClientInstance: 消息客戶端實(shí)例,負(fù)載與MQ服務(wù)器(Broker,Nameserver)交互的網(wǎng)絡(luò)實(shí)現(xiàn)
PullAPIWrapper: pull與Push在RocketMQ中,其實(shí)就只有Pull模式,所以Push其實(shí)就是用pull封裝一下
MessageListenerInner: 消費(fèi)消費(fèi)回調(diào)類,當(dāng)消息分配給消費(fèi)者消費(fèi)時(shí),執(zhí)行的業(yè)務(wù)代碼入口
OffsetStore: 消息消費(fèi)進(jìn)度保存
ConsumeMessageService: 消息消費(fèi)邏輯
消費(fèi)者啟動(dòng)流程
- 構(gòu)建主題訂閱信息SubscriptionData并加入到RebalanceImpl的訂閱消息中
- 初始化MQClientInstance、RebalanceImple等
- 初始化消息進(jìn)度。如果消費(fèi)時(shí)集群模式,那么消息進(jìn)度保存在Broker上;如果是廣播模式,那么消息進(jìn)度存儲(chǔ)在消費(fèi)端。
- 根據(jù)是否是順序消費(fèi),創(chuàng)建消費(fèi)端線程服務(wù)
- 向MQClientInstance注冊消費(fèi)者
啟動(dòng)流程圖

啟動(dòng)分析
DefaultMQPushConsumer
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
consumer.setNamesrvAddr("localhost:9876");
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
## start方法
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
關(guān)鍵部分
ConsumeMessageService.png
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
this.consumeMessageService.start();
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
mQClientFactory.start();
MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
沒有消息的時(shí)候

有消息的時(shí)候
image.png

啟動(dòng)后線程
- PullMessageService
- RebalanceService
- 定時(shí)任務(wù)線程MQClientFactoryScheduledThread和PullMessageServiceScheduledThread
- Netty相關(guān)線程,NettyClientPublicExecutor,NettyClientWorkerThread,NettyClientSelector
- 消費(fèi)者線程ConsumeMessageThread
消息拉取
基于PUSH模式來詳細(xì)分析消息拉取機(jī)制。
從MQClientInstance的啟動(dòng)流程中可以看出,RocketMQ使用一個(gè)單獨(dú)的線程PullMessageService來負(fù)責(zé)消息的拉取。
PullMessageService實(shí)現(xiàn)機(jī)制

ProcessQueue實(shí)現(xiàn)機(jī)制
ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按照消息的隊(duì)列偏移量順序存放在ProcessQueue中。PullMessageService然后將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后從ProcessQueue中移除。
ProcessQueue核心屬性

ProcessQueue核心方法

消息拉取基本流程
- 消息拉取客戶端消息拉取請求封裝
- 消息服務(wù)器查找并返回消息
- 消息拉取客戶端處理返回的消息