1 概述
2 相關(guān)類介紹
3 同步刷盤原理
4 異步刷盤
1 概述
RocketMQ和其他存儲系統(tǒng)類似,如Redis等,提供了同步和異步兩種刷盤方式,同步刷盤方式能夠保證數(shù)據(jù)被寫入硬盤,做到真正的持久化,但是也會讓系統(tǒng)的寫入速度受制于磁盤的IO速度;而異步刷盤方式在將數(shù)據(jù)寫入緩沖之后就返回,提供了系統(tǒng)的IO速度,卻存在系統(tǒng)發(fā)生故障時未來得及寫入硬盤的數(shù)據(jù)丟失的風(fēng)險。
RocketMQ提供了SYNC_FLUSH和ASYNC_FLUSH兩種方式,也即同步和異步刷盤方式,同步刷盤在寫入消息后會等待刷盤進(jìn)度大于等于當(dāng)前寫入經(jīng)度之后返回,而異步刷盤則在寫入消息之后直接返回,不再等待刷盤進(jìn)度。
在閱讀本文前可先看文章RocketMQ源碼-MappedFile介紹,了解其中介紹的暫存池相關(guān)原理以及具體刷盤操作時commit和flush動作的區(qū)別,本文在介紹刷盤時則不再贅述。
其實(shí)同步刷盤、異步刷盤和我們在文章RocketMQ源碼-主從同步復(fù)制和異步復(fù)制介紹的同步復(fù)制、異步復(fù)制原理基本相同,同步刷盤也是阻塞等待當(dāng)前刷盤進(jìn)度大于等于此次寫入進(jìn)度然后返回,而異步刷盤寫入之后直接返回,由后臺線程定時進(jìn)行刷盤動作。
2 相關(guān)類介紹
GroupCommitService
如果配置的刷盤方式為同步方式,即SYNC_FLUSH,那么根據(jù)我們在文章RocketMQ源碼-MappedFile介紹第8節(jié)介紹的注意事項(xiàng)可知,該配置肯定不會啟用MappedFile的暫存池TransientStorePool功能。而GroupCommitService就是用于同步刷盤時進(jìn)行實(shí)際的刷盤動作。
FlushRealTimeService
用于沒有啟用暫存池的異步刷盤動作,主要是定時觸發(fā)flush動作。
CommitRealTimeService
用于啟用了暫存池的異步刷盤動作,和FlushRealTimeService不同的是,CommitRealTimeService在刷盤時會先將從暫存池借用的ByteBuffer中的數(shù)據(jù)commit到fileChannel中,然后調(diào)用flush對fileChannel進(jìn)行刷盤操作。
3 同步刷盤原理
CommitLog.putMessage在寫入消息之后,會調(diào)用handleDiskFlush進(jìn)行刷盤相關(guān)處理,該方法實(shí)現(xiàn)如下:
//CommitLog
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//如果是同步刷盤,則進(jìn)入阻塞等待刷盤進(jìn)度大于當(dāng)前寫入進(jìn)度
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//獲取負(fù)責(zé)刷盤的服務(wù),根據(jù)上面相關(guān)類介紹,如果是同步刷盤
//方式此處獲取的服務(wù)為GroupCommitService類實(shí)例
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//這個和主從Broker同步復(fù)制使用同樣的類,該類主要記錄
//了當(dāng)前寫入后需要等待刷盤的進(jìn)度,只有達(dá)到該進(jìn)度才
//從阻塞中返回
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待同步刷盤任務(wù)完成或發(fā)生失敗
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
//如果是異步刷盤,則喚醒相關(guān)的服務(wù),這里根據(jù)是否啟用了暫存
//池調(diào)用不同的服務(wù)進(jìn)行后臺刷盤動作
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
相關(guān)的服務(wù)源碼比較簡單,和主從同步復(fù)制及其類似,這里不再介紹,建議閱讀文章RocketMQ源碼-主從同步復(fù)制和異步復(fù)制做對比理解。
4 異步刷盤
異步刷盤則寫入消息之后直接返回,由ServiceThread實(shí)現(xiàn)類FlushRealTimeService以及CommitRealTimeService在后臺根據(jù)配置的刷盤頻率進(jìn)行異步刷盤,FlushRealTimeService對未啟用暫存池的MappedFile進(jìn)行刷盤,而CommitRealTimeService則對啟用了暫存池的MappedFile進(jìn)行刷盤。