rocketmq客戶端消費(fèi)流程

rocketmq客戶端消費(fèi)流程

只關(guān)注于集群模式下并發(fā)消費(fèi)的push模式

組件概述

DefaultMQPushConsumerImpl
  • 負(fù)載均衡實(shí)現(xiàn) RebalanceImpl
  • 拉取消息. PullAPIWrapper
  • 消費(fèi)進(jìn)度存儲(chǔ) OffsetStore
  • 消費(fèi)服務(wù) ConsumeMessageService
  • MQClientInstance 客戶端核心實(shí)現(xiàn)
MQClientInstance
  • netty 客戶端 業(yè)務(wù)線程池和回調(diào)線程池隔離
  • 定時(shí)任務(wù)
  • 負(fù)載均衡調(diào)度 RebalanceService
  • 拉消息任務(wù)調(diào)度 pullMessageService
  • 內(nèi)部生產(chǎn)者 defaultMQProducer

MQClientInstance 和 消費(fèi)者為一對(duì)多關(guān)系。使用InstanceName相同的生產(chǎn)者消費(fèi)者都使用同一個(gè)MQClientInstance。

啟動(dòng) DefaultMQPushConsumerImpl.start()

  1. 生成InstanceName,如果用戶未設(shè)置則為pid。

  2. 創(chuàng)建 MQClientInstance,使用InstanceName相同的生產(chǎn)者消費(fèi)者都使用同一個(gè)MQClientInstance。MQClientInstance是客戶端的核心。

    就是說(shuō)一個(gè)MQClientInstance下會(huì)與多個(gè)消費(fèi)者。MQClientInstance統(tǒng)一調(diào)度他們。

this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);

//后面會(huì)將消費(fèi)者注冊(cè)到mQClientFactory,讓mQClientFactory有所有同一InstanceName消費(fèi)者的引用。
boolean registerOK =     mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
         
  1. 為負(fù)載均衡實(shí)現(xiàn)rebalanceImpl 賦值
  2. 創(chuàng)建PullAPIWrapper 負(fù)責(zé)拉取消息
  3. 根據(jù)消費(fèi)模式創(chuàng)建 OffsetStore
   switch (this.defaultMQPushConsumer.getMessageModel()) {
               case BROADCASTING://廣播存儲(chǔ)在本地
                   this.offsetStore =
                           new LocalFileOffsetStore(this.mQClientFactory,
                               this.defaultMQPushConsumer.getConsumerGroup());
                   break;
               case CLUSTERING://集群進(jìn)度存儲(chǔ)在遠(yuǎn)程
                   this.offsetStore =
                           new RemoteBrokerOffsetStore(this.mQClientFactory,
                               this.defaultMQPushConsumer.getConsumerGroup());
                   break;
               default:
                   break;
               }

? OffsetStore 負(fù)責(zé)讀取消費(fèi)進(jìn)度和同步消費(fèi)進(jìn)度

  1. 根據(jù)消費(fèi)模式創(chuàng)建ConsumeMessageService 并啟動(dòng)

    并發(fā)消費(fèi)不啟動(dòng)線程。

    順序消費(fèi)下啟動(dòng)定時(shí)任務(wù),會(huì)調(diào)用消費(fèi)者的RebalanceImpl的lockAll 方法。向broker發(fā)生請(qǐng)求鎖住分配給他的隊(duì)列。

                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
            {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }
  1. 啟動(dòng)MQClientInstance,多消費(fèi)者引用同一個(gè)MQClientInstance時(shí)MQClientInstance只會(huì)啟動(dòng)一次

    mQClientFactory.start();
    
  2. 初始化

    //向nameser 拉取所關(guān)心的topic的路由信息  
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    //向所有路由信息里的所有broker發(fā)送心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //喚醒mQClientFactory的負(fù)責(zé)均衡服務(wù),
    this.mQClientFactory.rebalanceImmediately();
    

啟動(dòng) MQClientInstance.start()

一個(gè)MQClientInstance 只會(huì)啟動(dòng)一次。

1.啟動(dòng)netty 客戶端

this.mQClientAPIImpl.start();//內(nèi)置netty客戶端

2.啟動(dòng)定時(shí)任務(wù)

this.startScheduledTask();//會(huì)啟動(dòng)5個(gè)定時(shí)任務(wù)
  • 從遠(yuǎn)程獲取nameServer地址 發(fā)生變動(dòng)時(shí)可以更新本地nameServer

遠(yuǎn)程地址被寫(xiě)死,暫時(shí)沒(méi)有用。

MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  • 更新topic路由信息,topic路由發(fā)送變動(dòng)時(shí)可以感知

    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    
  • 更新消費(fèi)進(jìn)度到broker 最終調(diào)用 DefaultMQPushConsumerImpl.offsetStore.persistAll

這里可以看出更新消費(fèi)進(jìn)度是異步的,這也是出現(xiàn)重復(fù)消息的原因之一

MQClientInstance.this.persistAllConsumerOffset();
  • 向broker發(fā)送心跳
MQClientInstance.this.cleanOfflineBroker();//清理下線的broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();//發(fā)送心跳
  • 動(dòng)態(tài)調(diào)整線程池 根據(jù)DefaultMQPushConsumer 的 adjustThreadPoolNumsThreshold 參數(shù)和消息在消費(fèi)者內(nèi)部的堆積調(diào)整
MQClientInstance.this.adjustThreadPool(); 
  1. 啟動(dòng)調(diào)度服務(wù)
//拉消息線程
this.pullMessageService.start();
//Start rebalance service
//重負(fù)載線程
this.rebalanceService.start();
//Start push service 內(nèi)部生產(chǎn)者用于消費(fèi)失敗時(shí),發(fā)送重試消息
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

拉消息流程

拉消息的流程是先從負(fù)載均衡開(kāi)始的。MQClientInstance的rebalanceService啟動(dòng)后會(huì)定時(shí)調(diào)用,所有消費(fèi)者的doRebalance 方法。間隔10s

        private static long WaitInterval = 1000 * 10;//間隔10s    
        @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            this.waitForRunning(WaitInterval);//等待
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    public void doRebalance() {
      //調(diào)用所有消費(fèi)者的doRebalance
        for (String group : this.consumerTable.keySet()) {//consumerTable 消費(fèi)者引用
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Exception e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
        //消費(fèi)者最終會(huì)調(diào)用自己的負(fù)載均衡實(shí)現(xiàn)的doRebalance方法
        @Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) { //消費(fèi)者調(diào)用自己的rebalanceImpl
            this.rebalanceImpl.doRebalance();
        }
    }

負(fù)載均衡實(shí)現(xiàn)

先拿到topic路由信息,然后循環(huán)對(duì)topic做負(fù)載

public void doRebalance() {
    //取得關(guān)心等待topic
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //對(duì)topic做負(fù)載
                this.rebalanceByTopic(topic);
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    //當(dāng)topic變動(dòng)時(shí),移除多余topic對(duì)應(yīng)的ProcessQueue
    this.truncateMessageQueueNotMyTopic();
}

負(fù)載分集群和廣播模式,廣播模式不討論

在rocketmq中一個(gè)topic有多個(gè)隊(duì)列。負(fù)載均衡就是將隊(duì)列合理的分配給一個(gè)消費(fèi)組的所有消費(fèi)者。

有多種分配算法,繼承AllocateMessageQueueStrategy,默認(rèn)為AllocateMessageQueueAveragely

//先獲取負(fù)載所需要的參數(shù)
//topic對(duì)應(yīng)的所有隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//topic對(duì)應(yīng)的所有客戶端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

然后調(diào)用,返回的list就是分配給當(dāng)前消費(fèi)者的隊(duì)列

   public List<MessageQueue> allocate(//
                                       final String consumerGroup,//
                                       final String currentCID,//
                                       final List<MessageQueue> mqAll,//
                                       final List<String> cidAll//
    );

而區(qū)分不同客戶端的cidAll 就是每個(gè)客戶端的ip@InstanceName ,使用同一ip下不能有相同的InstanceName。

比如AllocateMessageQueueAveragely有這一行

//取自己在客戶端集合的下標(biāo),如果兩個(gè)客戶端InstanceName相同,那么index都一樣,分配的隊(duì)列也相同
int index = cidAll.indexOf(currentCID);

而這個(gè)負(fù)載算法是沒(méi)有同步和校驗(yàn)等操作的,不同客戶端不會(huì)進(jìn)行通信??蛻舳瞬恢绖e人分配了哪些隊(duì)列。全靠“自覺(jué)”,同一組內(nèi)都使用同一策略那么分配是合理的,如果同一組內(nèi)使用不同策略,隊(duì)列的分配就會(huì)發(fā)生混亂。

拉取任務(wù)

rocketmq為每個(gè)分配給它的隊(duì)列生成一個(gè) 拉取任務(wù) ProcessQueue

將其存儲(chǔ)在PullMessageService 的pullRequestQueue中,這是一個(gè)LinkedBlockingQueue

PullMessageService 啟動(dòng)后會(huì)從堵塞隊(duì)列中取出拉取任務(wù),然后進(jìn)行消息的拉取。

分配隊(duì)列完成后

 //返回隊(duì)列是否發(fā)生了變化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
                

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {//mqSet 分配給當(dāng)前消費(fèi)者的隊(duì)列
        boolean changed = false;
                //存儲(chǔ)上次分配的隊(duì)列和對(duì)應(yīng)的ProcessQueue拉取任務(wù) 
        //processQueueTable 是ConcurrentHashMap
        Iterator<Entry<MessageQueue, ProcessQueue>> it =
          this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {//topic 是否相等
                if (!mqSet.contains(mq)) { //上次分配隊(duì)列,這次沒(méi)分配給我
                    pq.setDropped(true);//禁用拉取任務(wù) 修改dropped屬性。是volatile變量
                    //移除OffsetStore中存儲(chǔ)的隊(duì)列進(jìn)度,移除前先提交進(jìn)度
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", 
                                 consumerGroup, mq);
                    }
                }
         
               //據(jù)上次拉取間隔 120000ms 也移除它
                else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error(
                                        "[BUG]doRebalance, {}, remove unnecessary mq, {},
                                  because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
                //新隊(duì)列 處理
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                //生成拉取任務(wù)
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(new ProcessQueue());
                                //計(jì)算下次拉取的偏移
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    pullRequest.setNextOffset(nextOffset);
                    pullRequestList.add(pullRequest);
                    changed = true;
                    //記錄下 用于下次對(duì)比
                    this.processQueueTable.put(mq, pullRequest.getProcessQueue());
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
                //將拉取任務(wù)壓入堵塞隊(duì)列
        //最終調(diào)用 
        //PullMessageService.executePullRequestImmediately 
        //this.pullRequestQueue.put(pullRequest);
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
拉取消息

現(xiàn)在知道一個(gè)隊(duì)列對(duì)應(yīng)一個(gè)拉取任務(wù)ProcessQueue,存放在堵塞隊(duì)列中,如果禁用了會(huì)將dropped屬性修改為true。

誰(shuí)來(lái)執(zhí)行拉取呢,MQClientInstance.PullMessageService。

PullMessageService 啟動(dòng)后從堵塞隊(duì)列取出拉取任務(wù),找到對(duì)應(yīng)的組調(diào)用pullMessage

PullMessageService 為單線程,所有拉取消息時(shí)為單線程拉取

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
          //從堵塞隊(duì)列中取出1
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                this.pullMessage(pullRequest);
            }
    }

    log.info(this.getServiceName() + " service end");
}

  private void pullMessage(final PullRequest pullRequest) {
        //找到對(duì)應(yīng)的組調(diào)用pullMessage
        final MQConsumerInner consumer = 
          this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //調(diào)用消費(fèi)者的pullMessage,最終調(diào)用pullAPIWrapper.pullKernelImpl
            impl.pullMessage(pullRequest);
        }
    }
DefaultMQPushConsumerImpl.pullMessage

先進(jìn)行限流等檢查,如果不能通過(guò)會(huì)調(diào)用executePullRequestLater() 將任務(wù)放回隊(duì)列,下次消費(fèi)。

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    //提交到定時(shí)任務(wù)中
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {//待會(huì)在放入隊(duì)列
            PullMessageService.this.executePullRequestImmediately(pullRequest);
        }
    }, timeDelay, TimeUnit.MILLISECONDS);
}

也會(huì)檢查是否禁用。正常的任務(wù)拉取完成會(huì)放回隊(duì)列,等待下次拉取。

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {//檢查dropped屬性。volatile修飾
    log.info("the pull request[{}] is droped.", pullRequest.toString());
   //被禁用直接拋棄 沒(méi)被禁用的用完會(huì)放回隊(duì)列
    return;
}

都完成后創(chuàng)建一個(gè)回調(diào)函數(shù) PullCallback,然后異步拉取

因?yàn)榫W(wǎng)絡(luò)層是netty,所以其實(shí)所有請(qǐng)求都是異步。同步的操作只是做了異步轉(zhuǎn)同步而已。

this.pullAPIWrapper.pullKernelImpl(//
    pullRequest.getMessageQueue(), // 1
    subExpression, // 2
    subscriptionData.getSubVersion(), // 3
    pullRequest.getNextOffset(), // 4
    this.defaultMQPushConsumer.getPullBatchSize(), // 5
    sysFlag, // 6
    commitOffsetValue,// 7
    BrokerSuspendMaxTimeMillis, // 8
    ConsumerTimeoutMillisWhenSuspend, // 9
    CommunicationMode.ASYNC, // 10
    pullCallback// 11
    );

請(qǐng)求成功后觸發(fā)回調(diào)函數(shù)。主要看 case FOUND,就可以了。其他代表沒(méi)有新消息,偏移量不對(duì)等

//這里有一個(gè)mq自己實(shí)現(xiàn)的性能統(tǒng)計(jì)。我們?cè)谕獠恳部梢阅玫?consumer.getDefaultMQPushConsumerImpl().getConsumerStatsManager()
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult =
                    DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                        pullRequest.getMessageQueue(), pullResult, subscriptionData);

            switch (pullResult.getPullStatus()) {
            case FOUND:
                long prevRequestOffset = pullRequest.getNextOffset();
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                long pullRT = System.currentTimeMillis() - beginTimestamp;
                //性能統(tǒng)計(jì)
                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                    pullRequest.getConsumerGroup(),         
                  pullRequest.getMessageQueue().getTopic(), pullRT);
                long firstMsgOffset = Long.MAX_VALUE;
                if (pullResult.getMsgFoundList() == null || 
                    pullResult.getMsgFoundList().isEmpty()) {
                  //空消息,放回隊(duì)列
                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                }
                else {
                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                  //性能統(tǒng)計(jì)
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                        pullRequest.getConsumerGroup(), 
                        pullRequest.getMessageQueue().getTopic(),
                        pullResult.getMsgFoundList().size());
                    boolean dispathToConsume = 
                      processQueue.putMessage(pullResult.getMsgFoundList());
               //開(kāi)始消費(fèi)       
               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                        pullResult.getMsgFoundList(), //
                        processQueue, //
                        pullRequest.getMessageQueue(), //
                        dispathToConsume);
            case NO_NEW_MSG:
            case NO_MATCHED_MSG:
            case OFFSET_ILLEGAL:
            default:
                break;
            }
        }
    }
開(kāi)始消費(fèi)

這里有一個(gè)分批消費(fèi)的邏輯,根據(jù)consumeMessageBatchMaxSize拆分

取決于這個(gè)參數(shù)private int consumeMessageBatchMaxSize = 1;

如果設(shè)置大于1那么這批消息消費(fèi)時(shí)只能全部成功或者全部失敗

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
    ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
    this.consumeExecutor.submit(consumeRequest);
}
else {
    for (int total = 0; total < msgs.size();) {
        List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
        for (int i = 0; i < consumeBatchSize; i++, total++) {
            if (total < msgs.size()) {
                msgThis.add(msgs.get(total));
            }
            else {
                break;
            }
        }
                //創(chuàng)建一個(gè)消費(fèi)job
        ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue,
                                                           messageQueue);
        //提交到線程池
        this.consumeExecutor.submit(consumeRequest);
    }
}
//ConsumeRequest 是Runnable的實(shí)現(xiàn)
ConsumeRequest implements Runnable 

ConsumeRequest的run方法

@Override
public void run() {
   //又進(jìn)行了隊(duì)列禁用的校驗(yàn)
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped {}",
            this.messageQueue);
        return;
    }
        //用戶的消費(fèi)listener 實(shí)現(xiàn)
    MessageListenerConcurrently listener =    
    ConsumeMessageConcurrentlyService.this.messageListener;
    //創(chuàng)建Context
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    //這個(gè)Context 用于hook,在4.5的消息追蹤中是借助此hook和Context實(shí)現(xiàn)的
    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext
            .setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer
                .getConsumerGroup());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        //調(diào)用hook:ConsumeMessageHook
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
            .executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();

    try {
        //將重試消息的topic替換為原來(lái)的topic
        ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
        //調(diào)用用戶方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    }
    catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
            RemotingHelper.exceptionSimpleDesc(e),//
            ConsumeMessageConcurrentlyService.this.consumerGroup,//
            msgs,//
            messageQueue);
    }

    long consumeRT = System.currentTimeMillis() - beginTimestamp;

    if (null == status) {//返回null 或者異常設(shè)置為失敗
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",//
            ConsumeMessageConcurrentlyService.this.consumerGroup,//
            msgs,//
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    
    // add by fuhaining@yolo24.com
    if (consumeMessageLog.isInfoEnabled()) {
        StringBuilder keys = new StringBuilder();
        for (MessageExt msg : msgs) {
            keys.append(msg.getMsgId()).append(",");
        }
        consumeMessageLog.info("concurrently - " + status.name() + " : " + 
                               keys.deleteCharAt(keys.length() - 1).toString());
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == 
                                         status);
       //調(diào)用hook
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
            .executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(
        ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
      consumeRT);
    //再次校驗(yàn)
    if (!processQueue.isDropped()) {
        //處理結(jié)果
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    }
    else {
        log.warn(
            "processQueue is dropped without process consume result. messageQueue={}, 
          msgTreeMap={}, msgs={}",
            new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
    }
}
重試消息
//ConsumeMessageConcurrentlyService.processConsumeResult方法   
//在前面會(huì)進(jìn)行性能統(tǒng)計(jì)

        switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING://廣播略過(guò)
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
          (consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //發(fā)送重試消息
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);
                        //發(fā)送失敗的進(jìn)定時(shí)任務(wù),重試
            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
                consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0) { 
   // 將消費(fèi)進(jìn)度提交到OffsetStore
   // OffsetStore 只會(huì)將進(jìn)度記下,由前面說(shuō)的定時(shí)任務(wù)同步給broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

將要重試的消息發(fā)會(huì)broker。只是把原來(lái)的id發(fā)回去。broker在會(huì)根據(jù)id讀取原來(lái)消息的消息體

生成重試消息

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    try {
        //消息原來(lái)存哪,發(fā)會(huì)到哪
        String brokerAddr =(null != brokerName) ? 
          this.mQClientFactory.findBrokerAddressInPublish(brokerName)
          : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
          this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
    }
    catch (Exception e) {
      
     //如果發(fā)送失敗,使用內(nèi)部生產(chǎn)者發(fā)送
      this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }
  
 //  consumerSendMessageBack  方法
  
        ConsumerSendMsgBackRequestHeader requestHeader = new
            ConsumerSendMsgBackRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
        requestHeader.setGroup(consumerGroupWithProjectGroup);
            //原來(lái)的topic
        requestHeader.setOriginTopic(msg.getTopic());
            //原消息的偏移
        requestHeader.setOffset(msg.getCommitLogOffset());
          //重試級(jí)別
        requestHeader.setDelayLevel(delayLevel);
            //記錄原來(lái)的id
        requestHeader.setOriginMsgId(msg.getMsgId());
            //通過(guò)netty發(fā)送
        RemotingCommand response = this.remotingClient.invokeSync(addr, 
        request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
        }

流程總結(jié)

  1. MQClientInstancerebalanceService 線程啟動(dòng)。定時(shí)調(diào)用消費(fèi)者的負(fù)載均衡實(shí)現(xiàn)RebalanceImpldoRebalance方法。

  2. RebalanceImpl根據(jù)負(fù)載策略AllocateMessageQueueStrategy計(jì)算屬于自己的隊(duì)列

  3. 根據(jù)隊(duì)列的變化,生成新的拉取任務(wù) ProcessQueue 或者將原來(lái)的ProcessQueue禁用

  4. 將新的 ProcessQueue放入MQClientInstancePullMessageServicepullRequestQueue這是一個(gè)LinkedBlockingQueue

  5. PullMessageService的線程會(huì)從隊(duì)列中取出,然后調(diào)用對(duì)應(yīng)消費(fèi)者的PullAPIWrapperpullKernelImpl方法發(fā)送請(qǐng)求拉取

  6. 拉取為異步,在回調(diào)中將消息封裝成ConsumeMessageConcurrentlyService.ConsumeRequest任務(wù)提交到ConsumeMessageConcurrentlyService的線程池ScheduledExecutorService

  7. 最終調(diào)用用戶的實(shí)現(xiàn)進(jìn)行消費(fèi)

  8. 將消費(fèi)失敗消息發(fā)回broker生成重試消息

  9. 消費(fèi)成功將進(jìn)度寫(xiě)入消費(fèi)者的OffsetStore 定時(shí)回寫(xiě)broker

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容