作為分布式文件系統(tǒng),HDFS擅于處理大文件的讀/寫。這得益于“文件元信息與文件數(shù)據(jù)分離,文件數(shù)據(jù)分塊存儲”的思想:namenode管理文件元信息,datanode管理分塊的文件數(shù)據(jù)。
HDFS 2.x進一步將數(shù)據(jù)塊存儲服務抽象為blockpool,不過寫數(shù)據(jù)塊過程與1.x大同小異。本文假設副本系數(shù)1(即寫數(shù)據(jù)塊只涉及1個客戶端+1個datanode),未發(fā)生任何異常,分析datanode寫數(shù)據(jù)塊的過程。
源碼版本:Apache Hadoop 2.6.0
可參考猴子追源碼時的速記打斷點,親自debug一遍。
副本系數(shù)1,即只需要一個datanode構成最小的管道,與更常見的管道寫相比,可以認為“無管道”。后續(xù)再寫兩篇文章分別分析管道寫無異常、管道寫有異常兩種情況。
開始之前
總覽
參考源碼|HDFS之DataNode:啟動過程,我們大體了解了datanode上有哪些重要的工作線程。其中,與寫數(shù)據(jù)塊過程聯(lián)系最緊密的是DataXceiverServer與BPServiceActor。
參考HDFS-1.x、2.x的RPC接口,客戶端與數(shù)據(jù)節(jié)點間主要通過流接口DataTransferProtocol完成數(shù)據(jù)塊的讀/寫。DataTransferProtocol用于整個管道中的客戶端、數(shù)據(jù)節(jié)點間的流式通信,其中,DataTransferProtocol#writeBlock()負責完成寫數(shù)據(jù)塊的工作:
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException;
文章的組織結構
- 如果只涉及單個分支的分析,則放在同一節(jié)。
- 如果涉及多個分支的分析,則在下一級分多個節(jié),每節(jié)討論一個分支。
- 多線程的分析同多分支。
- 每一個分支和線程的組織結構遵循規(guī)則1-3。
DataXceiverServer線程
注意,DataTransferProtocol并不是一個RPC協(xié)議,因此,常見通過的尋找DataTransferProtocol接口的實現(xiàn)類來確定“客戶端調(diào)用的遠程方法”是站不住腳。不過依然可以按照這個思路倒追,看實現(xiàn)類究竟是如何被創(chuàng)建,與誰通信,來驗證是否找到了正確的實現(xiàn)類。
依靠debug,猴子從DataXceiver類反向追到了DataXceiverServer類。這里從DataXceiverServer類開始,正向講解。
DataXceiverServer線程在DataNode#runDatanodeDaemon()方法中啟動。
DataXceiverServer#run():
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
...// 檢查DataXceiver線程的數(shù)量,超過最大限制就拋出IOE
// 啟動一個新的DataXceiver線程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
...// 清理
} catch (OutOfMemoryError ie) {
...// 清理并sleep 30s
} catch (Throwable te) {
// 其他異常就關閉datanode
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
...// 關閉peerServer并清理所有peers
}
DataXceiverServer線程是一個典型的Tcp Socket Server??蛻舳嗣縼硪粋€TCP請求,如果datanode上的DataXceiver線程數(shù)量還沒超過限制,就啟動一個新的DataXceiver線程。
默認的最大DataXceiver線程數(shù)量為4096,通過
dfs.datanode.max.transfer.threads設置。
主流程:DataXceiver線程
DataXceiver#run():
public void run() {
int opsProcessed = 0;
Op op = null;
try {
...// 一些初始化
// 使用一個循環(huán),以允許客戶端發(fā)送新的操作請求時重用TCP連接
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
...// 超時設置
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
...// 此處的優(yōu)化使得正常處理完一個操作后,一定會拋出EOFException或ClosedChannelException,可以退出循環(huán)
...// 如果是其他異常,則說明出現(xiàn)錯誤,重新拋出以退出循環(huán)
}
...// 超時設置
opStartTime = now();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
...// 異常處理
} finally {
...// 資源清理,包括打開的文件、socket等
}
}
此處的優(yōu)化不多講。
DataXceiver#readOp()繼承自Receiver類:從客戶端發(fā)來的socket中讀取op碼,判斷客戶端要進行何種操作操作。寫數(shù)據(jù)塊使用的op碼為80,返回的枚舉變量op = Op.WRITE_BLOCK。
DataXceiver#processOp()也繼承自Receiver類:
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
...// 其他case
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
...
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
} finally {
if (traceScope != null) traceScope.close();
}
}
HDFS 2.x相對于1.x的另一項改進,在流式接口中也大幅替換為使用protobuf,不再是裸TCP分析字節(jié)流了。
Receiver類實現(xiàn)了DataTransferProtocol接口,但沒有實現(xiàn)DataTransferProtocol#writeBlock()。多態(tài)特性告訴我們,這里會調(diào)用DataXceiver#writeBlock()。
終于回到了DataXceiver#writeBlock():
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
...// 檢查,設置參數(shù)等
...// 構建向上游節(jié)點或客戶端回復的輸出流(此處即為客戶端)
...// 略
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// 創(chuàng)建BlockReceiver,準備接收數(shù)據(jù)塊
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);
storageUuid = blockReceiver.getStorageUuid();
} else {
...// 管道錯誤恢復相關
}
...// 下游節(jié)點的處理。一個datanode是沒有下游節(jié)點的。
// 發(fā)送的第一個packet是空的,只用于建立管道。這里立即返回ack表示管道是否建立成功
// 由于該datanode沒有下游節(jié)點,則執(zhí)行到此處,表示管道已經(jīng)建立成功
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// 接收數(shù)據(jù)塊(也負責發(fā)送到下游,不過此處沒有下游節(jié)點)
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
...// 數(shù)據(jù)塊復制相關
}
...// 數(shù)據(jù)塊恢復相關
...// 數(shù)據(jù)塊復制相關
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
...// 清理資源
}
...// 更新metrics
}
特別說明幾個參數(shù):
- stage:表示數(shù)據(jù)塊構建的狀態(tài)。此處為
BlockConstructionStage.PIPELINE_SETUP_CREATE。 - isDatanode:表示寫數(shù)據(jù)塊請求是否由數(shù)據(jù)節(jié)點發(fā)起。如果寫請求中clientname為空,就說明是由數(shù)據(jù)節(jié)點發(fā)起(如數(shù)據(jù)塊復制等由數(shù)據(jù)節(jié)點發(fā)起)。此處為false。
- isClient:表示寫數(shù)據(jù)塊請求是否由客戶端發(fā)起,此值一定與isDatanode相反。此處為true。
- isTransfers:表示寫數(shù)據(jù)塊請求是否為數(shù)據(jù)塊復制。如果stage為
BlockConstructionStage.TRANSFER_RBW或BlockConstructionStage.TRANSFER_FINALIZED,則表示為了數(shù)據(jù)塊復制。此處為false。
下面討論“準備接收數(shù)據(jù)塊”和“接收數(shù)據(jù)塊”兩個過程。
準備接收數(shù)據(jù)塊:BlockReceiver.<init>()
BlockReceiver.<init>():
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
try{
...// 檢查,設置參數(shù)等
// 打開文件,準備接收數(shù)據(jù)塊
if (isDatanode) { // 數(shù)據(jù)塊復制和數(shù)據(jù)塊移動是由數(shù)據(jù)節(jié)點發(fā)起的,這是在tmp目錄下創(chuàng)建block文件
replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
// 對于客戶端發(fā)起的寫數(shù)據(jù)請求(只考慮create,不考慮append),在rbw目錄下創(chuàng)建數(shù)據(jù)塊(block文件、meta文件,數(shù)據(jù)塊處于RBW狀態(tài))
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
...// 其他case
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
...// 略
// 對于數(shù)據(jù)塊復制、數(shù)據(jù)塊移動、客戶端創(chuàng)建數(shù)據(jù)塊,本質(zhì)上都在創(chuàng)建新的block文件。對于這些情況,isCreate為true
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
...// 計算meta文件的文件頭
// 如果需要創(chuàng)建新的block文件,也就需要同時創(chuàng)建新的meta文件,并寫入文件頭
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
...// IOE通常涉及文件等資源,因此要額外清理資源
}
}
盡管上述代碼的注釋加了不少,但創(chuàng)建block的場景比較簡單,只需要記住在rbw目錄下創(chuàng)建block文件和meta文件即可。
在rbw目錄下創(chuàng)建數(shù)據(jù)塊后,還要通過DataNode#notifyNamenodeReceivingBlock()向namenode匯報正在接收的數(shù)據(jù)塊。該方法僅僅將數(shù)據(jù)塊放入緩沖區(qū)中,由BPServiceActor線程異步匯報。
此處不展開,后面會介紹一個相似的方法DataNode#notifyNamenodeReceivedBlock()。
接收數(shù)據(jù)塊:BlockReceiver#receiveBlock()
BlockReceiver#receiveBlock():
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
...// 參數(shù)設置
try {
// 如果是客戶端發(fā)起的寫請求(此處即為數(shù)據(jù)塊create),則啟動PacketResponder發(fā)送ack
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
// 同步接收packet,寫block文件和meta文件
while (receivePacket() >= 0) {}
// 此時,節(jié)點已接收了所有packet,可以等待發(fā)送完所有ack后關閉responder
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
...// 數(shù)據(jù)塊復制相關
} catch (IOException ioe) {
if (datanode.isRestarting()) {
LOG.info("Shutting down for restart (" + block + ").");
} else {
LOG.info("Exception for " + block, ioe);
throw ioe;
}
} finally {
...// 清理
}
}
同步接收packet:BlockReceiver#receivePacket()
先看BlockReceiver#receivePacket()。
嚴格來說,BlockReceiver#receivePacket()負責接收上游的packet,并繼續(xù)向下游節(jié)點管道寫:
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader();
...// 略
...// 檢查packet頭
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
...// 略
// 如果不需要立即持久化也不需要校驗收到的數(shù)據(jù),則可以立即委托PacketResponder線程返回 SUCCESS 的ack,然后再進行校驗和持久化
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 管道寫相關
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) { // 收到空packet可能是表示心跳或數(shù)據(jù)塊發(fā)送
// 這兩種情況都可以嘗試把之前的數(shù)據(jù)刷到磁盤
if (syncBlock) {
flushOrSync(true);
}
} else { // 否則,需要持久化packet
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
...// 如果是管道中的最后一個節(jié)點,則持久化之前,要先對收到的packet做一次校驗(使用packet本身的校驗機制)
...// 如果校驗錯誤,則委托PacketResponder線程返回 ERROR_CHECKSUM 的ack
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
...// 如果校驗塊不完整,需要加載并調(diào)整舊的meta文件內(nèi)容,供后續(xù)重新計算crc
// 寫block文件
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// 寫meta文件
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) { // 如果是校驗塊不完整(之前收到過一部分)
...// 重新計算crc
...// 更新lastCrc
checksumOut.write(buf);
partialCrc = null;
} else { // 如果校驗塊完整
...// 更新lastCrc
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
...//略
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
// 相反的,如果需要立即持久化或需要校驗收到的數(shù)據(jù),則現(xiàn)在已經(jīng)完成了持久化和校驗,可以委托PacketResponder線程返回 SUCCESS 的ack
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 如果超過了響應時間,還要主動發(fā)送一個IN_PROGRESS的ack,防止超時
...// 節(jié)流器相關
// 當整個數(shù)據(jù)塊都發(fā)送完成之前,客戶端會可能會發(fā)送有數(shù)據(jù)的packet,也因為維持心跳或表示結束寫數(shù)據(jù)塊發(fā)送空packet
// 因此,當標志位lastPacketInBlock為true時,不能返回0,要返回一個負值,以區(qū)分未到達最后一個packet之前的情況
return lastPacketInBlock?-1:len;
}
...
private boolean shouldVerifyChecksum() {
// 對于客戶端寫,只有管道中的最后一個節(jié)點滿足`mirrorOut == null`
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
}
BlockReceiver#shouldVerifyChecksum()主要與管道寫有關,本文只有一個datanode,則一定滿足
mirrorOut == null。
上述代碼看起來長,主要工作只有四項:
- 接收packet
- 校驗packet
- 持久化packet
- 委托PacketResponder線程發(fā)送ack
BlockReceiver#receivePacket() + PacketResponder線程 + PacketResponder#ackQueue構成一個生產(chǎn)者消費者模型。生產(chǎn)和消費的對象是ack,BlockReceiver#receivePacket()是生產(chǎn)者,PacketResponder線程是消費者。
掃一眼PacketResponder#enqueue():
void enqueue(final long seqno, final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime(), ackStatus);
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
synchronized(ackQueue) {
if (running) {
ackQueue.addLast(p);
ackQueue.notifyAll();
}
}
}
ackQueue是一個線程不安全的LinkedList。
關于如何利用線程不安全的容器實現(xiàn)生產(chǎn)者消費者模型可參考Java實現(xiàn)生產(chǎn)者-消費者模型中的實現(xiàn)三。
異步發(fā)送ack:PacketResponder線程
與BlockReceiver#receivePacket()相對,PacketResponder線程負責接收下游節(jié)點的ack,并繼續(xù)向上游管道響應。
PacketResponder#run():
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
// 如果當前節(jié)點不是管道的最后一個節(jié)點,且下游節(jié)點正常,則從下游讀取ack
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
ack.readFields(downstreamIn);
...// 統(tǒng)計相關
...// OOB相關(暫時忽略)
seqno = ack.getSeqno();
}
// 如果從下游節(jié)點收到了正常的 ack,或當前節(jié)點是管道的最后一個節(jié)點,則需要從隊列中消費pkt(即BlockReceiver#receivePacket()放入的ack)
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
// 管道寫用seqno控制packet的順序:當且僅當下游正確接收的序號與當前節(jié)點正確處理完的序號相等時,當前節(jié)點才認為該序號的packet已正確接收;上游同理
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
...// 統(tǒng)計相關
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
...// 異常處理
} catch (IOException ioe) {
...// 異常處理
}
...// 中斷退出
// 如果是最后一個packet,將block的狀態(tài)轉(zhuǎn)換為FINALIZED,并關閉BlockReceiver
if (lastPacketInBlock) {
finalizeBlock(startTime);
}
// 此時,必然滿足 ack.seqno == pkt.seqno,構造新的 ack 發(fā)送給上游
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
// 已經(jīng)處理完隊頭元素,出隊
// 只有一種情況下滿足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder線程正在關閉。此時無論隊列中是否有元素,都不需要出隊了
if (pkt != null) {
removeAckHead();
}
} catch (IOException e) {
...// 異常處理
} catch (Throwable e) {
...// 異常處理
}
}
LOG.info(myString + " terminating");
}
總結起來,PacketResponder線程的核心工作如下:
- 接收下游節(jié)點的ack
- 比較ack.seqno與當前隊頭的pkt.seqno
- 如果相等,則向上游發(fā)送pkt
- 如果是最后一個packet,將block的狀態(tài)轉(zhuǎn)換為FINALIZED
一不小心把管道響應的邏輯也分析了。。。
掃一眼PacketResponder線程使用的出隊和查看對頭的方法:
// 查看隊頭
Packet waitForAckHead(long seqno) throws InterruptedException {
synchronized(ackQueue) {
while (isRunning() && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
ackQueue.wait();
}
return isRunning() ? ackQueue.getFirst() : null;
}
}
...
// 出隊
private void removeAckHead() {
synchronized(ackQueue) {
ackQueue.removeFirst();
ackQueue.notifyAll();
}
}
隊尾入隊,隊頭出隊。
- 每次查看對頭后,如果發(fā)現(xiàn)隊列非空,則只要不出隊,則隊列后續(xù)狀態(tài)一定是非空的,且隊頭元素不變。
- 查看隊頭后的第一次出隊,彈出的一定是剛才查看隊頭看到的元素。
需要看下PacketResponder#finalizeBlock():
private void finalizeBlock(long startTime) throws IOException {
// 關閉BlockReceiver,并清理資源
BlockReceiver.this.close();
...// log
block.setNumBytes(replicaInfo.getNumBytes());
// datanode上的數(shù)據(jù)塊關閉委托給FsDatasetImpl#finalizeBlock()
datanode.data.finalizeBlock(block);
// namenode上的數(shù)據(jù)塊關閉委托給Datanode#closeBlock()
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
...// log
}
datanode角度的數(shù)據(jù)塊關閉:FsDatasetImpl#finalizeBlock()
FsDatasetImpl#finalizeBlock():
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
...
private synchronized FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException {
FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
ReplicaState.FINALIZED) { // 數(shù)據(jù)塊恢復相關(略)
newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else {
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
// 回憶BlockReceiver.<init>()的分析,我們創(chuàng)建的block處于RBW狀態(tài),block文件位于rbw目錄(當然,實際上位于哪里也無所謂,原因見后)
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
" for block " + replicaInfo);
}
// 在卷FsVolumeImpl上進行block文件與meta文件的狀態(tài)轉(zhuǎn)換
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
// 該副本即代表最終的數(shù)據(jù)塊副本,處于FINALIZED狀態(tài)
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
...// 略
}
volumeMap.add(bpid, newReplicaInfo);
return newReplicaInfo;
}
FsVolumeImpl#addFinalizedBlock():
File addFinalizedBlock(String bpid, Block b,
File f, long bytesReservedForRbw)
throws IOException {
releaseReservedSpace(bytesReservedForRbw);
return getBlockPoolSlice(bpid).addBlock(b, f);
}
還記得datanode啟動過程中分析的FsVolumeImpl與BlockPoolSlice的關系嗎?此處將操作繼續(xù)委托給BlockPoolSlice#addBlock():
可知,BlockPoolSlice僅管理處于FINALIZED的數(shù)據(jù)塊。
File addBlock(Block b, File f) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockDir);
}
}
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
...// 統(tǒng)計相關
return blockFile;
}
BlockPoolSlice反向借助FsDatasetImpl提供的靜態(tài)方法FsDatasetImpl.moveBlockFiles():
static File moveBlockFiles(Block b, File srcfile, File destdir)
throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
try {
NativeIO.renameTo(srcmeta, dstmeta);
} catch (IOException e) {
throw new IOException("Failed to move meta file for " + b
+ " from " + srcmeta + " to " + dstmeta, e);
}
try {
NativeIO.renameTo(srcfile, dstfile);
} catch (IOException e) {
throw new IOException("Failed to move block file for " + b
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
}
...// 日志
return dstfile;
}
直接將block文件和meta文件從原目錄(rbw目錄,對應RBW狀態(tài))移動到finalized目錄(對應FINALIZED狀態(tài))。
至此,datanode上的寫數(shù)據(jù)塊已經(jīng)完成。
不過,namenode上的元信息還沒有更新,因此,還要向namenode匯報收到了數(shù)據(jù)塊。
- 線程安全由FsDatasetImpl#finalizeReplica()保證
- 整個FsDatasetImpl#finalizeReplica()的流程中,都不關系數(shù)據(jù)塊的原位置,狀態(tài)轉(zhuǎn)換邏輯本身保證了其正確性。
namenode角度的數(shù)據(jù)塊關閉:Datanode#closeBlock()
Datanode#closeBlock():
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
// 向namenode匯報已收到的數(shù)據(jù)塊
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
// 將新數(shù)據(jù)塊添加到blockScanner的掃描范圍中(暫不討論)
FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}
BPOfferService#notifyNamenodeReceivedBlock():
void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
// 收到數(shù)據(jù)塊(增加)與刪除數(shù)據(jù)塊(減少)是一起匯報的,都構造為ReceivedDeletedBlockInfo
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
delHint);
// 每個BPServiceActor都要向自己負責的namenode發(fā)送報告
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlock(bInfo, storageUuid, true);
}
}
BPServiceActor#notifyNamenodeBlock():
void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
String storageUuid, boolean now) {
synchronized (pendingIncrementalBRperStorage) {
// 更新pendingIncrementalBRperStorage
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
// sendImmediateIBR是一個volatile變量,控制是否立即發(fā)送BlockReport(BR)
sendImmediateIBR = true;
// 傳入的now為true,接下來將喚醒阻塞在pendingIncrementalBRperStorage上的所有線程
if (now) {
pendingIncrementalBRperStorage.notifyAll();
}
}
}
該方法的核心是pendingIncrementalBRperStorage,它維護了兩次匯報之間收到、刪除的數(shù)據(jù)塊。pendingIncrementalBRperStorage是一個緩沖區(qū),此處將收到的數(shù)據(jù)塊放入緩沖區(qū)后即認為通知完成(當然,不一定成功);由其他線程讀取緩沖區(qū),異步向namenode匯報。
猴子看的源碼比較少,但這種緩沖區(qū)的設計思想在HDFS和Yarn中非常常見。緩沖區(qū)實現(xiàn)了解耦,解耦不僅能提高可擴展性,還能在緩沖區(qū)兩端使用不同的處理速度、處理規(guī)模。如pendingIncrementalBRperStorage,生產(chǎn)者不定期、零散放入的數(shù)據(jù)塊,消費者就可以定期、批量的對數(shù)據(jù)塊進行處理。而保障一定及時性的前提下,批量匯報減輕了RPC的壓力。
利用IDE,很容易得知,只有負責向各namenode發(fā)送心跳的BPServiceActor線程阻塞在pendingIncrementalBRperStorage上。后文將分析該線程如何進行實際的匯報。
PacketResponder#close()
根據(jù)對BlockReceiver#receivePacket()與PacketResponder線程的分析,節(jié)點已接收所有packet時,ack可能還沒有發(fā)送完。
因此,需要調(diào)用PacketResponder#close(),等待發(fā)送完所有ack后關閉responder:
public void close() {
synchronized(ackQueue) {
// ackQueue非空就說明ack還沒有發(fā)送完成
while (isRunning() && ackQueue.size() != 0) {
try {
ackQueue.wait();
} catch (InterruptedException e) {
running = false;
Thread.currentThread().interrupt();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": closing");
}
// notify阻塞在PacketResponder#waitForAckHead()方法上的PacketResponder線程,使其檢測到關閉條件
running = false;
ackQueue.notifyAll();
}
// ???
synchronized(this) {
running = false;
notifyAll();
}
}
猴子沒明白19-22行的synchronized語句塊有什么用,,,求解釋。
BPServiceActor線程
根據(jù)前文,接下來需要分析BPServiceActor線程如何讀取pendingIncrementalBRperStorage緩沖區(qū),進行實際的匯報。
在BPServiceActor#offerService()中調(diào)用了pendingIncrementalBRperStorage#wait()。由于涉及阻塞、喚醒等操作,無法按照正常流程分析,這里從線程被喚醒的位置開始分析:
// 如果目前不需要匯報,則wait一段時間
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
// BPServiceActor線程從此處醒來,然后退出synchronized塊
pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
}
} // synchronized
可能有讀者閱讀過猴子的條件隊列大法好:使用wait、notify和notifyAll的正確姿勢,認為此處
if(){wait}的寫法姿勢不正確。讀者可再復習一下該文的“version2:過早喚醒”部分,結合HDFS的心跳機制,思考一下為什么此處的寫法沒有問題。更甚,此處恰恰應當這么寫。
如果目前不需要匯報,則BPServiceActor線程會wait一段時間,正式這段wait的時間,讓BPServiceActor#notifyNamenodeBlock()的喚醒產(chǎn)生了意義。
BPServiceActor線程喚醒后,醒來后,繼續(xù)心跳循環(huán):
while (shouldRun()) {
try {
final long startTime = now();
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
假設還到達心跳發(fā)送間隔,則不執(zhí)行if語句塊。
此時,在BPServiceActor#notifyNamenodeBlock()方法中修改的volatile變量sendImmediateIBR就派上了用場:
// 檢測到sendImmediateIBR為true,則立即匯報已收到和已刪除的數(shù)據(jù)塊
if (sendImmediateIBR ||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
// 匯報已收到和已刪除的數(shù)據(jù)塊
reportReceivedDeletedBlocks();
// 更新lastDeletedReport
lastDeletedReport = startTime;
}
// 再來一次完整的數(shù)據(jù)塊匯報
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
// 處理namenode返回的命令
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
有意思的是,這里先單獨匯報了一次數(shù)據(jù)塊收到和刪除的情況,該RPC不需要等待namenode的返回值;又匯報了一次總體情況,此時需要等待RPC的返回值了。
因此,盡管對于增刪數(shù)據(jù)塊采取增量式匯報,但由于增量式匯報后必然跟著一次全量匯報,使得增量匯報的成本仍然非常高。為了提高并發(fā),BPServiceActor#notifyNamenodeBlock修改緩沖區(qū)后立即返回,不關心匯報是否成功。也不必擔心匯報失敗的后果:在匯報之前,數(shù)據(jù)塊已經(jīng)轉(zhuǎn)為FINALIZED狀態(tài)+持久化到磁盤上+修改了緩沖區(qū),如果匯報失敗可以等待重試,如果datanode在發(fā)報告前掛了可以等啟動后重新匯報,必然能保證一致性。
暫時不關心總體匯報的邏輯,只看單獨匯報的BPServiceActor#reportReceivedDeletedBlocks():
private void reportReceivedDeletedBlocks() throws IOException {
// 構造報告,并重置sendImmediateIBR為false
ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
final DatanodeStorage storage = entry.getKey();
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
if (perStorageMap.getBlockInfoCount() > 0) {
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
}
}
sendImmediateIBR = false;
}
// 如果報告為空,就直接返回
if (reports.size() == 0) {
return;
}
// 否則通過RPC向自己負責的namenode發(fā)送報告
boolean success = false;
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
// 如果匯報失敗,則將增刪數(shù)據(jù)塊的信息放回緩沖區(qū),等待重新匯報
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
}
}
有兩個注意點:
- 不管namenode處于active或standy狀態(tài),BPServiceActor線程都會匯報(盡管會忽略standby namenode的命令)
- 最后success為false時,可能namenode已收到匯報,但將信息添加會緩沖區(qū)導致重復匯報也沒有壞影響,這分為兩個方面:
- 重復匯報已刪除的數(shù)據(jù)塊:namenode發(fā)現(xiàn)未存儲該數(shù)據(jù)塊的信息,則得知其已經(jīng)刪除了,會忽略該信息。
- 重復匯報已收到的數(shù)據(jù)塊:namenode發(fā)現(xiàn)新收到的數(shù)據(jù)塊與已存儲數(shù)據(jù)塊的信息完全一致,也會忽略該信息。
總結
1個客戶端+1個datanode構成了最小的管道。本文梳理了在這個最小管道上無異常情況下的寫數(shù)據(jù)塊過程,在此之上,再來分析管道寫的有異常的難度將大大降低。
本文鏈接:源碼|HDFS之DataNode:寫數(shù)據(jù)塊(1)
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識共享署名-相同方式共享 4.0 國際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載,演繹或用于商業(yè)目的,但是必須保留本文的署名及鏈接。