RocketMQ源碼解析——存儲(chǔ)部分(4)ConsumeQueue邏輯日志文件相關(guān)的`ConsumeQueue`類(lèi)

ConsumeQueue文件講解

概述

?RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的。多個(gè)Topic文件是共用一個(gè)CommitLog文件的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。ConsumeQueue文件的引入的目的主要是提高消息消費(fèi)的性能。

文件結(jié)構(gòu)

?消息消費(fèi)者Consumer可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog(物理消費(fèi)隊(duì)列)中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
?ConsumeQueue文件可以看成是基于topic的CommitLog索引文件,故ConsumeQueue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
?同樣consumequeue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長(zhǎng)度、8字節(jié)tag hashcode,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目,每個(gè)ConsumeQueue文件大小約5.72M。單條記錄結(jié)構(gòu)如下:

在這里插入圖片描述

?消息的起始物理偏移量physical offset(long 8字節(jié))+消息大小size(int 4字節(jié))+tagsCode(long 8字節(jié)),每條數(shù)據(jù)的大小為20個(gè)字節(jié)(這個(gè)很重要,源碼中有用到這個(gè)固定值),從而每個(gè)文件的默認(rèn)大小為600萬(wàn)個(gè)字節(jié)。

ConsumeQueue類(lèi)講解

字段屬性

    private final DefaultMessageStore defaultMessageStore;
    //映射文件隊(duì)列
    private final MappedFileQueue mappedFileQueue;
    //消息的Topic
    private final String topic;
    //消息的queueId
    private final int queueId;
    //指定大小的緩沖,因?yàn)橐粋€(gè)記錄的大小是20byte的固定大小
    private final ByteBuffer byteBufferIndex;
    //保存的路徑
    private final String storePath;
    //映射文件的大小
    private final int mappedFileSize;
    //最后一個(gè)消息對(duì)應(yīng)的物理偏移量  也就是在CommitLog中的偏移量
    private long maxPhysicOffset = -1;
    //最小的邏輯偏移量 在ConsumeQueue中的最小偏移量
    private volatile long minLogicOffset = 0;
    //ConsumeQueue的擴(kuò)展文件,保存一些不重要的信息,比如消息存儲(chǔ)時(shí)間等
    private ConsumeQueueExt consumeQueueExt = null;

?這里比較重要的屬性,topicqueueId,maxPhysicOffset,minLogicOffset。這里對(duì)這幾個(gè)屬性進(jìn)行說(shuō)明一下

屬性 說(shuō)明
topic 文件所屬的topic
queueId 文件所屬的topic下的隊(duì)列id
maxPhysicOffset 最大的物理偏移量,這里指的是CommitLog中的偏移量
minLogicOffset 最小的邏輯偏移量,這里指的是ConsumeQueue中的最小偏移量

?需要分清楚的是ConsumeQueue是消息的邏輯地址文件,CommitLog是消息的物理地址文件。

內(nèi)部方法解析

構(gòu)造方法

?ConsumeQueue只有一個(gè)構(gòu)造方法。

    public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) {
        //指定文件的存儲(chǔ)位置
        this.storePath = storePath;
        //指定文件大小
        this.mappedFileSize = mappedFileSize;
        //指定DefaultMessageStore對(duì)象
        this.defaultMessageStore = defaultMessageStore;
        //存儲(chǔ)指定topic消息
        this.topic = topic;
        //指定指定queueId消息
        this.queueId = queueId;
        //設(shè)置對(duì)應(yīng)的文件路徑,$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
        String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;
        //創(chuàng)建文件映射隊(duì)列
        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
        //創(chuàng)建20個(gè)字節(jié)大小的緩沖
        this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
        //是否啟用消息隊(duì)列的擴(kuò)展存儲(chǔ)
        if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
            //創(chuàng)建一個(gè)擴(kuò)展存儲(chǔ)對(duì)象
            this.consumeQueueExt = new ConsumeQueueExt(
                topic,
                queueId,
                //consumeQueueExt的存儲(chǔ)地址
                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                //todo 設(shè)置消費(fèi)隊(duì)列文件擴(kuò)展大小  默認(rèn)48M
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                //todo 位圖過(guò)濾的位圖長(zhǎng)度
                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
            );
        }
    }

?構(gòu)造方法中沒(méi)有除了設(shè)置字段值之外的額外的邏輯。都是比較簡(jiǎn)單的邏輯,不多進(jìn)行分析。

文件加載load

?load方法調(diào)用也是在RocketMQ的Broker啟動(dòng)的時(shí)候,會(huì)調(diào)用到,用來(lái)加載機(jī)器內(nèi)存中的ConsumeQueue文件

    public boolean load() {
        //從映射文件隊(duì)列加載
        boolean result = this.mappedFileQueue.load();
        log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
        //存在擴(kuò)展存儲(chǔ)則加載
        if (isExtReadEnable()) {
            //消息隊(duì)列擴(kuò)展加載=》
            result &= this.consumeQueueExt.load();
        }
        return result;
    }
服務(wù)重啟時(shí)修復(fù)文件的recover

?RocketMQ在啟動(dòng)時(shí)候,會(huì)去嘗試恢復(fù)服務(wù)器中的ConsumeQueue文件。文件恢復(fù)的邏輯就是通過(guò)檢查每個(gè)消息記錄單元中記錄信息來(lái)判斷這個(gè)記錄是否完整,進(jìn)而分析整個(gè)文件是不是完整,最后對(duì)文件中損壞的記錄進(jìn)行截?cái)?。整體的恢復(fù)邏輯有點(diǎn)長(zhǎng)。這里對(duì)每個(gè)消息單元的分析是基于單個(gè)消息單元的長(zhǎng)度是20個(gè)字節(jié)長(zhǎng)度的原理來(lái)進(jìn)行分析。

    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            //如果文件列表大于3就從倒數(shù)第3個(gè)開(kāi)始,否則從第一個(gè)開(kāi)始
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
            //獲取consumeQueue單個(gè)文件的大小
            int mappedFileSizeLogics = this.mappedFileSize;
            //獲取最后一個(gè)映射文件
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            //映射文件處理的起始偏移量
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                //遍歷文件列表
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
                    //順序解析,每個(gè)數(shù)據(jù)單元隔20個(gè)字節(jié),如果offset跟size大于0則表示有效
                    if (offset >= 0 && size > 0) {
                        //正常數(shù)據(jù)的大小
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        //設(shè)置最大的物理偏移量
                        this.maxPhysicOffset = offset;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }
                //如果已經(jīng)   加載正常數(shù)據(jù)的大小 = 隊(duì)列文件的大小,則表示這個(gè)文件加載完畢
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }
            // 完整的偏移量 = 最后一個(gè)文件的起始偏移量(getFileFromOffset) +  正常數(shù)據(jù)的長(zhǎng)度(mappedFileOffset)
            processOffset += mappedFileOffset;
            //設(shè)置刷新到的 offset位置
            this.mappedFileQueue.setFlushedWhere(processOffset);
            //設(shè)置提交到的 offset位置
            this.mappedFileQueue.setCommittedWhere(processOffset);
            //刪除有效的 offset 之后的文件
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            //如果有擴(kuò)展文件,則恢復(fù)擴(kuò)展文件
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                //映射文件隊(duì)列刪除最大offset的臟數(shù)據(jù)文件=》
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }
根據(jù)時(shí)間獲取消息在隊(duì)列中的邏輯位置getOffsetInQueueByTime
    public long getOffsetInQueueByTime(final long timestamp) {
        //根據(jù)時(shí)間找到映射的文件,文件可以知道最后一次修改的時(shí)間
        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
        if (mappedFile != null) {
            long offset = 0;
            //如果文件的最小偏移量 大于 查找的時(shí)間戳所在的文件的起始偏移量 說(shuō)明對(duì)應(yīng)的消息在這個(gè)文件中。
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            long leftIndexValue = -1L, rightIndexValue = -1L;
            //獲取最小的物理偏移量  也就是CommitLog的最小偏移量
            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
            //獲取文件的內(nèi)容buffer
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
            if (null != sbr) {
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                //計(jì)算文件的最大的數(shù)據(jù)單元的偏移量
                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                try {
                    //用二分法來(lái)獲取更新的時(shí)間戳
                    while (high >= low) {
                        //獲取中間單元
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        //獲取消息的物理偏移量,也就是在commitLog上的偏移量
                        long phyOffset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        //如果小于最小的物理偏移量,則取下一條消息的位置
                        if (phyOffset < minPhysicOffset) {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        }

                        //按物理offset從commitLog中獲取存儲(chǔ)時(shí)間=》
                        long storeTime =
                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        if (storeTime < 0) {
                            return 0;
                        } else if (storeTime == timestamp) {//如果存儲(chǔ)時(shí)間相等就是要找的
                            targetOffset = midOffset;
                            break;
                        } else if (storeTime > timestamp) {//如果存儲(chǔ)時(shí)間大于目標(biāo)時(shí)間,則消息需要往前找
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                            rightIndexValue = storeTime;
                        } else {//如果存儲(chǔ)時(shí)間小于目標(biāo)時(shí)間,則消息需要往后
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            leftIndexValue = storeTime;
                        }
                    }

                    //找到了符合條件的消息的邏輯地址
                    if (targetOffset != -1) {
                        offset = targetOffset;
                    } else {
                        if (leftIndexValue == -1) {

                            offset = rightOffset;
                        } else if (rightIndexValue == -1) {

                            offset = leftOffset;
                        } else {
                            offset =
                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                    - rightIndexValue) ? rightOffset : leftOffset;
                        }
                    }
                    //返回對(duì)應(yīng)的消息
                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                } finally {
                   // 映射文件釋放
                    sbr.release();
                }
            }
        }
        return 0;
    }

?通過(guò)消息的存儲(chǔ)時(shí)間,來(lái)獲取對(duì)應(yīng)的消息在隊(duì)列中的邏輯偏移量,大概的步驟如下:

  1. 根據(jù)傳入的timestamp獲取對(duì)應(yīng)的MappedFile文件,這個(gè)獲取的文件,在前面的MappedFileQueue類(lèi)分析講到過(guò)
  2. 根據(jù)minLogicOffset最小邏輯偏移量和選擇的MappedFile文件的起始偏移量來(lái)確定起始的消息單元
  3. 根據(jù)最小消息單元和最大消息單元(文件的最后一個(gè)消息單元)區(qū)間來(lái)進(jìn)行二分查找對(duì)應(yīng)的消息落盤(pán)時(shí)間和timestamp進(jìn)行對(duì)比,找到合適的消息單元。并返回。
截?cái)噙壿嬑募?code>truncateDirtyLogicFiles
   public void truncateDirtyLogicFiles(long phyOffet) {

        int logicFileSize = this.mappedFileSize;

        this.maxPhysicOffset = phyOffet - 1;
        long maxExtAddr = 1;
        while (true) {
//            獲取映射隊(duì)列中最后的映射文件=》
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            if (mappedFile != null) {
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

                mappedFile.setWrotePosition(0);
                mappedFile.setCommittedPosition(0);
                mappedFile.setFlushedPosition(0);
                //遍歷所有的MappedFile,每次讀取的間隔是20個(gè)字節(jié)(ConsumeQueue的單個(gè)數(shù)據(jù)單元大小為20字節(jié))
                for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();

                    if (0 == i) {
                        //如果第一個(gè)單元的物理偏移量CommitLog Offset大于phyOffet,則直接刪除最后一個(gè)文件。因?yàn)閜hyOffet表示的是最后一個(gè)有效的commitLog文件的起始偏移量。
                        if (offset >= phyOffet) {
                            this.mappedFileQueue.deleteLastMappedFile();
                            break;
                        } else {
                            //設(shè)置wrotePostion和CommittedPosition兩個(gè)變量為解析到的數(shù)據(jù)塊位置
                            int pos = i + CQ_STORE_UNIT_SIZE;
                            mappedFile.setWrotePosition(pos);
                            mappedFile.setCommittedPosition(pos);
                            mappedFile.setFlushedPosition(pos);
                            this.maxPhysicOffset = offset;
                            // This maybe not take effect, when not every consume queue has extend file.
                            if (isExtAddr(tagsCode)) {
                                maxExtAddr = tagsCode;
                            }
                        }
                    } else {
                        //解析到數(shù)據(jù)塊的大小為空或者物理偏移值大于了processOffset為止。
                        if (offset >= 0 && size > 0) {
                            if (offset >= phyOffet) {
                                return;
                            }

                            int pos = i + CQ_STORE_UNIT_SIZE;
                            mappedFile.setWrotePosition(pos);
                            mappedFile.setCommittedPosition(pos);
                            mappedFile.setFlushedPosition(pos);
                            this.maxPhysicOffset = offset;
                            if (isExtAddr(tagsCode)) {
                                maxExtAddr = tagsCode;
                            }

                            if (pos == logicFileSize) {
                                return;
                            }
                        } else {
                            return;
                        }
                    }
                }
            } else {
                break;
            }
        }

        if (isExtReadEnable()) {
//            刪除最大位置的消息隊(duì)列=》
            this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
        }
    }

?截?cái)辔募?,一般在服?wù)重啟后會(huì)調(diào)用,用來(lái)刪除損壞或者有問(wèn)題的消息。主要的邏輯步驟如下:

  1. 遍歷MappedFile,遍歷文件中的消息單元,如果是第一個(gè)消息單元,則比較消息單元中記錄的物理偏移量是不是大于傳入的phyOffet,如果是的則刪除當(dāng)前文件。
  2. 依次比較每個(gè)單元記錄的偏移量和phyOffet大小,直到大于phyOffet值,然后重置文件的提交,寫(xiě)入和刷新的文職
保存消息邏輯日志putMessagePositionInfoWrapper

?邏輯消息的保存邏輯比較長(zhǎng),主要的邏輯步驟如下:

  1. 檢查對(duì)應(yīng)的文件是不是可寫(xiě)的狀態(tài),以及寫(xiě)入的重試次數(shù)是否達(dá)到上限30次
  2. 如果消息擴(kuò)展服務(wù)開(kāi)啟了,則保存對(duì)應(yīng)的擴(kuò)展信息到擴(kuò)展文件隊(duì)列中
  3. 組裝消息進(jìn)行寫(xiě)入
  4. 如果寫(xiě)入成功,則更新CheckPoint文件中的邏輯日志落盤(pán)時(shí)間

?其中組裝消息寫(xiě)入被抽出到另外一個(gè)方法putMessagePositionInfo中。主要邏輯如下:

  1. 申請(qǐng)20個(gè)字節(jié)長(zhǎng)度的buffer,然后依次拼接消息在CommitLog中的偏移量,消息長(zhǎng)度和消息的tagCode
  2. 然后獲取 MappedFile,并把消息保存進(jìn)去,同時(shí)更新maxPhysicOffset字段。

?方法中很多用到MappedFileMappedFileQueue類(lèi)中的方法,可以看看前面的文章

   public void putMessagePositionInfoWrapper(DispatchRequest request) {
        //最大的重試次數(shù)
        final int maxRetries = 30;
        //檢查對(duì)應(yīng)的ConsumeQueue文件是不是可寫(xiě)
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        //可寫(xiě) 并且 重試次數(shù)還沒(méi)達(dá)到30次,則進(jìn)行寫(xiě)入
        for (int i = 0; i < maxRetries && canWrite; i++) {
            //獲取消息的  Tag
            long tagsCode = request.getTagsCode();
            //消息擴(kuò)展服務(wù)是否開(kāi)啟
            if (isExtWriteEnable()) {
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
                }
            }
            //組裝消息存儲(chǔ)位置信息=》CommitLog中的偏移量,消息的大小,和小的Tag的hash值
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            //設(shè)置CheckPoint文件中的邏輯日志落盤(pán)時(shí)間
            if (result) {
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e);
                }
            }
        }

        // XXX: warn and notify me
        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }

    private boolean putMessagePositionInfo(final long offset,/*CommitLog文件的偏移量*/
                                           final int size, /*消息的大小*/
                                           final long tagsCode, /*消息的tag*/
                                           final long cqOffset/*ConsumeQueue的偏移,這個(gè)偏移在添加消息到CommitLog的時(shí)候確定了*/
        ) {
        //如果CommitLog 的偏移量比consumequeue的最大偏移量還小,說(shuō)明已經(jīng)追加過(guò)了
        if (offset <= this.maxPhysicOffset) {
            return true;
        }
        //把buffer重置
        this.byteBufferIndex.flip();
        //申請(qǐng)20個(gè)字節(jié)大小
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        //設(shè)置在CommitLog中的偏移量
        this.byteBufferIndex.putLong(offset);
        //設(shè)置消息的大小
        this.byteBufferIndex.putInt(size);
        //設(shè)置消息的tag信息
        this.byteBufferIndex.putLong(tagsCode);
        //希望拼接到的偏移量=commitLog中的QUEUEOFFSET*20
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        //從映射文件隊(duì)列中獲取最后一個(gè)映射文件=》
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {
            //映射文是第一個(gè)創(chuàng)建、consumerOffset不是0,映射文件寫(xiě)位置是0
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                //設(shè)置最小的邏輯偏移量 為 對(duì)應(yīng)消息的起始偏移量
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                //填充文件=》如果不是第一次拼接,則指定位置進(jìn)行拼接
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            //consumerOffset不是0,則表示不是文件中的第一個(gè)記錄
            if (cqOffset != 0) {
                //計(jì)算當(dāng)前的文件提交到的位置
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                //如果文件寫(xiě)入的偏移量 已經(jīng) 大于這個(gè)消息寫(xiě)入的初始偏移量, 表示這個(gè)消息重復(fù)了
                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }
                //如果兩個(gè)值不相等, 說(shuō)明隊(duì)列的順序可能有問(wèn)題
                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            //這只最大的物理偏移量為CommitLog 的偏移量
            this.maxPhysicOffset = offset;
            //消息寫(xiě)入映射文件=》
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }
根據(jù)消息的index獲取消息單元getIndexBuffer

?邏輯比較簡(jiǎn)單,就是依據(jù)單個(gè)消息單元的大小為20字節(jié),來(lái)計(jì)算在文件中的位置,然后去出來(lái)

    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        //計(jì)算消息的在文件中的便宜offset
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        //偏移量小于最小的邏輯偏移量,則說(shuō)明消息在文件中
        if (offset >= this.getMinLogicOffset()) {
            //根據(jù)offset查詢映射文件 =》
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }
補(bǔ)充:ConsumeQueue何時(shí)填充消息單元

?ConsumeQueue類(lèi)中還包含一些別的方法,這里不一一進(jìn)行講解。這些方法主要是對(duì)文件的提交,刷盤(pán)和獲取偏移量方面的方法。跟前面的CommitLog文件的步驟差不多,都是利用MappedFileMappedFileQueue類(lèi)來(lái)完成的。這里主要說(shuō)一下什么時(shí)候填充ConsumeQueue文件。
?在DefaultMessageStore中有個(gè)任務(wù)線程ReputMessageService。這個(gè)線程會(huì)不斷檢查CommitLog文件是否有新的消息填充,如果有會(huì)調(diào)用doDispatch方法進(jìn)行消息分發(fā),最后會(huì)調(diào)用ConsumeQueue類(lèi)的putMessagePositionInfoWrapper。這里給一個(gè)簡(jiǎn)單的調(diào)用鏈,后續(xù)的文章會(huì)分析。

ReputMessageService#run
    ReputMessageService#doReput
        DefaultMessageStore#doDispatch
            CommitLogDispatcherBuildConsumeQueue#dispatch
                DefaultMessageStore#putMessagePositionInfo
                    ConsumeQueue#putMessagePositionInfoWrapper

下一篇存儲(chǔ)部分(4)IndexFile消息索引日志文件相關(guān)的IndexService類(lèi)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容