RocketMQ源碼解析(六)-Broker#MessageStore

Broker將消息存儲(chǔ)抽象成MessageStore接口,默認(rèn)實(shí)現(xiàn)類是DefaultMessageStore。主要提供如下方法:

  • 保存消息,包括單條和批量保存
  • 根據(jù)topic、queue和offset批量獲取消息,consumer使用該方法來拉取消息
  • 根據(jù)消息offset讀取消息詳情,根據(jù)messageId查詢消息時(shí)使用該方法
  • 根據(jù)messageKey查詢消息,可提供給終端用戶使用
    下面我們根據(jù)一個(gè)MessageStore的數(shù)據(jù)結(jié)構(gòu)圖來看下消息是如何存儲(chǔ)的

數(shù)據(jù)結(jié)構(gòu)圖

MessageStore數(shù)據(jù)結(jié)構(gòu)圖

【注】以上圖片轉(zhuǎn)載自博客RocketMQ消息存儲(chǔ)流程圖及數(shù)據(jù)結(jié)構(gòu)

數(shù)據(jù)結(jié)構(gòu)

通過上面的圖可以看到消息存儲(chǔ)涉及到一下幾個(gè)數(shù)據(jù)結(jié)構(gòu):
CommitLog,存儲(chǔ)消息的詳細(xì)內(nèi)容,按照消息收到的順序,所有消息都存儲(chǔ)在一起。每個(gè)消息存儲(chǔ)后都會(huì)有一個(gè)offset,代表在commitLog中的偏移量。舉個(gè)例子,當(dāng)前commitLog文件的大小是12413435字節(jié),那下一條消息到來后它的offset就是12413436。這個(gè)說法不是非常準(zhǔn)確,但是offset大概是這么計(jì)算來的。commitLog并不是一個(gè)文件,而是一系列文件(上圖中的MappedFile)。每個(gè)文件的大小都是固定的(默認(rèn)1G),寫滿一個(gè)會(huì)生成一個(gè)新的文件,新文件的文件名就是它存儲(chǔ)的第一條消息的offset。
ConsumeQueue,既然所有消息都是存儲(chǔ)在一個(gè)commitLog中,但是consumer是按照topic+queue的維度來消費(fèi)消息的,沒有辦法直接從commitLog中讀取,所以針對(duì)每個(gè)topic的每個(gè)queue都會(huì)生成consumeQueue文件。ConsumeQueue文件中存儲(chǔ)的是消息在commitLog中的offset,可以理解成一個(gè)按queue建的索引,每條消息占用20字節(jié)(上圖中的一個(gè)cq)。跟commitLog一樣,每個(gè)Queue文件也是一系列連續(xù)的文件組成,每個(gè)文件默認(rèn)放30w個(gè)offset。
IndexFile,CommitLog的另外一種形式的索引文件,只是索引的是messageKey,每個(gè)MsgKey經(jīng)過hash后計(jì)算存儲(chǔ)的slot,然后將offset存到IndexFile的相應(yīng)slot上。根據(jù)msgKey來查詢消息時(shí),可以先到IndexFile中查詢offset,然后根據(jù)offset去commitLog中查詢消息詳情。

線程服務(wù)

MessageStore除了上面的數(shù)據(jù)結(jié)構(gòu)以外,還需要相應(yīng)的服務(wù)來對(duì)數(shù)據(jù)做操作。
IndexService,負(fù)責(zé)讀寫IndexFile的服務(wù)
ReputMessageService,消息存儲(chǔ)到commitLog后,MessageStore的接口調(diào)用就直接返回了,后續(xù)由ReputMessageService負(fù)責(zé)將消息分發(fā)到ConsumeQueueIndexService
HAService,負(fù)責(zé)將master-slave之間的消息數(shù)據(jù)同步
以上就是MessageStore的整體結(jié)構(gòu)了,下面看下它的啟動(dòng)過程。

MessageStore啟動(dòng)

啟動(dòng)入口在DefaultMessageStore.start()方法:

public void start() throws Exception {
        //1、寫lock 文件,嘗試獲取lock文件鎖,保證磁盤上的文件只會(huì)被一個(gè)messageStore讀寫
        lock = lockFile.getChannel().tryLock(0, 1, false);
        if (lock == null || lock.isShared() || !lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }

        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
        lockFile.getChannel().force(true);
        //2、啟動(dòng)FlushConsumeQueueService,是一個(gè)單線程的服務(wù),定時(shí)將consumeQueue文件的數(shù)據(jù)刷新到磁盤,周期由參數(shù)flushIntervalConsumeQueue設(shè)置,默認(rèn)1sec
        this.flushConsumeQueueService.start();
        //3、啟動(dòng)CommitLog
        this.commitLog.start();
        //4、消息存儲(chǔ)指標(biāo)統(tǒng)計(jì)服務(wù),RT,TPS...
        this.storeStatsService.start();
        //5、針對(duì)master,啟動(dòng)延時(shí)消息調(diào)度服務(wù)
        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
        //6、啟動(dòng)reputMessageService,該服務(wù)負(fù)責(zé)將CommitLog中的消息offset記錄到cosumeQueue文件中
        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();
        //7、啟動(dòng)haService,數(shù)據(jù)主從同步的服務(wù)
        this.haService.start();
        //8、對(duì)于新的broker,初始化文件存儲(chǔ)的目錄
        this.createTempFile();
        //9、啟動(dòng)定時(shí)任務(wù)
        this.addScheduleTask();
        this.shutdown = false;
    }

以上就是整個(gè)MessageStore服務(wù)啟動(dòng)的過程,其中有幾項(xiàng)下面解釋一下:

  • 第2步,數(shù)據(jù)寫入文件后,因?yàn)槎嗉?jí)緩存的原因不會(huì)馬上寫到磁盤上,所以會(huì)有一個(gè)單獨(dú)的線程定時(shí)調(diào)用flush,這里是flush consumeQueue文件的。CommitLogIndexFile的也有類似的邏輯,只是不是在這里啟動(dòng)的
  • 第3步,啟動(dòng)CommitLog,CommitLog.start()代碼如下:
    public void start() {
        //加載刷盤服務(wù)
        this.flushCommitLogService.start();
        //storePool flush
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }

FlushCommitLogService,跟第2步類似的,該服務(wù)負(fù)責(zé)將CommitLog的數(shù)據(jù)flush到磁盤,針對(duì)同步刷盤和異步刷盤,有兩種實(shí)現(xiàn)方式
CommitLogService,這個(gè)service只有在采用內(nèi)存池緩存消息的時(shí)候才需要啟動(dòng)。在使用內(nèi)存池的時(shí)候,這個(gè)服務(wù)會(huì)定時(shí)將內(nèi)存池中的數(shù)據(jù)刷新到FileChannel中,這個(gè)我們后面講CommitLog的文章中再詳細(xì)講。

  • 第5步,在consumer的時(shí)候講過,如果消息失敗,broker會(huì)延時(shí)重發(fā)。對(duì)于延時(shí)重發(fā)消息(topic=SCHEDULE_TOPIC_XXXX),這個(gè)服務(wù)負(fù)責(zé)檢查是否有消息到了發(fā)送時(shí)間,到了時(shí)間則從延時(shí)隊(duì)列中取出后重新發(fā)送
  • 第7步,如果是Master,HAService默認(rèn)監(jiān)聽10912端口,接收slave的連接請(qǐng)求,然后將消息推送給slave;如果是Slave,則通過該服務(wù)連接Master接收數(shù)據(jù)
  • 第9步,這里的定時(shí)任務(wù)主要有以下幾個(gè):
  1. 定時(shí)清理過期的commitLog、cosumeQueue和Index數(shù)據(jù)文件, 默認(rèn)文件寫滿后會(huì)保存72小時(shí)
  2. 定時(shí)自檢commitLog和consumerQueue文件,校驗(yàn)文件是否完整。主要用于監(jiān)控,不會(huì)做修復(fù)文件的動(dòng)作。
  3. 定時(shí)檢查commitLog的Lock時(shí)長(因?yàn)樵趙rite或者flush時(shí)侯會(huì)lock),如果lock的時(shí)間過長,則打印jvm堆棧,用于監(jiān)控。

以上就是整個(gè)啟動(dòng)的過程了,后續(xù)的文章開始講解Broker是怎樣接收Producer消息,還有怎樣將消息交給Consumer的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容