Broker將消息存儲(chǔ)抽象成MessageStore接口,默認(rèn)實(shí)現(xiàn)類是DefaultMessageStore。主要提供如下方法:
- 保存消息,包括單條和批量保存
- 根據(jù)topic、queue和offset批量獲取消息,consumer使用該方法來拉取消息
- 根據(jù)消息offset讀取消息詳情,根據(jù)messageId查詢消息時(shí)使用該方法
- 根據(jù)messageKey查詢消息,可提供給終端用戶使用
下面我們根據(jù)一個(gè)MessageStore的數(shù)據(jù)結(jié)構(gòu)圖來看下消息是如何存儲(chǔ)的
數(shù)據(jù)結(jié)構(gòu)圖

【注】以上圖片轉(zhuǎn)載自博客RocketMQ消息存儲(chǔ)流程圖及數(shù)據(jù)結(jié)構(gòu)
數(shù)據(jù)結(jié)構(gòu)
通過上面的圖可以看到消息存儲(chǔ)涉及到一下幾個(gè)數(shù)據(jù)結(jié)構(gòu):
CommitLog,存儲(chǔ)消息的詳細(xì)內(nèi)容,按照消息收到的順序,所有消息都存儲(chǔ)在一起。每個(gè)消息存儲(chǔ)后都會(huì)有一個(gè)offset,代表在commitLog中的偏移量。舉個(gè)例子,當(dāng)前commitLog文件的大小是12413435字節(jié),那下一條消息到來后它的offset就是12413436。這個(gè)說法不是非常準(zhǔn)確,但是offset大概是這么計(jì)算來的。commitLog并不是一個(gè)文件,而是一系列文件(上圖中的MappedFile)。每個(gè)文件的大小都是固定的(默認(rèn)1G),寫滿一個(gè)會(huì)生成一個(gè)新的文件,新文件的文件名就是它存儲(chǔ)的第一條消息的offset。
ConsumeQueue,既然所有消息都是存儲(chǔ)在一個(gè)commitLog中,但是consumer是按照topic+queue的維度來消費(fèi)消息的,沒有辦法直接從commitLog中讀取,所以針對(duì)每個(gè)topic的每個(gè)queue都會(huì)生成consumeQueue文件。ConsumeQueue文件中存儲(chǔ)的是消息在commitLog中的offset,可以理解成一個(gè)按queue建的索引,每條消息占用20字節(jié)(上圖中的一個(gè)cq)。跟commitLog一樣,每個(gè)Queue文件也是一系列連續(xù)的文件組成,每個(gè)文件默認(rèn)放30w個(gè)offset。
IndexFile,CommitLog的另外一種形式的索引文件,只是索引的是messageKey,每個(gè)MsgKey經(jīng)過hash后計(jì)算存儲(chǔ)的slot,然后將offset存到IndexFile的相應(yīng)slot上。根據(jù)msgKey來查詢消息時(shí),可以先到IndexFile中查詢offset,然后根據(jù)offset去commitLog中查詢消息詳情。
線程服務(wù)
MessageStore除了上面的數(shù)據(jù)結(jié)構(gòu)以外,還需要相應(yīng)的服務(wù)來對(duì)數(shù)據(jù)做操作。
IndexService,負(fù)責(zé)讀寫IndexFile的服務(wù)
ReputMessageService,消息存儲(chǔ)到commitLog后,MessageStore的接口調(diào)用就直接返回了,后續(xù)由ReputMessageService負(fù)責(zé)將消息分發(fā)到ConsumeQueue和IndexService
HAService,負(fù)責(zé)將master-slave之間的消息數(shù)據(jù)同步
以上就是MessageStore的整體結(jié)構(gòu)了,下面看下它的啟動(dòng)過程。
MessageStore啟動(dòng)
啟動(dòng)入口在DefaultMessageStore.start()方法:
public void start() throws Exception {
//1、寫lock 文件,嘗試獲取lock文件鎖,保證磁盤上的文件只會(huì)被一個(gè)messageStore讀寫
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
//2、啟動(dòng)FlushConsumeQueueService,是一個(gè)單線程的服務(wù),定時(shí)將consumeQueue文件的數(shù)據(jù)刷新到磁盤,周期由參數(shù)flushIntervalConsumeQueue設(shè)置,默認(rèn)1sec
this.flushConsumeQueueService.start();
//3、啟動(dòng)CommitLog
this.commitLog.start();
//4、消息存儲(chǔ)指標(biāo)統(tǒng)計(jì)服務(wù),RT,TPS...
this.storeStatsService.start();
//5、針對(duì)master,啟動(dòng)延時(shí)消息調(diào)度服務(wù)
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
//6、啟動(dòng)reputMessageService,該服務(wù)負(fù)責(zé)將CommitLog中的消息offset記錄到cosumeQueue文件中
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
//7、啟動(dòng)haService,數(shù)據(jù)主從同步的服務(wù)
this.haService.start();
//8、對(duì)于新的broker,初始化文件存儲(chǔ)的目錄
this.createTempFile();
//9、啟動(dòng)定時(shí)任務(wù)
this.addScheduleTask();
this.shutdown = false;
}
以上就是整個(gè)MessageStore服務(wù)啟動(dòng)的過程,其中有幾項(xiàng)下面解釋一下:
- 第2步,數(shù)據(jù)寫入文件后,因?yàn)槎嗉?jí)緩存的原因不會(huì)馬上寫到磁盤上,所以會(huì)有一個(gè)單獨(dú)的線程定時(shí)調(diào)用flush,這里是flush consumeQueue文件的。
CommitLog和IndexFile的也有類似的邏輯,只是不是在這里啟動(dòng)的 - 第3步,啟動(dòng)CommitLog,
CommitLog.start()代碼如下:
public void start() {
//加載刷盤服務(wù)
this.flushCommitLogService.start();
//storePool flush
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
FlushCommitLogService,跟第2步類似的,該服務(wù)負(fù)責(zé)將CommitLog的數(shù)據(jù)flush到磁盤,針對(duì)同步刷盤和異步刷盤,有兩種實(shí)現(xiàn)方式
CommitLogService,這個(gè)service只有在采用內(nèi)存池緩存消息的時(shí)候才需要啟動(dòng)。在使用內(nèi)存池的時(shí)候,這個(gè)服務(wù)會(huì)定時(shí)將內(nèi)存池中的數(shù)據(jù)刷新到FileChannel中,這個(gè)我們后面講CommitLog的文章中再詳細(xì)講。
- 第5步,在consumer的時(shí)候講過,如果消息失敗,broker會(huì)延時(shí)重發(fā)。對(duì)于延時(shí)重發(fā)消息(
topic=SCHEDULE_TOPIC_XXXX),這個(gè)服務(wù)負(fù)責(zé)檢查是否有消息到了發(fā)送時(shí)間,到了時(shí)間則從延時(shí)隊(duì)列中取出后重新發(fā)送 - 第7步,如果是Master,
HAService默認(rèn)監(jiān)聽10912端口,接收slave的連接請(qǐng)求,然后將消息推送給slave;如果是Slave,則通過該服務(wù)連接Master接收數(shù)據(jù) - 第9步,這里的定時(shí)任務(wù)主要有以下幾個(gè):
- 定時(shí)清理過期的commitLog、cosumeQueue和Index數(shù)據(jù)文件, 默認(rèn)文件寫滿后會(huì)保存72小時(shí)
- 定時(shí)自檢commitLog和consumerQueue文件,校驗(yàn)文件是否完整。主要用于監(jiān)控,不會(huì)做修復(fù)文件的動(dòng)作。
- 定時(shí)檢查commitLog的Lock時(shí)長(因?yàn)樵趙rite或者flush時(shí)侯會(huì)lock),如果lock的時(shí)間過長,則打印jvm堆棧,用于監(jiān)控。
以上就是整個(gè)啟動(dòng)的過程了,后續(xù)的文章開始講解Broker是怎樣接收Producer消息,還有怎樣將消息交給Consumer的。