[ RocketMQ源碼閱讀 6 ] Broker磁盤文件格式與作用

Broker的功能點很多,安裝程序啟動的順序去看源碼,發(fā)現(xiàn)代碼量比之前的組件要大很多。閱讀過程中發(fā)現(xiàn)Broker會去持久化一些配置,并且會將消息數(shù)據(jù)存儲在磁盤上。
整理和檢索了網(wǎng)上的一些資料,列出了這些文件和相應的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • ...
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • ...
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • ...
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

CommitLog

該目錄下存儲了消息內(nèi)容文件,默認每個文件1GB大小。目錄下會包含多個文件,相鄰的兩個文件名稱的差值正好為1GB。
文件格式:

序號 名稱 長度 描述
1 totalSize int 消息總字節(jié)數(shù)
2 magicCode int 用于標記消息協(xié)議的版本
3 bodyCRC int crc校驗碼,用于校驗消息內(nèi)容是否正確
4 queueId int 所屬queueId
5 flag int 消息的類型標記
6 queueOffset long
7 physicOffset long
8 sysFlag int 標記消息類型
9 bornTimeStamp long 消息產(chǎn)生的日期
10 bornHost 8字節(jié)或者20字節(jié) ipv4或者ipv6的差別
11 storeTimestamp long 存儲消息的時間戳,在Broker收到消息的時候賦值
12 storeHostAddress 8字節(jié)或者20字節(jié) 存儲消息的Broker進程地址
13 reconsumeTimes int 被重復消費的次數(shù)
14 preparedTransactionOffset long
15 bodyLen int 消息內(nèi)容長度
16 body bodyLen 消息內(nèi)容
17 topic 長度取決于從magicCode中獲取的topic長度
18 propertiesLength short properties內(nèi)容的長度

ConsumeQueue

每個Topic存在n(1..n)個數(shù)量的Queue,每個Queue對應一個ConsumQueue文件。該文件用于記錄屬于某個Topic的消息在CommitLog中的偏移量。

/**
 * ConsumeQueue's store unit. Format:
 * <pre>
 * ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
 * │    CommitLog Physical Offset  │      Body Size    │            Tag HashCode       │
 * │          (8 Bytes)            │      (4 Bytes)    │             (8 Bytes)         │
 * ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
 * │                                     Store Unit                                    │
 * │                                                                                   │
 * </pre>
 * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
 */

CommitLog Physical Offset記錄了消息在CommitLog中全局的offset,Body Size為消息的總長度。

ConsumeQueue文件是由DefaultMessageStore內(nèi)部類ReputMessageService來完成的,該服務是一個定時任務,每隔1ms執(zhí)行一次。主要看doReput函數(shù)做了哪些事情:

  1. 讀取CommitLog文件內(nèi)容,然后構(gòu)建ConsumeQueue文件、Index、compact文件
DefaultMessageStore.this.doDispatch(dispatchRequest)

doDisPatch函數(shù)會調(diào)用如下三個dispatcher,完成構(gòu)建文件的任務

    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2
    if (messageStoreConfig.isEnableCompaction()) {
          this.compactionStore = new CompactionStore(this);
          this.compactionService = new CompactionService(commitLog, this, compactionStore);
          this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3
    }

對于非事務消息和事務消息的提交消息,寫入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代碼

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

與ComsumeQueue文件相關的還有consumequeue_ext文件,這個文件存放了一些擴展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函數(shù)的相關代碼。如下:

  ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  cqExtUnit.setFilterBitMap(request.getBitMap());
  cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
  cqExtUnit.setTagsCode(request.getTagsCode());
  long extAddr = this.consumeQueueExt.put(cqExtUnit); //寫入文件
  1. 通知客戶端有新消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
          dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
          dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
          dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
  notifyMessageArrive4MultiQueue(dispatchRequest);

Index

攜帶key的消息會構(gòu)建索引文件,用于按照key來進行消息檢索。查看CommitLogDispatcherBuildIndex類,最大單個文件2kw條索引數(shù)據(jù)。
每個Index文件的格式如下:
IndexHeader(40bytes) + Slots(固定500w個 * 4bytes) + IndexItems(最多2000w個 * 20bytes)

IndexHeader格式:
Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)

每個Slot存儲當前分配到此Slot下最新的Index count,通過此數(shù)據(jù)可以計算出此slot下最新一條IndexItem的位置。

IndexItem的格式:
Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)

Time Diff用于和Header中的開始和結(jié)束時間戳來比較,結(jié)合Key HashCode來判斷索引是否命中數(shù)據(jù)。
Next Index Pos用于指向之前命中該Slot的IndexItem。

RocketMQ Index結(jié)構(gòu)

Compaction

該目錄下的文件,有特殊的用途。更像是核心功能開發(fā)完成之后,以打補丁的方式加入的特殊功能。在不依賴于其他存儲的情況喜愛,用來存儲kv數(shù)據(jù)。

Compact的過程是將Commitlog的數(shù)據(jù)按照某個topic下的某個queue(也稱為partition),對于數(shù)據(jù)按key的維度只保留最新的數(shù)據(jù)。當然ConsumeQueue文件也需要做相應的Compact。

可以閱讀如下文章了解更多細節(jié)。
RocketMQ Compaction Topic的設計與實現(xiàn)

Config

文件名稱 處理類 描述
delayOffset.json ScheduleMessageService 延遲消息消費進度
broker.properties BrokerController Broker配置
topics.json TopicConfigManager 存儲每個topic的讀寫隊列數(shù)、權限、是否順序等信息
topicQueueMapping.json TopicQueueMappingManager 當前Broker存儲了哪些queue
consumerOffset.json ConsumerOffsetManager 消費進度數(shù)據(jù)
lmqConsumerOffset.json LmqConsumerOffsetManager lmq模式下的消費進度數(shù)據(jù)
consumerOrderInfo.json ConsumerOrderInfoManager 維護了topic+group+queueId維度的順序消費情況OrderInfo
subscriptionGroup.json SubscriptionGroupManager SubscriptionGroup配置,訂閱組配置內(nèi)容,還有包含了一些內(nèi)置的配置
timercheck TimerCheckpoint 存儲時間輪消息當前的瞬時狀態(tài)
timermetrics TimerMetrics 用于統(tǒng)計各個時間的時間輪消息的條數(shù)
consumerFilter.json ConsumerFilterManager 每個Topic的ConsumerGroup消費過濾規(guī)則
messageRequestMode.json MessageRequestModeManager 配置ConsumerGroup使用pop或pull模式消費
tieredStoreMetadata.json TieredMetadataManager 分級存儲topic、queue以及消息存儲文件的meta data

abort

DefaultMessageStore在啟動Broker,加載磁盤日志文件之前,用來判斷上次是否是結(jié)束進程的。內(nèi)容存儲了上次啟動Broker的pid。

checkpoint

存儲了當前消息刷盤的offset,用于重啟或者故障后的恢復。

數(shù)據(jù)名稱 描述
lastReadTimeMs 上次消費的時間節(jié)點
lastTimerLogFlushPos 最后刷新log的pos
lastTimerQueueOffset 最后一次消費的隊列節(jié)點
masterTimerQueueOffset 主 Broker 的隊列消費節(jié)點

lock

防止本地啟動多個以此目錄為存儲目錄的Broker進程。

timerwheel

該文件存儲的內(nèi)容如下

序號 名稱 長度 描述
1 delayTime long 延遲時間
2 firstPos long 開始的位置
3 lastPos long 結(jié)束的位置
4 num int 消息的數(shù)量
5 magic int 暫時未用

timerlog

主要邏輯都在TimeLog類中,下面為存儲的日志格式

序號 名稱 長度 描述
1 size int 日志單元固定長度
2 pre pos long 上一條日志的pos
3 magic value int 標記消息處理邏輯delete(刪除消息的消息)或者roll處理(消息延遲時間超過限定時間,需要做roll處理來進行展期)
4 curr write time long for trace寫入時間輪的時間戳
5 delayed time int for check
6 offsetPy long 在commitLog中的位置
7 sizePy int 在commitLog中存儲的字節(jié)數(shù)
8 hash code of real topic int 消息后續(xù)投放的真正topic名稱的hash值
9 reserved value long 預留值,暫時沒有作用

timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是時間輪調(diào)度消息用到的文件

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容