RocketMQ源碼解析——存儲(chǔ)部分(8)操作消息相關(guān)日志的中介DefaultMessageStore

@[toc]

作用

前面介紹了RocketMQ的一些主要的日志文件,CommitLog,ConsumeQueueIndexFile的結(jié)構(gòu)和存儲(chǔ)操作原理。這些文件的處理類都在不同的類中處理的。RocketMQ中提供了DefaultMessageStore來(lái)對(duì)這些類進(jìn)行一個(gè)封裝聚合和額外的擴(kuò)展。比如過期消息的清理,新消息的保存,消息的查詢,消息的刷盤等。除此之外也提供一些服務(wù)啟動(dòng)時(shí)候的一些邏輯,比如從磁盤加載元數(shù)據(jù),服務(wù)停止時(shí)候的消息保存邏輯等。

分析

構(gòu)造方法DefaultMessageStore

?DefaultMessageStore的構(gòu)造方法,主要是給該類的一些成員變量進(jìn)行賦值或者初始化。這些成員變量都是DefaultMessageStore間接操作CommitLog文件或者是ConsumeQueue的對(duì)象以及持久化等相關(guān)的操作類。

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        //消息到達(dá)監(jiān)聽器,這個(gè)跟PullRequestHoldService一起使用的,消息到達(dá)后調(diào)用PullRequestHoldService中的notifyMessageArriving方法
        this.messageArrivingListener = messageArrivingListener;
        //broker配置
        this.brokerConfig = brokerConfig;
        //消息存儲(chǔ)配置
        this.messageStoreConfig = messageStoreConfig;
        //broker狀態(tài)管理類,統(tǒng)計(jì)broker相關(guān)信息,比如消息數(shù)量,topic數(shù)量等
        this.brokerStatsManager = brokerStatsManager;
        //提前主動(dòng)申請(qǐng)內(nèi)存文件服務(wù)類,用于CommitLog
        this.allocateMappedFileService = new AllocateMappedFileService(this);
        //
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        //管理ConsumeQueue文件的集合
        this.consumeQueueTable = new ConcurrentHashMap<>(32);
        //將ConsumeQueue邏輯隊(duì)列刷盤的服務(wù)類,1秒刷一次,最多嘗試3次,
        this.flushConsumeQueueService = new FlushConsumeQueueService();
        //清理物理文件服務(wù),定期清理72小時(shí)之前的物理文件。
        this.cleanCommitLogService = new CleanCommitLogService();
        //清理邏輯文件服務(wù),定期清理在邏輯隊(duì)列中的物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù),同時(shí)也清理Index中物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù)
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        //存儲(chǔ)層內(nèi)部統(tǒng)計(jì)服務(wù)
        this.storeStatsService = new StoreStatsService();
        //index文件的存儲(chǔ)
        this.indexService = new IndexService(this);
        //用于commitlog數(shù)據(jù)的主備同步
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService = new HAService(this);
        } else {
            this.haService = null;
        }
        //用于轉(zhuǎn)發(fā)CommitLog中消息到ConsumeQueue中
        this.reputMessageService = new ReputMessageService();
        //用于監(jiān)控延遲消息,并到期后執(zhí)行,吧延遲消息寫入CommitLog
        this.scheduleMessageService = new ScheduleMessageService(this);
        //臨時(shí)日志文件存儲(chǔ)池
        this.transientStorePool = new TransientStorePool(messageStoreConfig);

        if (messageStoreConfig.isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }
        //啟動(dòng)allocateMappedFileService
        this.allocateMappedFileService.start();
        //啟動(dòng)indexService
        this.indexService.start();
        //消息轉(zhuǎn)存任務(wù)的集合
        this.dispatcherList = new LinkedList<>();
        //負(fù)責(zé)把CommitLog消息轉(zhuǎn)存到ConsumeQueue文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        //負(fù)責(zé)吧CommitLog消息轉(zhuǎn)存到Index文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
        //在保存日志數(shù)據(jù)文件的根目錄 {user.home}/store 路徑下 創(chuàng)建一個(gè)一個(gè)名字為lock 的文件
        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
        MappedFile.ensureDirOK(file.getParent());
        //創(chuàng)建根目錄文件的隨機(jī)文件流,權(quán)限是讀寫
        lockFile = new RandomAccessFile(file, "rw");
    }

?這里對(duì)一些重要的變量進(jìn)行講解

變量對(duì)象 作用描述
messageArrivingListener 消息到達(dá)監(jiān)聽器,這個(gè)跟PullRequestHoldService一起使用的,消息到達(dá)后調(diào)用PullRequestHoldService中的notifyMessageArriving方法 ,在push模式下會(huì)調(diào)用
brokerConfig broker配置
messageStoreConfig 消息存儲(chǔ)相關(guān)配置
brokerStatsManager broker狀態(tài)管理類,統(tǒng)計(jì)broker相關(guān)信息,比如消息數(shù)量,topic數(shù)量等
allocateMappedFileService 提前主動(dòng)申請(qǐng)內(nèi)存文件服務(wù)類,用于CommitLog
commitLog CommitLog日志文件的生成和處理類,根據(jù)不同的情況可能是普通的CommitLog頁(yè)可能是基于Raft協(xié)議擴(kuò)展實(shí)現(xiàn)的CommitLog
consumeQueueTable 管理ConsumeQueue文件的集合
flushConsumeQueueService 將ConsumeQueue邏輯隊(duì)列刷盤的服務(wù)類,1秒刷一次,最多嘗試3次,
cleanCommitLogService 清理物理文件CommitLog服務(wù),定期清理72小時(shí)之前的物理文件?;蛘逤ommitLog日志文件的在磁盤占比達(dá)到了75以上就會(huì)進(jìn)行清理
cleanConsumeQueueService 清理邏輯文件服務(wù),定期清理在邏輯隊(duì)列中的物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù),同時(shí)也清理Index中物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù)
storeStatsService 存儲(chǔ)層內(nèi)部統(tǒng)計(jì)服務(wù)
indexService index文件的存儲(chǔ)和讀取類
haService 高可用的服務(wù)類,如果是基于Raft協(xié)議的高可用那么這個(gè)類就是null
reputMessageService 用于轉(zhuǎn)發(fā)CommitLog中消息到ConsumeQueue中
scheduleMessageService 用于監(jiān)控延遲消息,并到期后執(zhí)行,吧延遲消息真實(shí)的信息寫入CommitLog
transientStorePool 臨時(shí)日志文件存儲(chǔ)池,在CommitLog使用的臨時(shí)存儲(chǔ)池的時(shí)候會(huì)用到
dispatcherList 消息轉(zhuǎn)存任務(wù)的集合 , 用于把CommitLog的消息日志同時(shí)轉(zhuǎn)存一份到ConsumeQueue中和Index中,用于構(gòu)建索引和消費(fèi)隊(duì)列

內(nèi)部方法分析

?這里分析Broker按照啟動(dòng)的前后順序來(lái)進(jìn)行分析。

Broker啟動(dòng)后服務(wù)于日志相關(guān)的類啟動(dòng)的start方法

?Broker在啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)DefaultMessageStore,利用前面說(shuō)的構(gòu)造器進(jìn)行創(chuàng)建,然后會(huì)調(diào)用start方法對(duì)成員變量中的一些類進(jìn)行啟動(dòng)。

   public void start() throws Exception {
        //獲取lock文件的唯一FileChannel對(duì)象,然后獲取文件鎖
        lock = lockFile.getChannel().tryLock(0, 1, false);
        //如果鎖是null 或者 鎖是共享類型的 或者 鎖不是有效的,則拋出異常
        if (lock == null || lock.isShared() || !lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }
        //寫入lock 四個(gè)字符到lock文件
        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
        lockFile.getChannel().force(true);
        {
            /**
             * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
             * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
             * 3. Calculate the reput offset according to the consume queue;
             * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
             */
            //獲取CommitLog的最小物理偏移量
            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
            //比較ConsumeQueue文件記錄的最大物理偏移量和實(shí)際的物理偏移量的大小,取ConsumeQueue文件記錄的
            for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    //如果ConsumeQueue記錄的最大物理偏移量 大于 CommitLog的最小偏移量,則選用ConsumeQueue記錄的偏移量,
                    // 因?yàn)镃onsumeQueue記錄下來(lái)的表示已經(jīng)換發(fā)到了ConsumeQueue了,不需要處理,只需要處理沒有轉(zhuǎn)發(fā)的
                    if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                    }
                }
            }
            //記錄的小于0,則置位0
            if (maxPhysicalPosInLogicQueue < 0) {
                maxPhysicalPosInLogicQueue = 0;
            }
            /**
             *  如果邏輯文件記錄的物理偏移量  小于 實(shí)際的最小物理偏移量,則置位實(shí)際的。
             *   1. 如果有人移除了ConsumeQueue文件 或者 保存文件的磁盤損壞了
             *   2. 從別的broker上面復(fù)制CommitLog到一個(gè)新啟動(dòng)的broker機(jī)器中
             *  這幾種情況都可能使得邏輯日志ConsumeQueue文件記錄的最小物理偏移量為0
             *
             */
            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
                /**
                 * This happens in following conditions:
                 * 1. If someone removes all the consumequeue files or the disk get damaged.
                 * 2. Launch a new broker, and copy the commitlog from other brokers.
                 *
                 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
                 */
                log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
            }
            log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
            //啟動(dòng)reputMessageService 來(lái)按照CommitLog 生成ConsumeQueue
            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
            this.reputMessageService.start();

            /**
             *  1. Finish dispatching the messages fall behind, then to start other services.
             *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
             */
            //等待同步完成
            while (true) {
                if (dispatchBehindBytes() <= 0) {
                    break;
                }
                Thread.sleep(1000);
                log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
            }
            this.recoverTopicQueueTable();
        }
        //如果沒有使用DLegerCommitLog,就使用HaService來(lái)做高可用
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService.start();
            //如果是master,則啟動(dòng)延遲消息的檢測(cè)任務(wù)
            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
        }
        //刷新ConsumeQueue的服務(wù)啟動(dòng)
        this.flushConsumeQueueService.start();
        //CommitLog刷新的服務(wù)啟動(dòng)
        this.commitLog.start();
        //存儲(chǔ)狀態(tài)檢測(cè)的服務(wù)啟動(dòng)
        this.storeStatsService.start();
        //創(chuàng)建臨時(shí)文件,來(lái)表示是否正常關(guān)機(jī)
        this.createTempFile();
        //啟動(dòng)其他服務(wù)。比如清除過期日志的服務(wù)等
        this.addScheduleTask();
        this.shutdown = false;
    }

?這里發(fā)現(xiàn)有很大的一段邏輯實(shí)在判斷CommitLog文件的最小物理偏移量和ConsumeQueue記錄的最大物理移量的比較,這里比較的原因是。reputMessageService這個(gè)類,會(huì)對(duì)CommitLog中的消息進(jìn)行轉(zhuǎn)存到ConsumeQueue中。這里可以參考前面的ConsumeQueue的分析的文章CommitLog的分析文章。還有一些別的任務(wù)也會(huì)同時(shí)啟動(dòng),這里就不仔細(xì)分析。

Broker啟動(dòng)加載文件的 load方法

?Broker在啟動(dòng)后,會(huì)對(duì)機(jī)器中的可能存在的日志先關(guān)的文件比如CommitLog,ConsumeQueue,IndexFile等先進(jìn)行恢復(fù)處理。這個(gè)恢復(fù)的處理邏輯就在 load方法中。

    public boolean load() {
        boolean result = true;

        try {
            //是否存在abort文件,如果存在說(shuō)明上次服務(wù)關(guān)閉時(shí)異常關(guān)閉的
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
            //加載定時(shí)任務(wù)的配置文件,解析延遲級(jí)別配置信息
            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // 加載 Commit Log文件
            result = result && this.commitLog.load();

            // 加載 Consume Queue文件
            result = result && this.loadConsumeQueue();
            //檢查前面3個(gè)文件是不是加載成功
            if (result) {
                //加載成功則繼續(xù)加載checkpoint文件
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                //加載indexFile
                this.indexService.load(lastExitOK);
                //進(jìn)行文件的恢復(fù)邏輯
                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;
    }




    private void recover(final boolean lastExitOK) {
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
        //上次服務(wù)關(guān)閉是不是正常關(guān)閉
        if (lastExitOK) {
            //正常關(guān)閉情況關(guān)閉
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            //異常情況關(guān)閉
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }
        //恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
        this.recoverTopicQueueTable();
    }

?這里可以看到文件的恢復(fù)存在一種情況,就是上次Broker的關(guān)閉時(shí)異常關(guān)閉的情況。這種情況下對(duì)應(yīng)的CommitLog的恢復(fù)與正常情況的恢復(fù)是不一樣的。整體的 load方法邏輯如下:

  1. 檢查abort文件是不是存在,如果存在表示上次是異常關(guān)閉,這個(gè)文件是一個(gè)空文件,在啟動(dòng)之后會(huì)創(chuàng)建,正常關(guān)閉的情況會(huì)刪除掉。
  2. 加載延遲消息相關(guān)的配置,加載 Commit Log文件,加載Consume Queue文件
  3. 如果步驟2成功加載,則加載checkpoint文件,加載indexFile然后進(jìn)行文件的恢復(fù)邏輯
  4. 對(duì)于文件的恢復(fù)邏輯在recover方法中,會(huì)調(diào)用CommitLog類中的方法

?這里有幾個(gè)點(diǎn)沒有進(jìn)行分析,可以看前面的文章。第一個(gè)就是CommitLog文件正?;謴?fù)和異?;謴?fù)相關(guān)邏輯在前面的CommitLog的文章中有分析。第二個(gè)是延遲消息相關(guān)的邏輯在延遲消息的原理ScheduleMessageService中。第三個(gè)IndexFile的邏輯在IndexFile消息索引日志文件相關(guān)的IndexService

存儲(chǔ)消息的putMessage方法

?DefaultMessageStore中的putMessage方法其實(shí)主要是檢查存儲(chǔ)狀態(tài),校驗(yàn)消息和記錄消息存儲(chǔ)的耗時(shí)和次數(shù),主要的邏輯還是在CommitLog類中。這里不對(duì)CommitLog類中的邏輯進(jìn)行分析

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        //檢查存儲(chǔ)狀態(tài),檢查操作系統(tǒng)是不是繁忙
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return new PutMessageResult(checkStoreStatus, null);
        }
        //校驗(yàn)消息的topic長(zhǎng)度和屬性長(zhǎng)度
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
        //計(jì)算耗時(shí)
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        //記錄消息的存儲(chǔ)時(shí)間,分為13個(gè)耗時(shí)區(qū)間段來(lái)存儲(chǔ),每個(gè)區(qū)間段統(tǒng)計(jì)放入的消息個(gè)
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
        //如果消息存儲(chǔ)失敗,則統(tǒng)計(jì)失敗的次數(shù)
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

    private PutMessageStatus checkStoreStatus() {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        //salve  機(jī)器不能存儲(chǔ)消息
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("broke role is slave, so putMessage is forbidden");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        //檢查是不是可寫的狀態(tài)
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                    "the broker's disk is full, write to logic queue error, write to index file error, etc");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        } else {
            this.printTimes.set(0);
        }
        //如果寫入CommitLog時(shí)間超過1m,則認(rèn)為操作系統(tǒng)的pageCache繁忙
        if (this.isOSPageCacheBusy()) {
            return PutMessageStatus.OS_PAGECACHE_BUSY;
        }
        return PutMessageStatus.PUT_OK;
    }
獲取消息的getMessage方法

?獲取消息的邏輯比較長(zhǎng),主要復(fù)雜點(diǎn)在于計(jì)算消息的偏移量信息。主要邏輯如下

  1. 檢查服務(wù)的狀態(tài)
  2. 獲取CommitLog的最大物理偏移量,根據(jù)要獲取的消息的topic和queueId找到對(duì)應(yīng)的ConsumeQueue,并獲取最大邏輯偏移量和最小邏輯偏移量
  3. 根據(jù)獲取的邏輯偏移量進(jìn)行一系列的校驗(yàn),校驗(yàn)通過則根據(jù)ConsumeQueue的消息單元中記錄的消息物理偏移量和傳入的偏移量進(jìn)行對(duì)比尋找
  4. 找到對(duì)應(yīng)的消息的物理偏移量,然后進(jìn)行封裝最后返回
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }
        //檢查服務(wù)的狀態(tài)是否是可讀
        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }

        long beginTime = this.getSystemClock().now();

        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long nextBeginOffset = offset;
        long minOffset = 0;
        long maxOffset = 0;

        GetMessageResult getResult = new GetMessageResult();
        //獲取存儲(chǔ)的消息的最大的物理偏移量
        final long maxOffsetPy = this.commitLog.getMaxOffset();
        //根據(jù)topic和queueId 先找到對(duì)應(yīng)的邏輯日志ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        //如果邏輯日志不為空,獲取最小和最大的邏輯偏移量
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();
            //如果最大邏輯日志為0,說(shuō)明沒有存任何邏輯消息記錄
            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                //消息偏移量小于最小的偏移量
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                //偏移等于最大偏移量,則表示消息溢出了
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                //大于最大邏輯偏移量,則嚴(yán)重溢出
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                //根據(jù)偏移量獲取對(duì)應(yīng)的邏輯消息
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        //計(jì)算最大獲取的消息長(zhǎng)度  = 單個(gè)邏輯單元長(zhǎng)度 * 獲取的消息個(gè)數(shù) = 20*maxMsgNums
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        //是否開啟磁盤降幅
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        //依次獲取消息
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //獲取消息的物理偏移量 在CommitLog中的位置
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            //獲取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //獲取消息的tagcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                            //當(dāng)前消息的CommitLog的偏移量
                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }
                            /**
                             * 檢查獲取的消息是不是在磁盤上
                             * 1. 計(jì)算系統(tǒng)的內(nèi)存大小,然后?? 對(duì)應(yīng)的訪問消息在內(nèi)存中的最大比率參數(shù)(accessMessageInMemoryMaxRatio)得到存儲(chǔ)消息的內(nèi)存大小memory
                             * 2. 比較CommitLog的最大偏移量和當(dāng)前消息的偏移量差,如果小于memory 則在內(nèi)存中獲取消息。否則從磁盤獲取消息
                             */
                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                            /**
                             * 消息是否獲取完畢。如果已經(jīng)獲取的消息的大小,或者消息的數(shù)量,達(dá)到了設(shè)置的上限也會(huì)直接返回
                             * - 從磁盤獲取時(shí)
                             *   maxTransferBytesOnMessageInDisk : 一批次從磁盤獲取消息的最大允許大小
                             *   maxTransferCountOnMessageInDisk :  一次從磁盤獲取消息的最大允許數(shù)量
                             *
                             * - 從內(nèi)存獲取
                             *   maxTransferBytesOnMessageInMemory:一批次從內(nèi)存獲取消息的最大允許大小
                             *   maxTransferCountOnMessageInMemory:一次從內(nèi)存獲取消息的最大允許數(shù)量
                             */
                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                isInDisk)) {
                                break;
                            }
                            //
                            boolean extRet = false, isTagsCodeLegal = true;
                            //檢查是否有ConsumeQueue擴(kuò)展文件,是的則從擴(kuò)展文件獲取
                            if (consumeQueue.isExtAddr(tagsCode)) {
                                extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                if (extRet) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    // can't find ext content.Client will filter messages by tag also.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                        tagsCode, offsetPy, sizePy, topic, group);
                                    isTagsCodeLegal = false;
                                }
                            }
                            //如果有ConsumeQueue對(duì)應(yīng)的過濾器,則進(jìn)行過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                            //從CommitLog獲取消息
                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }
                            //如果有CommitLog先關(guān)的過濾器則進(jìn)行處理
                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                // release...
                                selectResult.release();
                                continue;
                            }
                            //增加消息存儲(chǔ)服務(wù)的相關(guān)消息獲數(shù)量記錄
                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }
                        //開啟磁盤降幅,
                        if (diskFallRecorded) {
                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                            brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                        }
                        //計(jì)算下一個(gè)開始的位置
                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        //當(dāng)前消息的偏移量和最大偏移量的差距
                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        //如果剩余的偏移差  大于 設(shè)置的內(nèi)存百分比的大小 則建議從 salve節(jié)點(diǎn)拉取
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    //如果在ConsumeQueue中沒有對(duì)應(yīng)的消息,那么
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                        + maxOffset + ", but access logic queue failed.");
                }
            }
        } else {
            //如果ConsumeQueue沒有則返回
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }
        //無(wú)論是否找到,都增加對(duì)應(yīng)的記錄
        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long elapsedTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
        //設(shè)置返回的信息
        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

?在DefaultMessageStore中還有一些別的方法,基本都是跟獲取消息有關(guān)系的,但是最最后幾乎都是調(diào)用上面說(shuō)的getMessage方法。因此這里就不在進(jìn)行分析了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容