【HDFS】EC decode過程源碼閱讀

通過本文可以獲得如下知識(shí):

  • StripeReader#readStripe源碼邏輯。
  • readDataForDecoding、readParityChunks方法源碼詳細(xì)分析。
  • prepareDecodeInputs、prepareParityChunk方法源碼詳細(xì)分析。
  • readChunk、getNextCompletedStripedRead方法源碼詳細(xì)分析。
  • decode方法簡要邏輯介紹

一、背景

讀EC文件的時(shí)候,假設(shè)我們使用RS(k,m)的算法,則最多允許m個(gè)data cell的讀取失敗,然后可以通過k-m個(gè)數(shù)據(jù)塊和m個(gè)校驗(yàn)塊對(duì)丟失的(missing)data cell進(jìn)行解碼還原。

本文就是介紹EC讀流程中,如果遇到數(shù)據(jù)塊missing的情況下的EC解碼(decode)過程。主要就是對(duì)StripeReader#readStripe方法及其內(nèi)部調(diào)用的關(guān)鍵方法進(jìn)行源碼解析。

二、StripeReader#readStripe整體流程

為什么要從這個(gè)readStripe方法開始呢? 因?yàn)樵?code>StripedInputStream的read的方法內(nèi)部,有兩個(gè)用來讀stripe的方法:
DFSStripedInputStream#fetchBlockByteRangeDFSStripedInputStream#readOneStripe,內(nèi)部就是使用readStripe方法來對(duì)條帶數(shù)據(jù)進(jìn)行讀取。

StripeReader#readStripe方法的主要功能:

/**
* read the whole stripe. do decoding if necessary。
* 翻譯:讀取整個(gè)條帶、如果有需要的話需要做解碼操作(data chunk miss掉的情況,需要讀parity校驗(yàn)塊,然后解碼出miss掉的數(shù)據(jù)塊)。
**/

readStripe方法代碼如下,這里我先把所有代碼粘貼出來并給出大概的注釋,后面我們會(huì)對(duì)readStripe方法按步驟進(jìn)行拆解并詳細(xì)注釋。

  /**
   * read the whole stripe. do decoding if necessary
   */
  void readStripe() throws IOException {
    // 第一部分:讀數(shù)據(jù)塊,如果讀失敗了則把missingChunksNum + 1,記錄一下missing的塊數(shù)
    for (int i = 0; i < dataBlkNum; i++) {
      if (alignedStripe.chunks[i] != null &&
          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
        if (!readChunk(targetBlocks[i], i)) {
          alignedStripe.missingChunksNum++;
        }
      }
    }
    // There are missing block locations at this stage. Thus we need to read
    // the full stripe and one more parity block.
    // 第二部分:如果missingChunk的個(gè)數(shù)大于0,則我們就需要讀整個(gè)條帶和對(duì)應(yīng)個(gè)數(shù)的校驗(yàn)塊。
    if (alignedStripe.missingChunksNum > 0) {
      checkMissingBlocks();
      // 構(gòu)造decodeInputs這個(gè)變量的data chunk部分。
      // 從不同datanode上讀取條帶中的data chunk數(shù)據(jù)。
      readDataForDecoding();
      // read parity chunks
      // 構(gòu)造decodeInputs這個(gè)變量的parity chunk部分。
      // 從不同datanode上讀取條件中的parity chunk數(shù)據(jù)
      readParityChunks(alignedStripe.missingChunksNum);
    }
    // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks

    // Input buffers for potential decode operation, which remains null until
    // first read failure
    // 在上面readChunk時(shí)或者readParityChunks里,會(huì)創(chuàng)建一些異步的讀任務(wù)加到futures這個(gè)Map里,這里就是去異步的獲取每一個(gè)讀請(qǐng)求的狀態(tài)的。
    while (!futures.isEmpty()) {
      try {
        //  獲取下一個(gè)已經(jīng)完成的Striped Read任務(wù)結(jié)果。
        // 這個(gè)方法內(nèi)部會(huì)去調(diào)用Java的CompletionService框架的take和get方法,如果遇到讀任務(wù)異常,會(huì)catch住ExecutionException異常,然后返回的讀結(jié)果狀態(tài)是FAILED
        StripingChunkReadResult r = StripedBlockUtil
            .getNextCompletedStripedRead(service, futures, 0);
        dfsStripedInputStream.updateReadStats(r.getReadStats());
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
              + alignedStripe);
        }
        StripingChunk returnedChunk = alignedStripe.chunks[r.index];
        Preconditions.checkNotNull(returnedChunk);
        Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);

        if (r.state == StripingChunkReadResult.SUCCESSFUL) {
          returnedChunk.state = StripingChunk.FETCHED;
          alignedStripe.fetchedChunksNum++;
          updateState4SuccessRead(r);
          // fetch到的chunk數(shù)等于數(shù)據(jù)塊數(shù),證明讀取未發(fā)生異常。
          if (alignedStripe.fetchedChunksNum == dataBlkNum) {
            clearFutures();
            break;
          }
        } else {
          // 如果讀請(qǐng)求發(fā)生了異常,返回的讀結(jié)果狀態(tài)就是FAILED,就會(huì)走到這個(gè)else里
          returnedChunk.state = StripingChunk.MISSING;
          // close the corresponding reader
          dfsStripedInputStream.closeReader(readerInfos[r.index]);

          final int missing = alignedStripe.missingChunksNum;
          alignedStripe.missingChunksNum++;
          checkMissingBlocks();
          // 有異常的時(shí)候,需要讀數(shù)據(jù)塊進(jìn)行解碼。準(zhǔn)備解碼所需的decodeInput數(shù)組的數(shù)據(jù)塊部分。  
          readDataForDecoding();
          // 有異常的時(shí)候,需要讀parity塊進(jìn)行解碼。準(zhǔn)備解碼所需的decodeInput數(shù)組的parity塊部分。  
          readParityChunks(alignedStripe.missingChunksNum - missing);
        }
      } catch (InterruptedException ie) {
        String err = "Read request interrupted";
        DFSClient.LOG.error(err);
        clearFutures();
        // Don't decode if read interrupted
        throw new InterruptedIOException(err);
      }
    }

    if (alignedStripe.missingChunksNum > 0) {
      decode();
    }
  }

接下來拆解readStripe方法:

還有 71% 的精彩內(nèi)容
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
支付 ¥1.50 繼續(xù)閱讀

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容