RocketMQ源碼閱讀(四)-消息存儲二

RocketMQ的消息存儲過程非常復(fù)雜, 本文先介紹存儲模塊中幾個重要對象.

1. MappedFile

MappedByteBuffer的封裝, 具有創(chuàng)建文件(使用非堆區(qū)內(nèi)存), 寫入, 提交, 讀取, 釋放, 關(guān)閉等功能, RocketMQ使用該類實現(xiàn)數(shù)據(jù)從內(nèi)存到磁盤的持久化.

關(guān)鍵字段

  • fileChannel: 該類對應(yīng)的文件通道.
  • mappedByteBuffer: 文件在內(nèi)存中的映射. 如前文所述RocketMQ使用內(nèi)存映射的方式來操作文件, 這種方式要比流的方式快很多.
  • fileSize: 文件尺寸
  • wrotePosition: 當(dāng)前寫到哪一個位置.
  • committedPosition: 已經(jīng)提交(已經(jīng)持久化到磁盤)的位置.
  • flushedPosition: 已經(jīng)提交(已經(jīng)持久化到磁盤)的位置.
  • writeBuffer: 內(nèi)存字節(jié)緩沖區(qū), RocketMQ提供兩種數(shù)據(jù)落盤的方式: 一種是直接將數(shù)據(jù)寫到映射文件字節(jié)緩沖區(qū)(mappedByteBuffer), 映射文件字節(jié)緩沖區(qū)(mappedByteBuffer)flush; 另一種是先寫到writeBuffer, 再從內(nèi)存字節(jié)緩沖區(qū)(write buffer)提交(commit)到文件通道(fileChannel), 然后文件通道(fileChannel)flush.
  • fileFromOffset: fileFromOffset: 映射的起始偏移量, 拿commitlog文件來舉例, 下面有很多個文件夾(假設(shè)為1KB, 默認(rèn)是1G大小), 第一個文件名為00000000000000000000, 第二個文件名為00000000000000001024, 那么第一個文件的fileFromOffset就是0, 第二個文件的fileFromOffset就是1024

關(guān)鍵方法

  • appendMessage: 插入消息到MappedFile, 并返回插入結(jié)果.
  • selectMappedBuffer: 返回指定位置的內(nèi)存映射, 用于讀取數(shù)據(jù).
    (1) appendMessage
    源代碼如下:
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
    assert msg != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();  //獲取當(dāng)前寫的位置

    if (currentPos < this.fileSize) {   //currentPos小于文件尺寸才能寫入
        //獲取獲取需要寫入的字節(jié)緩沖區(qū), 之所以會有writeBuffer != null的判斷與使用的刷盤服務(wù)有關(guān).
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);    //設(shè)置寫入的postion
        AppendMessageResult result =
            cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);  //執(zhí)行寫入
        this.wrotePosition.addAndGet(result.getWroteBytes()); //更新wrotePosition
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    //返回錯誤信息
    log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
        + this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

可以看到MappedFile調(diào)用AppendMessageCallback來執(zhí)行msg到字節(jié)緩沖區(qū)的寫入.事實上整個RocketMQ只有一個類實現(xiàn)了AppendMessageCallback接口, 就是DefaultAppendMessageCallback. doAppend方法的具體實現(xiàn)與消息格式有關(guān), 并且不屬于MappedFile的范疇, 后文再分析.
(2) selectMappedBuffer
源代碼如下:

//返回從pos到 pos + size的內(nèi)存映射
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
    int readPosition = getReadPosition();   //獲取當(dāng)前有效數(shù)據(jù)的最大位置
    if ((pos + size) <= readPosition) {    //內(nèi)存映射的最大位置必須小于readPosition

        if (this.hold()) {    //引用計數(shù)
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();  // 復(fù)制一個byteBuffer(與原byteBuffer共享數(shù)據(jù), 只是指針位置獨立)
            byteBuffer.position(pos);    //設(shè)置position
            //獲取目標(biāo)數(shù)據(jù)
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        } else {
            log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                + this.fileFromOffset);
        }
    } else {
        log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
            + ", fileFromOffset: " + this.fileFromOffset);
    }

    return null;
}

2. MappedFileQueue

顧名思義, 該類代表了MappedFile組成的隊列(由大小相同的多個文件構(gòu)成). 無論是CommitLog(消息主體以及元數(shù)據(jù)), 還是ConsumeQueue(邏輯隊列), 底層使用的組件都是MappedFileQueue.

關(guān)鍵字段

  • storePath: 文件隊列的存儲路徑
  • mappedFiles: 存儲MappedFile的map
  • mappedFileSize: MappedFile的尺寸
  • flushedWhere: 已經(jīng)刷到磁盤的位置
  • committedWhere: 已經(jīng)提交的位置

關(guān)鍵方法

  • getLastMappedFile: 獲取隊列中最后一個MappedFile對象
  • findMappedFileByOffset: 根據(jù)offset/filesize計算該offset所在那個文件中

(1) getLastMappedFile
源代碼如下:

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    //獲取當(dāng)前Queue中最后一個MappedFile
    MappedFile mappedFileLast = getLastMappedFile();

    //一個文件都不存在時, 計算起始文件的offset
    if (mappedFileLast == null) {
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }
    //計算需要新創(chuàng)建的文件的offset
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    //創(chuàng)建新的MappedFile
    if (createOffset != -1 && needCreate) {
        //計算文件名
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
            + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            //使用AllocateMappedFileService創(chuàng)建文件主要是更加安全一些, 會將一些并行的操作穿行化
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        //將新創(chuàng)建的文件添加到隊列中
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}

從源碼中可見, 只有當(dāng)文件寫滿或者找不到文件時, 才會創(chuàng)建新的文件.
(2) findMappedFileByOffset
主要是根據(jù)offset尋找對應(yīng)的MappedFile, 具體源代碼不再貼出.
為了理解findMapedFileByOffset, 我們假設(shè)每個文件的大小是1024K, 參考以下圖示:



如果現(xiàn)在想查找3021在那個文件中, 可以按如下計算:
(3021 - 0)/1024=2 即可知其在隊列下標(biāo)為2的MappedFile中
釋義如下: (offset-第一個文件的fileFromeOffset)/mappedFileSize

3. CommitLog

用于存儲消息的抽象封裝, 內(nèi)部采用MapedFileQueue實現(xiàn)了消息文件隊列功能.

關(guān)鍵字段

  • HashMap topicQueueTable: 用于記錄某個topic在某個queueId共寫入了多少個消息, put一個消息加1.

關(guān)鍵方法

  • putMessage: 存儲消息.
  • getMessage: 讀取消息

(1) putMessage
存儲消息主要分3步: 查找文件(getLastMapedFile), 寫入數(shù)據(jù)(DefaultAppendMessageCallback), 刷盤(FlushRealTimeService). 最終產(chǎn)生實際存儲消息的隊列文件如下:
${storePathRootDir}/commitlog/消息隊列文件. (消息隊列文件名規(guī)則如MappedFileQueue).

(2)getMessage(final long offset, final int size)
offset: 絕對偏移量, 可以用其調(diào)用findMappedFileByOffset查詢MappedFile.
size: 欲查詢的數(shù)據(jù)大小.

4. ConsumeQueue

消費隊列的實現(xiàn), 該消費隊列主要存儲了消息在CommitLog的位置, 與CommitLog類似, 內(nèi)部采用MappedFileQueue實現(xiàn)了消息位置文件隊列功能.
一個topic和一個queueId對應(yīng)一個ConsumeQueue.
默認(rèn)queue存儲30W條消息, 每個消息大小為20個字節(jié), 詳細(xì)如下:
offset(long 8字節(jié)) + size(int 4字節(jié)) + tagsCode(long 8字節(jié))

關(guān)鍵方法

  • putMessagePositionInfo: 消息位置的存儲
  • getIndexBuffer: 該方法返回從offset之后的字節(jié)映射
    (1)putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset)
    offset: 消息在commitLog中的起始位置
    size: 消息長度
    tagsCode: 消息tag的hash code
    cqOffset: 該消息在topic對應(yīng)的queue中的下標(biāo)
    該方法主要實現(xiàn)了消息位置的存儲, 并產(chǎn)生消息文件:
    storePathRootDir/consumequeue/{topic}/${queueId}/消息位置隊列文件
    消息數(shù)(30W)*消息位置固定大小(20字節(jié))=6000000字節(jié)
    故每6000000字節(jié)一個文件, 文件名依次遞增, 前綴不夠20位補0, 類似如下:
    00000000000000000000
    00000000000006000000
    00000000000012000000

(2)getIndexBuffer(final long startIndex)
該方法源代碼如下:

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    int mappedFileSize = this.mappedFileSize;
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

startIndex代表了起始偏移量索引.
該方法先根據(jù)startIndex找到對應(yīng)的MappedFile, 再在該MappedFile中找到對應(yīng)的字節(jié)映射.

5. 總結(jié)

RocketMQ的消息存儲非常復(fù)雜, 本文介紹了消息存儲中使用到的基礎(chǔ)組件類以及一些重要的API. 后文會進(jìn)一步介紹消息存儲的詳細(xì)流程.

參考資料:
1.http://www.tuicool.com/articles/6FFR7v
2.http://blog.csdn.net/a417930422/article/details/50606732

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐 來源:http://www.itdecent.cn/p/453...
    meng_philip123閱讀 13,217評論 6 104
  • 前言 接下來會介紹RocketMQ的消息存儲, 本文先對RocketMQ的整體設(shè)計和組件進(jìn)行簡單介紹,后續(xù)會針對細(xì)...
    _呆瓜_閱讀 4,014評論 1 8
  • 不經(jīng)意的瞬間 你沐浴了 一棵樹的葉子 你在 微麈中的城市 演奏 鴿子自由之歌 陽光被風(fēng)帶走 你在 似遠(yuǎn)又近路上 濕...
    秋鳶子閱讀 368評論 0 2
  • 《上海的雨》 文/徐荷驪 (一) 是心中有一個夢 是心中有一個結(jié) 總以為雨季不再來 不過是一句詩人的臺詞罷 可 天...
    到馬路對面去閱讀 310評論 0 6

友情鏈接更多精彩內(nèi)容