通過本文可以獲得如下知識(shí):
- StripeReader#readStripe源碼邏輯。
- readDataForDecoding、readParityChunks方法源碼詳細(xì)分析。
- prepareDecodeInputs、prepareParityChunk方法源碼詳細(xì)分析。
- readChunk、getNextCompletedStripedRead方法源碼詳細(xì)分析。
- decode方法簡要邏輯介紹
一、背景
讀EC文件的時(shí)候,假設(shè)我們使用RS(k,m)的算法,則最多允許m個(gè)data cell的讀取失敗,然后可以通過k-m個(gè)數(shù)據(jù)塊和m個(gè)校驗(yàn)塊對(duì)丟失的(missing)data cell進(jìn)行解碼還原。
本文就是介紹EC讀流程中,如果遇到數(shù)據(jù)塊missing的情況下的EC解碼(decode)過程。主要就是對(duì)StripeReader#readStripe方法及其內(nèi)部調(diào)用的關(guān)鍵方法進(jìn)行源碼解析。
二、StripeReader#readStripe整體流程
為什么要從這個(gè)readStripe方法開始呢? 因?yàn)樵?code>StripedInputStream的read的方法內(nèi)部,有兩個(gè)用來讀stripe的方法:
DFSStripedInputStream#fetchBlockByteRange和DFSStripedInputStream#readOneStripe,內(nèi)部就是使用readStripe方法來對(duì)條帶數(shù)據(jù)進(jìn)行讀取。
StripeReader#readStripe方法的主要功能:
/**
* read the whole stripe. do decoding if necessary。
* 翻譯:讀取整個(gè)條帶、如果有需要的話需要做解碼操作(data chunk miss掉的情況,需要讀parity校驗(yàn)塊,然后解碼出miss掉的數(shù)據(jù)塊)。
**/
readStripe方法代碼如下,這里我先把所有代碼粘貼出來并給出大概的注釋,后面我們會(huì)對(duì)readStripe方法按步驟進(jìn)行拆解并詳細(xì)注釋。
/**
* read the whole stripe. do decoding if necessary
*/
void readStripe() throws IOException {
// 第一部分:讀數(shù)據(jù)塊,如果讀失敗了則把missingChunksNum + 1,記錄一下missing的塊數(shù)
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
}
// There are missing block locations at this stage. Thus we need to read
// the full stripe and one more parity block.
// 第二部分:如果missingChunk的個(gè)數(shù)大于0,則我們就需要讀整個(gè)條帶和對(duì)應(yīng)個(gè)數(shù)的校驗(yàn)塊。
if (alignedStripe.missingChunksNum > 0) {
checkMissingBlocks();
// 構(gòu)造decodeInputs這個(gè)變量的data chunk部分。
// 從不同datanode上讀取條帶中的data chunk數(shù)據(jù)。
readDataForDecoding();
// read parity chunks
// 構(gòu)造decodeInputs這個(gè)變量的parity chunk部分。
// 從不同datanode上讀取條件中的parity chunk數(shù)據(jù)
readParityChunks(alignedStripe.missingChunksNum);
}
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
// Input buffers for potential decode operation, which remains null until
// first read failure
// 在上面readChunk時(shí)或者readParityChunks里,會(huì)創(chuàng)建一些異步的讀任務(wù)加到futures這個(gè)Map里,這里就是去異步的獲取每一個(gè)讀請(qǐng)求的狀態(tài)的。
while (!futures.isEmpty()) {
try {
// 獲取下一個(gè)已經(jīng)完成的Striped Read任務(wù)結(jié)果。
// 這個(gè)方法內(nèi)部會(huì)去調(diào)用Java的CompletionService框架的take和get方法,如果遇到讀任務(wù)異常,會(huì)catch住ExecutionException異常,然后返回的讀結(jié)果狀態(tài)是FAILED
StripingChunkReadResult r = StripedBlockUtil
.getNextCompletedStripedRead(service, futures, 0);
dfsStripedInputStream.updateReadStats(r.getReadStats());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+ alignedStripe);
}
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
Preconditions.checkNotNull(returnedChunk);
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
returnedChunk.state = StripingChunk.FETCHED;
alignedStripe.fetchedChunksNum++;
updateState4SuccessRead(r);
// fetch到的chunk數(shù)等于數(shù)據(jù)塊數(shù),證明讀取未發(fā)生異常。
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
clearFutures();
break;
}
} else {
// 如果讀請(qǐng)求發(fā)生了異常,返回的讀結(jié)果狀態(tài)就是FAILED,就會(huì)走到這個(gè)else里
returnedChunk.state = StripingChunk.MISSING;
// close the corresponding reader
dfsStripedInputStream.closeReader(readerInfos[r.index]);
final int missing = alignedStripe.missingChunksNum;
alignedStripe.missingChunksNum++;
checkMissingBlocks();
// 有異常的時(shí)候,需要讀數(shù)據(jù)塊進(jìn)行解碼。準(zhǔn)備解碼所需的decodeInput數(shù)組的數(shù)據(jù)塊部分。
readDataForDecoding();
// 有異常的時(shí)候,需要讀parity塊進(jìn)行解碼。準(zhǔn)備解碼所需的decodeInput數(shù)組的parity塊部分。
readParityChunks(alignedStripe.missingChunksNum - missing);
}
} catch (InterruptedException ie) {
String err = "Read request interrupted";
DFSClient.LOG.error(err);
clearFutures();
// Don't decode if read interrupted
throw new InterruptedIOException(err);
}
}
if (alignedStripe.missingChunksNum > 0) {
decode();
}
}
接下來拆解readStripe方法: