RocketMQ源碼解析——存儲(chǔ)部分(7)延遲消息的原理`ScheduleMessageService`

@[toc]

延遲消息描述介紹

RocketMQ的定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后,不會(huì)立即被消費(fèi),等待特定時(shí)間投遞給真正的topic。broker有配置項(xiàng)messageDelayLevel,默認(rèn)值為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,18個(gè)level。可以配置自定義messageDelayLevel。注意,messageDelayLevelbroker的屬性,不屬于某個(gè)topic。發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)。level有以下三種情況:

  • level 為 0,消息為非延遲消息
  • 1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1,延遲1s
  • level > maxLevel,則level== maxLevel,例如level==20,延遲2h

定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId =delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫(xiě)入真實(shí)的topic。

需要注意的是,定時(shí)消息會(huì)在第一次寫(xiě)入和調(diào)度寫(xiě)入真實(shí)topic時(shí)都會(huì)計(jì)數(shù),因此發(fā)送數(shù)量、tps都會(huì)變高。

源碼分析

第一次存儲(chǔ)消息

?第一次存儲(chǔ)延遲消息是在CommitLog的putMessage方法中進(jìn)行的,關(guān)于這部分代碼分析可以看看前面的分析CommitLog文件的文章。這里不重復(fù)分析,只截取部分的代碼片段出來(lái)。

 public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    ......
     //如果不是事務(wù)消息 或者 是事務(wù)消息的提交階段
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 如果設(shè)置了延遲時(shí)間
            if (msg.getDelayTimeLevel() > 0) {
                //延遲級(jí)別不能超過(guò)最大的延遲級(jí)別,超過(guò)也設(shè)置為最大的延遲級(jí)別
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                //設(shè)置延遲消息的topic
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //延遲消息的queueId= 延遲級(jí)別-1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId 備份真正的topic和queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
    ......
 }

?后續(xù)的把消息保存到CommitLog文件的代碼沒(méi)有貼出來(lái),這里是第一次存儲(chǔ)的關(guān)鍵部分。

  1. 這里會(huì)先判斷消息的標(biāo)志位,如果標(biāo)識(shí)位不是事務(wù)消息或者事務(wù)消息的提交階段。
  2. 會(huì)進(jìn)一步判斷是不是設(shè)置了延遲時(shí)間。
  3. 如果設(shè)置的延遲時(shí)間大于最大的延遲時(shí)間則把延遲時(shí)間設(shè)置為最大延遲時(shí)間
  4. 把消息的queueId屬性修改為PROPERTY_REAL_QUEUE_ID,對(duì)應(yīng)的topic屬性設(shè)置為PROPERTY_REAL_TOPIC。同時(shí)把真正的queueIdtopic保存在property屬性中。然后保存到CommitLog。

?在這里可以看到RocketMQ對(duì)于延遲消息,第一次的消息存儲(chǔ),會(huì)把消息的topicqueueId先修改,然后存放到特定的topic中去進(jìn)行保存。

第二次消息存儲(chǔ)

?RocketMQ中有一個(gè)專(zhuān)門(mén)處理topicRMQ_SYS_SCHEDULE_TOPIC的服務(wù)類(lèi)ScheduleMessageService。這個(gè)類(lèi)的初始化是在DefaultMessageStore中會(huì)在RocketMQ的Broker啟動(dòng)的時(shí)候初始化。

初始化延遲文件和配置

?ScheduleMessageService在Broker啟動(dòng)的時(shí)候會(huì)先調(diào)用其load方法,加載delayOffset.json文件然后加載對(duì)應(yīng)的延遲級(jí)別配置。

    public boolean load() {
        //調(diào)用父類(lèi)的加載文件的方法,父類(lèi)會(huì)調(diào)用子類(lèi)實(shí)現(xiàn)的configFilePath方法確定文件
        boolean result = super.load();
        //加載成功則進(jìn)行解析延遲級(jí)別配置
        result = result && this.parseDelayLevel();
        return result;
    }

    @Override
    public String configFilePath() {
        //獲取`delayOffset.json`文件
        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
            .getStorePathRootDir());
    }

    public boolean parseDelayLevel() {
        //不同延遲級(jí)別的基礎(chǔ)時(shí)間長(zhǎng)度,單位為毫秒
        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);
        //獲取延遲配置
        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
        try {
            //進(jìn)行分割解析,分割符號(hào)為空格
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                //獲取對(duì)應(yīng)的延遲毫秒數(shù)
                Long tu = timeUnitTable.get(ch);
                int level = i + 1;
                //尋找最大的延遲等級(jí)
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                //獲取延遲的時(shí)長(zhǎng)
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                //計(jì)算真正的時(shí)長(zhǎng)
                long delayTimeMillis = tu * num;
                //保存到延遲級(jí)別緩存中
                this.delayLevelTable.put(level, delayTimeMillis);
            }
        } catch (Exception e) {
            log.error("parseDelayLevel exception", e);
            log.info("levelString String = {}", levelString);
            return false;
        }

        return true;
    }

?上面就是對(duì)應(yīng)的延遲級(jí)別的解析和配置,從上面可以看到我們可以通過(guò)修改配置文件的方式來(lái)修改RocketMQ的最大延遲時(shí)間和對(duì)應(yīng)的延遲級(jí)別。

處理延遲消息

?ScheduleMessageService是一個(gè)不停運(yùn)行的線程,在start方法中會(huì)不斷的針對(duì)不同延遲級(jí)別的消息進(jìn)行處理

   public void start() {
        //設(shè)置運(yùn)行狀態(tài)為開(kāi)始
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //迭代延遲級(jí)別的緩存
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                //獲取等級(jí)和 延遲時(shí)間長(zhǎng)度
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                //獲取對(duì)應(yīng)延遲級(jí)別的偏移量緩存,這里緩存的是ConsumeQueue文件中的消息的偏移量
                Long offset = this.offsetTable.get(level);
                //如果偏移量為null,說(shuō)明沒(méi)有消息需要處理,這設(shè)置為0
                if (null == offset) {
                    offset = 0L;
                }
                //如果延遲級(jí)別不為null,則構(gòu)建DeliverDelayedMessageTimerTask任務(wù)進(jìn)行處理
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
        ......
    }

?這里主要邏輯就是循環(huán)迭代對(duì)應(yīng)的延遲級(jí)別緩存,然后根據(jù)不同的等級(jí)來(lái)獲取對(duì)應(yīng)的偏移量緩存。然后根據(jù)偏移量和延遲級(jí)創(chuàng)建一個(gè)DeliverDelayedMessageTimerTask進(jìn)一步的處理。這里要說(shuō)明的是offsetTable中存的是消息在ConsumeQueue中的偏移量。關(guān)于這部分的可以看看前面的ConsumeQueue相關(guān)的文章

?這里進(jìn)一步看看DeliverDelayedMessageTimerTask。

  @Override
        public void run() {
            try {
                if (isStarted()) {
                    //執(zhí)行檢查消息是否到時(shí)間的邏輯
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }

?主要邏輯在executeOnTimeup中。這個(gè)方法的邏輯有點(diǎn)長(zhǎng),這里貼出主要的部分,然后進(jìn)行分析

       public void executeOnTimeup() {
            //根據(jù) RMQ_SYS_SCHEDULE_TOPIC 和 延遲級(jí)別 找到對(duì)應(yīng)的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                //根據(jù)傳入的  offset 的從ConsumeQueue中獲取對(duì)應(yīng)的消息信息緩沖,這里獲取到的不是真實(shí)的消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        //獲取額外的信息單元
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        //從buffer中每次獲取20個(gè)byte長(zhǎng)度的信息,因?yàn)镃onsumeQueue的存儲(chǔ)單元大小為20byte
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //獲取消息在CommitLog中的真實(shí)位置
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            //獲取消息的大小
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            //消息對(duì)應(yīng)的tag的hashcode
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
                            //如果額外的信息不為空,則獲取
                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                            tagsCode, offsetPy, sizePy);
                                     //從CommitLog中獲取消息的存儲(chǔ)時(shí)間
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一個(gè)消息單元的偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            //消息的延遲時(shí)間到了
                            if (countdown <= 0) {
                                //鎖定消息
                                MessageExt msgExt =
                                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                                offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //構(gòu)建真正的消息,把真實(shí)的Topic和QueueId恢復(fù)
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                                    msgInner.getTopic(), msgInner);
                                            continue;
                                        }
                                        //把消息保存到CommitLog,等待消費(fèi)
                                        PutMessageResult putMessageResult =
                                                ScheduleMessageService.this.writeMessageStore
                                                        .putMessage(msgInner);
                                        //保存成功,則繼續(xù)處理下一條消息
                                        if (putMessageResult != null
                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                    msgExt.getTopic(), msgExt.getMsgId());
                                            //如果保存失敗,則創(chuàng)建新的延遲任務(wù)。10秒鐘之后重試
                                            ScheduleMessageService.this.timer.schedule(
                                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                            nextOffset), DELAY_FOR_A_PERIOD);
                                            //更新偏移量
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                    nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                      .......
                      //剩余的這些部分的邏輯就是重新構(gòu)建DeliverDelayedMessageTimerTask,等待下個(gè)時(shí)間段的重試
        }

分析一下主要的邏輯步驟:

  1. 根據(jù) topic為RMQ_SYS_SCHEDULE_TOPIC 和 延遲級(jí)別為queueId 找到對(duì)應(yīng)的ConsumeQueue
  2. 根據(jù)傳入的 offset 的從ConsumeQueue中獲取對(duì)應(yīng)的消息信息緩沖,這里獲取到的不是真實(shí)的消息,而是前面分析的重新設(shè)置后的消息
  3. 從buffer中每次獲取20個(gè)byte長(zhǎng)度的信息,因?yàn)镃onsumeQueue的存儲(chǔ)單元大小為20byte。然后根據(jù)Offset從CommitLog中獲取消息的落盤(pán)時(shí)間
  4. 計(jì)算當(dāng)前時(shí)間和落盤(pán)時(shí)間的時(shí)間差,檢查延遲時(shí)間是否到了
  5. 恢復(fù)消息真實(shí)的TopicQueueId然后保存到CommitLog中。等待消息的消費(fèi)
  6. 期間如果有存在失敗的,則重新創(chuàng)建DeliverDelayedMessageTimerTask任務(wù),等待下一次的處理

?這就是整個(gè)延遲消息的處理邏輯。其實(shí)就是先把消息真實(shí)的信息保存在消息屬性中,然后把消息的topic和queueId覆蓋然后保存到延遲消息專(zhuān)用的topic中,其中queueId為延遲級(jí)別。然后等待延遲消息處理的線程處理延遲消息的topic,時(shí)間到了就恢復(fù)消息真實(shí)的topic和queueId然后重新保存到CommitLog中,等待消費(fèi)。
?其中關(guān)于ConsumeQueueCommitLog的消息獲取和保存的邏輯這里沒(méi)有進(jìn)行分析,需要了解的可以看前面的文章
CommitLog文件的文章
ConsumeQueue相關(guān)的文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容