RocketMQ源碼分析----長輪詢

廢話

這篇文章主要講RocketMQ的長輪詢,為什么叫長輪詢我也不清楚....主要別人這樣叫我也這樣叫吧,大家明白意思就好。

正文

RcocketMQ消費者的模式是pull模式,也就是會定時向Broker請求消息進(jìn)行消費。在源碼中實現(xiàn)是開啟了后臺線程不停的去pull(當(dāng)然會先從隊列去PullRequest,隊列為空會阻塞),剛研究RocketMQ消費者pull的代碼之后不久,有個問題:

  • 如果長時間沒有消息,消費者不停的去請求那不就會導(dǎo)致broker負(fù)載很高嗎

當(dāng)時沒有想太多這個問題,后來才發(fā)現(xiàn)的,那么我們先回顧一下Broker在沒有消息的時候是怎么處理的,首先當(dāng)然是先獲取消息了(下面代碼在PullMessageProcessor.java中)

// ....省略其他代碼
final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
// ....省略其他代碼

然后看下對Result的判斷

switch (getMessageResult.getStatus()) {
                // ....省略其他代碼
                case NO_MATCHED_LOGIC_QUEUE:
                case NO_MESSAGE_IN_QUEUE:
                    if (0 != requestHeader.getQueueOffset()) {
                    // ....省略其他代碼
                    } else {
                        response.setCode(ResponseCode.PULL_NOT_FOUND);
                    }
                    break;
                case OFFSET_FOUND_NULL:
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                    break;
                case OFFSET_OVERFLOW_ONE:
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                    break;

            }

上面是沒有找到消息的情況,response是要相應(yīng)給消費者的,這是為特定的code。
下面看下對response的code的判斷

            switch (response.getCode()) {
                case ResponseCode.PULL_NOT_FOUND:
                    if (brokerAllowSuspend && hasSuspendFlag) {
                        // 如果broker開啟了長輪詢,則將長輪詢時間設(shè)置為30s(消費者傳過來的,默認(rèn)30s),否則設(shè)置為1s
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }

                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        // 將這次請求的信息包括channel全部封裝到PullRequest,并保存到pullRequestTable,即把當(dāng)前的request hold住
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                this.brokerController.getMessageStore().now(), offset, subscriptionData);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }
            }

看到response = null了吧,如果是null,就不會給消費者響應(yīng),那么消費者就不能執(zhí)行相應(yīng)的回調(diào)方法了。
那么這時候又有兩個問題:

  • 給或者不給消費者響應(yīng)對消費者有什么影響?
  • 此時不給響應(yīng),那么什么時候會給消費者響應(yīng)?

先看下第一個問題,第一個問題的答案在MQClientAPIImpl的pullMessageAsync(由于源碼里寫的異步,所以調(diào)用的該異步方法,同步的情況就不一樣了)

        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    // ....
                }
            }
        });

response==null的在這次的討論范圍,先看下響應(yīng)了會怎樣,會調(diào)用回調(diào)方法pullcallback,傳入Response,這個定義在DefaultMQPushConsumerImpl的pullMessage方法中

// 省略其他代碼
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }
                    break;
                case NO_NEW_MSG:
                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case NO_MATCHED_MSG:
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
    }
};

如果收到了響應(yīng),到最后會調(diào)用executePullRequestImmediately或者executePullRequestLater(底層也是調(diào)用executePullRequestImmediately,只不過是過一會采取執(zhí)行),其作用就是將pullRequest放回隊列,嘿嘿,那么第一個問題答案就出來了:

  • 如果是成功,那么當(dāng)然是沒問題,我消費完成,把pullRequest放回隊列,后臺線程會從隊列取出來繼續(xù)請求下一個消息;
  • 如果是失敗,由于Broker沒有返回,所以,自然執(zhí)行不到這里,那么后臺線程還是阻塞狀態(tài)知道返回失敗,把隊列放回去,才進(jìn)行下一次嘗試

那么接下來就是看下第二個問題的答案是什么了,這個問題嘛,需要先看下《ConsumeQueue介紹和其構(gòu)建過程》這篇文章,在ConsumeQueue定時構(gòu)建過程中,有幾句代碼

    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode());
    }

ConsumeQueue在CommitLog有消息寫入的時候(即有Producer發(fā)送了消息),會進(jìn)行構(gòu)建,就會調(diào)用上面那行代碼,那么這個arriving方法到底干了什么,我們進(jìn)去看看,核心是調(diào)用了PullRequestHoldService的notifyMessageArriving方法

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
                    }

                    Long tmp = tagsCode;
                    if (newestOffset > request.getPullFromThisOffset()) {
                        if (tagsCode == null) {
                            // tmp = getLatestMessageTagsCode(topic, queueId,
                            // maxOffset);
                        }
                        if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
                            try {
                                this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
                                        request.getRequestCommand());
                            } catch (RemotingCommandException e) {
                                log.error("", e);
                            }
                            continue;
                        }
                    }
                    // 如果已經(jīng)超過限制了,那么再次重試,如果還是失敗,那么直接返回
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                        } catch (RemotingCommandException e) {
                            log.error("", e);
                        }
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }

邏輯很簡單,先從pullRequestTable中取出對于topic+qid下hold住的PullRequest,如果tag是相符的,那么調(diào)用PullMessageProcessor.this.processRequest模擬broker處理消費者的請求消息,并做回響應(yīng),這里進(jìn)行調(diào)用如果還是查不到消息,那么不會再hold住Request,直接返回

看到這,第二個問題也得到了答案,整個長輪詢的原理也清楚了,總結(jié)一下:

  • 消費者會不停的從PullRequest的隊列里取request然后想broker請求消息,得到broker的響應(yīng)后會做相應(yīng)處理并把PullRequest放回隊列以便下一次請求
  • broker在查不到消息的情況下會hold住請求,在ReputMessageService不停構(gòu)建ConsumeQueue的時候,會拿出hold住的請求進(jìn)行二次處理
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,537評論 19 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,897評論 13 425
  • 這是我在簡書上寫的第一篇文章,平日里構(gòu)思了很久,但卻從未有過落筆。今晚喝了一小點兒酒,來深度剖析一下自己。 我是9...
    張水貨閱讀 405評論 0 0
  • 且把柴扉掩,且偷半日閑。 笑將紙筆設(shè),飛白醉書澤。 小品非小品,也是也是集。 浮生人間夢,為歡天上音。 2013/...
    弗離閱讀 178評論 0 0
  • 文/孟小滿 2017.11.2 星期四 晴 首先,自我介紹一下:我是...
    孟小滿閱讀 2,502評論 195 167

友情鏈接更多精彩內(nèi)容