基于hadoop-3.1.0
分享一波大數(shù)據(jù)&Java的學習視頻和書籍:
### Java與大數(shù)據(jù)資源分享
BlockManager類是Namenode端管理數(shù)據(jù)塊的非常重要的類。本文就來對它進行剖析。
首先看BlockManager的構(gòu)造方法被誰調(diào)用:

進到FSNamesystem的構(gòu)造方法中。然后在FSNS類中找調(diào)用成員變量blockManager的地方。發(fā)現(xiàn)只有activate有激活啟動的意思,于是去activate方法:

點進去到BlockManager#activate方法:

果不其然,activate方法里啟動了各種線程,以及注冊metric指標等工作。
從其中調(diào)用的方法來看,啟動了pendingReconstruction,redundancyThread,storageInfoDefragmenterThread,blockReportThread,注冊metric指標,設置blockManager的safemode狀態(tài)等。
那我們接下來的分析任務就是這幾個線程。本篇文章首先分析第一個,pendingReconstruction。其他的挖坑后面學習。
pendingReconstruction是PendingReconstructionBlocks類的對象。

看一下PendingReconstructionBlocks的javadoc:

這個類主要是對數(shù)據(jù)塊進行一些記賬工作。類似于Block可能存放在那個Datanode上這種。
進到PendingReconstructionBlocks類的start方法:

其實最終調(diào)用的是PendingReconstructionMonitor的run方法,點進去看:

PendingReconstructionMonitor是一個周期性的線程,用來掃描沒完成重建數(shù)據(jù)塊請求的數(shù)據(jù)塊。
run方法中調(diào)用了pendingReconstructionCheck()方法:

pendingReconstructionCheck方法的邏輯比較簡單,就是去檢測pendingReconstructions這個數(shù)據(jù)結(jié)構(gòu)里保存的所有數(shù)據(jù)復制請求是不是超時(
默認5分鐘),如果超時了就放到timedOutItems這個ArrayList里。最后更新相關(guān)metric指標。
這里我們遇到了兩個數(shù)據(jù)結(jié)構(gòu):pendingReconstructions,timedOutItems??匆幌逻@兩個數(shù)據(jù)結(jié)構(gòu)是什么樣的:

pendingReconstructions是Map結(jié)構(gòu),key是BlockInfo,value是PendingBlockInfo。
timedOutItems是一個ArrayList,里面存放BlockInfo類型的數(shù)據(jù)。
所以要搞懂這兩個數(shù)據(jù)結(jié)構(gòu)存儲的內(nèi)容是什么,就得去了解BlockInfo和PendingBlockInfo這兩個類。這里簡單概括:BlockInfo保存了一個block屬于哪個文件INode(通過BlockCollection對象)和存儲在那些Datanode里。PendingBlockInfo(數(shù)據(jù)塊復制信息)是對數(shù)據(jù)塊開始復制時間timeStamp、待復制的目標數(shù)據(jù)節(jié)點列表List<DatanodeDescriptor>實例targets的一個封裝。

了解完這兩個數(shù)據(jù)結(jié)構(gòu)的存儲內(nèi)容,我們就來看看什么情況下把數(shù)據(jù)放入這兩個數(shù)據(jù)結(jié)構(gòu)。
1.pendingReconstructions
首先看pendingReconstructions,因為是個Map結(jié)構(gòu),所以我們看看誰調(diào)用了put方法。

點進去是increment方法:

這個方法首先嘗試從pendingReconstructions根據(jù)key獲取PendingBlockInfo,如果沒有,就調(diào)用put。如果發(fā)現(xiàn)了原來有這個復制請求,則更新targets和timestamp。
再來看誰調(diào)用了increment方法:

這里面第一個方法是增量塊匯報相關(guān)的,我們以后再分析,所以我們看第二個方法validateReconstructionWork。講這個方法之前說句題外話,我耐不住性子,一層一層往上點,最后發(fā)現(xiàn)調(diào)用邏輯是在BlockManage的內(nèi)部類RedundancyMonitor的run方法里。所以說,塊待復制請求和冗余監(jiān)測線程有關(guān)系,比如冗余監(jiān)測線程檢測到某些數(shù)據(jù)塊的副本數(shù)不滿足最小副本條件,就會去調(diào)用相關(guān)方法添加塊復制請求。
好的,回過頭來看validateReconstructionWork方法,這個方法主要的功能是:驗證傳入的復制任務參數(shù)中的數(shù)據(jù)塊的副本數(shù)的狀態(tài),然后更新neededReconstruction結(jié)構(gòu)和pendingReconstruction結(jié)構(gòu),也即把復制任務從neededReconstruction中刪除或者添加到pendingReconstruction中。

接著看誰調(diào)用了這個方法(validateReconstructionWork),點進來就一處地方:

發(fā)現(xiàn)是computeReconstructionWorkForBlocks方法,這個方法注釋寫到:把一些數(shù)據(jù)塊重現(xiàn)到full strength(滿員)的狀態(tài),通過復制或者EC的方法。這個方法的參數(shù)是List<List<BlockInfo>>類型,最外層的List代表優(yōu)先級的意思,因為數(shù)據(jù)塊復制任務也是有優(yōu)先級的,比如當前就1個副本的塊很可能會丟失,所以針對這種塊的復制任務它的優(yōu)先級就要比當前有2個副本的塊的復制任務優(yōu)先級高。這個也留個坑,后續(xù)系列會講解復制任務的優(yōu)先級。
邏輯也是非常簡單:
step1:根據(jù)優(yōu)先級遍歷需要生成復制任務的blocks,同時生成復制任務到reconWork這個List里。
step2:為reconWork里的每個復制任務根據(jù)副本放置策略選擇目標datanode。
step3:就是調(diào)用我們上面提到的validateReconstructionWork方法驗證復制任務。
最后面的代碼就是打印一些log信息。
代碼如下:
/**
* Reconstruct a set of blocks to full strength through replication or
* erasure coding
*
* @param blocksToReconstruct blocks to be reconstructed, for each priority
* @return the number of blocks scheduled for replication
*/
@VisibleForTesting
int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) {
int scheduledWork = 0;
List<BlockReconstructionWork> reconWork = new LinkedList<>();
// Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
try {
synchronized (neededReconstruction) {
//根據(jù)優(yōu)先級遍歷需要生成復制任務的blocks
for (int priority = 0; priority < blocksToReconstruct
.size(); priority++) {
for (BlockInfo block : blocksToReconstruct.get(priority)) {
BlockReconstructionWork rw = scheduleReconstruction(block,
priority);
if (rw != null) {
reconWork.add(rw);
}
}
}
}
} finally {
namesystem.writeUnlock();
}
// Step 2: choose target nodes for each reconstruction task
final Set<Node> excludedNodes = new HashSet<>();
for(BlockReconstructionWork rw : reconWork){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
for (DatanodeDescriptor dn : rw.getContainingNodes()) {
excludedNodes.add(dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
final BlockPlacementPolicy placementPolicy =
placementPolicies.getPolicy(rw.getBlock().getBlockType());
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
}
// Step 3: add tasks to the DN
namesystem.writeLock();
try {
for(BlockReconstructionWork rw : reconWork){
final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){
rw.resetTargets();
continue;
}
synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
}
}
}
} finally {
namesystem.writeUnlock();
}
if (blockLog.isDebugEnabled()) {
// log which blocks have been scheduled for reconstruction
for(BlockReconstructionWork rw : reconWork){
DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
targetList.append(target.getDatanodeDescriptor());
}
blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
rw.getBlock(), targetList);
}
}
blockLog.debug(
"BLOCK* neededReconstruction = {} pendingReconstruction = {}",
neededReconstruction.size(), pendingReconstruction.size());
}
return scheduledWork;
}
computeReconstructionWorkForBlocks這個方法又被computeBlockReconstructionWork調(diào)用。

代碼的邏輯是:根據(jù)傳入的processToChoose參數(shù)(這個參數(shù)可配置),選擇processToChoose個低冗余副本,傳入computeReconstructionWorkForBlocks生成復制任務。
這個方法又會被computeDatanodeWork調(diào)用。這個方法的功能是:計算數(shù)據(jù)塊復制和數(shù)據(jù)塊刪除的work,并且在下次heartbeat的時候通知到datanode。代碼如下:

紅線框起來的地方分別是計算復制塊任務、計算刪除塊任務。最后返回生成的總?cè)蝿諗?shù)。
這個方法最終最終最終最終被RedundancyMonitor線程類的run方法調(diào)用,終于來到最后了。

run方法的主要工作是:周期性的(redundancyRecheckIntervalMs)調(diào)用紅框里的二個方法。生成數(shù)據(jù)塊復制任務、處理數(shù)據(jù)塊復制任務、重新掃描之前被推遲處理的數(shù)據(jù)塊。
至此pendingReconstructions這個數(shù)據(jù)結(jié)構(gòu)的這條線我們就分析完了,為了理解深刻,可以再反向看一遍,即從run方法看到最初的方法。
2.timedOutItems
接下來是第二個數(shù)據(jù)結(jié)構(gòu)timedOutItems,主要用于存放數(shù)據(jù)塊復制請求超時數(shù)據(jù)塊,之所以設置這個結(jié)構(gòu),是因為肯定還有其他線程要把這里面的數(shù)據(jù)塊再生成數(shù)據(jù)塊復制的任務。(這里只是比喻,實際是把timedOutItems重新放入隊列里)
分析方法一樣,先看誰調(diào)用了add方法。

是pendingReconstructionCheck調(diào)用的。

這個方法在上面介紹pendingReconstructions時分析過,當時我們沒關(guān)注timeout,現(xiàn)在看一下,這里加了個timeout判斷,如果數(shù)據(jù)塊準備復制任務的時間戳+timeout < 當前時間。那么就說明超時了,就加入到timedOutItems中。
回顧一下本篇文章留下的坑:
- redundancyThread,storageInfoDefragmenterThread,blockReportThread
- 數(shù)據(jù)塊生成復制任務的優(yōu)先級
END!
毀滅吧,趕緊的,累了。