RocketMQ第四講

這一講分析的是RocketMQ的消費(fèi)者,下一講分析broker。從第三講開始,主要是對照源碼來分析,所以這次的分享不僅僅是講消費(fèi)者的邏輯和消費(fèi)過程,還整理了client端(生產(chǎn)者和消費(fèi)者都是client)的接口圖,啟動(dòng)之后消費(fèi)端的狀態(tài)。

讀官方文檔,讀RocketMQ相關(guān)的書,還有查閱網(wǎng)上相關(guān)的一些博客,別人分析的再系統(tǒng),再有條理,從再多的維度去分析,看再多變,我總是覺得不是很清晰,理解的不夠透徹,因?yàn)樯婕暗降倪壿嬏嗔?,過幾天就忘了。所以在學(xué)習(xí)的過程中,必須整理出自己理解,雖不能面面俱到,但因?yàn)槭亲约旱睦斫?,就不?huì)忘記。

client接口和類圖

client端是依賴remoting模塊的,所以把remoting模塊的一些接口圖也整合進(jìn)來了。

remoting接口

Remoting

client接口

clien

主要接口圖

cient_remoting

接口依賴關(guān)系圖

從下面這張圖中可以清晰的看出,client只用的了remoting接口的RemotingClient。RemotingServer接口在Namesrv和Broker中用到了。

client_remoting_d

接口總結(jié)

RocketMQ的設(shè)計(jì)中,分了三層接口:

用戶接口,實(shí)現(xiàn)接口,remoting接口。用戶接口是開箱即用的,有默認(rèn)的實(shí)現(xiàn)類,實(shí)現(xiàn)類是把用戶的配置,業(yè)務(wù)邏輯,配置文件信息,在啟動(dòng)的時(shí)候,初始化進(jìn)去。核心的業(yè)務(wù)邏輯在實(shí)現(xiàn)接口的實(shí)現(xiàn)類里。

實(shí)現(xiàn)接口的實(shí)現(xiàn)類持有MQClientInstance,MQClientInstance通過MQClientAPIImpl和remoting接口層打交道。MQClientInstance,MQClientAPIImpl和MQAdminImpl都沒有接口,MQAdminImpl持有MQClientInstance,訪問remoting接口也是通過MQClientAPIImpl類。

RocketMQ消息消費(fèi)概述

消息消費(fèi)以組的模式開展,一個(gè)消費(fèi)組內(nèi)可以包含多個(gè)消費(fèi)者,每一個(gè)消費(fèi)組可訂閱多個(gè)主題,消費(fèi)組之間有集群模式和廣播模式兩種消費(fèi)模式。

集群模式,主題下的同一條消息只允許被其中一個(gè)消費(fèi)者消費(fèi)。廣播模式,主題下的同一條消息將被集群內(nèi)的所有消費(fèi)者消費(fèi)一次。消息服務(wù)器與消費(fèi)者之間的消息傳送也有兩種方式:推模式、拉模式。所謂的拉模式,是消費(fèi)端主動(dòng)發(fā)起拉消息請求,而推模式是消息到達(dá)消息服務(wù)器后,推送給消息消費(fèi)者RocketMQ消息推模式的實(shí)現(xiàn)基于拉模式,在拉模式上包裝一層,一個(gè)拉取任務(wù)完成后開始下一個(gè)拉取任務(wù)。

集群模式下,多個(gè)消費(fèi)者如何對消息隊(duì)列進(jìn)行負(fù)載?消息隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想:一個(gè)消息隊(duì)列同一時(shí)間只允許被同一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多少個(gè)消息隊(duì)列。

消息消費(fèi)者初探

推模式的消費(fèi)者M(jìn)QPushConsume

image.png

實(shí)現(xiàn)類DefaultMQPushConsumer

image.png
image.png

DefaultMQPushConsumerImpl分析

  • DefaultMQPushConsumerImpl :消息消息者默認(rèn)實(shí)現(xiàn)類,應(yīng)用程序中直接用該類的實(shí)例完成消息的消費(fèi),并回調(diào)業(yè)務(wù)方法。
image.png
  • RebalanceImpl :字面上的意思(重新平衡)也就是消費(fèi)端消費(fèi)者與消息隊(duì)列的重新分布,與消息應(yīng)該分配給哪個(gè)消費(fèi)者消費(fèi)息息相關(guān)。

  • MQClientInstance: 消息客戶端實(shí)例,負(fù)載與MQ服務(wù)器(Broker,Nameserver)交互的網(wǎng)絡(luò)實(shí)現(xiàn)

  • PullAPIWrapper: pull與Push在RocketMQ中,其實(shí)就只有Pull模式,所以Push其實(shí)就是用pull封裝一下

  • MessageListenerInner: 消費(fèi)消費(fèi)回調(diào)類,當(dāng)消息分配給消費(fèi)者消費(fèi)時(shí),執(zhí)行的業(yè)務(wù)代碼入口

  • OffsetStore: 消息消費(fèi)進(jìn)度保存

  • ConsumeMessageService: 消息消費(fèi)邏輯

消費(fèi)者啟動(dòng)流程

  • 構(gòu)建主題訂閱信息SubscriptionData并加入到RebalanceImpl的訂閱消息中
  • 初始化MQClientInstance、RebalanceImple等
  • 初始化消息進(jìn)度。如果消費(fèi)時(shí)集群模式,那么消息進(jìn)度保存在Broker上;如果是廣播模式,那么消息進(jìn)度存儲(chǔ)在消費(fèi)端。
  • 根據(jù)是否是順序消費(fèi),創(chuàng)建消費(fèi)端線程服務(wù)
  • 向MQClientInstance注冊消費(fèi)者

啟動(dòng)流程圖

流程圖

啟動(dòng)分析

DefaultMQPushConsumer

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        consumer.setNamesrvAddr("localhost:9876");
        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");
        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

## start方法
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

defaultMQPushConsumerImpl

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

關(guān)鍵部分
ConsumeMessageService.png

this.consumeMessageService =
   new ConsumeMessageConcurrentlyService(this, 
         (MessageListenerConcurrently) this.getMessageListenerInner());
this.consumeMessageService.start();

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.mQClientFactory = MQClientManager.getInstance()
    .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

mQClientFactory.start();

MQClientInstance

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

沒有消息的時(shí)候

image.png

有消息的時(shí)候

image.png

啟動(dòng)后線程

  • PullMessageService
  • RebalanceService
  • 定時(shí)任務(wù)線程MQClientFactoryScheduledThread和PullMessageServiceScheduledThread
  • Netty相關(guān)線程,NettyClientPublicExecutor,NettyClientWorkerThread,NettyClientSelector
  • 消費(fèi)者線程ConsumeMessageThread

消息拉取

基于PUSH模式來詳細(xì)分析消息拉取機(jī)制。

從MQClientInstance的啟動(dòng)流程中可以看出,RocketMQ使用一個(gè)單獨(dú)的線程PullMessageService來負(fù)責(zé)消息的拉取。

PullMessageService實(shí)現(xiàn)機(jī)制

image.png

ProcessQueue實(shí)現(xiàn)機(jī)制

ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按照消息的隊(duì)列偏移量順序存放在ProcessQueue中。PullMessageService然后將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后從ProcessQueue中移除。

ProcessQueue核心屬性

image.png

ProcessQueue核心方法

image.png

消息拉取基本流程

  1. 消息拉取客戶端消息拉取請求封裝
  2. 消息服務(wù)器查找并返回消息
  3. 消息拉取客戶端處理返回的消息
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容