RocketMQ源碼-同步和異步刷盤


1 概述
2 相關(guān)類介紹
3 同步刷盤原理
4 異步刷盤

1 概述

RocketMQ和其他存儲系統(tǒng)類似,如Redis等,提供了同步和異步兩種刷盤方式,同步刷盤方式能夠保證數(shù)據(jù)被寫入硬盤,做到真正的持久化,但是也會讓系統(tǒng)的寫入速度受制于磁盤的IO速度;而異步刷盤方式在將數(shù)據(jù)寫入緩沖之后就返回,提供了系統(tǒng)的IO速度,卻存在系統(tǒng)發(fā)生故障時未來得及寫入硬盤的數(shù)據(jù)丟失的風(fēng)險。

RocketMQ提供了SYNC_FLUSHASYNC_FLUSH兩種方式,也即同步和異步刷盤方式,同步刷盤在寫入消息后會等待刷盤進(jìn)度大于等于當(dāng)前寫入經(jīng)度之后返回,而異步刷盤則在寫入消息之后直接返回,不再等待刷盤進(jìn)度。

在閱讀本文前可先看文章RocketMQ源碼-MappedFile介紹,了解其中介紹的暫存池相關(guān)原理以及具體刷盤操作時commitflush動作的區(qū)別,本文在介紹刷盤時則不再贅述。

其實(shí)同步刷盤、異步刷盤和我們在文章RocketMQ源碼-主從同步復(fù)制和異步復(fù)制介紹的同步復(fù)制、異步復(fù)制原理基本相同,同步刷盤也是阻塞等待當(dāng)前刷盤進(jìn)度大于等于此次寫入進(jìn)度然后返回,而異步刷盤寫入之后直接返回,由后臺線程定時進(jìn)行刷盤動作。

2 相關(guān)類介紹

  • GroupCommitService

如果配置的刷盤方式為同步方式,即SYNC_FLUSH,那么根據(jù)我們在文章RocketMQ源碼-MappedFile介紹第8節(jié)介紹的注意事項(xiàng)可知,該配置肯定不會啟用MappedFile的暫存池TransientStorePool功能。而GroupCommitService就是用于同步刷盤時進(jìn)行實(shí)際的刷盤動作。

  • FlushRealTimeService

用于沒有啟用暫存池的異步刷盤動作,主要是定時觸發(fā)flush動作。

  • CommitRealTimeService

用于啟用了暫存池的異步刷盤動作,和FlushRealTimeService不同的是,CommitRealTimeService在刷盤時會先將從暫存池借用的ByteBuffer中的數(shù)據(jù)commitfileChannel中,然后調(diào)用flushfileChannel進(jìn)行刷盤操作。

3 同步刷盤原理

CommitLog.putMessage在寫入消息之后,會調(diào)用handleDiskFlush進(jìn)行刷盤相關(guān)處理,該方法實(shí)現(xiàn)如下:

//CommitLog
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    //如果是同步刷盤,則進(jìn)入阻塞等待刷盤進(jìn)度大于當(dāng)前寫入進(jìn)度
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        //獲取負(fù)責(zé)刷盤的服務(wù),根據(jù)上面相關(guān)類介紹,如果是同步刷盤
        //方式此處獲取的服務(wù)為GroupCommitService類實(shí)例
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            //這個和主從Broker同步復(fù)制使用同樣的類,該類主要記錄
            //了當(dāng)前寫入后需要等待刷盤的進(jìn)度,只有達(dá)到該進(jìn)度才
            //從阻塞中返回
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            //等待同步刷盤任務(wù)完成或發(fā)生失敗
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush
    else {
        //如果是異步刷盤,則喚醒相關(guān)的服務(wù),這里根據(jù)是否啟用了暫存
        //池調(diào)用不同的服務(wù)進(jìn)行后臺刷盤動作
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

相關(guān)的服務(wù)源碼比較簡單,和主從同步復(fù)制及其類似,這里不再介紹,建議閱讀文章RocketMQ源碼-主從同步復(fù)制和異步復(fù)制做對比理解。

4 異步刷盤

異步刷盤則寫入消息之后直接返回,由ServiceThread實(shí)現(xiàn)類FlushRealTimeService以及CommitRealTimeService在后臺根據(jù)配置的刷盤頻率進(jìn)行異步刷盤,FlushRealTimeService對未啟用暫存池的MappedFile進(jìn)行刷盤,而CommitRealTimeService則對啟用了暫存池的MappedFile進(jìn)行刷盤。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容