首页 > 其他分享 >HDFS读流程分析

HDFS读流程分析

时间:2023-10-30 23:32:30浏览次数:25  
标签:分析 HDFS 流程 throws IOException reader new final block

1. 背景

https://blog.51cto.com/u_15327484/8023493https://blog.51cto.com/u_15327484/8089923https://blog.51cto.com/u_15327484/8095971三篇文章中,介绍了HDFS写文件在client、NameNode、DataNode组件侧的行为逻辑。

对于HDFS读文件流程来说相对简单:

  1. 获取HDFS文件起始的的block信息。
  2. 选择离客户端最近的datanode读取block。
  3. 当block读取完毕,ClientProtocol.getBlockLocations()读取下一个block位置信息。
  4. 所有block读取完全,调用HdfsDataInputStream.close()方法关闭输入流。

HDFS读文件流程如下:

Untitled.png

2. HDFS读文件业务代码示例

如下所示,先调用FileSystem.get()创建DistributedFileSystem对象,再调用DistributedFileSystem.open打开文件,最后调用FSDataInputStream.readLine读取内容:

	public void testRead() {
		try {
			Configuration conf = new Configuration();
			FileSystem fs = FileSystem.get(conf);
			Path p = new Path("hdfs://localhost:9000/a.txt");
			FSDataInputStream in = fs.open(p);
			BufferedReader buff = new BufferedReader(new InputStreamReader(in));
			String str = null;
			while ((str = buff.readLine()) != null) {
				System.out.println(str);
			}
			buff.close();
			in.close();
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

3. 客户端执行open操作

DistributedFileSystem调用DFSClient.open创建输入流FSDataInputStream:

DFSClient dfs;

public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    statistics.incrementReadOps(1);
    storageStatistics.incrementOpCounter(OpType.OPEN);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p) throws IOException {
        final DFSInputStream dfsis =
            dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

DFSClient.open先默认获取10个blocks位置信息,再根据这些blocks创建DFSInputStream:

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException {
    checkOpen();
    //    Get block info from namenode
    try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
      //获取前10个blocks信息
      LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
      //创建DFSInputStream
      return openInternal(locatedBlocks, src, verifyChecksum);
    }
  }

//创建DFSInputStream
private DFSInputStream openInternal(LocatedBlocks locatedBlocks, String src,
      boolean verifyChecksum) throws IOException {
      //省略
      return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
  }

DFSClient.getLocatedBlocks()方法用于获取HDFS文件从[start,dfsClientConf.getPrefetchSize()]区间对应的blocks,start为0,dfsClientConf.getPrefetchSize()为10*BlockSize。即每次客户端获取10个blocks信息:

public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
  }

最终调用ClientProtocol.getBlockLocations获取前10个blocks信息:

static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length)
      throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
          FileNotFoundException.class,
          UnresolvedPathException.class);
    }
  }

DFSInputStream包含blocks信息,路径信息:

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
      LocatedBlocks locatedBlocks) throws IOException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    this.locatedBlocks = locatedBlocks;
    openInfo(false);
  }

void openInfo(boolean refreshLocatedBlocks) throws IOException {
    final DfsClientConf conf = dfsClient.getConf();
    synchronized(infoLock) {
      lastBlockBeingWrittenLength =
          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
      int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          waitFor(conf.getRetryIntervalForGetLastBlockLength());
          //获取最后一个block长度
          lastBlockBeingWrittenLength =
              fetchLocatedBlocksAndGetLastBlockLength(true);
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (lastBlockBeingWrittenLength == -1
          && retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }

4. 客户端执行read操作

执行DFSInputStream.read(),会途径DFSInputStream.readWithStrategy()方法,blockSeekTo方法负责获取第pos的block对应的最近的datanode,readBuffer负责从datanode中读取block信息:

protected synchronized int readWithStrategy(ReaderStrategy strategy)
      throws IOException {
    dfsClient.checkOpen();
    if (closed.get()) {
      throw new IOException("Stream closed");
    }

    int len = strategy.getTargetLength();
    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
    failures = 0;
    if (pos < getFileLength()) {
      int retries = 2;
      while (retries > 0) {
        try {
          // currentNode can be left as null if previous read had a checksum
          // error on the same block. See HDFS-3067
          if (pos > blockEnd || currentNode == null) {
            //获取第pos的block对应的最近的datanode
            currentNode = blockSeekTo(pos);
          }
          //省略
          //读取datanode中的block信息
          int result = readBuffer(strategy, realLen, corruptedBlocks);

          if (result >= 0) {
            pos += result;
          } else {
            // got a EOS from reader though we expect more data on it.
            throw new IOException("Unexpected EOS from the reader");
          }
          //省略
    return -1;
  }

blockSeekTo获取block对应的优先级最高的dn,并构建blockReader:

private synchronized DatanodeInfo blockSeekTo(long target)
      throws IOException {
      //获取第target个block
      LocatedBlock targetBlock = getBlockAt(target);

   
      //选择block优先级最高的dn
      DNAddrPair retval = chooseDataNode(targetBlock, null);
      chosenNode = retval.info;
      InetSocketAddress targetAddr = retval.addr;
      StorageType storageType = retval.storageType;
      // Latest block if refreshed by chooseDatanode()
      targetBlock = retval.block;

      //构建block对于的BlockReader对象
        blockReader = getBlockReader(targetBlock, offsetIntoBlock,
            targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
            storageType, chosenNode);
       
  }

getBlockReader用于构建blockReader,如果client在dn上,就构建BlockReaderLocal读取本地block,否则就通过BlockReaderRemote读取远程datanode中的block:

if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
        if (clientContext.getUseLegacyBlockReaderLocal()) {
          reader = getLegacyBlockReaderLocal();
          if (reader != null) {
            LOG.trace("{}: returning new legacy block reader local.", this);
            return reader;
          }
        } else {
          reader = getBlockReaderLocal();
          if (reader != null) {
            LOG.trace("{}: returning new block reader local.", this);
            return reader;
          }
        }
      }
      if (scConf.isDomainSocketDataTraffic()) {
        reader = getRemoteBlockReaderFromDomain();
        if (reader != null) {
          LOG.trace("{}: returning new remote block reader using UNIX domain "
              + "socket on {}", this, pathInfo.getPath());
          return reader;
        }
      }

它会调用Sender.readBlock方法,发送READ_BLOCK请求:

public static BlockReader newBlockReader(String file,
      ExtendedBlock block,
      Token<BlockTokenIdentifier> blockToken,
      long startOffset, long len,
      boolean verifyChecksum,
      String clientName,
      Peer peer, DatanodeID datanodeID,
      PeerCache peerCache,
      CachingStrategy cachingStrategy,
      int networkDistance) throws IOException {
    // in and out will be closed when sock is closed (by the caller)
    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
        peer.getOutputStream()));
    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
        verifyChecksum, cachingStrategy);

public void readBlock(final ExtendedBlock blk,
      final Token<BlockTokenIdentifier> blockToken,
      final String clientName,
      final long blockOffset,
      final long length,
      final boolean sendChecksum,
      final CachingStrategy cachingStrategy) throws IOException {

    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
        .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName,
            blockToken))
        .setOffset(blockOffset)
        .setLen(length)
        .setSendChecksums(sendChecksum)
        .setCachingStrategy(getCachingStrategy(cachingStrategy))
        .build();
    //发送READ_BLOCK请求
    send(out, Op.READ_BLOCK, proto);
  }

本文以BlockReaderRemote远程读取为例进行研究。

调用DFSInputStream.read()方法,最终会调用BlockReaderRemote.read()方法,它进入readNextPacket方法读取packet:

public synchronized int read(ByteBuffer buf) throws IOException {
    if (curDataSlice == null ||
        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
      //读取packet
      readNextPacket();
    }
    //省略

    return nRead;
  }

readNextPacket读取packet,并通过checksum进行校验:

private void readNextPacket() throws IOException {
    //Read packet headers.
    //读取数据包头和数据包
    packetReceiver.receiveNextPacket(in);

    PacketHeader curHeader = packetReceiver.getHeader();
    curDataSlice = packetReceiver.getDataSlice();
    assert curDataSlice.capacity() == curHeader.getDataLen();

    LOG.trace("DFSClient readNextPacket got header {}", curHeader);

    // Sanity check the lengths
   
    if (!curHeader.sanityCheck(lastSeqNo)) {
      throw new IOException("BlockReader: error in packet header " +
          curHeader);
    }

    if (curHeader.getDataLen() > 0) {
      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
      int checksumsLen = chunks * checksumSize;

      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
          "checksum slice capacity=" +
              packetReceiver.getChecksumSlice().capacity() +
              " checksumsLen=" + checksumsLen;

      lastSeqNo = curHeader.getSeqno();
      if (verifyChecksum && curDataSlice.remaining() > 0) {
        // N.B.: the checksum error offset reported here is actually
        // relative to the start of the block, not the start of the file.
        // This is slightly misleading, but preserves the behavior from
        // the older BlockReader.
        //通过chunksum检查读取的数据是否正确
        checksum.verifyChunkedSums(curDataSlice,
            packetReceiver.getChecksumSlice(),
            filename, curHeader.getOffsetInBlock());
      }
      bytesNeededToFinish -= curHeader.getDataLen();
    }
  }

5. DataNode处理READ_BLOCK请求

根据https://blog.51cto.com/u_15327484/8095971中对于DataNode线程模型的分析,处理读请求时,DataNode线程模型如下:

Untitled 1.png

在DataNode中,由DataXceiver.readBlock处理客户端的READ_BLOCK请求。readBlock方法主要构建BlockSender,调用sendBlock方法发送数据:

public void readBlock(final ExtendedBlock block,
      final Token<BlockTokenIdentifier> blockToken,
      final String clientName,
      final long blockOffset,
      final long length,
      final boolean sendChecksum,
      final CachingStrategy cachingStrategy) throws IOException {
    previousOpClientName = clientName;
    long read = 0;
    updateCurrentThreadName("Sending block " + block);
    OutputStream baseStream = getOutputStream();
    DataOutputStream out = getBufferedOutputStream();
    checkAccess(out, true, block, blockToken, Op.READ_BLOCK,
        BlockTokenIdentifier.AccessMode.READ);

    // send the block
    BlockSender blockSender = null;
    //省略
        blockSender = new BlockSender(block, blockOffset, length,
            true, false, sendChecksum, datanode, clientTraceFmt,
            cachingStrategy);
     //省略

      read = blockSender.sendBlock(out, baseStream, null); // send data
     //省略
  }

调用doSendBlock方法发送数据:

private long doSendBlock(DataOutputStream out, OutputStream baseStream,
        DataTransferThrottler throttler) throws IOException {
      //省略
      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
      //省略
      while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
        manageOsCache();
        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
            transferTo, throttler);
        offset += len;
        totalRead += len + (numberOfChunks(len) * checksumSize);
        seqno++;
      }
      //省略
    return totalRead;
  }

在sendPacket方法中,有一处可以关注,它读取dfs.datanode.transferTo.allowed配置,如果为true,则进行零拷贝:

if (transferTo) {
        SocketOutputStream sockOut = (SocketOutputStream)out;
        // First write header and checksums
        sockOut.write(buf, headerOff, dataOff - headerOff);

        // no need to flush since we know out is not a buffered stream
        FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
        LongWritable waitTime = new LongWritable();
        LongWritable transferTime = new LongWritable();
        fileIoProvider.transferToSocketFully(
            ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
            dataLen, waitTime, transferTime);
        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
        blockInPosition += dataLen;
      } else {
        // normal transfer
        out.write(buf, headerOff, dataOff + dataLen - headerOff);
      }

零拷贝表示不需要应用程序流转,直接将内核中的数据读取到socket buffer中:

Untitled 2.png

如果不开启dfs.datanode.transferTo.allowed配置,会通过应用程序的buf进行中转,效率不高:

if (!transferTo) { // normal transfer
      try {
        ris.readDataFully(buf, dataOff, dataLen);
      } catch (IOException ioe) {
        if (ioe.getMessage().startsWith(EIO_ERROR)) {
          throw new DiskFileCorruptException("A disk IO error occurred", ioe);
        }
        throw ioe;
      }

      if (verifyChecksum) {
        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
      }
    }

这就是传统的数据传输方式:

Untitled 3.png

标签:分析,HDFS,流程,throws,IOException,reader,new,final,block
From: https://blog.51cto.com/u_15327484/8097899

相关文章

  • 阅读笔记:《软件需求分析》阅读笔记四
    软件需求分析是软件工程中至关重要的一部分,它涉及到确定和记录系统或应用程序的功能和性能需求,以便开发团队可以理解和满足用户的期望。在进行软件需求分析时,需要考虑各种因素,包括用户需求、系统约束、功能规范等等。本次笔记将继续探讨软件需求分析的重要性以及一些常用的技术和......
  • Java流程控制_01分支结构
    1. if分支  2.switch分支switch可以快速找到某个值,不用一个一个找(底层代码)  ......
  • 【技术分享】Amazon RDS MySQL常见故障分析和排查
    在亚马逊云的RDS中支持几乎主流的数据库,对于亚马逊云中的数据库的问题排查对于我们的日常业务的稳定运行会很有帮助。本篇将对于Amazon RDSMySQL常见故障问题的分析和排查办法。RDS提供了强大的适配工作负载功能对于RDS的责任共担RDS常见连接问题Check:√客户端IP地址是否在D......
  • 嵌入式Linux中内存管理详解分析
    Linux中内存管理内存管理的主要工作就是对物理内存进行组织,然后对物理内存的分配和回收。但是Linux引入了虚拟地址的概念。虚拟地址的作用如果用户进程直接操作物理地址会有以下的坏处:1、用户进程可以直接操作内核对应的内存,破坏内核运行。2、用户进程也会破坏其他进程的运行CPU......
  • Hadoop三大组件(HDFS,MapReduce,Yarn)
    1、HDFSHDFS是Hadoop分布式文件系统。一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。2、MapReduceMapReduce是一个软件框架,基于该框架能够容易地编写......
  • 倾斜摄影三维模型的顶层合并构建重要性分析
    倾斜摄影三维模型的顶层合并构建重要性分析 倾斜摄影超大场景的三维模型的顶层合并对于构建精确、完整且真实的三维模型具有重要的意义和应用价值。本文将从几个方面对其重要性进行浅析。一、模型完整性与连贯性倾斜摄影超大场景的三维模型的顶层合并可以将多个倾斜摄影数据......
  • 数据统计分析 — 正态分布
    连续型随机变量的概率分布德国的高斯法国的拉普拉斯回到最开始的业务场景通过统计描述,分析师已经了解了配件A过去的日消耗量波动情况,现希望基于历史数据设定库存控制线,要求该库存量能够保证99%的使用日不会出现库存断货情况。该怎么办呢?控制线设置成均数可以吗?肯定是不......
  • HDFS写流程分析:Namenode接收client请求
    1.背景在https://blog.51cto.com/u_15327484/8023493文章中,介绍了HDFS创建文件时,客户端执行的操作。对于NameNode而言,在创建文件的过程中,它会接受客户端以下rpc请求:createaddBlockcomplete本文将详细介绍这三个RPC在NameNode端的处理流程,同时扩展介绍Namenode相关架构。2.......
  • 二叉搜索树结构分析
    二叉查找树(BinarySearchTree),(又:二叉搜索树,二叉排序树),它具有以下特点:若任一节点的左子树不空,则左子树上所有结点的值均小于它的根结点的值;若任一节点的右子树不空,则右子树上所有结点的值均大于它的根结点的值;任意节点的左、右子树也分别为二叉查找树;没有键值相等的节点。下......
  • PHP全院级不良事件管理系统源码,支持上报、处理、分析、整改
    不良事件管理系统帮助医院梳理建立不良事件上报与管理的一体化解决方案,包含上报内容、归口科室、上报流程及管理办法。提供面向医院的不良事件全过程管理平台,包含事件上报、事件处理、事件追踪、RCA分析及持续改进等环节,帮助管理者从医院管理体系、运行机制与规章制度上进行有针对......