本章節(jié),我們主要從rmq的broker啟動(dòng)后,會(huì)如何初始化【業(yè)務(wù)消息】、【邏輯位移索引】以及【key查詢索引】這三種消息來(lái)分析底層實(shí)現(xiàn)。
直接上源碼,存儲(chǔ)相關(guān)的初始化的入口:
public boolean load() {
boolean result = true;
try {
//step1
//通過(guò)abort文件判斷上次rmq上次退出是否正常,如果正常退出,就可以把a(bǔ)bort文件刪除,即不存在。
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
...
//step2
// load Commit Log,加載消息存儲(chǔ)映射文件
result = result && this.commitLog.load();
// load Consume Queue,加載邏輯消費(fèi)隊(duì)列
result = result && this.loadConsumeQueue();
if (result) {
//step3
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//step4
//加載索引文件
this.indexService.load(lastExitOK);
//step5
this.recover(lastExitOK);
}
}
...
return result;
}
我們先來(lái)梳理一下上述的加載流程:
step1,通過(guò)abort文件判斷上次rmq上次退出是否正常,如果正常退出,就可以把a(bǔ)bort文件刪除,即不存在。否則,我們需要通過(guò)快照方式恢復(fù)這些存儲(chǔ)文件。
step2,我們依次初始化業(yè)務(wù)消息存儲(chǔ)文件(commitLog.load())以及邏輯位移存儲(chǔ)文件(this.loadConsumeQueue())對(duì)應(yīng)的連續(xù)映射文件抽象。
step3,加載存儲(chǔ)快照,里面存儲(chǔ)上述三類存儲(chǔ)文件的最后一次完整刷盤(pán)的時(shí)間戳。里面有三個(gè)long類型的字段屬性,分別是physicMsgTimestamp業(yè)務(wù)消息存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳,logicsMsgTimestamp邏輯位移存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳 以及 indexMsgTimestampkey查詢索引存儲(chǔ)文件最后一次完整刷盤(pán)時(shí)間戳。
step4,加載key查詢索引存儲(chǔ)文件。這里有可能使用快照方式。結(jié)合步驟三,我們先大致說(shuō)一下使用快照方式的流程。這里以【key查詢索引】存儲(chǔ)文件為例?!緆ey查詢索引】d刷盤(pán)形式為,直到上一個(gè)indexFile 滿了以后,才開(kāi)線程異步刷盤(pán)。每次【key查詢索引】存儲(chǔ)文件刷盤(pán)后,同時(shí)會(huì)更新 indexMsgTimestamp屬性。然后,在某個(gè)【key查詢索引】存儲(chǔ)文件刷盤(pán)的時(shí)間點(diǎn)上,broker突然宕機(jī)了,那么在重啟恢復(fù)時(shí),就會(huì)根據(jù)存儲(chǔ)文件的文件頭屬性endTimestamp(該屬性是每構(gòu)建一條索引時(shí),均會(huì)更新)與存儲(chǔ)快照屬性indexMsgTimestamp作比較,把文件頭屬性endTimestamp大于indexMsgTimestamp的文件全都刪除。
step5,加載部分業(yè)務(wù)消息以及邏輯位移索引值pagecache中。
上述的步驟1-4都比較簡(jiǎn)單,讀者可以自行根據(jù)描述自行分析,我們重點(diǎn)分析一下step5:
private void recover(final boolean lastExitOK) {
//step1.加載 邏輯物理位至pagecache
this.recoverConsumeQueue();
//step2.加載業(yè)務(wù)消息至pagecache
if (lastExitOK) {
this.commitLog.recoverNormally();
} else {
this.commitLog.recoverAbnormally();
}
//step3.加載TopicQueueTable到內(nèi)存中
this.recoverTopicQueueTable();
}
接下來(lái),我們將從三個(gè)步驟分析recover(...):
1、加載 邏輯物理位至pagecache
2、加載業(yè)務(wù)消息至pagecache
3、加載TopicQueueTable到內(nèi)存中
1、加載 邏輯物理位至pagecache
private void recoverConsumeQueue() {
//遍歷所有topic
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
//遍歷所有queueId
for (ConsumeQueue logic : maps.values()) {
logic.recover();
}
}
}
這里主要是遍歷所有的topic-queueId 為維度下,加載對(duì)應(yīng)的業(yè)務(wù)視圖消費(fèi)隊(duì)列ConsumeQueue,我們接著進(jìn)入logic.recover():
public void recover() {
//h獲取邏輯位移連續(xù)映射文件抽象
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//這里說(shuō)明初始化時(shí),最多只加載最新三個(gè)文件內(nèi)容至pagecache中
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
//讀取一條完整的 邏輯位移索引
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
//這里只需要保證讀完整一條
if (offset >= 0 && size > 0) {
//更新單個(gè)文件的處理位置
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
//基于消息時(shí)遞增的,因此每次讀取的offset均可以認(rèn)為是最大的物理位移
this.maxPhysicOffset = offset;
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {
//代碼走到這里,說(shuō)明這個(gè)索引文件已處理完,需滾到到下一個(gè)文件。
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
+ mappedFile.getFileName());
break;
} else {
//繼續(xù)獲取下一個(gè)邏輯位移索引映射文件
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
//更新方法內(nèi)全局處理位置
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
總結(jié)一下上述代碼段邏輯:
總的來(lái)說(shuō),就是最多只加載最新三個(gè)文件的邏輯位移內(nèi)容至pagecache中,遍歷這三個(gè)文件,然后逐條讀取 【邏輯位移索引】字節(jié)內(nèi)容。
其中,while(true)循環(huán)就是逐條讀取 【邏輯位移索引】。
這里,僅僅通過(guò)if (offset >= 0 && size > 0)這個(gè)判斷,即可認(rèn)為這條【邏輯位移索引】是完整的?我認(rèn)為該判斷是有誤的。首先,我們知道直接內(nèi)存映射的刷盤(pán)操作并不是原子性,如果在刷盤(pán)過(guò)程中,機(jī)子突然宕機(jī)了,就無(wú)法將pagecache中的字節(jié)內(nèi)容完整的保存在磁盤(pán)中。另外,在broker啟動(dòng)過(guò)程中,是對(duì)整個(gè)【邏輯位移索引】的存儲(chǔ)文件做直接內(nèi)存映射,換言之,對(duì)于沒(méi)寫(xiě)入內(nèi)容的空白存儲(chǔ)部分,例如執(zhí)行byteBuffer.getLong(),讀取pagecache中下一個(gè)offset(8字節(jié),64位),但出于宕機(jī)的原因,只將offset前面63位存放到磁盤(pán)中,因此,會(huì)以‘0’作為補(bǔ)充位 來(lái)補(bǔ)充缺失的字段。這樣一來(lái),byteBuffer.getLong()操作讀取到的offset即使是大于0,也可能是不準(zhǔn)確的。但是,作為offset的相鄰存儲(chǔ)字段size ,如果size > 0,雖然不能保證讀取到的size是完整的字節(jié)內(nèi)容,卻可以保證offset字段是完整的。因此,如果通過(guò)該判斷僅僅能保證offset是正確的,去無(wú)法保證size 與tagsCode字段正確。
假設(shè),讀取到一條正確的【邏輯位移索引】后,由于業(yè)務(wù)消息的物理存儲(chǔ)位移是遞增的,因此每次從【邏輯位移索引】讀取的offset字段均可以認(rèn)為是最大的物理位移,可以直接更新至ConsumeQueue的業(yè)務(wù)消息最大物理位移實(shí)例屬性this.maxPhysicOffset。
而循環(huán)內(nèi)的mappedFileOffset表明一個(gè)存儲(chǔ)文件的當(dāng)前處理位置,每讀取一條【邏輯位移索引】,均加上20,對(duì)應(yīng)代碼mappedFileOffset = i + CQ_STORE_UNIT_SIZE 。當(dāng)mappedFileOffset到達(dá)存儲(chǔ)文件的尾部是,即 if (mappedFileOffset == mappedFileSizeLogics),則說(shuō)明這個(gè)索引文件已處理完,需滾到到下一個(gè)文件。
而processOffset代表連續(xù)存儲(chǔ)索引文件的全局處理位置,因此,每次處理完一個(gè)完整的存儲(chǔ)文件時(shí),均需要 processOffset = mappedFile.getFileFromOffset()更新全局處理位置。
當(dāng)循環(huán)結(jié)束后,更新最后一個(gè)文件的處理位置processOffset += mappedFileOffset。
當(dāng)然,為了容錯(cuò)處理,還需要清除一些臟數(shù)據(jù),我們直接分析this.mappedFileQueue.truncateDirtyFiles(processOffset):
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
//獲取文件尾部的物理位移
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
//校正文件的緩存讀寫(xiě)位置
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//代碼走到這里,說(shuō)明本文件是需要?jiǎng)h除的臟文件。
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
先說(shuō)一下入?yún)?code>offset,其含義是目前最大的【邏輯位移索引】?jī)?chǔ)物理位移。清除臟文件的邏輯如下,遍歷目前所有的【邏輯位移索引】存儲(chǔ)文件,然后比較offset與文件尾部的存儲(chǔ)物理位移,如果offset大,則認(rèn)為該文件正常;如果小于,則繼續(xù)與文件開(kāi)始的存儲(chǔ)物理位移(file.getFileFromOffset()),如果offset還比文件開(kāi)始的存儲(chǔ)物理位移還要小,則說(shuō)明該文件可以刪除了;否則,校正文件的緩存讀寫(xiě)位置。
到這里就分析完【邏輯位移索引】載入pagecache流程
2、加載業(yè)務(wù)消息至pagecache
這一步,我們?cè)诜謨煞N情況去分析,一種是正常的加載業(yè)務(wù)消息的方式,另一種是通過(guò)快照方式加載。
首先,如果標(biāo)志位lastExitOK位true,說(shuō)明該broker可以正常退出,則按正常方式,即 this.commitLog.recoverNormally():
public void recoverNormally() {
//default :true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
}
上述的恢復(fù)流程與處理方式基本與【邏輯位移索引】載入pagecache的流程一致,都是連續(xù)加載最多3個(gè)最新的存儲(chǔ)文件內(nèi)容,只不過(guò),加載的內(nèi)容則是具體的業(yè)務(wù)消息字節(jié)內(nèi)容而已,即DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)。之前在構(gòu)建【邏輯位移索引】有分析過(guò)this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)這個(gè)方法,這里,即把業(yè)務(wù)消息字節(jié)按照存儲(chǔ)格式加載至pagecache中,只不過(guò)這里需要對(duì)內(nèi)容進(jìn)行crc檢測(cè)。
我們?cè)趤?lái)看看通過(guò)快照形式加載,即進(jìn)入this.commitLog.recoverAbnormally():
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
//step1、判斷從哪個(gè)映射文件開(kāi)始恢復(fù),即從指定的開(kāi)始文件到結(jié)束都需要通過(guò)快照恢復(fù)
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//step2、讀取一條完整的消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (size > 0) { //說(shuō)明該條消息是一條完整的消息
mappedFileOffset += size;
//
//嘗試去構(gòu)建索引,因?yàn)橛锌赡芩饕汛嬖?,如果存在,則跳過(guò)。
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else if (size == -1) {
//代碼走到這里,說(shuō)明讀取到了一條不完整的消息
log.info("recover physics file end, " + mappedFile.getFileName());
break;
} else if (size == 0) {
//滾動(dòng)到下一個(gè)待處理的文件
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last maped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
//step3、
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
//Clear ConsumeQueue redundant data(清除多余的邏輯位移索引數(shù)據(jù))
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
} else {
//代碼走到這里,說(shuō)明所有的業(yè)務(wù)文件都刪除了,因此,對(duì)應(yīng)的位移索引文件也需要
//全部清空
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
分析一下上述代碼流程??梢钥闯?,不管是業(yè)務(wù)視圖消費(fèi)隊(duì)列ConsumeQueue的恢復(fù),或是【邏輯位移索引】恢復(fù),還是業(yè)務(wù)消息的正常加載,他們的處理流程基本上都一致,而業(yè)務(wù)消息的非正常加載,也是采取了相同的處理流程。
對(duì)于業(yè)務(wù)消息的非正常加載,step1中先找出需要進(jìn)行快照恢復(fù)的業(yè)務(wù)消息存儲(chǔ)映射文件開(kāi)始索引位置(List 中的index),然后遍歷所有大于或等于該索引的業(yè)務(wù)消息存儲(chǔ)映射文件(只有最后一個(gè)內(nèi)存映射文件所存儲(chǔ)的消息才有可能不完整)。
step2滾動(dòng)讀取映射文件中的每條消息,在判斷返回的讀取字節(jié)大小size,如果size大于0,說(shuō)明該條消息是一條完整的消息,然后嘗試去構(gòu)建索引,因?yàn)橛锌赡芩饕汛嬖?,如果存在,則跳過(guò)。如果size等于-1,說(shuō)明讀取到了一條不完整的消息,進(jìn)一步說(shuō)明,從上一條完整的消息的存儲(chǔ)物理位移的尾部開(kāi)始,往后的存儲(chǔ)字節(jié)內(nèi)容我們均可認(rèn)為是無(wú)效的,因此可以重寫(xiě)這部分的存儲(chǔ)。如果size等于0,這完成遍歷,跳出循環(huán),否則,滾動(dòng)到下一個(gè)映射文件。
step3中,processOffset記錄最大的完整業(yè)務(wù)消息存儲(chǔ)物理位移,再更新全局的pagecache讀寫(xiě)位移(mappedFileQueue.setFlushedWhere(processOffset)以及mappedFileQueue.setCommittedWhere(processOffset))。
而mappedFileQueue.truncateDirtyFiles(processOffset)則是清除臟數(shù)據(jù)文件,清除邏輯這里在總結(jié)一下,如果映射文件的存儲(chǔ)起始位置大于processOffset,則直接刪除該文件。如果processOffset位于映射文件的起始位置以及結(jié)束位置之間,則直接更新本文件的讀寫(xiě)位置,值同樣為processOffset。最后的this.defaultMessageStore.truncateDirtyLogicFiles(processOffset)則是根據(jù)processOffset,清除那些與之對(duì)應(yīng)的業(yè)務(wù)消息存儲(chǔ)物理位移大于processOffset的邏輯位移索引。由于邏輯比較簡(jiǎn)單,這里就不在詳細(xì)展開(kāi),有興趣的讀者可以自行分析。
這里,我們?cè)龠M(jìn)一步分析分第一步中,是如何斷從哪個(gè)映射文件開(kāi)始恢復(fù),跟進(jìn)this.isMappedFileMatchedRecover(mappedFile):
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//讀取映射文件中,第一條業(yè)務(wù)消息的的存儲(chǔ)時(shí)間
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
return false;
}
//根據(jù)條件,消息存儲(chǔ)時(shí)間與快照時(shí)間比較,決定改文件是否正常恢復(fù)
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
根據(jù)上述代碼邏輯,首先,獲取本映射文件的第一條存儲(chǔ)消息的存儲(chǔ)時(shí)間戳(即寫(xiě)入pagecache的時(shí)間刻度)storeTimestamp。這里,我們假設(shè)允許【key查詢索引】開(kāi)啟,即this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()為true,并且this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()也為true(要求【key查詢索引】可以安全查詢),則storeTimestamp與this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()比較,在來(lái)看看getMinTimestampIndex():
public long getMinTimestampIndex() {
return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
}
public long getMinTimestamp() {
long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
// fixed https://github.org/apache/rocketmqissues/467
min -= 1000 * 3;
if (min < 0)
min = 0;
return min;
很明顯,getMinTimestampIndex()返回的是indexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值。
其中,
physicMsgTimestamp代表最后一條刷盤(pán)成功的【業(yè)務(wù)消息】的storeTimestamp(寫(xiě)入pagecache中的時(shí)間戳);
logicsMsgTimestamp代表最后一條刷盤(pán)成功的【邏輯位移索引】所對(duì)應(yīng)【業(yè)務(wù)消息】的storeTimestamp;
indexMsgTimestamp代表最后一條刷盤(pán)成功的【key查詢索引】所對(duì)應(yīng)【業(yè)務(wù)消息】的storeTimestamp。
換言之,如果storeTimestamp小于getMinTimestampIndex(),方法isMappedFileMatchedRecover(...)則返回true否則返回false。我們?cè)趤?lái)結(jié)合外層的非正?;謴?fù)方法recoverAbnormally()
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
//step1、判斷從哪個(gè)映射文件開(kāi)始恢復(fù),即從指定的開(kāi)始文件到結(jié)束都需要通過(guò)快照恢復(fù)
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
break;
}
}
...
}
從for (; index >= 0; index--)循環(huán),我們可以看出是從this.mappedFileQueue.getMappedFiles()(連續(xù)存儲(chǔ)文件集合)的尾部開(kāi)始遍歷,如果this.isMappedFileMatchedRecover(mappedFile)為true,則跳出循環(huán),換言之,index記錄的是開(kāi)始恢復(fù)的索引位置。如何理解結(jié)束循環(huán)的條件是storeTimestamp與indexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比較,而不是storeTimestamp與physicMsgTimestamp比較??
原因如下:說(shuō)原因之前,我們先來(lái)分析一個(gè)業(yè)務(wù)場(chǎng)景。假設(shè)broker第一次啟動(dòng)并始化,之前沒(méi)有接受過(guò)任何producer的消息發(fā)送請(qǐng)求,并且消息時(shí)同步刷盤(pán)。突然間,并發(fā)上來(lái)了,一下子寫(xiě)滿了兩個(gè)文件,現(xiàn)在在寫(xiě)第三個(gè)文件,并且是該文件的第三個(gè)消息,還沒(méi)刷盤(pán),由于消息時(shí)同步刷盤(pán)的,因此我們可以知道前兩個(gè)文件以及第三個(gè)文件的前兩個(gè)消息均能完整落地。另一方面,根據(jù)之前的章節(jié)中,我們知道【邏輯位移索引】以及【key查詢索引】的構(gòu)建以及刷盤(pán)均有單獨(dú)的線程異步進(jìn)行的。在我們假設(shè)的場(chǎng)景中,【邏輯位移索引】以及【key查詢索引】都沒(méi)有對(duì)應(yīng)的線程都沒(méi)有進(jìn)行刷盤(pán)。就在這時(shí),機(jī)器突然宕機(jī)了,最后一個(gè)消息至刷盤(pán)了一半。因此,我們?cè)诎凑辗钦5幕謴?fù)流程中,則需要根據(jù)storeTimestamp與indexMsgTimestamp、physicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比較,找出最早那條已經(jīng)構(gòu)建索引失敗的業(yè)務(wù)消息。并根據(jù)這條業(yè)務(wù)消息所在的映射文件,滾動(dòng)構(gòu)建這條消息以及之后存儲(chǔ)的消息的索引。對(duì)于最后一個(gè)文件的第三條消息,由于無(wú)法完整刷盤(pán),因此,需要截掉這部分不完整的消息。
到這里,我們已近分析完業(yè)務(wù)消息正常或非正常的恢復(fù)流程。
3、加載TopicQueueTable到內(nèi)存中
我們先來(lái)看看CommitLog的實(shí)例屬性topicQueueTable:
private HashMap<String/* topic-queueid /, Long/ offset */> topicQueueTable = new HashMap<String, Long>(1024);
從數(shù)據(jù)結(jié)構(gòu)中我們可以看出,它是一個(gè)HashMap, key為topic-queueId,value為消息位移。我們可以通過(guò)topic-queueId ,確定一條隊(duì)列目前最大的可消費(fèi)位移。這里簡(jiǎn)單在說(shuō)一下,通過(guò)offset * CQ_STORE_UNIT_SIZE(【邏輯位移索引大小】),可以算出具體的【邏輯位移索引】的物理存儲(chǔ)位移,然后在通過(guò)【邏輯位移索引】,查詢出具體的業(yè)務(wù)消息內(nèi)容。
我們?cè)趤?lái)分析是如何加載TopicQueueTable到內(nèi)存中recoverTopicQueueTable():
private void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
//獲取未刪除,有效并且最舊的消息的開(kāi)始位置
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
//logic.getMaxOffsetInQueue():這里獲取當(dāng)前隊(duì)列最大的邏輯位移(即已經(jīng)持久化的最大的位置索引消息)
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
分析一下上述流程:在步驟一 recoverConsumeQueue()中,我們已近分析了consumeQueueTable的加載流程。而TopicQueueTable主要還是依據(jù)consumeQueueTable來(lái)加載。
從代碼中,我們可以看出,遍歷所有的ConsumeQueue,即可得到 對(duì)應(yīng)的最大消費(fèi)位移logic.getMaxOffsetInQueue():
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
其中, getMaxOffset()的語(yǔ)義為返回最后一個(gè)映射文件的最新可讀位置,換言之,即最后一條存儲(chǔ)內(nèi)容的物理位移,最后在通過(guò)`this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE即可算出最大消費(fèi)位移。
接著是logic.correctMinOffset(minPhyOffset),該方法主要是校正ConsumeQueue的實(shí)例屬性minLogicOffset(目前該隊(duì)列可消費(fèi)最小【邏輯位移索引】的 物理位移),之前已近分析過(guò)該方法,這里就不在詳細(xì)展開(kāi)。
到這里,TopicQueueTable的加載流程全部分析完
以上就是存儲(chǔ)恢復(fù)的底層實(shí)現(xiàn)細(xì)節(jié)。