首页 > 其他分享 >HDFS写流程分析:客户端发送数据

HDFS写流程分析:客户端发送数据

时间:2023-10-25 18:07:37浏览次数:35  
标签:HDFS 创建 chunk 写入 packet 发送数据 block 客户端

1. 背景

在Hadoop Yarn中,App、AppAttempt、Container、Node都有自己的生命周期,因此Yarn实现了一套状态机进行管理。通过状态机的管理后,用户可以直观看到App、AppAttempt、Container、Node的状态,其状态切换也更规范。但是状态机也导致Yarn的代码可能性很差,无法很好调试。

在HDFS中就不需要维护状态机,对于HDFS的操作,只有成功和失败。因此,在代码分析上,更容易对代码进行阅读。

通过https://blog.51cto.com/u_15327484/7995505文章了解到HDFS是GFS之上进行了一些简化,本文在此基础上,分析客户端向DataNode写数据的详细过程,文章最后会总结客户端发送数据的流程。

同时,本文通过代码分析,会解释以下难点:

  1. Datanode响应客户端的数据粒度。是按Block响应?还是按packet响应?还是按chunk响应?
  2. 客户端如何切分block?block在哪里切分成packet?packet哪里切分成chunk?
  3. 一个packet64KB,实际每个chunk写入packet的大小为516Byte,它们不是倍数关系,是不是一个packet没办法装满了?(来自强迫症患者的疑问)

2. HDFS客户端写操作流程概览

  1. 客户端创建DistributedFileSystem,请求Namenode创建文件。
  2. 向FSDataOutputStream写入要发送给DataNode的文件数据。
  3. FSDataOutputStream中,DataStream线程先向Namenode申请创建Block,并根据返回的block位置信息,与datanode建立连接。
  4. DataStream向DataNode发送packet。如果Block写入数据量达到128MB,就跳转到步骤3。

Untitled.png

3. 写流程源码分析

3.1 写操作典型业务代码

进行写操作时,一般业务代码包含以下几步骤:

  1. 设置配置文件。
  2. 根据配置文件创建HDFS客户端对象。
  3. 基于HDFS客户端对象执行write写操作。

如下所示:

public void create() throws URISyntaxException, IOException, InterruptedException {
        // 配置文件
        Configuration conf = new Configuration();
        // 获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://{namenode IP}:9000"), conf, 访问的用户);
        // 创建文件并写入数据
        FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
        out.write("Hello, HDFS".getBytes());
        out.flush();
        // 关闭流
        fs.close();
    }

3.2 根据URL创建HDFS客户端

FileSystem是一个抽象类,它提供get方法获取文件系统客户端实现类对象。之所以没有直接new一个HDFS客户端,是因为Hadoop提供多种文件系统的访问,例如alluxio、S3a。FileSystem读取url和Configuration即可创建对应的文件系统客户端。Hadoop自带的FileSystem实现类有如下:

Untitled 1.png

本文文档只用于研究HDFS,因此这里FileSystem.get方法访问的是HDFS文件系统的客户端:

public abstract class FileSystem extends Configured
    implements Closeable, DelegationTokenIssuer {

public static FileSystem get(final URI uri, final Configuration conf,
        final String user) throws IOException, InterruptedException {
    String ticketCachePath =
      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
    UserGroupInformation ugi =
        UserGroupInformation.getBestUGI(ticketCachePath, user);
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      @Override
      public FileSystem run() throws IOException {
        return get(uri, conf);
      }
    });
  }
}

由于FileSystem是一个抽象类,返回的对象必须是其子类。get方法负责创建FileSystem子类,如下,调用getFileSystemClass方法,它会获取core-site.xml中fs.hdfs.impl的配置,默认为org.apache.hadoop.hdfs.DistributedFileSystem,后续通过反射创建DistributedFileSystem对象:

private static FileSystem createFileSystem(URI uri, Configuration conf)
      throws IOException {
    Tracer tracer = FsTracer.get(conf);
    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
      scope.addKVAnnotation("scheme", uri.getScheme());
      //查询core-site.xml中fs.hdfs.impl的配置,默认是org.apache.hadoop.hdfs.DistributedFileSystem
      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
      //通过反射创建FileSystem子类DistributedFileSystem对象
      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
      //DistributedFileSystem初始化操作
      fs.initialize(uri, conf);
      return fs;
    }
  }

查看DistributedFileSystem类,可以发现,它包含最核心的成员是DFSClient,DFSClient负责执行真正的请求namenode和datanode读写操作:

public class DistributedFileSystem extends FileSystem
    implements KeyProviderTokenIssuer {
  //工作目录,一般为/user/{USER},类似linux中,hadoop用户的工作目录是/home/hadoop。默认情况下,使用hdfs dfs -ls . 即可访问/user/{User}家目录
  private Path workingDir;
  //集群地址,一般为hdfs://{集群名}
  private URI uri;
  //用来连接namenode和datanode最核心的客户端
  DFSClient dfs;
  //检验数据是否正确写入或读取
  private boolean verifyChecksum = true;
  //客户端审计信息
  private DFSOpsCountStatistics storageStatistics;
}

3.3 客户端请求namenode创建文件

创建好HDFS客户端对象后,业务方此时会调用.create方法新建一个文件以为后续写数据作准备。

在执行create方法创建文件时,有几个非常重要的参数可以留意。

  1. permission:默认情况下,为默认权限与默认umask异或运算。即666-022=644
  2. overwrite:是否覆盖原有有文件,默认为true,即进行覆盖。
  3. bufferSize:写入数据时使用的缓冲区大小。默认情况下读取io.file.buffer.size配置,单位byte,默认4KB。
  4. replication:默认情况下,读取dfs.replication获取文件副本数,默认3。获取默认值的方法由DistributedFileSystem#getDefaultReplication提供。
  5. blockSize:默认情况下:读取file.blocksize获取每个block大小,默认为64MB,可以设置为128MB。获取默认值的方法由DistributedFileSystem#getDefaultBlockSize提供。
  6. favoredNodes:数据块首选节点,默认为空。即不指定首选节点。
public HdfsDataOutputStream create(final Path f,
      final FsPermission permission, final boolean overwrite,
      final int bufferSize, final short replication, final long blockSize,
      final Progressable progress, final InetSocketAddress[] favoredNodes)
      throws IOException {
    statistics.incrementWriteOps(1);
    storageStatistics.incrementOpCounter(OpType.CREATE);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
      @Override
      public HdfsDataOutputStream doCall(final Path p) throws IOException {
        //创建文件
        final DFSOutputStream out = dfs.create(getPathName(f), permission,
            overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                : EnumSet.of(CreateFlag.CREATE),
            true, replication, blockSize, progress, bufferSize, null,
            favoredNodes);
        //准备开始写数据
        return dfs.createWrappedOutputStream(out, statistics);
      }
      //省略
    }.resolve(this, absF);
  }

create方法最终调用DFSOutputStream#newStreamForCreate方法,创建文件并启动写入流程:

  1. 通过执行dfsClient.namenode.create()方法向namenode服务端发起请求创建文件。
  2. 创建DFSOutputStream对象,该对象内部有一个DataStreamer线程。向DFSOutputStream写入的数据会发送给DataStreamer线程。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress,
      DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
      throws IOException {
    try (TraceScope ignored =
             dfsClient.newPathTraceScope("newStreamForCreate", src)) {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      int retryCount = CREATE_RETRY_COUNT;
      while (shouldRetry) {
        shouldRetry = false;
        try {
          //创建文件
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
          break;
      //省略
      }
      final DFSOutputStream out;
      if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      //启动发送数据的线程
      out.start();
      return out;
    }
  }

首先,查看创建文件的请求流程。dfsClient.namenode成员类型实际上是客户端与Namenode的协议ClientProtocol,通过Haddoop RPC可以知道,要想访问namenode,必须创建ClientProtocol的代理对象。

在DFSClient的对象创建过程汇中,可以看到,代理对象的创建是在NameNodeProxiesClient#createProxyWithLossyRetryHandler方法中,它指定创建ClientProtocol.class的代理对象:

public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory, KeyProviderTokenIssuer {
final ClientProtocol namenode;
ProxyAndInfo<ClientProtocol> proxyInfo = null;
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop,
          nnFallbackToSimpleAuth);
this.namenode = proxyInfo.getProxy();
}

进入NameNodeProxiesClient,它先创建ProxyProvider,再由ProxyProvider创建代理对象:


AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
        createFailoverProxyProvider(config, nameNodeUri, xface, true,
            fallbackToSimpleAuth);
T proxy = (T) Proxy.newProxyInstance(
          failoverProxyProvider.getInterface().getClassLoader(),
          new Class[]{xface}, dummyHandler);

ProxyProvider的实现类由hdfs-site.xml中的dfs.client.failover.proxy.provider.{集群名}配置指定,默认是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。

在ConfiguredFailoverProxyProvider中,创建了active namenode和standby namenode两个proxy对象。当通过RetryProxy创建的代理对象执行rpc失败后,会通过performFailover方法切换到另外一个proxy对象,这涉及到Java Proxy机制,不详细研究。

protected final List<NNProxyInfo<T>> proxies;

ConfiguredFailoverProxyProvider最终调用NameNodeProxiesClient#createProxyWithAlignmentContext创建proxy对象,如下,最终的基于ClientNamenodeProtocolPB创建proxy,后续就是protobuf生成代码,不详细研究:

public static ClientProtocol createProxyWithAlignmentContext(
      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
      boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
      AlignmentContext alignmentContext)
      throws IOException {
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
        ProtobufRpcEngine.class);

    final RetryPolicy defaultPolicy =
        RetryUtils.getDefaultRetryPolicy(
            conf,
            HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
            HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
            HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
            HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
            SafeModeException.class.getName());

    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth, alignmentContext).getProxy();

    if (withRetries) { // create the proxy with retries
      Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
      ClientProtocol translatorProxy =
          new ClientNamenodeProtocolTranslatorPB(proxy);
      return (ClientProtocol) RetryProxy.create(
          ClientProtocol.class,
          new DefaultFailoverProxyProvider<>(ClientProtocol.class,
              translatorProxy),
          methodNameToPolicyMap,
          defaultPolicy);
    } else {
      return new ClientNamenodeProtocolTranslatorPB(proxy);
    }
  }

基于协议,namenode会返回文件的元数据给client。比如:路径、权限、拥有者、组、大小、修改时间,block位置信息。详细元数据如下所示:

    private long length                    = 0L;
    private boolean isdir                  = false;
    private int replication                = 0;
    private long blocksize                 = 0L;
    private long mtime                     = 0L;
    private long atime                     = 0L;
    private FsPermission permission        = null;
    private EnumSet<Flags> flags           = EnumSet.noneOf(Flags.class);
    private String owner                   = null;
    private String group                   = null;
    private byte[] symlink                 = null;
    private byte[] path                    = EMPTY_NAME;
    private long fileId                    = -1L;
    private int childrenNum                = 0;
    private FileEncryptionInfo feInfo      = null;
    private byte storagePolicy             =
        HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
    private ErasureCodingPolicy ecPolicy   = null;
    private LocatedBlocks locations        = null;

在客户端创建完Namenode代理对象后,会启动DFSOutputStream中DataStreamer线程。客户端会将数据通过该线程向DataNode中进行写入。将在下一节中详细介绍DataStreamer线程:

     out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      //启动发送数据的线程
      out.start();

3.4 客户端向datanode发送数据

标签:HDFS,创建,chunk,写入,packet,发送数据,block,客户端
From: https://blog.51cto.com/u_15327484/8023493

相关文章

  • 使用命令操作HDFS文件系统
    HDFS文件系统基本信息HDFS作为分布式存储的文件系统,有其对数据的路径表达方式。HDFS同Linux系统一样,均是以/作为根目录的组织形式Linux: /usr/local/hello.txtHDFS: /usr/local/hello.txt命令行#老版本用法hadoopfs[genericoptions]#新版本用法hdfsdfs[generi......
  • 10月23日简易服务器与客户端通信
    目录简易服务器与客户端通信简易服务器与客户端通信此代码可以用于两者互相通信(就是互相发信息)服务器代码#导入一个socket模块来建立一个简单的通信服务器importsocket#socket.AF_INET表示使用IPv4地址族,这是Internet上常用的地址族。socket.SOCK_STREAM表示创建一......
  • 客户端桌面显示信息-----工具Bginfo
    上几天接个朋友的请求,帮忙做个设置,现实每台计算机的信息在桌面,以便管理。我想,应该不难,上网找了一下,还是可以解决的。微软有个工具Bginfo,能显示计算机的系统信息在桌面的。对于教学计算机来说,有些情况是很有必要的。例如显示学生的计算机信息在桌面,以防止学生更改系统设置,或者方便......
  • Eureka注册中心(服务端)和Eureka注册中心(客户端)
    搭建EurekaServer(服务端)1.搭建EurekaServer服务步骤如下:创建项目引入依赖<!--eureka服务端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-serve......
  • 传奇客户端常见补丁功能解析
    NewopUI.Pak基础补丁如血条界面框等很多用到这个补丁ChrSel.pak游戏登陆界面,选择人物界面图Effect.pak沙的门和墙破碎图Hair.pak人物头发图Magic.pak魔法效果图Magic2.pak补充魔法效果图,召唤神兽时符和雷电的图MagIcon.pak技能栏里的魔法缩略图Mon1.pak-Mon**.pak各种传奇......
  • 版本管理客户端工具SourceTree
      [使用]1.设置SSH客户端工具>选项 设置OpenSSH, SSH密钥这一栏自然会去选择当前用户下的.ssh目录下的id_rsa这个私钥: ......
  • 在客户端存储登录令牌 token 步骤
    原理:  是服务器创建的对象(一般存储当前登录用户的编号);加密为定长字符串;发送给客户端;客户端存储起来;等待下一次请求时,提交自己的身份令牌;服务器读取该令牌,解密从而获取其中的用户信息一-类似于银行卡客户端在登录成功后保存令牌:  uni.setStorageSync(userToken'......
  • Linux 实现OpenSSL 服务器端客户端通信
    1.OpenSSL安装详情参考博文:https://blog.csdn.net/qq_39521181/article/details/964576732.SSL在学习openssl编程之前,先了解一下什么是SSL,有助于后续的学习。SSL是一个缩写,代表的是SecureSocketsLayer。它是支持在Internet上进行安全通信的标准,并且将数据密码术集成到了......
  • eas_客户端查询提示过滤条件不完整或比较值过长请检查
    问题如图所示,这里票据号码是标准元数据字段,查看客户端dep,显示字段长度为30,这里实际测试,查询超过30位就会提示这个,这里需要修改2个元数据,复制并修改xxx.entity和xxx.table文件里的对应字段的长度,然后打包成私包,部署到server/lib/metas/sp的目录下,元数据的路径和原来保持一致 ......
  • openGauss学习笔记-104 openGauss 数据库管理-管理数据库安全-客户端接入之SSL证书管
    openGauss学习笔记-104openGauss数据库管理-管理数据库安全-客户端接入之SSL证书管理-证书替换openGauss默认配置了通过openssl生成的安全证书、私钥。并且提供证书替换的接口,方便用户进行证书的替换。104.1操作场景openGauss默认配置了SSL连接所需要的安全的证书、私钥,用户......