Broker的功能點很多,安裝程序啟動的順序去看源碼,發(fā)現(xiàn)代碼量比之前的組件要大很多。閱讀過程中發(fā)現(xiàn)Broker會去持久化一些配置,并且會將消息數(shù)據(jù)存儲在磁盤上。
整理和檢索了網(wǎng)上的一些資料,列出了這些文件和相應的作用,如下。
- store
- commitlog
- 000000000
- xxxxxxxxxx
- compaction
- compactionLog
- {topic}
- 0
- 1
- ...
- {queueId}
- {topic}
- compactionCq
- {topic}
- 0
- 1
- ...
- {queueId}
- {topic}
- compactionLog
- config
- delayOffset.json
- broker.properties
- topics.json
- topicQueueMapping.json
- consumerOffset.json
- lmqConsumerOffset.json
- consumerOrderInfo.json
- subscriptionGroup.json
- timercheck
- timermetrics
- consumerFilter.json
- messageRequestMode.json
- tieredStoreMetadata.json
- consumequeue
- {topic}
- 0
- 00000000000000000000
- 1
- ...
- {queueId}
- 0
- {topic}
- index
- 20240305101010000
- abort
- checkpoint
- lock
- timerwheel
- timerlog
- 00000000000000000000
- commitlog
CommitLog
該目錄下存儲了消息內(nèi)容文件,默認每個文件1GB大小。目錄下會包含多個文件,相鄰的兩個文件名稱的差值正好為1GB。
文件格式:
| 序號 | 名稱 | 長度 | 描述 |
|---|---|---|---|
| 1 | totalSize | int | 消息總字節(jié)數(shù) |
| 2 | magicCode | int | 用于標記消息協(xié)議的版本 |
| 3 | bodyCRC | int | crc校驗碼,用于校驗消息內(nèi)容是否正確 |
| 4 | queueId | int | 所屬queueId |
| 5 | flag | int | 消息的類型標記 |
| 6 | queueOffset | long | |
| 7 | physicOffset | long | |
| 8 | sysFlag | int | 標記消息類型 |
| 9 | bornTimeStamp | long | 消息產(chǎn)生的日期 |
| 10 | bornHost | 8字節(jié)或者20字節(jié) | ipv4或者ipv6的差別 |
| 11 | storeTimestamp | long | 存儲消息的時間戳,在Broker收到消息的時候賦值 |
| 12 | storeHostAddress | 8字節(jié)或者20字節(jié) | 存儲消息的Broker進程地址 |
| 13 | reconsumeTimes | int | 被重復消費的次數(shù) |
| 14 | preparedTransactionOffset | long | |
| 15 | bodyLen | int | 消息內(nèi)容長度 |
| 16 | body | bodyLen | 消息內(nèi)容 |
| 17 | topic | 長度取決于從magicCode中獲取的topic長度 | |
| 18 | propertiesLength | short | properties內(nèi)容的長度 |
ConsumeQueue
每個Topic存在n(1..n)個數(shù)量的Queue,每個Queue對應一個ConsumQueue文件。該文件用于記錄屬于某個Topic的消息在CommitLog中的偏移量。
/**
* ConsumeQueue's store unit. Format:
* <pre>
* ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
* │ CommitLog Physical Offset │ Body Size │ Tag HashCode │
* │ (8 Bytes) │ (4 Bytes) │ (8 Bytes) │
* ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
* │ Store Unit │
* │ │
* </pre>
* ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
*/
CommitLog Physical Offset記錄了消息在CommitLog中全局的offset,Body Size為消息的總長度。
ConsumeQueue文件是由DefaultMessageStore內(nèi)部類ReputMessageService來完成的,該服務是一個定時任務,每隔1ms執(zhí)行一次。主要看doReput函數(shù)做了哪些事情:
- 讀取CommitLog文件內(nèi)容,然后構(gòu)建ConsumeQueue文件、Index、compact文件
DefaultMessageStore.this.doDispatch(dispatchRequest)
doDisPatch函數(shù)會調(diào)用如下三個dispatcher,完成構(gòu)建文件的任務
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2
if (messageStoreConfig.isEnableCompaction()) {
this.compactionStore = new CompactionStore(this);
this.compactionService = new CompactionService(commitLog, this, compactionStore);
this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3
}
對于非事務消息和事務消息的提交消息,寫入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代碼
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
與ComsumeQueue文件相關的還有consumequeue_ext文件,這個文件存放了一些擴展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函數(shù)的相關代碼。如下:
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit); //寫入文件
- 通知客戶端有新消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
Index
攜帶key的消息會構(gòu)建索引文件,用于按照key來進行消息檢索。查看CommitLogDispatcherBuildIndex類,最大單個文件2kw條索引數(shù)據(jù)。
每個Index文件的格式如下:
IndexHeader(40bytes) + Slots(固定500w個 * 4bytes) + IndexItems(最多2000w個 * 20bytes)
IndexHeader格式:
Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)
每個Slot存儲當前分配到此Slot下最新的Index count,通過此數(shù)據(jù)可以計算出此slot下最新一條IndexItem的位置。
IndexItem的格式:
Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)
Time Diff用于和Header中的開始和結(jié)束時間戳來比較,結(jié)合Key HashCode來判斷索引是否命中數(shù)據(jù)。
Next Index Pos用于指向之前命中該Slot的IndexItem。

Compaction
該目錄下的文件,有特殊的用途。更像是核心功能開發(fā)完成之后,以打補丁的方式加入的特殊功能。在不依賴于其他存儲的情況喜愛,用來存儲kv數(shù)據(jù)。
Compact的過程是將Commitlog的數(shù)據(jù)按照某個topic下的某個queue(也稱為partition),對于數(shù)據(jù)按key的維度只保留最新的數(shù)據(jù)。當然ConsumeQueue文件也需要做相應的Compact。
可以閱讀如下文章了解更多細節(jié)。
RocketMQ Compaction Topic的設計與實現(xiàn)
Config
| 文件名稱 | 處理類 | 描述 |
|---|---|---|
| delayOffset.json | ScheduleMessageService | 延遲消息消費進度 |
| broker.properties | BrokerController | Broker配置 |
| topics.json | TopicConfigManager | 存儲每個topic的讀寫隊列數(shù)、權限、是否順序等信息 |
| topicQueueMapping.json | TopicQueueMappingManager | 當前Broker存儲了哪些queue |
| consumerOffset.json | ConsumerOffsetManager | 消費進度數(shù)據(jù) |
| lmqConsumerOffset.json | LmqConsumerOffsetManager | lmq模式下的消費進度數(shù)據(jù) |
| consumerOrderInfo.json | ConsumerOrderInfoManager | 維護了topic+group+queueId維度的順序消費情況OrderInfo |
| subscriptionGroup.json | SubscriptionGroupManager | SubscriptionGroup配置,訂閱組配置內(nèi)容,還有包含了一些內(nèi)置的配置 |
| timercheck | TimerCheckpoint | 存儲時間輪消息當前的瞬時狀態(tài) |
| timermetrics | TimerMetrics | 用于統(tǒng)計各個時間的時間輪消息的條數(shù) |
| consumerFilter.json | ConsumerFilterManager | 每個Topic的ConsumerGroup消費過濾規(guī)則 |
| messageRequestMode.json | MessageRequestModeManager | 配置ConsumerGroup使用pop或pull模式消費 |
| tieredStoreMetadata.json | TieredMetadataManager | 分級存儲topic、queue以及消息存儲文件的meta data |
abort
DefaultMessageStore在啟動Broker,加載磁盤日志文件之前,用來判斷上次是否是結(jié)束進程的。內(nèi)容存儲了上次啟動Broker的pid。
checkpoint
存儲了當前消息刷盤的offset,用于重啟或者故障后的恢復。
| 數(shù)據(jù)名稱 | 描述 |
|---|---|
| lastReadTimeMs | 上次消費的時間節(jié)點 |
| lastTimerLogFlushPos | 最后刷新log的pos |
| lastTimerQueueOffset | 最后一次消費的隊列節(jié)點 |
| masterTimerQueueOffset | 主 Broker 的隊列消費節(jié)點 |
lock
防止本地啟動多個以此目錄為存儲目錄的Broker進程。
timerwheel
該文件存儲的內(nèi)容如下
| 序號 | 名稱 | 長度 | 描述 |
|---|---|---|---|
| 1 | delayTime | long | 延遲時間 |
| 2 | firstPos | long | 開始的位置 |
| 3 | lastPos | long | 結(jié)束的位置 |
| 4 | num | int | 消息的數(shù)量 |
| 5 | magic | int | 暫時未用 |
timerlog
主要邏輯都在TimeLog類中,下面為存儲的日志格式
| 序號 | 名稱 | 長度 | 描述 |
|---|---|---|---|
| 1 | size | int | 日志單元固定長度 |
| 2 | pre pos | long | 上一條日志的pos |
| 3 | magic value | int | 標記消息處理邏輯delete(刪除消息的消息)或者roll處理(消息延遲時間超過限定時間,需要做roll處理來進行展期) |
| 4 | curr write time | long | for trace寫入時間輪的時間戳 |
| 5 | delayed time | int | for check |
| 6 | offsetPy | long | 在commitLog中的位置 |
| 7 | sizePy | int | 在commitLog中存儲的字節(jié)數(shù) |
| 8 | hash code of real topic | int | 消息后續(xù)投放的真正topic名稱的hash值 |
| 9 | reserved value | long | 預留值,暫時沒有作用 |
timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是時間輪調(diào)度消息用到的文件