BlockManager源碼分析(1)--PendingReconstructionMonitor

基于hadoop-3.1.0
分享一波大數(shù)據(jù)&Java的學習視頻和書籍:
### Java與大數(shù)據(jù)資源分享

BlockManager類是Namenode端管理數(shù)據(jù)塊的非常重要的類。本文就來對它進行剖析。

首先看BlockManager的構(gòu)造方法被誰調(diào)用:

進到FSNamesystem的構(gòu)造方法中。然后在FSNS類中找調(diào)用成員變量blockManager的地方。發(fā)現(xiàn)只有activate有激活啟動的意思,于是去activate方法:

點進去到BlockManager#activate方法:

果不其然,activate方法里啟動了各種線程,以及注冊metric指標等工作。
從其中調(diào)用的方法來看,啟動了pendingReconstruction,redundancyThread,storageInfoDefragmenterThread,blockReportThread,注冊metric指標,設置blockManager的safemode狀態(tài)等。

那我們接下來的分析任務就是這幾個線程。本篇文章首先分析第一個,pendingReconstruction。其他的挖坑后面學習。
pendingReconstruction是PendingReconstructionBlocks類的對象。

看一下PendingReconstructionBlocks的javadoc:

這個類主要是對數(shù)據(jù)塊進行一些記賬工作。類似于Block可能存放在那個Datanode上這種。

進到PendingReconstructionBlocks類的start方法:

其實最終調(diào)用的是PendingReconstructionMonitor的run方法,點進去看:

PendingReconstructionMonitor是一個周期性的線程,用來掃描沒完成重建數(shù)據(jù)塊請求的數(shù)據(jù)塊。

run方法中調(diào)用了pendingReconstructionCheck()方法:

pendingReconstructionCheck方法的邏輯比較簡單,就是去檢測pendingReconstructions這個數(shù)據(jù)結(jié)構(gòu)里保存的所有數(shù)據(jù)復制請求是不是超時(
默認5分鐘),如果超時了就放到timedOutItems這個ArrayList里。最后更新相關(guān)metric指標。

這里我們遇到了兩個數(shù)據(jù)結(jié)構(gòu):pendingReconstructions,timedOutItems??匆幌逻@兩個數(shù)據(jù)結(jié)構(gòu)是什么樣的:

pendingReconstructions是Map結(jié)構(gòu),key是BlockInfo,value是PendingBlockInfo。

timedOutItems是一個ArrayList,里面存放BlockInfo類型的數(shù)據(jù)。

所以要搞懂這兩個數(shù)據(jù)結(jié)構(gòu)存儲的內(nèi)容是什么,就得去了解BlockInfo和PendingBlockInfo這兩個類。這里簡單概括:BlockInfo保存了一個block屬于哪個文件INode(通過BlockCollection對象)和存儲在那些Datanode里。PendingBlockInfo(數(shù)據(jù)塊復制信息)是對數(shù)據(jù)塊開始復制時間timeStamp、待復制的目標數(shù)據(jù)節(jié)點列表List<DatanodeDescriptor>實例targets的一個封裝。

了解完這兩個數(shù)據(jù)結(jié)構(gòu)的存儲內(nèi)容,我們就來看看什么情況下把數(shù)據(jù)放入這兩個數(shù)據(jù)結(jié)構(gòu)。

1.pendingReconstructions

首先看pendingReconstructions,因為是個Map結(jié)構(gòu),所以我們看看誰調(diào)用了put方法。

點進去是increment方法:

這個方法首先嘗試從pendingReconstructions根據(jù)key獲取PendingBlockInfo,如果沒有,就調(diào)用put。如果發(fā)現(xiàn)了原來有這個復制請求,則更新targets和timestamp。

再來看誰調(diào)用了increment方法:


這里面第一個方法是增量塊匯報相關(guān)的,我們以后再分析,所以我們看第二個方法validateReconstructionWork。講這個方法之前說句題外話,我耐不住性子,一層一層往上點,最后發(fā)現(xiàn)調(diào)用邏輯是在BlockManage的內(nèi)部類RedundancyMonitor的run方法里。所以說,塊待復制請求和冗余監(jiān)測線程有關(guān)系,比如冗余監(jiān)測線程檢測到某些數(shù)據(jù)塊的副本數(shù)不滿足最小副本條件,就會去調(diào)用相關(guān)方法添加塊復制請求。

好的,回過頭來看validateReconstructionWork方法,這個方法主要的功能是:驗證傳入的復制任務參數(shù)中的數(shù)據(jù)塊的副本數(shù)的狀態(tài),然后更新neededReconstruction結(jié)構(gòu)和pendingReconstruction結(jié)構(gòu),也即把復制任務從neededReconstruction中刪除或者添加到pendingReconstruction中。

接著看誰調(diào)用了這個方法(validateReconstructionWork),點進來就一處地方:

發(fā)現(xiàn)是computeReconstructionWorkForBlocks方法,這個方法注釋寫到:把一些數(shù)據(jù)塊重現(xiàn)到full strength(滿員)的狀態(tài),通過復制或者EC的方法。這個方法的參數(shù)是List<List<BlockInfo>>類型,最外層的List代表優(yōu)先級的意思,因為數(shù)據(jù)塊復制任務也是有優(yōu)先級的,比如當前就1個副本的塊很可能會丟失,所以針對這種塊的復制任務它的優(yōu)先級就要比當前有2個副本的塊的復制任務優(yōu)先級高。這個也留個坑,后續(xù)系列會講解復制任務的優(yōu)先級。

邏輯也是非常簡單:
step1:根據(jù)優(yōu)先級遍歷需要生成復制任務的blocks,同時生成復制任務到reconWork這個List里。

step2:為reconWork里的每個復制任務根據(jù)副本放置策略選擇目標datanode。

step3:就是調(diào)用我們上面提到的validateReconstructionWork方法驗證復制任務。

最后面的代碼就是打印一些log信息。

代碼如下:

 /**
   * Reconstruct a set of blocks to full strength through replication or
   * erasure coding
   *
   * @param blocksToReconstruct blocks to be reconstructed, for each priority
   * @return the number of blocks scheduled for replication
   */
  @VisibleForTesting
  int computeReconstructionWorkForBlocks(
      List<List<BlockInfo>> blocksToReconstruct) {
    int scheduledWork = 0;
    List<BlockReconstructionWork> reconWork = new LinkedList<>();

    // Step 1: categorize at-risk blocks into replication and EC tasks
    namesystem.writeLock();
    try {
      synchronized (neededReconstruction) {
         //根據(jù)優(yōu)先級遍歷需要生成復制任務的blocks
        for (int priority = 0; priority < blocksToReconstruct
            .size(); priority++) {
          for (BlockInfo block : blocksToReconstruct.get(priority)) {
            BlockReconstructionWork rw = scheduleReconstruction(block,
                priority);
            if (rw != null) {
              reconWork.add(rw);
            }
          }
        }
      }
    } finally {
      namesystem.writeUnlock();
    }

    // Step 2: choose target nodes for each reconstruction task
    final Set<Node> excludedNodes = new HashSet<>();
    for(BlockReconstructionWork rw : reconWork){
      // Exclude all of the containing nodes from being targets.
      // This list includes decommissioning or corrupt nodes.
      excludedNodes.clear();
      for (DatanodeDescriptor dn : rw.getContainingNodes()) {
        excludedNodes.add(dn);
      }

      // choose replication targets: NOT HOLDING THE GLOBAL LOCK
      final BlockPlacementPolicy placementPolicy =
          placementPolicies.getPolicy(rw.getBlock().getBlockType());
      rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
    }

    // Step 3: add tasks to the DN
    namesystem.writeLock();
    try {
      for(BlockReconstructionWork rw : reconWork){
        final DatanodeStorageInfo[] targets = rw.getTargets();
        if(targets == null || targets.length == 0){
          rw.resetTargets();
          continue;
        }

        synchronized (neededReconstruction) {
          if (validateReconstructionWork(rw)) {
            scheduledWork++;
          }
        }
      }
    } finally {
      namesystem.writeUnlock();
    }

    if (blockLog.isDebugEnabled()) {
      // log which blocks have been scheduled for reconstruction
      for(BlockReconstructionWork rw : reconWork){
        DatanodeStorageInfo[] targets = rw.getTargets();
        if (targets != null && targets.length != 0) {
          StringBuilder targetList = new StringBuilder("datanode(s)");
          for (DatanodeStorageInfo target : targets) {
            targetList.append(' ');
            targetList.append(target.getDatanodeDescriptor());
          }
          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
              rw.getBlock(), targetList);
        }
      }

      blockLog.debug(
          "BLOCK* neededReconstruction = {} pendingReconstruction = {}",
          neededReconstruction.size(), pendingReconstruction.size());
    }

    return scheduledWork;
  }

computeReconstructionWorkForBlocks這個方法又被computeBlockReconstructionWork調(diào)用。

代碼的邏輯是:根據(jù)傳入的processToChoose參數(shù)(這個參數(shù)可配置),選擇processToChoose個低冗余副本,傳入computeReconstructionWorkForBlocks生成復制任務。

這個方法又會被computeDatanodeWork調(diào)用。這個方法的功能是:計算數(shù)據(jù)塊復制和數(shù)據(jù)塊刪除的work,并且在下次heartbeat的時候通知到datanode。代碼如下:

紅線框起來的地方分別是計算復制塊任務、計算刪除塊任務。最后返回生成的總?cè)蝿諗?shù)。

這個方法最終最終最終最終被RedundancyMonitor線程類的run方法調(diào)用,終于來到最后了。

run方法的主要工作是:周期性的(redundancyRecheckIntervalMs)調(diào)用紅框里的二個方法。生成數(shù)據(jù)塊復制任務、處理數(shù)據(jù)塊復制任務、重新掃描之前被推遲處理的數(shù)據(jù)塊。

至此pendingReconstructions這個數(shù)據(jù)結(jié)構(gòu)的這條線我們就分析完了,為了理解深刻,可以再反向看一遍,即從run方法看到最初的方法。

2.timedOutItems

接下來是第二個數(shù)據(jù)結(jié)構(gòu)timedOutItems,主要用于存放數(shù)據(jù)塊復制請求超時數(shù)據(jù)塊,之所以設置這個結(jié)構(gòu),是因為肯定還有其他線程要把這里面的數(shù)據(jù)塊再生成數(shù)據(jù)塊復制的任務。(這里只是比喻,實際是把timedOutItems重新放入隊列里)

分析方法一樣,先看誰調(diào)用了add方法。

是pendingReconstructionCheck調(diào)用的。

這個方法在上面介紹pendingReconstructions時分析過,當時我們沒關(guān)注timeout,現(xiàn)在看一下,這里加了個timeout判斷,如果數(shù)據(jù)塊準備復制任務的時間戳+timeout < 當前時間。那么就說明超時了,就加入到timedOutItems中。

回顧一下本篇文章留下的坑:

  • redundancyThread,storageInfoDefragmenterThread,blockReportThread
  • 數(shù)據(jù)塊生成復制任務的優(yōu)先級

END!
毀滅吧,趕緊的,累了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容