引入
上一篇我们看完了NameNode的启动源码,对于NameNode我们已经很熟悉了,今天我们接着来看看它的“得力干将”——DataNode。
首先,自然还是从元数据管理篇提到的DataNode类(org.apache.hadoop.hdfs.server.datanode.DataNode)开始。不过在深入启动源码前,我们先看看它的源码注释:
DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with a single NameNode. It also communicates with client code and other DataNodes from time to time.
DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode may also, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes.
The DataNode maintains just one critical table:
block-> stream of bytes (of BLOCK_SIZE or less)
This info is stored on a local disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards.
DataNodes spend their lives in an endless loop of asking the NameNode for something to do. A NameNode cannot connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode.
DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.
翻译:
DataNode(数据节点)是一个类(也是一个程序),它为HDFS部署存储Block块。一次集群里可以有一个或多个 DataNode。每个 DataNode 会定期与 NameNode 进行通信。它还会不时地与客户端代码和其他 DataNode 进行通信。
DataNode 存储一系列有索引的Block块。DataNode允许客户端代码读取这些Block块,或者写入新的Block块。DataNode还可能根据NameNode的指令,删除Block块或者在其他DataNode之间复制Block块。
数据节点只维护一张关键的表:
Block块 -> 字节流(大小为Block块大小或更小)
这些数据都存储在本地磁盘上。DataNode在启动时以及之后每隔一段时间向NameNode报告该表的内容。
DataNode会一直循环向NameNode询问要做的事情。(通过心跳)NameNode不能直接连接到DataNode;NameNode只是通过,由DataNode调用的函数,所返回的值,来向DataNode传达指令。
DataNode维护着一个开放的服务器socket,以便客户端代码或其他DataNode能够读取/写入数据。此服务器的主机/端口会报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他DataNode。
我们来总结一下:
- 一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的。
- DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报)
- NameNode不能直接操作DataNode,而是通过心跳返回值指令的方式去操作的DataNode。
- DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。
通过前面篇章的学习,再看源码注释的内容,会发现正好验证了我们所掌握的知识,下面我们开始进入今天的主题。
启动过程
有了上一篇文章的经验,我们的第一目标,自然是找DataNode类的main方法啦。
对应代码如下:
public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
secureMain(args, null);
}
继续往里走,secureMain实现代码如下:
public static void secureMain(String args[], SecureResources resources) {
... ...
// 初始化DataNode
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null) {
// 阻塞起来,也就是JPS能看到datanode的原因啦
datanode.join();
}
... ...
}
可以看到createDataNode(args, null, resources)创建返回了DataNode对象,我们继续看看它的源码:
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 实例化DataNode
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
//启动DataNode后台线程
dn.runDatanodeDaemon();
}
return dn;
}
在上面代码中,instantiateDataNode创建返回了DataNode对象,其实在创建DataNode的构造中,还会初始化DataXceiver服务、HttpServer服务、DataNode PRC 服务及向NameNode注册并进行心跳汇报,然后再通过 dn.runDatanodeDaemon() 方法启动DataXceiver服务。
DataXceiver服务用于接收客户端写数据和通信。
下面我们就来看看 instantiateDataNode(args, conf, resources) 实现代码:
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
... ...
// 根据配置获取DataNode 数据存储位置
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
... ...
//创建返回DataNode对象
return makeInstance(dataLocations, conf, resources);
}
makeInstance方法会携带DataNode 数据存储位置,创建DataNode对象。
makeInstance源码如下:
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
// 检查数据目录可用
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
DefaultMetricsSystem.initialize("DataNode");
assert locations.size() > 0 : "number of data directories should be > 0";
// 至少配置的一个数据目录,可用就返回创建DataNode
return new DataNode(conf, locations, resources);
}
进入DataNode构造可以看到startDataNode方法:
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
... ...
// 启动datanode
startDataNode(conf, dataDirs, resources);
... ...
//这里使用了构建者设计模式
datanodeNetworkCounts =
CacheBuilder.newBuilder()
.maximumSize(dncCacheMaxSize)
.build(new CacheLoader<String, Map<String, Long>>() {
@Override
public Map<String, Long> load(String key) throws Exception {
final Map<String, Long> ret = new HashMap<String, Long>();
ret.put("networkErrors", 0L);
return ret;
}
});
}
通过上面源码,可以看到在DataNode 对象的构造中,执行了startDataNode方法,通过它去初始化各种服务,并向NameNode注册信息。
在StartDataNode方法中主要可以分为如下4个过程:(强迫症犯了,就想和NameNode的启动都划分成4个,见谅
标签:HDFS,...,源码,DataNode,conf,new,NameNode From: https://blog.csdn.net/qq_41478243/article/details/145183859