經(jīng)過前面四個章節(jié)的分析,我們已近知道rocket mq 主要有三類消息的持久化,分別是【業(yè)務(wù)消息】、【邏輯位移索引】以及【key查詢索引】,它們均會以文件形式落地到磁盤。但我們想一下,磁盤的容量是有限的,總不可能一直把這些消息存放于磁盤中。因此,接下來,我們來分析rmq是如何執(zhí)行持久化文件的清除策略。
為了讓讀者有一個全局認識,先從總體概括一下持久化文件的清除策略。
首先,對于【業(yè)務(wù)消息】持久化文件來說,如果【業(yè)務(wù)消息】存儲文件的存儲容量到達了所占的磁盤分區(qū)空間使用百分比或者存儲時間到期了,就會進行刪除。而【邏輯位移索引】以及【key查詢索引】會根據(jù)【業(yè)務(wù)消息】存儲文件刪除的物理位移,在進行刪除。而刪除邏輯由定時任務(wù)定時執(zhí)行。
定時任務(wù)在broker啟動時,進行注冊的:

其中,
cleanFilesPeriodically()就是清除邏輯的入口
private void cleanFilesPeriodically() {
//1、清除業(yè)務(wù)消息持久化文件
this.cleanCommitLogService.run();
//2、清除索引文件
this.cleanConsumeQueueService.run();
}
根據(jù)上述代碼片段,接下來會分兩大步去分析rmq清除流程
1、清除業(yè)務(wù)消息持久化文件
2、清除索引文件
1、清除業(yè)務(wù)消息持久化文件
this.cleanCommitLogService.run()該邏輯是委托CleanCommitLogService實例類完成的,跟進run()方法:
public void run() {
try {
//刪除過期文件,物理刪除
this.deleteExpiredFiles();
//刪除掛載文件,內(nèi)存刪除, 該步驟刪除因deleteExpiredFiles()還沒刪除成功的文件
this.redeleteHangedFile();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
業(yè)務(wù)消息的存儲文件通過兩步刪除,第一步通過deleteExpiredFiles()物理刪除滿足條件的文件,但在刪除過程中,某些文件有可能還在被引用,因此,通過redeleteHangedFile()進一步刪除第一步漏刪除的文件。而redeleteHangedFile()邏輯基本與deleteExpiredFiles()一致,因此,我們只分析deleteExpiredFiles()。
進入this.deleteExpiredFiles()
private void deleteExpiredFiles() {
int deleteCount = 0;
//獲取文件的存活時間,默認為72小時
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
///刪除物理文件的時間間隔 默認為100
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 強制銷毀 MapedFile 間隔時間 默認為 1000 * 120
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 回收硬盤存儲 ,default is at 4 am 可以設(shè)置 為 03:04:05 ,表示3點,4點,五點都可以回收
boolean timeup = this.isTimeToDelete();
//總的來說不管是commitlog(消息存儲文件) 或者是consumequeue(消費進度存儲文件) 各自所占的磁盤分區(qū)空間使用百分比,如果大于75%
//則返回isSpaceToDelete = true ,如果大于85%,就設(shè)置cleanImmediately狀態(tài)位為true
boolean spacefull = this.isSpaceToDelete();
//或者發(fā)起手動刪除也可以
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
//時間到了,或者存儲空間比率到了,又或者手動刪除次數(shù)大于零,都要刪除
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
//是否立刻刪除 cleanFileForciblyEnable == true && cleanImmediately == true;; getMessageStoreConfig().isCleanFileForciblyEnable()默認為true
//cleanImmediately 會在commitlog(消息存儲文件) 或者是consumequeue(消費進度存儲文件)各自所占的磁盤分區(qū)空間使用百分比大于85%設(shè)置為true
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
...
//文件保留時間 1小時
fileReservedTime *= 60 * 60 * 1000;
//刪除消息的持久化文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
...
}
}
從代碼上,我們可以通過 if (timeup || spacefull || manualDelete),只要timeup、spacefull以及manualDelete這三個條件中,有一個滿足,就可以進一步進入刪除邏輯,因此我們簡單分析一下這三個標志位為true時的條件。
首先,標志位timeup,它的語義就是更具用戶設(shè)置的小時數(shù)(默認為04),例如,我們設(shè)置03:04:05,則表示表示3點,4點,5點這整個點數(shù)的時間段都可以回收,則為true。
接著,標志位spacefull,表示所有業(yè)務(wù)消息存儲文件的總大小所占的磁盤分區(qū)空間使用百分比大于指定的配置比例(默認為75%),則為true。
最后,標志位manualDelete,表明如果我們手動設(shè)置了刪除,則為true。
進入 if (timeup || spacefull || manualDelete)判斷以后,在看看標志位cleanAtOnce,該標志位表明是否需要立即對部分業(yè)務(wù)消息存儲文件進行刪除,條件為存儲文件的總大小占的磁盤分區(qū)空間使用百分比大于85%,直到存儲文件的總大小小于85%為止。
最后,在說一個屬性,fileReservedTime,該字段是文件保留時間,默認為72小時。
條件達到后,我們接著進入
commitLog.deleteExpiredFile(...):
public int deleteExpiredFile(//
final long expiredTime, //
final int deleteFilesInterval, //
final long intervalForcibly, //
final boolean cleanImmediately//
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
CommitLog,即業(yè)務(wù)消息持久化抽象,其刪除邏輯委托MappedFileQueue,映射文件連續(xù)存儲抽象完成。繼續(xù)進入mappedFileQueue.deleteExpiredFileByTime(...):
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
//step1,獲取MappedFiles快照
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//存活最大時間戳 = 文件最后一次修改的時間戳(創(chuàng)建時間) + 60 * 60 * 1000 * 72(72小時)
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
//step2,如果當前系統(tǒng)時間 > 存活最大時間戳 或者立刻清除標志位為true
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//step3,嘗試銷毀映射文件
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
...
} else {
break;
}
}
}
}
//step4,從緩存中刪除所有過期,也即上述步驟中,物理刪除成功的MappedFile
deleteExpiredFile(files);
return deleteCount;
}
上述邏輯主要分4個步驟來執(zhí)行刪除流程,step1,第一步,導出一份映射文件集合的快照,然后遍歷快照。遍歷過程中,step2,如果存儲文件符合刪除條件,即文件保留時間到了,或者磁盤空間超過指定百分比,step3,則對映射文件嘗試進行銷毀,如果銷毀成功,則加入內(nèi)存刪除文件集合。step4,最后在從內(nèi)存上刪除銷毀映射文件成功的存儲文件。在step3中,如果嘗試銷毀映射文件成功后,會有一個files.size() >= DELETE_FILES_BATCH_MAX的判斷,如果符合,則結(jié)束銷毀映射文件的遍歷。我們可以思考一下為什么會需要改判斷?。。。
其實原因也很簡單,假如cleanImmediately標志位為true,該標志位就是存儲文件的總大小占的磁盤分區(qū)空間使用百分比大于85%滿足時,如果沒有該判斷,那豈不是會刪除該broker下所有的業(yè)務(wù)消息存儲文件。因此,加上該判斷,確保每次最多只能刪除DELETE_FILES_BATCH_MAX(其值為10) 個存儲文件。下次需要在通過時間,存儲空間等判斷,才會執(zhí)行刪除邏輯。
剛剛提到step3中,為什么是對映射文件進行嘗試銷毀,而不是強制銷毀?我們接著跟進mappedFile.destroy(intervalForcibly)方法:
public boolean destroy(final long intervalForcibly) {
//intervalForcibly設(shè)置,并符合,則釋放所有的引用,并clean(this.mappedByteBuffer)
this.shutdown(intervalForcibly);
//確保釋放了所有的引用即
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
//物理刪除文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
我們繼續(xù)分析一下上述方法流程。首先,通過this.shutdown(intervalForcibly),該方法才是正在嘗試銷毀內(nèi)存映射文件,如果銷毀成功,即this.isCleanupOver()的判斷為true,就會this.fileChannel.close()關(guān)閉文件通道以及this.file.delete()物理刪除該存儲文件。
繼續(xù)進入
this.shutdown(intervalForcibly):
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
在《rocket mq 底層存儲源碼分析(1)-存儲總概》章節(jié)中,我們曾經(jīng)分析過,rmq通過引用計數(shù)法來對內(nèi)存映射文件進行GC,其中this.available標志位代表該內(nèi)存映射文件是否有效,如果改標志位設(shè)為false,則表明該內(nèi)存映射文件已無效,無法再使用。this.getRefCount()代表當前的引用個數(shù)。什么情況下,這個引用個數(shù)會增加呢。例如,我們需要對業(yè)務(wù)消息進行構(gòu)建索引時,我們就需要對消息所在的內(nèi)存文件進行引用,即引用數(shù)加1。
我們不妨假設(shè),此時,該映射文件還在被引用,那么,代碼邏輯肯定會先運行第一個判斷if (this.available),時this.available為false,表明該映射文件無法再被新的操作引用,并記下第一次嘗試回收的時間this.firstShutdownTimestamp。換言之,第一次回收不成功。一直回收到指定的時間間隔后intervalForcibly(默認為兩分鐘),如果原來的引用因某些操作還完成二無法釋放該映射文件的情況下,即代碼中,既滿足else if (this.getRefCount() > 0)條件,又滿足(System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly條件,則會強行使this.refCount.set(-1000 - this.getRefCount())該映射文件的引用為負數(shù),即小于零,最后在根據(jù)release()方法的邏輯,即可執(zhí)行this.cleanup(value)銷毀方法。這里就是通過反射的方式調(diào)用Cleaner.clean,對堆外內(nèi)存的釋放的核心了。
到這里,我們已經(jīng)對業(yè)務(wù)消息存儲文件清除邏輯分析完成。
最后在總結(jié)一下上述流程邏輯,如果業(yè)務(wù)消息存儲文件的總大小占的磁盤分區(qū)空間使用百分比大于85%,則忽視時間條件,從最舊的文件開始,依次刪除該類型文件,直到存儲文件的總大小占的磁盤分區(qū)空間使用百分比大于85%為止。否則,就刪除保留時間超過3天的文件。當然磁盤空間比較充裕的情況下,只會在指定時間段刪除【有業(yè)務(wù)消息存儲文件的總大小所占的磁盤分區(qū)空間使用百分比小于指定的配置比例(默認為75%)】,否則, 會每隔10秒掃描一次。
2、清除索引文件
分析完 清除業(yè)務(wù)消息持久化文件 的流程后,我們直接分析索引文件的清除流程。
進入CleanConsumeQueueService.run():
public void run() {
try {
this.deleteExpiredFiles();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
這里先說一下,業(yè)務(wù)消息存儲文件與索引文件是同步刪除的,并且索引文件刪除是緊跟在業(yè)務(wù)消息存儲文件刪除之后,這樣一來,就可以確保索引文件所刪除的范圍不會超過業(yè)務(wù)消息存儲文件刪除范圍。
繼續(xù)進入
this.deleteExpiredFiles():
private void deleteExpiredFiles() {
//消費隊列物理文件刪除間隔 默認為100
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
//獲取 最小 offset = commitLog.mappedFileQueue.getFirstMappedFile().getFileFromOffset()
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
int deleteCount = logic.deleteExpiredFile(minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
}
}
}
}
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
這里,我們先從總體上說明方法流程:依據(jù)long minOffset = DefaultMessageStore.this.commitLog.getMinOffset()獲取業(yè)務(wù)消息目前最小的物理存儲位移,然后在遍歷所有的ConsumeQueue,把所有小于該minOffset的邏輯位移存儲內(nèi)容中所對應(yīng)的業(yè)務(wù)消息的物理位移,進行刪除或者內(nèi)存校正。怎么理解這句話呢,我們接著進入 logic.deleteExpiredFile(minOffset)分析:
public int deleteExpiredFile(long offset) {
//step1
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
//step2
this.correctMinOffset(offset);
return cnt;
}
先看step1、
mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE):
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
//獲取映射文件最后一個位置的索引
//如果result == null,表明該映射文件還沒有填充完,即不存在下一個位置索引文件
//因此無需刪除當前的位置索引文件。
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
//獲取該位置索引所對應(yīng)的 業(yè)務(wù)消息 開始物理位移
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
//調(diào)用mappedFile.selectMappedBuffer方法時,持有計數(shù)器加1,
//因此,查詢完后,要釋放引用,持有計數(shù)器減1.
result.release();
//如果該位置索引文件的最大 業(yè)務(wù)消息物理位移 都比指定的offset小
//則說明該位置索引文件可以刪除
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
// TODO: Externalize this hardcoded value
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}
整段代碼的刪除過程幾乎和業(yè)務(wù)消息存儲文件的刪除一致,都是存儲時間最早的文件開始遍歷,找出所有可以刪除的存儲文件,先銷毀映射文件,把文件從磁盤移除,最后在從內(nèi)存上移除。唯獨不同點在于復合刪除條件的判斷。我們就分析一下什么情況下,邏輯位移存儲文件需要被刪除。
我們先分析關(guān)鍵代碼SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize),之前在分析構(gòu)建索引的文章已近分析過selectMappedBuffer(long pos)的含義,在根據(jù)this.mappedFileSize - unitSize,可以得出result 的結(jié)果就是該邏輯索引文件最后一條邏輯索引的字節(jié)內(nèi)容。先明確一點,業(yè)務(wù)消息的物理存儲物理位移一定是按照插入順序單調(diào)遞增的,因此,邏輯位移的存儲物理位移也一定是單調(diào)遞增。如果result 為空,從業(yè)務(wù)上則表明該存儲文件還沒滿,因此需要進一步判斷該存儲文件對應(yīng)的映射文件是否有效(對應(yīng)代碼else if (!mappedFile.isAvailable())),如果如果無效則刪除,否則,結(jié)束該次清除流程。
我們在來分析result 不為空的情況下,先來回顧一下一條【邏輯位移索引】的存儲格式,大小20字節(jié),8字節(jié)的業(yè)務(wù)消息存儲物理位移、4字節(jié)業(yè)務(wù)消息總長度 以及 8字節(jié) 的producer端指定消息的tags屬性的hashcode。因此long maxOffsetInLogicQueue = result.getByteBuffer().getLong()獲取的是該【邏輯位移索引】所對應(yīng)的 業(yè)務(wù)消息 開始物理位移。通過destroy = maxOffsetInLogicQueue < offset,即當前【邏輯位移索引】存儲文件的最大業(yè)務(wù)消息物理位移 與 目前業(yè)務(wù)消息存儲文件最小物理位移的的比較,如果后者大于前者,則表明整個存儲文件都需要刪除;否則,則保留。換言之,經(jīng)過上述邏輯遍歷以后,留下來的第一個【邏輯位移索引】存儲文件,一定有一部分的【邏輯位移索引】所關(guān)聯(lián)的【業(yè)務(wù)消息的存儲物理位移】一定大于業(yè)務(wù)消息最小的存儲物理位移minOffset,但也可能導致部分小于minOffset的【邏輯位移索引】存在于索引文件中。
到這里,我們接著分析step2.this.correctMinOffset(offset),看它是如何校正這部分小于minOffset的【邏輯位移索引】:
public void correctMinOffset(long phyMinOffset) {
//step1
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
if (result != null) {
try {
//step2
for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = result.getByteBuffer().getLong();
result.getByteBuffer().getInt();
result.getByteBuffer().getLong();
//step3
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("compute logics min offset: " + this.getMinOffsetInQueue() + ", topic: "
+ this.topic + ", queueId: " + this.queueId);
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
result.release();
}
}
}
}
經(jīng)過一次【邏輯位移索引】存儲文件刪除以后,step1this.mappedFileQueue.getFirstMappedFile(),所獲取的第一個存儲文件,一定有一部分的【邏輯位移索引】所關(guān)聯(lián)的【業(yè)務(wù)消息的存儲物理位移】一定大于業(yè)務(wù)消息最小的存儲物理位移minOffset,但也可能導致部分小于minOffset的【邏輯位移索引】存在于文件中。因此step2中,我們從頭開始遍歷該存儲文件中所有的【邏輯位移索引】,直到找到大于或等于minOffset的【邏輯位移索引】,最后在以該【邏輯位移索引】的物理位移更新至ConsumerQueue實例中的minLogicOffset屬性即可。這樣一來,消費者端就不能消費小于minLogicOffset的業(yè)務(wù)消息了。從而達到校正的效果。
到這里我們已近分析完了【邏輯位移索引】的清除流程。
最后在分析一下【key查詢索引】索引文件的刪除,我們回過來看一下刪除索引文件的入口this.deleteExpiredFiles(),該方法最后一步DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset)就是【key查詢索引】索引文件的刪除清除流程:該流程主要是從最早的索引文件開始遍歷,在根據(jù)索引文件頭的endPhyOffset(即該索引文件所構(gòu)建的最大的業(yè)務(wù)消息物理位移)與minOffset(業(yè)務(wù)消息最小的存儲物理位移) 相比較,如果后者大,則該索引文件需要刪除。由于比較簡單,讀者感興趣的,可自行解讀。
索引文件清除流程分析完成。
以上就是rmq清除持久化文件策略的底層細節(jié)!