1. 背景
在Hadoop Yarn中,App、AppAttempt、Container、Node都有自己的生命周期,因此Yarn实现了一套状态机进行管理。通过状态机的管理后,用户可以直观看到App、AppAttempt、Container、Node的状态,其状态切换也更规范。但是状态机也导致Yarn的代码可能性很差,无法很好调试。
在HDFS中就不需要维护状态机,对于HDFS的操作,只有成功和失败。因此,在代码分析上,更容易对代码进行阅读。
通过https://blog.51cto.com/u_15327484/7995505文章了解到HDFS是GFS之上进行了一些简化,本文在此基础上,分析客户端向DataNode写数据的详细过程,文章最后会总结客户端发送数据的流程。
同时,本文通过代码分析,会解释以下难点:
- Datanode响应客户端的数据粒度。是按Block响应?还是按packet响应?还是按chunk响应?
- 客户端如何切分block?block在哪里切分成packet?packet哪里切分成chunk?
- 一个packet64KB,实际每个chunk写入packet的大小为516Byte,它们不是倍数关系,是不是一个packet没办法装满了?(来自强迫症患者的疑问)
2. HDFS客户端写操作流程概览
- 客户端创建DistributedFileSystem,请求Namenode创建文件。
- 向FSDataOutputStream写入要发送给DataNode的文件数据。
- FSDataOutputStream中,DataStream线程先向Namenode申请创建Block,并根据返回的block位置信息,与datanode建立连接。
- DataStream向DataNode发送packet。如果Block写入数据量达到128MB,就跳转到步骤3。
3. 写流程源码分析
3.1 写操作典型业务代码
进行写操作时,一般业务代码包含以下几步骤:
- 设置配置文件。
- 根据配置文件创建HDFS客户端对象。
- 基于HDFS客户端对象执行write写操作。
如下所示:
public void create() throws URISyntaxException, IOException, InterruptedException {
// 配置文件
Configuration conf = new Configuration();
// 获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://{namenode IP}:9000"), conf, 访问的用户);
// 创建文件并写入数据
FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
out.write("Hello, HDFS".getBytes());
out.flush();
// 关闭流
fs.close();
}
3.2 根据URL创建HDFS客户端
FileSystem是一个抽象类,它提供get方法获取文件系统客户端实现类对象。之所以没有直接new一个HDFS客户端,是因为Hadoop提供多种文件系统的访问,例如alluxio、S3a。FileSystem读取url和Configuration即可创建对应的文件系统客户端。Hadoop自带的FileSystem实现类有如下:
本文文档只用于研究HDFS,因此这里FileSystem.get方法访问的是HDFS文件系统的客户端:
public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer {
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
}
由于FileSystem是一个抽象类,返回的对象必须是其子类。get方法负责创建FileSystem子类,如下,调用getFileSystemClass方法,它会获取core-site.xml中fs.hdfs.impl的配置,默认为org.apache.hadoop.hdfs.DistributedFileSystem,后续通过反射创建DistributedFileSystem对象:
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
//查询core-site.xml中fs.hdfs.impl的配置,默认是org.apache.hadoop.hdfs.DistributedFileSystem
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
//通过反射创建FileSystem子类DistributedFileSystem对象
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//DistributedFileSystem初始化操作
fs.initialize(uri, conf);
return fs;
}
}
查看DistributedFileSystem类,可以发现,它包含最核心的成员是DFSClient,DFSClient负责执行真正的请求namenode和datanode读写操作:
public class DistributedFileSystem extends FileSystem
implements KeyProviderTokenIssuer {
//工作目录,一般为/user/{USER},类似linux中,hadoop用户的工作目录是/home/hadoop。默认情况下,使用hdfs dfs -ls . 即可访问/user/{User}家目录
private Path workingDir;
//集群地址,一般为hdfs://{集群名}
private URI uri;
//用来连接namenode和datanode最核心的客户端
DFSClient dfs;
//检验数据是否正确写入或读取
private boolean verifyChecksum = true;
//客户端审计信息
private DFSOpsCountStatistics storageStatistics;
}
3.3 客户端请求namenode创建文件
创建好HDFS客户端对象后,业务方此时会调用.create方法新建一个文件以为后续写数据作准备。
在执行create方法创建文件时,有几个非常重要的参数可以留意。
- permission:默认情况下,为默认权限与默认umask异或运算。即666-022=644
- overwrite:是否覆盖原有有文件,默认为true,即进行覆盖。
- bufferSize:写入数据时使用的缓冲区大小。默认情况下读取io.file.buffer.size配置,单位byte,默认4KB。
- replication:默认情况下,读取dfs.replication获取文件副本数,默认3。获取默认值的方法由DistributedFileSystem#getDefaultReplication提供。
- blockSize:默认情况下:读取file.blocksize获取每个block大小,默认为64MB,可以设置为128MB。获取默认值的方法由DistributedFileSystem#getDefaultBlockSize提供。
- favoredNodes:数据块首选节点,默认为空。即不指定首选节点。
public HdfsDataOutputStream create(final Path f,
final FsPermission permission, final boolean overwrite,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final InetSocketAddress[] favoredNodes)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
@Override
public HdfsDataOutputStream doCall(final Path p) throws IOException {
//创建文件
final DFSOutputStream out = dfs.create(getPathName(f), permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE),
true, replication, blockSize, progress, bufferSize, null,
favoredNodes);
//准备开始写数据
return dfs.createWrappedOutputStream(out, statistics);
}
//省略
}.resolve(this, absF);
}
create方法最终调用DFSOutputStream#newStreamForCreate方法,创建文件并启动写入流程:
- 通过执行dfsClient.namenode.create()方法向namenode服务端发起请求创建文件。
- 创建DFSOutputStream对象,该对象内部有一个DataStreamer线程。向DFSOutputStream写入的数据会发送给DataStreamer线程。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
//创建文件
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break;
//省略
}
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
//启动发送数据的线程
out.start();
return out;
}
}
首先,查看创建文件的请求流程。dfsClient.namenode成员类型实际上是客户端与Namenode的协议ClientProtocol,通过Haddoop RPC可以知道,要想访问namenode,必须创建ClientProtocol的代理对象。
在DFSClient的对象创建过程汇中,可以看到,代理对象的创建是在NameNodeProxiesClient#createProxyWithLossyRetryHandler方法中,它指定创建ClientProtocol.class的代理对象:
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory, KeyProviderTokenIssuer {
final ClientProtocol namenode;
ProxyAndInfo<ClientProtocol> proxyInfo = null;
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
nameNodeUri, ClientProtocol.class, numResponseToDrop,
nnFallbackToSimpleAuth);
this.namenode = proxyInfo.getProxy();
}
进入NameNodeProxiesClient,它先创建ProxyProvider,再由ProxyProvider创建代理对象:
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
T proxy = (T) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[]{xface}, dummyHandler);
ProxyProvider的实现类由hdfs-site.xml中的dfs.client.failover.proxy.provider.{集群名}配置指定,默认是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。
在ConfiguredFailoverProxyProvider中,创建了active namenode和standby namenode两个proxy对象。当通过RetryProxy创建的代理对象执行rpc失败后,会通过performFailover方法切换到另外一个proxy对象,这涉及到Java Proxy机制,不详细研究。
protected final List<NNProxyInfo<T>> proxies;
ConfiguredFailoverProxyProvider最终调用NameNodeProxiesClient#createProxyWithAlignmentContext创建proxy对象,如下,最终的基于ClientNamenodeProtocolPB创建proxy,后续就是protobuf生成代码,不详细研究:
public static ClientProtocol createProxyWithAlignmentContext(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
SafeModeException.class.getName());
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth, alignmentContext).getProxy();
if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol) RetryProxy.create(
ClientProtocol.class,
new DefaultFailoverProxyProvider<>(ClientProtocol.class,
translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
基于协议,namenode会返回文件的元数据给client。比如:路径、权限、拥有者、组、大小、修改时间,block位置信息。详细元数据如下所示:
private long length = 0L;
private boolean isdir = false;
private int replication = 0;
private long blocksize = 0L;
private long mtime = 0L;
private long atime = 0L;
private FsPermission permission = null;
private EnumSet<Flags> flags = EnumSet.noneOf(Flags.class);
private String owner = null;
private String group = null;
private byte[] symlink = null;
private byte[] path = EMPTY_NAME;
private long fileId = -1L;
private int childrenNum = 0;
private FileEncryptionInfo feInfo = null;
private byte storagePolicy =
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
private ErasureCodingPolicy ecPolicy = null;
private LocatedBlocks locations = null;
在客户端创建完Namenode代理对象后,会启动DFSOutputStream中DataStreamer线程。客户端会将数据通过该线程向DataNode中进行写入。将在下一节中详细介绍DataStreamer线程:
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
//启动发送数据的线程
out.start();