【HDFS】--增量塊匯報[IBR](1)

前言:
這是增量塊匯報的第一篇文章,不講述增量塊匯報的NameNode側的處理邏輯。主要介紹Datanode側的邏輯。

我將帶著如下問題去閱讀源碼:
1)什么時候會觸發(fā)增量塊匯報?
2)發(fā)送增量塊匯報的處理邏輯是什么?
3)增量塊匯報的內容是什么?

官方文檔上有個參數(shù):
dfs.blockreport.incremental.intervalMsec,默認值是0。單位ms。
這個參數(shù)的描述信息如下:

If set to a positive integer, the value in ms to wait between sending incremental block reports from the Datanode to the Namenode.
如果這個參數(shù)的值設置為一個正整數(shù),那么就代表每次DN向NN發(fā)送增量塊匯報時要中間間隔一段時間。

我們從逆向出發(fā),在代碼中搜索這個配置項,發(fā)現(xiàn)這個配置項最后是用來構造
org.apache.hadoop.hdfs.server.datanode.BPServiceActor#ibrManager這個成員變量了。

ibrManager這個成員變量的類型是IncrementalBlockReportManager。顧名思義,是增量塊匯報管理者,用來統(tǒng)一管理增量塊匯報的相關操作。

進到IncrementalBlockReportManager這個類,看一下都有什么方法,看方法名跟發(fā)送增量塊匯報相關的方法我在圖中圈出來了:

sendIBRs代碼如下:

  /** Send IBRs to namenode. */
  void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
      String bpid) throws IOException {
    // Generate a list of the pending reports for each storage under the lock
    final StorageReceivedDeletedBlocks[] reports = generateIBRs();
    if (reports.length == 0) {
      // Nothing new to report.
      return;
    }

    // Send incremental block reports to the Namenode outside the lock
    if (LOG.isDebugEnabled()) {
      LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
    }
    boolean success = false;
    final long startTime = monotonicNow();
    try {
      namenode.blockReceivedAndDeleted(registration, bpid, reports);
      success = true;
    } finally {

      if (success) {
        dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
        lastIBR = startTime;
      } else {
        // If we didn't succeed in sending the report, put all of the
        // blocks back onto our queue, but only in the case where we
        // didn't put something newer in the meantime.
        putMissing(reports);
      }
    }
  }

關注一下putMissing方法:

If we didn't succeed in sending the report, put all of the blocks back onto our queue, but only in the case where we didn't put something newer in the meantime.

putMissing代碼如下:

  //這個方法是個同步方法,會占用鎖
  private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
    for (StorageReceivedDeletedBlocks r : reports) {
      pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
    }
    if (reports.length > 0) {
      readyToSend = true;
    }
  }

putMissing主要就是把IBR根據(jù)Block放入PerStorageIBR#blocks這個Map類型的成員變量中。當然這里面有個判斷,如果blocks中已經有了相應的塊信息,那么就忽略掉。因為此時blocks中保存的對應塊的IBR信息才是最新的。也就對應了上面putMissing方法注釋中說明的情況。

sendImmediately方法的代碼如下,這個方法的作用是判斷是否需要立即發(fā)送IBR。用到了readyToSend變量、ibrInterval變量、lastIBR變量。這三個變量看下面的注釋也很好理解。


  /**
   * If this flag is set then an IBR will be sent immediately by the actor
   * thread without waiting for the IBR timer to elapse.
   */
  private volatile boolean readyToSend = false;

  /** The time interval between two IBRs. */
  private final long ibrInterval;

  /** The timestamp of the last IBR. */
  private volatile long lastIBR;

  boolean sendImmediately() {
    return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
  }

triggerIBR代碼如下,這個方法的功能是用來觸發(fā)IBR。首先設置volatile變量readyToSend的值為true,然后根據(jù)傳入的force參數(shù)判斷是否強制觸發(fā),如果是強制觸發(fā),則更新lastIBR的值為monotonicNow() - ibrInterval,也就是說人為更改上一次IBR的時間用以超過配置的增量塊匯報的間隔。然后根據(jù)上面提到的sendImmediately函數(shù)的返回值決定是否notify在等待的線程,這里拋出個問題后面解答,什么線程在等待呢?

  synchronized void triggerIBR(boolean force) {
    readyToSend = true;
    if (force) {
      lastIBR = monotonicNow() - ibrInterval;
    }
    if (sendImmediately()) {
      notifyAll();
    }
  }

接下來解答上面的問題,什么線程wait了?
剛才截圖類的structure時,除了上面這個三個看起來像IBR的函數(shù)外, 還有一個waitTillNextIBR方法,繼續(xù)顧名思義,wait直到下一次IBR。先來看一下這個方法的實現(xiàn),然后再看一下這個方法的調用棧,分析一下是什么位置調用了這個函數(shù)。

  synchronized void waitTillNextIBR(long waitTime) {
    if (waitTime > 0 && !sendImmediately()) {
      try {
        wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
      } catch (InterruptedException ie) {
        LOG.warn(getClass().getSimpleName() + " interrupted");
      }
    }
  }

wait函數(shù)里用了條件判斷表達式,這里的ibrInterval我們按照默認值0來計算,所以直接取傳入的waitTime參數(shù)的值。如果說傳入waitTime大于0并且不需要立即發(fā)送IBR,那么線程就進行wait,時間為waitTime(ms)。

接著看waitTillNextIBR的調用點,來到了org.apache.hadoop.hdfs.server.datanode.BPServiceActor#offerService方法里。這個方法是BPServiceActor類的run方法中調用的方法,只要DN運行,這個offerService方法就不斷循環(huán)的執(zhí)行。offerService方法中按順序進行heartbeat、增量塊匯報、全量塊匯報操作。

到這里基本上可以回答文章開頭的兩個問題了:

DataNode的BPServiceActor線程在DN啟動后會一直執(zhí)行,不斷循環(huán)的發(fā)送heartbeat、IBR(增量塊匯報)、FBR(全量塊匯報)。然后根據(jù)增量塊匯報會更新各種記錄時間的變量用來輔助調用IBR。比如強制觸發(fā)IBR等。

還有一個問題是:IBR中的內容是什么?回答這個問題需要看generateIBRs方法:

可以看到,本質上發(fā)送的內容來自于pendingIBRs這個Map,此Map的key是DatanodeStorage代表了Datanode的一個Storage,可以理解為一個磁盤;此Map的value是PerStorageIBR,代表了每個Storage上的IBR。PerStorageIBR這個類我們前面遇到過,還得他的blocks變量么?就是用來保存新增的IBR和處理失敗的IBR的那個blocks呀!

好,本文完。下一篇將學習IBR在NN側的處理過程。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容