rocket mq 底層存儲(chǔ)源碼分析(6)-存儲(chǔ)恢復(fù)

本章節(jié),我們主要從rmq的broker啟動(dòng)后,會(huì)如何初始化【業(yè)務(wù)消息】、【邏輯位移索引】以及【key查詢索引】這三種消息來(lái)分析底層實(shí)現(xiàn)。


直接上源碼,存儲(chǔ)相關(guān)的初始化的入口:

    public boolean load() {
        boolean result = true;

        try {
            //step1
            //通過(guò)abort文件判斷上次rmq上次退出是否正常,如果正常退出,就可以把a(bǔ)bort文件刪除,即不存在。
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            ...

            //step2
            // load Commit Log,加載消息存儲(chǔ)映射文件
            result = result && this.commitLog.load();

            // load Consume Queue,加載邏輯消費(fèi)隊(duì)列
            result = result && this.loadConsumeQueue();

            if (result) {
                //step3
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                
                //step4
                //加載索引文件
                this.indexService.load(lastExitOK);

                //step5
                this.recover(lastExitOK);

            }
        }

        ...

        return result;
    }

我們先來(lái)梳理一下上述的加載流程:
step1,通過(guò)abort文件判斷上次rmq上次退出是否正常,如果正常退出,就可以把a(bǔ)bort文件刪除,即不存在。否則,我們需要通過(guò)快照方式恢復(fù)這些存儲(chǔ)文件。

step2,我們依次初始化業(yè)務(wù)消息存儲(chǔ)文件(commitLog.load())以及邏輯位移存儲(chǔ)文件(this.loadConsumeQueue())對(duì)應(yīng)的連續(xù)映射文件抽象。

step3,加載存儲(chǔ)快照,里面存儲(chǔ)上述三類存儲(chǔ)文件的最后一次完整刷盤(pán)的時(shí)間戳。里面有三個(gè)long類型的字段屬性,分別是physicMsgTimestamp業(yè)務(wù)消息存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳,logicsMsgTimestamp邏輯位移存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳 以及 indexMsgTimestampkey查詢索引存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳。

step4,加載key查詢索引存儲(chǔ)文件。這里有可能使用快照方式。結(jié)合步驟三,我們先大致說(shuō)一下使用快照方式的流程。這里以【key查詢索引】存儲(chǔ)文件為例?!緆ey查詢索引】d刷盤(pán)形式為,直到上一個(gè)indexFile 滿了以后,才開(kāi)線程異步刷盤(pán)。每次【key查詢索引】存儲(chǔ)文件刷盤(pán)后,同時(shí)會(huì)更新 indexMsgTimestamp屬性。然后,在某個(gè)【key查詢索引】存儲(chǔ)文件刷盤(pán)的時(shí)間點(diǎn)上,broker突然宕機(jī)了,那么在重啟恢復(fù)時(shí),就會(huì)根據(jù)存儲(chǔ)文件的文件頭屬性endTimestamp(該屬性是每構(gòu)建一條索引時(shí),均會(huì)更新)與存儲(chǔ)快照屬性indexMsgTimestamp作比較,把文件頭屬性endTimestamp大于indexMsgTimestamp的文件全都刪除。

step5,加載部分業(yè)務(wù)消息以及邏輯位移索引值pagecache中。

上述的步驟1-4都比較簡(jiǎn)單,讀者可以自行根據(jù)描述自行分析,我們重點(diǎn)分析一下step5:

    private void recover(final boolean lastExitOK) {
        //step1.加載 邏輯物理位至pagecache
        this.recoverConsumeQueue();

        //step2.加載業(yè)務(wù)消息至pagecache
        if (lastExitOK) {
            this.commitLog.recoverNormally();
        } else {
            this.commitLog.recoverAbnormally();
        }

        //step3.加載TopicQueueTable到內(nèi)存中
        this.recoverTopicQueueTable();
    }

接下來(lái),我們將從三個(gè)步驟分析recover(...)

1、加載 邏輯物理位至pagecache

2、加載業(yè)務(wù)消息至pagecache

3、加載TopicQueueTable到內(nèi)存中

1、加載 邏輯物理位至pagecache

    private void recoverConsumeQueue() {
        //遍歷所有topic
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            //遍歷所有queueId
            for (ConsumeQueue logic : maps.values()) {
                logic.recover();
            }
        }
    }

這里主要是遍歷所有的topic-queueId 為維度下,加載對(duì)應(yīng)的業(yè)務(wù)視圖消費(fèi)隊(duì)列ConsumeQueue,我們接著進(jìn)入logic.recover()

    public void recover() {
        //h獲取邏輯位移連續(xù)映射文件抽象
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            
            //這里說(shuō)明初始化時(shí),最多只加載最新三個(gè)文件內(nèi)容至pagecache中
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    //讀取一條完整的 邏輯位移索引
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
                      
                    //這里只需要保證讀完整一條
                    if (offset >= 0 && size > 0) {
                        //更新單個(gè)文件的處理位置
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        //基于消息時(shí)遞增的,因此每次讀取的offset均可以認(rèn)為是最大的物理位移
                        this.maxPhysicOffset = offset;
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }


                if (mappedFileOffset == mappedFileSizeLogics) {
                    //代碼走到這里,說(shuō)明這個(gè)索引文件已處理完,需滾到到下一個(gè)文件。
                    index++;
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last maped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        //繼續(xù)獲取下一個(gè)邏輯位移索引映射文件
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        //更新方法內(nèi)全局處理位置
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    break;
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
        }

總結(jié)一下上述代碼段邏輯:
總的來(lái)說(shuō),就是最多只加載最新三個(gè)文件的邏輯位移內(nèi)容至pagecache中,遍歷這三個(gè)文件,然后逐條讀取 【邏輯位移索引】字節(jié)內(nèi)容。

其中,while(true)循環(huán)就是逐條讀取 【邏輯位移索引】。

這里,僅僅通過(guò)if (offset >= 0 && size > 0)這個(gè)判斷,即可認(rèn)為這條【邏輯位移索引】是完整的?我認(rèn)為該判斷是有誤的。首先,我們知道直接內(nèi)存映射的刷盤(pán)操作并不是原子性,如果在刷盤(pán)過(guò)程中,機(jī)子突然宕機(jī)了,就無(wú)法將pagecache中的字節(jié)內(nèi)容完整的保存在磁盤(pán)中。另外,在broker啟動(dòng)過(guò)程中,是對(duì)整個(gè)【邏輯位移索引】的存儲(chǔ)文件做直接內(nèi)存映射,換言之,對(duì)于沒(méi)寫(xiě)入內(nèi)容的空白存儲(chǔ)部分,例如執(zhí)行byteBuffer.getLong(),讀取pagecache中下一個(gè)offset(8字節(jié),64位),但出于宕機(jī)的原因,只將offset前面63位存放到磁盤(pán)中,因此,會(huì)以‘0’作為補(bǔ)充位 來(lái)補(bǔ)充缺失的字段。這樣一來(lái),byteBuffer.getLong()操作讀取到的offset即使是大于0,也可能是不準(zhǔn)確的。但是,作為offset的相鄰存儲(chǔ)字段size ,如果size > 0,雖然不能保證讀取到的size是完整的字節(jié)內(nèi)容,卻可以保證offset字段是完整的。因此,如果通過(guò)該判斷僅僅能保證offset是正確的,去無(wú)法保證sizetagsCode字段正確。

假設(shè),讀取到一條正確的【邏輯位移索引】后,由于業(yè)務(wù)消息的物理存儲(chǔ)位移是遞增的,因此每次從【邏輯位移索引】讀取的offset字段均可以認(rèn)為是最大的物理位移,可以直接更新至ConsumeQueue的業(yè)務(wù)消息最大物理位移實(shí)例屬性this.maxPhysicOffset。

而循環(huán)內(nèi)的mappedFileOffset表明一個(gè)存儲(chǔ)文件的當(dāng)前處理位置,每讀取一條【邏輯位移索引】,均加上20,對(duì)應(yīng)代碼mappedFileOffset = i + CQ_STORE_UNIT_SIZE 。當(dāng)mappedFileOffset到達(dá)存儲(chǔ)文件的尾部是,即 if (mappedFileOffset == mappedFileSizeLogics),則說(shuō)明這個(gè)索引文件已處理完,需滾到到下一個(gè)文件。

processOffset代表連續(xù)存儲(chǔ)索引文件的全局處理位置,因此,每次處理完一個(gè)完整的存儲(chǔ)文件時(shí),均需要 processOffset = mappedFile.getFileFromOffset()更新全局處理位置。

當(dāng)循環(huán)結(jié)束后,更新最后一個(gè)文件的處理位置processOffset += mappedFileOffset。

當(dāng)然,為了容錯(cuò)處理,還需要清除一些臟數(shù)據(jù),我們直接分析this.mappedFileQueue.truncateDirtyFiles(processOffset)

    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
             //獲取文件尾部的物理位移
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {
                    //校正文件的緩存讀寫(xiě)位置
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    //代碼走到這里,說(shuō)明本文件是需要?jiǎng)h除的臟文件。
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

先說(shuō)一下入?yún)?code>offset,其含義是目前最大的【邏輯位移索引】?jī)?chǔ)物理位移。清除臟文件的邏輯如下,遍歷目前所有的【邏輯位移索引】存儲(chǔ)文件,然后比較offset與文件尾部的存儲(chǔ)物理位移,如果offset大,則認(rèn)為該文件正常;如果小于,則繼續(xù)與文件開(kāi)始的存儲(chǔ)物理位移(file.getFileFromOffset()),如果offset還比文件開(kāi)始的存儲(chǔ)物理位移還要小,則說(shuō)明該文件可以刪除了;否則,校正文件的緩存讀寫(xiě)位置。
到這里就分析完【邏輯位移索引】載入pagecache流程


2、加載業(yè)務(wù)消息至pagecache

這一步,我們?cè)诜謨煞N情況去分析,一種是正常的加載業(yè)務(wù)消息的方式,另一種是通過(guò)快照方式加載。

首先,如果標(biāo)志位lastExitOK位true,說(shuō)明該broker可以正常退出,則按正常方式,即 this.commitLog.recoverNormally()

   public void recoverNormally() {
        //default :true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Began to recover from the last third file
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                // Normal data
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                else if (dispatchRequest.isSuccess() && size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
        }
    }

上述的恢復(fù)流程與處理方式基本與【邏輯位移索引】載入pagecache的流程一致,都是連續(xù)加載最多3個(gè)最新的存儲(chǔ)文件內(nèi)容,只不過(guò),加載的內(nèi)容則是具體的業(yè)務(wù)消息字節(jié)內(nèi)容而已,即DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)。之前在構(gòu)建【邏輯位移索引】有分析過(guò)this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)這個(gè)方法,這里,即把業(yè)務(wù)消息字節(jié)按照存儲(chǔ)格式加載至pagecache中,只不過(guò)這里需要對(duì)內(nèi)容進(jìn)行crc檢測(cè)。

我們?cè)趤?lái)看看通過(guò)快照形式加載,即進(jìn)入this.commitLog.recoverAbnormally()

    public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            //step1、判斷從哪個(gè)映射文件開(kāi)始恢復(fù),即從指定的開(kāi)始文件到結(jié)束都需要通過(guò)快照恢復(fù)
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this maped file " + mappedFile.getFileName());
                    break;
                }
            }

            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                //step2、讀取一條完整的消息
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();

                // Normal data
                if (size > 0) { //說(shuō)明該條消息是一條完整的消息
                    mappedFileOffset += size;

                     //
                    //嘗試去構(gòu)建索引,因?yàn)橛锌赡芩饕汛嬖?,如果存在,則跳過(guò)。
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                } else if (size == -1) {
                    //代碼走到這里,說(shuō)明讀取到了一條不完整的消息
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                } else if (size == 0) {
                    //滾動(dòng)到下一個(gè)待處理的文件
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last maped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            }

            //step3、
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            //Clear ConsumeQueue redundant data(清除多余的邏輯位移索引數(shù)據(jù))
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        } else {
            //代碼走到這里,說(shuō)明所有的業(yè)務(wù)文件都刪除了,因此,對(duì)應(yīng)的位移索引文件也需要
            //全部清空
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
   }

分析一下上述代碼流程??梢钥闯?,不管是業(yè)務(wù)視圖消費(fèi)隊(duì)列ConsumeQueue的恢復(fù),或是【邏輯位移索引】恢復(fù),還是業(yè)務(wù)消息的正常加載,他們的處理流程基本上都一致,而業(yè)務(wù)消息的非正常加載,也是采取了相同的處理流程。

對(duì)于業(yè)務(wù)消息的非正常加載,step1中先找出需要進(jìn)行快照恢復(fù)的業(yè)務(wù)消息存儲(chǔ)映射文件開(kāi)始索引位置(List 中的index),然后遍歷所有大于或等于該索引的業(yè)務(wù)消息存儲(chǔ)映射文件(只有最后一個(gè)內(nèi)存映射文件所存儲(chǔ)的消息才有可能不完整)。

step2滾動(dòng)讀取映射文件中的每條消息,在判斷返回的讀取字節(jié)大小size,如果size大于0,說(shuō)明該條消息是一條完整的消息,然后嘗試去構(gòu)建索引,因?yàn)橛锌赡芩饕汛嬖?,如果存在,則跳過(guò)。如果size等于-1,說(shuō)明讀取到了一條不完整的消息,進(jìn)一步說(shuō)明,從上一條完整的消息的存儲(chǔ)物理位移的尾部開(kāi)始,往后的存儲(chǔ)字節(jié)內(nèi)容我們均可認(rèn)為是無(wú)效的,因此可以重寫(xiě)這部分的存儲(chǔ)。如果size等于0,這完成遍歷,跳出循環(huán),否則,滾動(dòng)到下一個(gè)映射文件。

step3中,processOffset記錄最大的完整業(yè)務(wù)消息存儲(chǔ)物理位移,再更新全局的pagecache讀寫(xiě)位移(mappedFileQueue.setFlushedWhere(processOffset)以及mappedFileQueue.setCommittedWhere(processOffset))。
mappedFileQueue.truncateDirtyFiles(processOffset)則是清除臟數(shù)據(jù)文件,清除邏輯這里在總結(jié)一下,如果映射文件的存儲(chǔ)起始位置大于processOffset,則直接刪除該文件。如果processOffset位于映射文件的起始位置以及結(jié)束位置之間,則直接更新本文件的讀寫(xiě)位置,值同樣為processOffset。最后的this.defaultMessageStore.truncateDirtyLogicFiles(processOffset)則是根據(jù)processOffset,清除那些與之對(duì)應(yīng)的業(yè)務(wù)消息存儲(chǔ)物理位移大于processOffset的邏輯位移索引。由于邏輯比較簡(jiǎn)單,這里就不在詳細(xì)展開(kāi),有興趣的讀者可以自行分析。

這里,我們?cè)龠M(jìn)一步分析分第一步中,是如何斷從哪個(gè)映射文件開(kāi)始恢復(fù),跟進(jìn)this.isMappedFileMatchedRecover(mappedFile)

    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        
        //讀取映射文件中,第一條業(yè)務(wù)消息的的存儲(chǔ)時(shí)間
        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
        if (magicCode != MESSAGE_MAGIC_CODE) {
            return false;
        }
        long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
        if (0 == storeTimestamp) {
            return false;
        }

        //根據(jù)條件,消息存儲(chǔ)時(shí)間與快照時(shí)間比較,決定改文件是否正常恢復(fù)
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
            && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}", //
                    storeTimestamp, //
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        } else {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
                log.info("find check timestamp, {} {}", //
                    storeTimestamp, //
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        }

        return false;
    }

根據(jù)上述代碼邏輯,首先,獲取本映射文件的第一條存儲(chǔ)消息的存儲(chǔ)時(shí)間戳(即寫(xiě)入pagecache的時(shí)間刻度)storeTimestamp。這里,我們假設(shè)允許【key查詢索引】開(kāi)啟,即this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()為true,并且this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()也為true(要求【key查詢索引】可以安全查詢),則storeTimestampthis.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()比較,在來(lái)看看getMinTimestampIndex()

    public long getMinTimestampIndex() {
        return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
    }

    public long getMinTimestamp() {
        long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);

        // fixed https://github.org/apache/rocketmqissues/467
        min -= 1000 * 3;
        if (min < 0)
            min = 0;

        return min;

很明顯,getMinTimestampIndex()返回的是indexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值。

其中,
physicMsgTimestamp代表最后一條刷盤(pán)成功的【業(yè)務(wù)消息】的storeTimestamp(寫(xiě)入pagecache中的時(shí)間戳);
logicsMsgTimestamp代表最后一條刷盤(pán)成功的【邏輯位移索引】所對(duì)應(yīng)【業(yè)務(wù)消息】的storeTimestamp;
indexMsgTimestamp代表最后一條刷盤(pán)成功的【key查詢索引】所對(duì)應(yīng)【業(yè)務(wù)消息】的storeTimestamp。

換言之,如果storeTimestamp小于getMinTimestampIndex(),方法isMappedFileMatchedRecover(...)則返回true否則返回false。我們?cè)趤?lái)結(jié)合外層的非正?;謴?fù)方法recoverAbnormally()

    public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            //step1、判斷從哪個(gè)映射文件開(kāi)始恢復(fù),即從指定的開(kāi)始文件到結(jié)束都需要通過(guò)快照恢復(fù)
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this maped file " + mappedFile.getFileName());
                    break;
                }
            }

      ...
     }

for (; index >= 0; index--)循環(huán),我們可以看出是從this.mappedFileQueue.getMappedFiles()(連續(xù)存儲(chǔ)文件集合)的尾部開(kāi)始遍歷,如果this.isMappedFileMatchedRecover(mappedFile)為true,則跳出循環(huán),換言之,index記錄的是開(kāi)始恢復(fù)的索引位置。如何理解結(jié)束循環(huán)的條件是storeTimestampindexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比較,而不是storeTimestampphysicMsgTimestamp比較??

原因如下:說(shuō)原因之前,我們先來(lái)分析一個(gè)業(yè)務(wù)場(chǎng)景。假設(shè)broker第一次啟動(dòng)并始化,之前沒(méi)有接受過(guò)任何producer的消息發(fā)送請(qǐng)求,并且消息時(shí)同步刷盤(pán)。突然間,并發(fā)上來(lái)了,一下子寫(xiě)滿了兩個(gè)文件,現(xiàn)在在寫(xiě)第三個(gè)文件,并且是該文件的第三個(gè)消息,還沒(méi)刷盤(pán),由于消息時(shí)同步刷盤(pán)的,因此我們可以知道前兩個(gè)文件以及第三個(gè)文件的前兩個(gè)消息均能完整落地。另一方面,根據(jù)之前的章節(jié)中,我們知道【邏輯位移索引】以及【key查詢索引】的構(gòu)建以及刷盤(pán)均有單獨(dú)的線程異步進(jìn)行的。在我們假設(shè)的場(chǎng)景中,【邏輯位移索引】以及【key查詢索引】都沒(méi)有對(duì)應(yīng)的線程都沒(méi)有進(jìn)行刷盤(pán)。就在這時(shí),機(jī)器突然宕機(jī)了,最后一個(gè)消息至刷盤(pán)了一半。因此,我們?cè)诎凑辗钦5幕謴?fù)流程中,則需要根據(jù)storeTimestampindexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比較,找出最早那條已經(jīng)構(gòu)建索引失敗的業(yè)務(wù)消息。并根據(jù)這條業(yè)務(wù)消息所在的映射文件,滾動(dòng)構(gòu)建這條消息以及之后存儲(chǔ)的消息的索引。對(duì)于最后一個(gè)文件的第三條消息,由于無(wú)法完整刷盤(pán),因此,需要截掉這部分不完整的消息。

到這里,我們已近分析完業(yè)務(wù)消息正常或非正常的恢復(fù)流程。


3、加載TopicQueueTable到內(nèi)存中

我們先來(lái)看看CommitLog的實(shí)例屬性topicQueueTable

private HashMap<String/* topic-queueid /, Long/ offset */> topicQueueTable = new HashMap<String, Long>(1024);

從數(shù)據(jù)結(jié)構(gòu)中我們可以看出,它是一個(gè)HashMap, key為topic-queueId,value為消息位移。我們可以通過(guò)topic-queueId ,確定一條隊(duì)列目前最大的可消費(fèi)位移。這里簡(jiǎn)單在說(shuō)一下,通過(guò)offset * CQ_STORE_UNIT_SIZE(【邏輯位移索引大小】),可以算出具體的【邏輯位移索引】的物理存儲(chǔ)位移,然后在通過(guò)【邏輯位移索引】,查詢出具體的業(yè)務(wù)消息內(nèi)容。

我們?cè)趤?lái)分析是如何加載TopicQueueTable到內(nèi)存中recoverTopicQueueTable()

    private void recoverTopicQueueTable() {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        //獲取未刪除,有效并且最舊的消息的開(kāi)始位置
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                //logic.getMaxOffsetInQueue():這里獲取當(dāng)前隊(duì)列最大的邏輯位移(即已經(jīng)持久化的最大的位置索引消息)
                table.put(key, logic.getMaxOffsetInQueue());
                logic.correctMinOffset(minPhyOffset);
            }
        }

        this.commitLog.setTopicQueueTable(table);
    }

分析一下上述流程:在步驟一 recoverConsumeQueue()中,我們已近分析了consumeQueueTable的加載流程。而TopicQueueTable主要還是依據(jù)consumeQueueTable來(lái)加載。

從代碼中,我們可以看出,遍歷所有的ConsumeQueue,即可得到 對(duì)應(yīng)的最大消費(fèi)位移logic.getMaxOffsetInQueue()

    public long getMaxOffsetInQueue() {
        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
    }

    public long getMaxOffset() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
        return 0;
    }

其中, getMaxOffset()的語(yǔ)義為返回最后一個(gè)映射文件的最新可讀位置,換言之,即最后一條存儲(chǔ)內(nèi)容的物理位移,最后在通過(guò)`this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE即可算出最大消費(fèi)位移。

接著是logic.correctMinOffset(minPhyOffset),該方法主要是校正ConsumeQueue的實(shí)例屬性minLogicOffset(目前該隊(duì)列可消費(fèi)最小【邏輯位移索引】的 物理位移),之前已近分析過(guò)該方法,這里就不在詳細(xì)展開(kāi)。

到這里,TopicQueueTable的加載流程全部分析完


以上就是存儲(chǔ)恢復(fù)的底層實(shí)現(xiàn)細(xì)節(jié)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容