RocketMQ源碼-ConsumeQueue的構(gòu)建


1 概述
2 入口方法
3 ConsumeQueue索引結(jié)構(gòu)
4 索引構(gòu)建

1 概述

RocketMQ一個Broker中可以建立多個Topic,每個Topic又可以有多個queue,Broker在接收生產(chǎn)者發(fā)來的消息時,是按照消息到來的順序追加到同一個文件中的,當(dāng)然文件默認大小為1G,如果超過文件最大大小,則會接著前一個文件寫入的數(shù)據(jù)繼續(xù)寫入。

所有Topic所有queue的數(shù)據(jù)放在一起就造成了查詢數(shù)據(jù)或者消費數(shù)據(jù)時面臨著大量的隨機讀,也造成查詢數(shù)據(jù)需要從頭到尾讀取所有的數(shù)據(jù)。為了避免每次查詢或者消費者拉去數(shù)據(jù)時從頭到尾遍歷,RocketMQ在消息數(shù)據(jù)上構(gòu)建了兩種索引,一個是筆者文章RocketMQ源碼-Index索引介紹介紹的全局索引Index索引,另一個就是本文介紹的根據(jù)queue劃分后的每個隊列的ConsumeQueue索引。

Index索引和ConsumeQueue的區(qū)別主要有三個,第一是Index基于消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX屬性構(gòu)建,而ConsumeQueue基于消息標(biāo)簽的hash碼構(gòu)建;第二是Index為全局索引,不區(qū)分主題隊列,所有消息索引在一個文件中,而ConsumeQueue對應(yīng)一個主題的一個隊列,每個主題的每個隊列都會有一個ConsumeQueue索引;第三是Index主要用于消息查詢,而ConsumeQueue主要用于消息消費時,消費者拉取消息使用,這里也能說明為什么Index設(shè)計為全局索引而ConsumeQueue為單個隊列的索引,因為消息查詢時一般為查詢所有消息中的滿足指定條件的消息,而消息消費時,消費者一般只會拉取自己訂閱(或者是訂閱之后負載均衡的被分配)的某個主題下某個隊列的消息。

2 入口方法

和筆者文章RocketMQ源碼-Index索引介紹一樣,ConsumeQueue構(gòu)建的入口也是在ReputMessageService服務(wù)的run方法中進行reput操作觸發(fā)的,用于構(gòu)建ConsumeQueue的類為CommitLogDispatcherBuildConsumeQueue,其也是DefaultMessageStore的內(nèi)部類,源碼如下:

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            //只會為普通的非事務(wù)消息和已提交的事務(wù)消息
            //做索引
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            //為提交的事務(wù)消息或者已經(jīng)回滾的事務(wù)消息
            //則不索引
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

在介紹具體如何構(gòu)建ConsumeQueue之前,我們先介紹下ConsumeQueue索引的結(jié)構(gòu)。

3 ConsumeQueue索引結(jié)構(gòu)

ConsumeQueue的結(jié)構(gòu)比較簡單,如下:

ConsumeQueue索引結(jié)構(gòu).jpg

如上圖所示,每個索引項在文件中占20個字節(jié),各字段分別為:

  • CommitLog Offset:該消息在CommitLog的起始物理偏移,long類型,8字節(jié);
  • Size:該消息的大小,int類型,4字節(jié);
  • Message Tags HashCode:消息標(biāo)簽對應(yīng)的hashCode,long類型,8字節(jié)。

這里要注意一下,每個ConsumeQueue還有一個用于記錄擴展索引信息的ConsumeQueueExt類實例,如果配置啟動了ConsumeQueue擴展類型,則ConsumeQueue中的Message Tags HashCode記錄的并不是消息標(biāo)簽對應(yīng)的hashCode,記錄的是該消息索引在擴展信息ConsumeQueueExt文件中的物理偏移,真正的Message Tags HashCode則記錄在ConsumeQueueExt文件中。那么在讀取ConsumeQueue如何區(qū)分Message Tags HashCode記錄的是消息標(biāo)簽的hashCode,還是擴展信息偏移呢?ConsumeQueue中有個方法isExtAddr(long tagsCode)則用于實現(xiàn)這個判斷:

//ConsumeQueue
/**
* Check {@code tagsCode} is address of extend file or tags code.
*/
public boolean isExtAddr(long tagsCode) {
    return ConsumeQueueExt.isExtAddr(tagsCode);
}

//ConsumeQueueExt
/**
* Check whether {@code address} point to extend file.
* <p>
* Just test {@code address} is less than 0.
* </p>
*/
public static boolean isExtAddr(final long address) {
    //MAX_ADDR = Integer.MIN_VALUE - 1L;
    //也即如果消息tagsCode小于Integer.Min_VALUE-1,
    //則為偏移地址,而不是tags的hashCode
    return address <= MAX_ADDR;
}

擴展索引ConsumeQueueExt除了記錄消息的標(biāo)簽code,還記錄了消息bitMap信息和存儲時間。消息bitMap主要用于消息過了,暫不介紹。ConsumeQueueExt的基本存儲結(jié)構(gòu)為ConsumeQueueExt.CqExtUnit。

4 索引構(gòu)建

我們現(xiàn)在接著第2節(jié)的入口方法介紹,入口方法是調(diào)用DefaultMessageStore.this.putMessagePositionInfo(request);進行索引構(gòu)建的,DefaultMessageStore.this.putMessagePositionInfo(request);實現(xiàn)如下:

//DefaultMessageStore
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //先根據(jù)
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

ConsumeQueue中具體實現(xiàn)如下:

//ConsumeQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
    final int maxRetries = 30;
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    //寫入失敗則會連續(xù)嘗試30次
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        //如果啟用了擴展索引,則先構(gòu)造擴展索引保存單元
        //CqExtUnit,寫入bitMap、保存時間、實際的消息
        //標(biāo)簽hashCode
        if (isExtWriteEnable()) {
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());
            //寫入之后則返回擴展索引剛寫入的偏移地址
            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                //tagsCode重置為擴展索引偏移地址
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                    topic, queueId, request.getCommitLogOffset());
            }
        }
        //進行實際寫入
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
            request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}


private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
    //寫入消息物理偏移、消息大小和tagsCode
    //tagsCode可能為擴展索引偏移或者實際標(biāo)簽code
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點: 能夠保證嚴(yán)格的消息順序 提供豐富的消息拉取模式...
    AI喬治閱讀 2,174評論 2 5
  • 架構(gòu)圖 基本概念 Producer 消息生產(chǎn)者,負責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負責(zé)產(chǎn)生消息 Consumer 消息消...
    桑榆非晚95閱讀 1,231評論 0 1
  • 楊孜 我善於把戀愛談成懷念 也善於把欲望化為信仰 如果妳立於近前 我的目光會躲躲閃閃 如果妳身處異鄉(xiāng) 我的牽掛會跟...
    楊孜閱讀 423評論 2 2
  • 似蘭斯馨
    王中海閱讀 143評論 0 1
  • 這個時候,是準(zhǔn)備吃飯的,人不多,吃得香。 人走得遠,遠得像是久遠的夢。我們看了彼此,她走遠了。走...
    Lan_9e0f閱讀 186評論 0 1

友情鏈接更多精彩內(nèi)容