UNIT4

1. 創(chuàng)建INodeFile流程分析 0:15:00 ~ 0:29.19
2步:
FileSystem.create
↓
DistributedFileSystem.create
——》DFSClient.create
——》DFSOutputStream.newStreamForCreate
——》dfsClient.namenode.create(遠(yuǎn)程代理類)
↓ 遠(yuǎn)程
NamenodeRpcserver.create
▼
namesystem.startFile
—》FSNamesystem:startFileInt
—》FSNamesystem: startFileInternal
▼
iip = dir.addFile //添加文件
leaseManager.addLease //添加租約
2. 添加契約流程分析 0:29.19 ~ 0:36.51
,因?yàn)镠DFS上的文件是不允許并發(fā)寫的,比如并發(fā)的追加一些數(shù)據(jù)什么。 同一時間只能有一個客戶端獲取NameNode上面一個文件的契約,然后才可以向獲取契約的文件寫 入數(shù)據(jù)。 此時如果其他客戶端嘗試獲取文件契約的時候,就獲取不到,只能干等著。 通過這個機(jī)制,可以保證同一時間只有一個客戶端在寫一個文件。 在獲取到了文件契約之后,在寫文件的過程期間,那個客戶端需要開啟一個線程,不停的發(fā)送請求給 NameNode進(jìn)行文件續(xù)約,告訴NameNode: 我還在寫文件啊,你給我一直保留那個契約好嗎? 而NameNode內(nèi)部有一個專門的后臺線程,負(fù)責(zé)監(jiān)控各個契約的續(xù)約時間。 如果某個契約很長時間沒續(xù)約了,此時就自動過期掉這個契約,讓別的客戶端來寫。
接上 leaseManager.addLease 租約
—》 leases.put sortedLeases.add 可排序

3. DataStreamer啟動流程分析 0:36:51 ~ 0:52:29
接上回 1 DFSOutputStream.newStreamForCreate 方法
—》DataStreamer#start() -- run方法
▼
//dataqueue里面沒有數(shù)據(jù),代碼阻塞在此,有數(shù)據(jù)則notify
dataQueue.wait(timeout);

4. 啟動文件續(xù)約流程分析 52:29 ~ 1:00:00
退回 DFSClient.create 分析 beginFileLease 開啟續(xù)約
/* 這個方法完成3件事 :
1、往文件目錄里面加了 文件2、添加了租約3、 啟動了 DataStreamer流程 , dataqueue 有數(shù)據(jù)則notify
*/
DFSClient#create
—》 beginFileLease() 開始續(xù)約
—》(LeaseManager)getLeaseRenewer().put
—》LeaseManager#run(final int id)
—》LeaseManager#renew
—》DFSClient#renewLease 57:17
↓ 遠(yuǎn)程RPC
—》 NamenodeRpcserver#renewLease
—》FSNamesystem#renewLease
—》LeaseManager#renewLease
邏輯如下
//先移除
sortedLeases.remove(lease);
//修改心跳
lease.renew();
//把改后的加入
sortedLeases.add(lease);
5. 契約掃描機(jī)制分析 1:00:00 ~ 1:15:11
LeaseManager#Monitor#run
—》LeaseManager#checkLeases
try {
//拿出最早的租約
leaseToCheck = sortedLeases.first();
} catch(NoSuchElementException e) {}
while(leaseToCheck != null) {
if (!leaseToCheck.expiredHardLimit()) {
break;
}
6. chunk封裝為packet 寫入DataQueue流程剖析 1:14:11~ 1:50:11
接上回 1
客戶端代碼為:
FileSystem.create()// 這個上面做過分析了
現(xiàn)在到
FSDataOutputStream.write
FSDataOutputStream.write()
↓
HdfsDataOutputStream的父類FSDataOutputStream.write()
▼
//TODO out 即為DFSOutputStream
out.write(b);
↓
DFSOutputStream的父類FSOutputSummer.write()
—》FSOutputSummer#flushBuffer
—》FSOutputSummer#writeChecksumChunks
▼
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
↓
——》子類DFSOutputStream#writeChunk
——》DFSOutputStream#writeChunkImpl
——》DFSOutputStream#waitAndQueueCurrentPacket
——》DFSOutputStream#queueCurrentPacket//把packet加入 list
——》DFSOutputStream# dataQueue.notifyAll();
——》DFSOutputStream#DataStreamer#run()
7. pipline數(shù)據(jù)管道流程之 namenode 申請Block 流程剖析 1:53:40 ~ 1:59:40
接上回 DFSOutputStream#DataStreamer#run 里面 這句setPipeline(nextBlockOutputStream());
這個方法 nextBlockOutputStream() 即為Block申請流程
DFSOutputStream#DataStreamer#nextBlockOutputStream
——》lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
return dfsClient.namenode.addBlock ;
↓ RPC 遠(yuǎn)程調(diào)用
——》 NameNodeRpcServer#addBlock
▼
namesystem.getAdditionalBlock
——》FSnamesystem#getAdditionalBlock 選擇存放datablock的主機(jī)-負(fù)載均衡,機(jī)架感知
1 saveAllocatedBlock 修改內(nèi)存內(nèi)的目錄樹,修改元數(shù)據(jù)
——》 dir.addBlock
▼
getBlockManager().addBlockCollection //BlockManager記錄block信息
fileINode.addBlock //新產(chǎn)生的block放到文件節(jié)點(diǎn)下
2 persistNewBlock(src, pendingFile); 元數(shù)據(jù)落盤
回到 setPipeline() 下面開始建立數(shù)據(jù)管道
8. pipline數(shù)據(jù)管道流程建立分析 2:00:00 ~
DFSOutputStream#DataStreamer#setPipeline() 方法
DFSOutputStream#DataStreamer#setPipeline
—》DFSOutputStream#DataStreamer#setPipeline#nextBlockOutputStream
—》DFSOutputStream#DataStreamer#createBlockOutputStream
▼
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE))
new Sender(out).writeBlock
—》Sender#writeBlock()
—》Sender#send()
out.flush();
↓ RPC 遠(yuǎn)程調(diào)用
DataXceiverServer#run()
▼
//TODO 接受socket請求
peer = peerServer.accept();
//每一個block都啟動一個DataXceiver
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
↓
DataXceiver.run()
—》 processOp()
▼
case WRITE_BLOCK:
opWriteBlock(in);
—》 DataXceiver.writeBlock
9. 管道建立容錯處理 (retry ,排除剛才問題機(jī)器) 2.25:00~ 2.30:00
接上 7 節(jié)
DFSOutputStream#DataStreamer#nextBlockOutputStream 方法
▼
//TODO 整個管道建立是這段代碼
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
.....
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
//管道不成功就 放棄block
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
..... 排除節(jié)點(diǎn)hadoop3
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);

.....
lb = locateFollowingBlock
10. ResponseProcessor組件初始化流程分析 2:31:00 ~ 2:41:00
接上回7: DFSOutputStream#DataStreamer#run 方法
DFSOutputStream#DataStreamer#setPipeline后面
//TODO ResponseProcessor組件初始化
initDataStreaming();
▼
response = new ResponseProcessor(nodes);
response.start();
↓
DFSOutputStream# ResponseProcessor.run()
▼
//TODO
ackQueue.removeFirst();
dataQueue.notifyAll();

11. BlockReceiver和PacketResponder初始化 2:40:00~
接上回7: DFSOutputStream#DataStreamer#run 方法
▼
// write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try {
one.writeTo(blockStream);
blockStream.flush();
↓
DataXceiverServer.run
▼
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this)) .start();
↓
DataXceiver.run
——》 DataXceiver.processOp()
——》DataXceiver.opWriteBlock()
——》DataXceiver.writeBlock()
▼
blockReceiver = new BlockReceiver(block, storageType, in,
blockReceiver.receiveBlock
——》BlockReceiver.receiveBlock()
▼
responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams));
↓
BlockReceiver.PacketResponder.run()
▼
removeAckHead()
▼
ackQueue.removeFirst();
ackQueue.notifyAll();