HDFS架構(gòu)師 4- DataNode寫數(shù)據(jù)流程詳細(xì)分析

UNIT4

image.png

1. 創(chuàng)建INodeFile流程分析 0:15:00 ~ 0:29.19

2步:
FileSystem.create

DistributedFileSystem.create
——》DFSClient.create
——》DFSOutputStream.newStreamForCreate
——》dfsClient.namenode.create(遠(yuǎn)程代理類)
↓ 遠(yuǎn)程
NamenodeRpcserver.create

namesystem.startFile
—》FSNamesystem:startFileInt
—》FSNamesystem: startFileInternal

iip = dir.addFile //添加文件
leaseManager.addLease //添加租約

2. 添加契約流程分析 0:29.19 ~ 0:36.51

,因?yàn)镠DFS上的文件是不允許并發(fā)寫的,比如并發(fā)的追加一些數(shù)據(jù)什么。 同一時間只能有一個客戶端獲取NameNode上面一個文件的契約,然后才可以向獲取契約的文件寫 入數(shù)據(jù)。 此時如果其他客戶端嘗試獲取文件契約的時候,就獲取不到,只能干等著。 通過這個機(jī)制,可以保證同一時間只有一個客戶端在寫一個文件。 在獲取到了文件契約之后,在寫文件的過程期間,那個客戶端需要開啟一個線程,不停的發(fā)送請求給 NameNode進(jìn)行文件續(xù)約,告訴NameNode: 我還在寫文件啊,你給我一直保留那個契約好嗎? 而NameNode內(nèi)部有一個專門的后臺線程,負(fù)責(zé)監(jiān)控各個契約的續(xù)約時間。 如果某個契約很長時間沒續(xù)約了,此時就自動過期掉這個契約,讓別的客戶端來寫。

接上 leaseManager.addLease 租約
—》 leases.put sortedLeases.add 可排序

image.png

3. DataStreamer啟動流程分析 0:36:51 ~ 0:52:29

接上回 1 DFSOutputStream.newStreamForCreate 方法
—》DataStreamer#start() -- run方法

//dataqueue里面沒有數(shù)據(jù),代碼阻塞在此,有數(shù)據(jù)則notify
dataQueue.wait(timeout);

image.png

4. 啟動文件續(xù)約流程分析 52:29 ~ 1:00:00

退回 DFSClient.create 分析 beginFileLease 開啟續(xù)約
/* 這個方法完成3件事 :

  • 1、往文件目錄里面加了 文件 
    
  •  2、添加了租約 
    
  •   3、 啟動了   DataStreamer流程 , dataqueue 有數(shù)據(jù)則notify 
    

*/
DFSClient#create
—》 beginFileLease() 開始續(xù)約
—》(LeaseManager)getLeaseRenewer().put
—》LeaseManager#run(final int id)
—》LeaseManager#renew
—》DFSClient#renewLease 57:17
↓ 遠(yuǎn)程RPC
—》 NamenodeRpcserver#renewLease
—》FSNamesystem#renewLease
—》LeaseManager#renewLease

邏輯如下

//先移除
sortedLeases.remove(lease);
//修改心跳     
 lease.renew();
//把改后的加入
      sortedLeases.add(lease);

5. 契約掃描機(jī)制分析 1:00:00 ~ 1:15:11

LeaseManager#Monitor#run
—》LeaseManager#checkLeases

try {
    //拿出最早的租約
     leaseToCheck = sortedLeases.first();
   } catch(NoSuchElementException e) {}

   while(leaseToCheck != null) {
     if (!leaseToCheck.expiredHardLimit()) {
       break;
     }

6. chunk封裝為packet 寫入DataQueue流程剖析 1:14:11~ 1:50:11

接上回 1
客戶端代碼為:
FileSystem.create()// 這個上面做過分析了
現(xiàn)在到
FSDataOutputStream.write

FSDataOutputStream.write()

HdfsDataOutputStream的父類FSDataOutputStream.write()

//TODO out 即為DFSOutputStream
out.write(b);

DFSOutputStream的父類FSOutputSummer.write()
—》FSOutputSummer#flushBuffer
—》FSOutputSummer#writeChecksumChunks

writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());

——》子類DFSOutputStream#writeChunk
——》DFSOutputStream#writeChunkImpl
——》DFSOutputStream#waitAndQueueCurrentPacket
——》DFSOutputStream#queueCurrentPacket//把packet加入 list
——》DFSOutputStream# dataQueue.notifyAll();
——》DFSOutputStream#DataStreamer#run()

7. pipline數(shù)據(jù)管道流程之 namenode 申請Block 流程剖析 1:53:40 ~ 1:59:40

接上回 DFSOutputStream#DataStreamer#run 里面 這句setPipeline(nextBlockOutputStream());

這個方法 nextBlockOutputStream() 即為Block申請流程

DFSOutputStream#DataStreamer#nextBlockOutputStream
——》lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
return dfsClient.namenode.addBlock ;
↓ RPC 遠(yuǎn)程調(diào)用
——》 NameNodeRpcServer#addBlock

namesystem.getAdditionalBlock
——》FSnamesystem#getAdditionalBlock 選擇存放datablock的主機(jī)-負(fù)載均衡,機(jī)架感知
1 saveAllocatedBlock 修改內(nèi)存內(nèi)的目錄樹,修改元數(shù)據(jù)
——》 dir.addBlock

getBlockManager().addBlockCollection //BlockManager記錄block信息
fileINode.addBlock //新產(chǎn)生的block放到文件節(jié)點(diǎn)下
2 persistNewBlock(src, pendingFile); 元數(shù)據(jù)落盤

回到 setPipeline() 下面開始建立數(shù)據(jù)管道

8. pipline數(shù)據(jù)管道流程建立分析 2:00:00 ~

DFSOutputStream#DataStreamer#setPipeline() 方法
DFSOutputStream#DataStreamer#setPipeline
—》DFSOutputStream#DataStreamer#setPipeline#nextBlockOutputStream
—》DFSOutputStream#DataStreamer#createBlockOutputStream

out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE))
new Sender(out).writeBlock
—》Sender#writeBlock()
—》Sender#send()
out.flush();
↓ RPC 遠(yuǎn)程調(diào)用
DataXceiverServer#run()

//TODO 接受socket請求
peer = peerServer.accept();
//每一個block都啟動一個DataXceiver
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();

DataXceiver.run()
—》 processOp()

case WRITE_BLOCK:
opWriteBlock(in);
—》 DataXceiver.writeBlock

9. 管道建立容錯處理 (retry ,排除剛才問題機(jī)器) 2.25:00~ 2.30:00

接上 7 節(jié)
DFSOutputStream#DataStreamer#nextBlockOutputStream 方法

//TODO 整個管道建立是這段代碼
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
.....
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
//管道不成功就 放棄block
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
..... 排除節(jié)點(diǎn)hadoop3
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);

image.png

.....
lb = locateFollowingBlock

10. ResponseProcessor組件初始化流程分析 2:31:00 ~ 2:41:00

接上回7: DFSOutputStream#DataStreamer#run 方法

DFSOutputStream#DataStreamer#setPipeline后面

//TODO ResponseProcessor組件初始化
initDataStreaming();

response = new ResponseProcessor(nodes);
response.start();

DFSOutputStream# ResponseProcessor.run()

//TODO
ackQueue.removeFirst();
dataQueue.notifyAll();

image.png

11. BlockReceiver和PacketResponder初始化 2:40:00~

接上回7: DFSOutputStream#DataStreamer#run 方法

// write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try {
one.writeTo(blockStream);
blockStream.flush();

DataXceiverServer.run

new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this)) .start();

DataXceiver.run
——》 DataXceiver.processOp()
——》DataXceiver.opWriteBlock()
——》DataXceiver.writeBlock()

blockReceiver = new BlockReceiver(block, storageType, in,
blockReceiver.receiveBlock
——》BlockReceiver.receiveBlock()

responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams));

BlockReceiver.PacketResponder.run()

removeAckHead()

ackQueue.removeFirst();
ackQueue.notifyAll();

12. 寫數(shù)據(jù)層層上報處理結(jié)果 3.13

13. 寫數(shù)據(jù)容錯分析 3.20

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容