前言:
這是增量塊匯報的第一篇文章,不講述增量塊匯報的NameNode側的處理邏輯。主要介紹Datanode側的邏輯。
我將帶著如下問題去閱讀源碼:
1)什么時候會觸發(fā)增量塊匯報?
2)發(fā)送增量塊匯報的處理邏輯是什么?
3)增量塊匯報的內容是什么?
官方文檔上有個參數(shù):
dfs.blockreport.incremental.intervalMsec,默認值是0。單位ms。
這個參數(shù)的描述信息如下:
If set to a positive integer, the value in ms to wait between sending incremental block reports from the Datanode to the Namenode.
如果這個參數(shù)的值設置為一個正整數(shù),那么就代表每次DN向NN發(fā)送增量塊匯報時要中間間隔一段時間。
我們從逆向出發(fā),在代碼中搜索這個配置項,發(fā)現(xiàn)這個配置項最后是用來構造
org.apache.hadoop.hdfs.server.datanode.BPServiceActor#ibrManager這個成員變量了。
ibrManager這個成員變量的類型是IncrementalBlockReportManager。顧名思義,是增量塊匯報管理者,用來統(tǒng)一管理增量塊匯報的相關操作。
進到IncrementalBlockReportManager這個類,看一下都有什么方法,看方法名跟發(fā)送增量塊匯報相關的方法我在圖中圈出來了:

sendIBRs代碼如下:
/** Send IBRs to namenode. */
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
String bpid) throws IOException {
// Generate a list of the pending reports for each storage under the lock
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
if (reports.length == 0) {
// Nothing new to report.
return;
}
// Send incremental block reports to the Namenode outside the lock
if (LOG.isDebugEnabled()) {
LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
}
boolean success = false;
final long startTime = monotonicNow();
try {
namenode.blockReceivedAndDeleted(registration, bpid, reports);
success = true;
} finally {
if (success) {
dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
lastIBR = startTime;
} else {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime.
putMissing(reports);
}
}
}
關注一下putMissing方法:
If we didn't succeed in sending the report, put all of the blocks back onto our queue, but only in the case where we didn't put something newer in the meantime.
putMissing代碼如下:
//這個方法是個同步方法,會占用鎖
private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
for (StorageReceivedDeletedBlocks r : reports) {
pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
}
if (reports.length > 0) {
readyToSend = true;
}
}
putMissing主要就是把IBR根據(jù)Block放入PerStorageIBR#blocks這個Map類型的成員變量中。當然這里面有個判斷,如果blocks中已經有了相應的塊信息,那么就忽略掉。因為此時blocks中保存的對應塊的IBR信息才是最新的。也就對應了上面putMissing方法注釋中說明的情況。
sendImmediately方法的代碼如下,這個方法的作用是判斷是否需要立即發(fā)送IBR。用到了readyToSend變量、ibrInterval變量、lastIBR變量。這三個變量看下面的注釋也很好理解。
/**
* If this flag is set then an IBR will be sent immediately by the actor
* thread without waiting for the IBR timer to elapse.
*/
private volatile boolean readyToSend = false;
/** The time interval between two IBRs. */
private final long ibrInterval;
/** The timestamp of the last IBR. */
private volatile long lastIBR;
boolean sendImmediately() {
return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
}
triggerIBR代碼如下,這個方法的功能是用來觸發(fā)IBR。首先設置volatile變量readyToSend的值為true,然后根據(jù)傳入的force參數(shù)判斷是否強制觸發(fā),如果是強制觸發(fā),則更新lastIBR的值為monotonicNow() - ibrInterval,也就是說人為更改上一次IBR的時間用以超過配置的增量塊匯報的間隔。然后根據(jù)上面提到的sendImmediately函數(shù)的返回值決定是否notify在等待的線程,這里拋出個問題后面解答,什么線程在等待呢?
synchronized void triggerIBR(boolean force) {
readyToSend = true;
if (force) {
lastIBR = monotonicNow() - ibrInterval;
}
if (sendImmediately()) {
notifyAll();
}
}
接下來解答上面的問題,什么線程wait了?
剛才截圖類的structure時,除了上面這個三個看起來像IBR的函數(shù)外, 還有一個waitTillNextIBR方法,繼續(xù)顧名思義,wait直到下一次IBR。先來看一下這個方法的實現(xiàn),然后再看一下這個方法的調用棧,分析一下是什么位置調用了這個函數(shù)。
synchronized void waitTillNextIBR(long waitTime) {
if (waitTime > 0 && !sendImmediately()) {
try {
wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
} catch (InterruptedException ie) {
LOG.warn(getClass().getSimpleName() + " interrupted");
}
}
}
wait函數(shù)里用了條件判斷表達式,這里的ibrInterval我們按照默認值0來計算,所以直接取傳入的waitTime參數(shù)的值。如果說傳入waitTime大于0并且不需要立即發(fā)送IBR,那么線程就進行wait,時間為waitTime(ms)。
接著看waitTillNextIBR的調用點,來到了org.apache.hadoop.hdfs.server.datanode.BPServiceActor#offerService方法里。這個方法是BPServiceActor類的run方法中調用的方法,只要DN運行,這個offerService方法就不斷循環(huán)的執(zhí)行。offerService方法中按順序進行heartbeat、增量塊匯報、全量塊匯報操作。

到這里基本上可以回答文章開頭的兩個問題了:
DataNode的BPServiceActor線程在DN啟動后會一直執(zhí)行,不斷循環(huán)的發(fā)送heartbeat、IBR(增量塊匯報)、FBR(全量塊匯報)。然后根據(jù)增量塊匯報會更新各種記錄時間的變量用來輔助調用IBR。比如強制觸發(fā)IBR等。
還有一個問題是:IBR中的內容是什么?回答這個問題需要看generateIBRs方法:

可以看到,本質上發(fā)送的內容來自于pendingIBRs這個Map,此Map的key是DatanodeStorage代表了Datanode的一個Storage,可以理解為一個磁盤;此Map的value是PerStorageIBR,代表了每個Storage上的IBR。PerStorageIBR這個類我們前面遇到過,還得他的blocks變量么?就是用來保存新增的IBR和處理失敗的IBR的那個blocks呀!

好,本文完。下一篇將學習IBR在NN側的處理過程。