hdfs讀之block讀取解析<一>

一、hdfs讀取流程

  • 先獲取文件流
    FSDataInputStream fsIn = FileSystem.open("path")
  • 然后讀取文件內容
    fsIn.read(buf, off, toRead)
  • 流程圖如下


    image.png

二、hdfs客戶端打開文件流過程

1. 打開文件流FileSystem.open
  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    //該方法被DistributedFileSystem.open實現了
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }
2.繼承了FileSystem的 DistributedFileSystem.open
  @Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    statistics.incrementReadOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
          //該方法調用DFSClient的open
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

3. DFSClient.open如下
  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    //檢查客戶端讀取文件是否關閉
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      //返回構造方法,下面到這個構造方法看看邏輯
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }
4. DFSInputStream 構造方法
 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      /**
       * 讀緩存策略,readDropBehind和readahead兩個參數控制讀緩存策略,數據讀取通常為磁盤操作,每次read將會讀取一頁數據(512b或者更大),這些數據加載到內存并傳輸給Client。
       * readDropBehind表示讀后即棄,即數據讀取后立即丟棄cache數據,這可以在多用戶并發(fā)文件讀取時有效節(jié)約內存,不過會導致更頻繁的磁盤操作,
       * 如果關閉此特性,read操作后數據會被cache在內存,對于同一個文件的多次讀取可以有效的提升性能,但會消耗更多內存。readahead為預讀,
       * 如果開啟,那么Datanode將會在一次磁盤讀取操作中向前額外的多讀取一定字節(jié)的數據,在線性讀取時,這可以有效降低IO操作延遲。
       * 這個特性需要在Datanode上開啟Native libaries,否則不會生效
       */
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    //讀取block信息
    openInfo();
  }
5. DFSInputStream.openInfo()
 void openInfo() throws IOException, UnresolvedLinkException {
    synchronized(infoLock) {
      //讀取block塊信息并且獲得最后一個block塊的長度
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      //默認三次,dfs.client.retry.times.get-last-block-length
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          //默認4000毫秒,dfs.client.retry.interval-ms.get-last-block-length
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }
6. DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    //客戶端向namenode請求獲取block信息
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }
    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
    return lastBlockBeingWrittenLength;
  }
7. 獲取block信息DFSClient.getLocatedBlocks()
  public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    // DFSClient.callGetBlockLocations調用
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }
  /*
   * This is just a wrapper around callGetBlockLocations, but non-static so that
   * we can stub it out for tests.
   */
  @VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    TraceScope scope = getPathTraceScope("getBlockLocations", src);
    try {
      // 遠程RPC調用namenode server
      return callGetBlockLocations(namenode, src, start, length);
    } finally {
      scope.close();
    }
  }
8. 遠程RPC調用DFSClient.callGetBlockLocations()
/**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
    //通過ClientProtocol(ClientNamenodeProtocolTranslatorPB)的協議向namenode請求
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

9. RPC NN ClientNamenodeProtocolTranslatorPB.getBlockLocations()
@Override
  public LocatedBlocks getBlockLocations(String src, long offset, long length)
      throws AccessControlException, FileNotFoundException,
      UnresolvedLinkException, IOException {
     
    GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
        .newBuilder()
        .setSrc(src)
        .setOffset(offset)
        .setLength(length)
        .build();
    try {
      // rpc調用暫時不分析了
      //調用NameNodeRpcServer.getBlockLocations
      //rpcProxy: localhost/127.0.0.1:51397, ProtobufRpcEngine, ClientNamenodeProtocolPB
      GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req);
      return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

三、hdfs namenode服務端獲取block信息

1. namenode獲取block信息NameNodeRpcServer.getBlockLocations()
  public LocatedBlocks getBlockLocations(String src, long offset,  long length) 
      throws IOException {
    checkNNStartup();
    metrics.incrGetBlockLocations();
    //調用FSNamesystem.getBlockLocations()
    return namesystem.getBlockLocations(getClientMachine(),  src, offset, length);
  }
2. 獲取排序后block FSNamesystem.getBlockLocations()
/**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
      long offset, long length) throws IOException {
    checkOperation(OperationCategory.READ);
    GetBlockLocationsResult res = null;
    FSPermissionChecker pc = getPermissionChecker();
    readLock();
    try {
      checkOperation(OperationCategory.READ);
     //獲取指定區(qū)間的block
      res = getBlockLocations(pc, srcArg, offset, length, true, true);
    } catch (AccessControlException e) {
      logAuditEvent(false, "open", srcArg);
      throw e;
    } finally {
      readUnlock();
    }

    logAuditEvent(true, "open", srcArg);

    if (res.updateAccessTime()) {
      byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
          srcArg);
      String src = srcArg;
      writeLock();
      final long now = now();
      try {
        checkOperation(OperationCategory.WRITE);
        /**
         * Resolve the path again and update the atime only when the file
         * exists.
         *
         * XXX: Races can still occur even after resolving the path again.
         * For example:
         *
         * <ul>
         *   <li>Get the block location for "/a/b"</li>
         *   <li>Rename "/a/b" to "/c/b"</li>
         *   <li>The second resolution still points to "/a/b", which is
         *   wrong.</li>
         * </ul>
         *
         * The behavior is incorrect but consistent with the one before
         * HDFS-7463. A better fix is to change the edit log of SetTime to
         * use inode id instead of a path.
         */
        src = dir.resolvePath(pc, srcArg, pathComponents);
        final INodesInPath iip = dir.getINodesInPath(src, true);
        INode inode = iip.getLastINode();
        boolean updateAccessTime = inode != null &&
            now > inode.getAccessTime() + getAccessTimePrecision();
        if (!isInSafeMode() && updateAccessTime) {
          boolean changed = FSDirAttrOp.setTimes(dir,
              inode, -1, now, false, iip.getLatestSnapshotId());
          if (changed) {
            getEditLog().logTimes(src, -1, now);
          }
        }
      } catch (Throwable e) {
        LOG.warn("Failed to update the access time of " + src, e);
      } finally {
        writeUnlock();
      }
    }

    LocatedBlocks blocks = res.blocks;
    // 對block副本所在的datanode節(jié)點按照到client的網絡拓撲距離排序
    if (blocks != null) {
      blockManager.getDatanodeManager().sortLocatedBlocks(
          clientMachine, blocks.getLocatedBlocks());
      // lastBlock is not part of getLocatedBlocks(), might need to sort it too
      LocatedBlock lastBlock = blocks.getLastLocatedBlock();
      if (lastBlock != null) {
        ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
        blockManager.getDatanodeManager().sortLocatedBlocks(
            clientMachine, lastBlockList);
      }
    }
    //將排序后的blocks返回客戶端
    return blocks;
  }

3. 獲取blocks FSNamesystem.getBlockLocations()
/**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   * @throws IOException
   */
  GetBlockLocationsResult getBlockLocations(
      FSPermissionChecker pc, String src, long offset, long length,
      boolean needBlockToken, boolean checkSafeMode) throws IOException {
    if (offset < 0) {
      throw new HadoopIllegalArgumentException(
          "Negative offset is not supported. File: " + src);
    }
    if (length < 0) {
      throw new HadoopIllegalArgumentException(
          "Negative length is not supported. File: " + src);
    }
    final GetBlockLocationsResult ret = getBlockLocationsInt(
        pc, src, offset, length, needBlockToken);

    if (checkSafeMode && isInSafeMode()) {
      for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
        // if safemode & no block locations yet then throw safemodeException
        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
          SafeModeException se = new SafeModeException(
              "Zero blocklocations for " + src, safeMode);
          if (haEnabled && haContext != null &&
              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
            throw new RetriableException(se);
          } else {
            throw se;
          }
        }
      }
    }
    return ret;
  }
4. 最后調用FSNamesystem.getBlockLocationsInt()
private GetBlockLocationsResult getBlockLocationsInt(
      FSPermissionChecker pc, final String srcArg, long offset, long length,
      boolean needBlockToken)
      throws IOException {
    String src = srcArg;
    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
    src = dir.resolvePath(pc, srcArg, pathComponents);
    final INodesInPath iip = dir.getINodesInPath(src, true);
    // 將path解析成INodeFile
    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
    if (isPermissionEnabled) {
      dir.checkPathAccess(pc, iip, FsAction.READ);
      checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
    }

    // 計算文件長度
    final long fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock();
    boolean isUc = inode.isUnderConstruction();
    if (iip.isSnapshot()) {
      // if src indicates a snapshot file, we need to make sure the returned
      // blocks do not exceed the size of the snapshot file.
      length = Math.min(length, fileSize - offset);
      isUc = false;
    }

    final FileEncryptionInfo feInfo =
        FSDirectory.isReservedRawName(srcArg) ? null
            : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
    //通過blockManager創(chuàng)建LocatedBlocks
    final LocatedBlocks blocks = blockManager.createLocatedBlocks(
        inode.getBlocks(iip.getPathSnapshotId()), fileSize,
        isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
    // Set caching information for the located blocks.
    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
      cacheManager.setCachedLocations(lb);
    }
    final long now = now();
    boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
        && !iip.isSnapshot()
        && now > inode.getAccessTime() + getAccessTimePrecision();
    return new GetBlockLocationsResult(updateAccessTime, blocks);
  }

四、 BlockManager創(chuàng)建LocatedBlocks

1. 獲取locaktedBolocks方法BlockManager.createLocatedBlocks()
/** Create a LocatedBlocks. */
  public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
      final long fileSizeExcludeBlocksUnderConstruction,
      final boolean isFileUnderConstruction, final long offset,
      final long length, final boolean needBlockToken,
      final boolean inSnapshot, FileEncryptionInfo feInfo)
      throws IOException {
    assert namesystem.hasReadLock();
    if (blocks == null) {
      return null;
    } else if (blocks.length == 0) {
      return new LocatedBlocks(0, isFileUnderConstruction,
          Collections.<LocatedBlock>emptyList(), null, false, feInfo);
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
      }
      final AccessMode mode = needBlockToken? AccessMode.READ: null;
      //根據blocks獲取LocatedBlock集合
      //createLocatedBlockList也調用了createLocatedBlock方法
      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
          blocks, offset, length, Integer.MAX_VALUE, mode);

      final LocatedBlock lastlb;
      final boolean isComplete;
      if (!inSnapshot) {
        final BlockInfoContiguous last = blocks[blocks.length - 1];
        final long lastPos = last.isComplete()?
            fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
            : fileSizeExcludeBlocksUnderConstruction;
        lastlb = createLocatedBlock(last, lastPos, mode);
        isComplete = last.isComplete();
      } else {
        lastlb = createLocatedBlock(blocks,
            fileSizeExcludeBlocksUnderConstruction, mode);
        isComplete = true;
      }
      return new LocatedBlocks(
          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
          locatedblocks, lastlb, isComplete, feInfo);
    }
  }
2. 最終獲取block信息BlockManager.createLocatedBlock()
  /** @return a LocatedBlock for the given block */
  private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
      ) throws IOException {
    if (blk instanceof BlockInfoContiguousUnderConstruction) {
      if (blk.isComplete()) {
        throw new IOException(
            "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
            + ", blk=" + blk);
      }
      final BlockInfoContiguousUnderConstruction uc =
          (BlockInfoContiguousUnderConstruction) blk;
      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
      return new LocatedBlock(eb, storages, pos, false);
    }


    // get block locations
    // 計算blk但無法讀取該Block的Datanode節(jié)點數
    final int numCorruptNodes = countNodes(blk).corruptReplicas();
    // 計算FSNamesystem在內存中維護的Block=>Datanode映射的列表中,無法讀取該Block的Datanode節(jié)點數
    // corruptReplicasMap存儲損壞數據塊Block與它對應每個數據節(jié)點與損壞原因集合映射關系的集合
    //計算blk鄰近信息塊損壞的副本個數,正常情況下和numCorruptNodes一個相等
    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
    if (numCorruptNodes != numCorruptReplicas) {
      LOG.warn("Inconsistent number of corrupt replicas for "
          + blk + " blockMap has " + numCorruptNodes
          + " but corrupt replicas map has " + numCorruptReplicas);
    }
    // 獲取Block所在位置(Datanode節(jié)點)
    // 計算文件blk鄰近信息塊存儲在哪些Datanode節(jié)點上
    final int numNodes = blocksMap.numNodes(blk);
    // 如果損壞的數和副本數一樣,則標識此block為壞的block
    final boolean isCorrupt = numCorruptNodes == numNodes;
    // 如果isCorrupt是true,則返回所有Datanode節(jié)點,否則,只返回可用的Block副本所在的Datanode節(jié)點
    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
    final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
    int j = 0;
    if (numMachines > 0) {
      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
        final DatanodeDescriptor d = storage.getDatanodeDescriptor();
        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
        if (isCorrupt || (!replicaCorrupt))
          machines[j++] = storage;
      }
    }
    assert j == machines.length :
      "isCorrupt: " + isCorrupt + 
      " numMachines: " + numMachines +
      " numNodes: " + numNodes +
      " numCorrupt: " + numCorruptNodes +
      " numCorruptRepls: " + numCorruptReplicas;
    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
    //返回blk和datanode實例化的LocatedBlock
    return new LocatedBlock(eb, machines, pos, isCorrupt);
  }

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    須臾之北閱讀 540評論 0 0
  • 隨著數據量越來越大, 在 一個操作系統管轄的范圍存不下了, 那么就 分配到更多的操作系統管理的磁盤中, 但是不方便...
    tracy_668閱讀 2,880評論 0 6
  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    須臾之北閱讀 916評論 0 1
  • 西山八大處離蝸居較近,交通也算方便,故時不時的閑庭信步一番。但今天才細細品賞門柱的對聯:至樂寄山林,斯文...
    退休人老高閱讀 457評論 0 0
  • 唱完后,主持人問樸樹,這次你為什么來參加節(jié)目呢? 樸樹:這是我的工作吧,嗯,應該是這樣! 主持人:選這首歌的意義在...
    小蛀蟲2017閱讀 409評論 0 2

友情鏈接更多精彩內容