一、hdfs讀取流程
- 先獲取文件流
FSDataInputStream fsIn = FileSystem.open("path") - 然后讀取文件內容
fsIn.read(buf, off, toRead) -
流程圖如下
image.png
二、hdfs客戶端打開文件流過程
1. 打開文件流FileSystem.open
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
//該方法被DistributedFileSystem.open實現了
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
2.繼承了FileSystem的 DistributedFileSystem.open
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
//該方法調用DFSClient的open
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
3. DFSClient.open如下
/**
* Create an input stream that obtains a nodelist from the
* namenode, and then reads from all the right places. Creates
* inner subclass of InputStream that does the right out-of-band
* work.
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
//檢查客戶端讀取文件是否關閉
checkOpen();
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
//返回構造方法,下面到這個構造方法看看邏輯
return new DFSInputStream(this, src, verifyChecksum);
} finally {
scope.close();
}
}
4. DFSInputStream 構造方法
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
/**
* 讀緩存策略,readDropBehind和readahead兩個參數控制讀緩存策略,數據讀取通常為磁盤操作,每次read將會讀取一頁數據(512b或者更大),這些數據加載到內存并傳輸給Client。
* readDropBehind表示讀后即棄,即數據讀取后立即丟棄cache數據,這可以在多用戶并發(fā)文件讀取時有效節(jié)約內存,不過會導致更頻繁的磁盤操作,
* 如果關閉此特性,read操作后數據會被cache在內存,對于同一個文件的多次讀取可以有效的提升性能,但會消耗更多內存。readahead為預讀,
* 如果開啟,那么Datanode將會在一次磁盤讀取操作中向前額外的多讀取一定字節(jié)的數據,在線性讀取時,這可以有效降低IO操作延遲。
* 這個特性需要在Datanode上開啟Native libaries,否則不會生效
*/
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
//讀取block信息
openInfo();
}
5. DFSInputStream.openInfo()
void openInfo() throws IOException, UnresolvedLinkException {
synchronized(infoLock) {
//讀取block塊信息并且獲得最后一個block塊的長度
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
//默認三次,dfs.client.retry.times.get-last-block-length
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
//默認4000毫秒,dfs.client.retry.interval-ms.get-last-block-length
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
6. DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
//客戶端向namenode請求獲取block信息
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
7. 獲取block信息DFSClient.getLocatedBlocks()
public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException {
// DFSClient.callGetBlockLocations調用
return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
}
/*
* This is just a wrapper around callGetBlockLocations, but non-static so that
* we can stub it out for tests.
*/
@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
throws IOException {
TraceScope scope = getPathTraceScope("getBlockLocations", src);
try {
// 遠程RPC調用namenode server
return callGetBlockLocations(namenode, src, start, length);
} finally {
scope.close();
}
}
8. 遠程RPC調用DFSClient.callGetBlockLocations()
/**
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length)
throws IOException {
try {
//通過ClientProtocol(ClientNamenodeProtocolTranslatorPB)的協議向namenode請求
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
}
}
9. RPC NN ClientNamenodeProtocolTranslatorPB.getBlockLocations()
@Override
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
.newBuilder()
.setSrc(src)
.setOffset(offset)
.setLength(length)
.build();
try {
// rpc調用暫時不分析了
//調用NameNodeRpcServer.getBlockLocations
//rpcProxy: localhost/127.0.0.1:51397, ProtobufRpcEngine, ClientNamenodeProtocolPB
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req);
return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
三、hdfs namenode服務端獲取block信息
1. namenode獲取block信息NameNodeRpcServer.getBlockLocations()
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
//調用FSNamesystem.getBlockLocations()
return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
}
2. 獲取排序后block FSNamesystem.getBlockLocations()
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
long offset, long length) throws IOException {
checkOperation(OperationCategory.READ);
GetBlockLocationsResult res = null;
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
//獲取指定區(qū)間的block
res = getBlockLocations(pc, srcArg, offset, length, true, true);
} catch (AccessControlException e) {
logAuditEvent(false, "open", srcArg);
throw e;
} finally {
readUnlock();
}
logAuditEvent(true, "open", srcArg);
if (res.updateAccessTime()) {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
srcArg);
String src = srcArg;
writeLock();
final long now = now();
try {
checkOperation(OperationCategory.WRITE);
/**
* Resolve the path again and update the atime only when the file
* exists.
*
* XXX: Races can still occur even after resolving the path again.
* For example:
*
* <ul>
* <li>Get the block location for "/a/b"</li>
* <li>Rename "/a/b" to "/c/b"</li>
* <li>The second resolution still points to "/a/b", which is
* wrong.</li>
* </ul>
*
* The behavior is incorrect but consistent with the one before
* HDFS-7463. A better fix is to change the edit log of SetTime to
* use inode id instead of a path.
*/
src = dir.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
INode inode = iip.getLastINode();
boolean updateAccessTime = inode != null &&
now > inode.getAccessTime() + getAccessTimePrecision();
if (!isInSafeMode() && updateAccessTime) {
boolean changed = FSDirAttrOp.setTimes(dir,
inode, -1, now, false, iip.getLatestSnapshotId());
if (changed) {
getEditLog().logTimes(src, -1, now);
}
}
} catch (Throwable e) {
LOG.warn("Failed to update the access time of " + src, e);
} finally {
writeUnlock();
}
}
LocatedBlocks blocks = res.blocks;
// 對block副本所在的datanode節(jié)點按照到client的網絡拓撲距離排序
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
}
}
//將排序后的blocks返回客戶端
return blocks;
}
3. 獲取blocks FSNamesystem.getBlockLocations()
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
* @throws IOException
*/
GetBlockLocationsResult getBlockLocations(
FSPermissionChecker pc, String src, long offset, long length,
boolean needBlockToken, boolean checkSafeMode) throws IOException {
if (offset < 0) {
throw new HadoopIllegalArgumentException(
"Negative offset is not supported. File: " + src);
}
if (length < 0) {
throw new HadoopIllegalArgumentException(
"Negative length is not supported. File: " + src);
}
final GetBlockLocationsResult ret = getBlockLocationsInt(
pc, src, offset, length, needBlockToken);
if (checkSafeMode && isInSafeMode()) {
for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = new SafeModeException(
"Zero blocklocations for " + src, safeMode);
if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
}
return ret;
}
4. 最后調用FSNamesystem.getBlockLocationsInt()
private GetBlockLocationsResult getBlockLocationsInt(
FSPermissionChecker pc, final String srcArg, long offset, long length,
boolean needBlockToken)
throws IOException {
String src = srcArg;
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = dir.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
// 將path解析成INodeFile
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
// 計算文件長度
final long fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
// if src indicates a snapshot file, we need to make sure the returned
// blocks do not exceed the size of the snapshot file.
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ? null
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
//通過blockManager創(chuàng)建LocatedBlocks
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
cacheManager.setCachedLocations(lb);
}
final long now = now();
boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
&& !iip.isSnapshot()
&& now > inode.getAccessTime() + getAccessTimePrecision();
return new GetBlockLocationsResult(updateAccessTime, blocks);
}
四、 BlockManager創(chuàng)建LocatedBlocks
1. 獲取locaktedBolocks方法BlockManager.createLocatedBlocks()
/** Create a LocatedBlocks. */
public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken,
final boolean inSnapshot, FileEncryptionInfo feInfo)
throws IOException {
assert namesystem.hasReadLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? AccessMode.READ: null;
//根據blocks獲取LocatedBlock集合
//createLocatedBlockList也調用了createLocatedBlock方法
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE, mode);
final LocatedBlock lastlb;
final boolean isComplete;
if (!inSnapshot) {
final BlockInfoContiguous last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
lastlb = createLocatedBlock(last, lastPos, mode);
isComplete = last.isComplete();
} else {
lastlb = createLocatedBlock(blocks,
fileSizeExcludeBlocksUnderConstruction, mode);
isComplete = true;
}
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, isComplete, feInfo);
}
}
2. 最終獲取block信息BlockManager.createLocatedBlock()
/** @return a LocatedBlock for the given block */
private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
) throws IOException {
if (blk instanceof BlockInfoContiguousUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
+ ", blk=" + blk);
}
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, storages, pos, false);
}
// get block locations
// 計算blk但無法讀取該Block的Datanode節(jié)點數
final int numCorruptNodes = countNodes(blk).corruptReplicas();
// 計算FSNamesystem在內存中維護的Block=>Datanode映射的列表中,無法讀取該Block的Datanode節(jié)點數
// corruptReplicasMap存儲損壞數據塊Block與它對應每個數據節(jié)點與損壞原因集合映射關系的集合
//計算blk鄰近信息塊損壞的副本個數,正常情況下和numCorruptNodes一個相等
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
+ blk + " blockMap has " + numCorruptNodes
+ " but corrupt replicas map has " + numCorruptReplicas);
}
// 獲取Block所在位置(Datanode節(jié)點)
// 計算文件blk鄰近信息塊存儲在哪些Datanode節(jié)點上
final int numNodes = blocksMap.numNodes(blk);
// 如果損壞的數和副本數一樣,則標識此block為壞的block
final boolean isCorrupt = numCorruptNodes == numNodes;
// 如果isCorrupt是true,則返回所有Datanode節(jié)點,否則,只返回可用的Block副本所在的Datanode節(jié)點
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!replicaCorrupt))
machines[j++] = storage;
}
}
assert j == machines.length :
"isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
//返回blk和datanode實例化的LocatedBlock
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
