1. 背景
在https://blog.51cto.com/u_15327484/8023493文章中,介绍了HDFS创建文件时,客户端执行的操作。对于NameNode而言,在创建文件的过程中,它会接受客户端以下rpc请求:
- create
- addBlock
- complete
本文将详细介绍这三个RPC在NameNode端的处理流程,同时扩展介绍Namenode相关架构。
2. NameNode文件系统实现及其class
在NameNode服务端中,有三个类最重要:
- NameNodeRpcServer:负责处理rpc请求,根据请求对FSNamesystem进行增删改查。
- FSNamesystem:负责维护NameNode文件系统树,文件系统最核心的类。
- NameNode:负责启动NameNode服务,启动初始化的作用。
NameNodeRpcServer
如下,可以看到,NameNodeRpcServer就包含了FSNamesystem和NameNode成员:
public class NameNodeRpcServer implements NamenodeProtocols {
// Dependencies from other parts of NN.
protected final FSNamesystem namesystem;
protected final NameNode nn;
}
NameNodeRpcServer实现NamenodeProtocols协议,如下所示,该协议包含ClientProtocol和DatanodeProtocol,专用于处理客户端请求和datanode请求:
public interface NamenodeProtocols
extends ClientProtocol,
DatanodeProtocol,
DatanodeLifelineProtocol,
NamenodeProtocol,
RefreshAuthorizationPolicyProtocol,
ReconfigurationProtocol,
RefreshUserMappingsProtocol,
RefreshCallQueueProtocol,
GenericRefreshProtocol,
GetUserMappingsProtocol,
HAServiceProtocol,
TraceAdminProtocol {
}
NameNode
通过NameNode方法定义就可以看出来,它主要负责NameNode服务的生命周期维护:
例如,在NameNodeRpcServer执行rpc请求时,首先会检查namenode启动的状态:
private void checkNNStartup() throws IOException {
if (!this.nn.isStarted()) {
String message = NameNode.composeNotStartedMessage(this.nn.getRole());
throw new RetriableException(message);
}
}
FSNamesystem
FSNamesystem是HDFS最核心的类,它用于维护HDFS文件系统树FSDirectory。如下,FSDirectory是其成员变量:
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean, ReplicatedBlocksMBean, ECBlockGroupsMBean {
FSDirectory dir;
}
FSDirectory中维护一个INodeDirectory类型的rootDir变量,它是文件系统树的根:
public class FSDirectory implements Closeable {
INodeDirectory rootDir;
}
在INodeDirectory类中,维护一个INode列表类型的children成员,它可以是目录类型INodeDirectory,也可以是文件类型INodeFile:
public class INodeDirectory extends INodeWithAdditionalFields
implements INodeDirectoryAttributes {
private List<INode> children = null;
}
通过FSNamesystem-》FSDirectory-〉INodeDirectory-》List<INode>的依赖关系,组成了HDFS的文件系统树。如下可以看到,INode实现类中,可以是INodeDirectory,也可以是INodeFile:
注意:对于INodeFile,都会保存该文件对应的block信息,以BlockInfo[]数组表示block列表:
public class INodeFile extends INodeWithAdditionalFields
implements INodeFileAttributes, BlockCollection {
private BlockInfo[] blocks;
}
FSNamesystem类中,它转换rpc请求,将其转变为对文件系统树的处理操作。以appendFile
这个rpc请求为例,经过NameNodeRpcServer#appendFile -》FSDirAppendOp#appendFile -〉FSDirAppendOp#prepareFileForAppend。最终就是对FSNamesystem的文件树的block修改状态信息:
static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
final INodesInPath iip, final String leaseHolder,
final String clientMachine, final boolean newBlock,
final boolean writeToEditLog, final boolean logRetryCache)
throws IOException {
//省略
//申请client对于该文件的租约
fsn.getLeaseManager().addLease(
file.getFileUnderConstructionFeature().getClientName(), file.getId());
LocatedBlock ret = null;
if (!newBlock) {
//修改FSNamesystem中INodeFile的状态为UnderConstruction
FSDirectory fsd = fsn.getFSDirectory();
ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
//省略
return ret;
}
3. RPC处理过程:create
对于RPC处理,会经过以下几个不重要的流程NameNodeRpcServer.create() → FSNamesystem.startFile() → FSNamesystem.startFileInt()。
FSNamesystem.startFileInt()方法中,执行流程如下:
- 通过string类型的路径,在FSDirectory中获取其INode信息。
- FSDirWriteFileOp.startFile创建文件。
- 将op操作记录到editlog中。
private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
String ecPolicyName, boolean logRetryCache) throws IOException {
//省略
INodesInPath iip = null;
boolean skipSync = true; // until we do something that might create edits
HdfsFileStatus stat = null;
BlocksMapUpdateInfo toRemoveBlocks = null;
//省略
// 获取路径中的inodes,INodesInPath中包含了从根目录到当前文件的各级inode信息
iip = FSDirWriteFileOp.resolvePathForStartFile(
dir, pc, src, flag, createParent);
//省略
// 创建文件
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
//省略
writeUnlock("create");
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
// edit log落盘,实际上就是预写日志
getEditLog().logSync();
// 如果覆盖文件,则需要清理对应block
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
}
}
}
return stat;
}
FSDirWriteFileOp.startFile()中,如果文件已存在,先删掉block和文件相关信息,在调用addFile方法创建文件:
FSDirectory fsd = fsn.getFSDirectory();
long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,toRemoveINodes, toRemoveUCFiles, now());
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName);
// 设置存储策略
setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
// 预写日志
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
FSDirWriteFileOp.addFile()中,创建文件对应的InodeFile,加入到命名空间中:
// 创建inode
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
// 将inode加入命名空间中
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
最终将文件创建的结果返回给客户端,客户端因此创建文件输出流FSDataOutputStream。
4. RPC处理过程:addBlock
客户端在初次向datanode发送数据,或者写满一个datanode时,会调用addBlock方法,NameNode会指定下一个写入的block所在的datanode,客户端会将数据写到这些datanode中。
在namenode处理addBlock请求时,进行三步骤:
- 根据指定的存储策略,选择block每个副本所属的datanode。
- 根据datanode与客户端的网络距离进行排序,客户端优先与排序靠前的datanode建立连接。
- 更新FSNamesystem中的block信息。
NameNodeRpcServer.addBlock()方法通过FSNamesystem.getAdditionalBlock方法,创建包含dataNode的block信息返回给客户端:
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
checkNNStartup();
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
return locatedBlock;
}
FSNamesystem.getAdditionalBlock()方法负责选择datanode节点,并将block信息存储到元数据中:
// 选择目标存储节点
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
blockManager, src, excludedNodes, favoredNodes, flags, r);
checkOperation(OperationCategory.WRITE);
writeLock();
LocatedBlock lb;
try {
checkOperation(OperationCategory.WRITE);
// block加入blocksMap,记录DataNode正在传输的block数等操作
lb = FSDirWriteFileOp.storeAllocatedBlock(
this, src, fileId, clientName, previous, targets);
最终执行BlockManager.chooseTarget4NewBlock()方法选择block副本所属datanode,其流程如下:
- 根据dfs.block.replicator.classname配置获取副本选择策略,默认为BlockPlacementPolicyDefault.class。
- BlockPlacementPolicyDefault策略中,如果客户端在datanode上运行,第一个副本选择该datanode。
- 第二个副本其他机架中选择。
- 第三个副本存放在第二副本所在的机架,但不属于同一节点。
- 第四副本及更多副本随机算则datanode节点存储。
首先,BlockManager。chooseTarget4NewBlock方法根据配置获取副本选择策略,并创建BlockPlacementPolicyDefault副本放置策略:
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client,
final Set<Node> excludedNodes,
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID,
final BlockType blockType,
final ErasureCodingPolicy ecPolicy,
final EnumSet<AddBlockFlag> flags) throws IOException {
// 优先选择节点
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
// 异构存储策略,用于选择不同类型的存储类型
// 默认为HOT,所有副本都保存到DISK类型的存储介质中
final BlockStoragePolicy storagePolicy =
storagePolicySuite.getPolicy(storagePolicyID);
// 块放置策略,CONTIGUOUS类型块的默认放置策略(为BlockPlacementPolicyDefault)
final BlockPlacementPolicy blockplacement =
placementPolicies.getPolicy(blockType);
// 选择DataNode
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy, flags);
//省略
return targets;
}
最终执行BlockPlacementPolicyDefault.chooseTargetInOrder一次选择副本位置:
第一个副本为client本机(如果client为DataNode),第二个副本从其它机架中随机选择, 第三个副本在第二个副本同机架中随机选择,如果副本数量大于3,剩下的副本都随机选择
protected Node chooseTargetInOrder(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final boolean newBlock,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
final int numOfResults = results.size();
if (numOfResults == 0) {
/*
* 1.如果本机使DataNode,直接选本机;
* 2.如果不是,则在本机架随机选一个;
* 3.如果随机选的节点不满足条件(stale、负载大于平均负载的两倍(isGoodDatanode()方法)、空间不足等),则在所有节点中随机选择一个
*/
DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes, true);
writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
: null;
// 如果只要求一个副本,直接返回
if (--numOfReplicas == 0) {
return writer;
}
}
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
// 第二个节点要在不同的机架上选取
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
// 如果前两个节点在同一机架,第三个节点尝试选择其它机架上的
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){
// new block 如果前两个节点不在同一机架,且这是个新块,第三个节点选择第二个节点相同机架上的
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 否则第三个节点选择第一个节点相同机架上的
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 如果副本总数大于3,剩下的副本随机选择
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
}
BlockPlacementPolicyDefault.getPipeline()方法计算每个dn与client的距离,由近到远进行排序。距离的计算方法为:对于两个DN,从DN出发,经过一个rack,距离+1,经过root,距离+1。
例如,DN1和DN3,它们经历了DN1→Rack1→Root→Rack2→DN3,总距离是4。即跨机架的网络距离是4。
对于DN1和DN2,它们经历了DN1→Rack1→DN2,总距离是2。
对于客户端和DataNode在同节点的情况,它们没有跨节点,总距离是0。
- 如果writer请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点。
- 根据贪心算法,通过两层循环,依次找到距离client最近的节点。
int index=0;
// 如果writer请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点
if (writer == null || !clusterMap.contains(writer)) {
writer = storages[0].getDatanodeDescriptor();
}
// 遍历所有的storeages,计算最近距离目标的storage
for(; index < storages.length; index++) {
// 获取当前index下标所属的Storage为最近距离的目标storage
DatanodeStorageInfo shortestStorage = storages[index];
// 计算最短距离,getDistance 的逻辑 :返回两个节点之间的距离
//假设一个节点到其父节点的距离为1
//两个节点之间的距离是通过将它们的距离相加来计算的
//和他们最近的共同祖先。
int shortestDistance = clusterMap.getDistance(writer, shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i = index + 1; i < storages.length; i++) {
// 遍历计算当前的距离
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestStorage = storages[i];
shortestIndex = i;
}
}
//找到新的最短距离的storage,并进行下标替换
if (index != shortestIndex) {
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
writer = shortestStorage.getDatanodeDescriptor();
}
}
return storages;
最后一步,FSDirWriteFileOp.addBlock方法将block放到INodeFile中,变更Block状态为UnderConstruction:
private static BlockInfo addBlock(FSDirectory fsd, String path,
INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets,
BlockType blockType) throws IOException {
fsd.writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
//省略
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
numLocations, true);
blockInfo = new BlockInfoStriped(block, ecPolicy);
blockInfo.convertToBlockUnderConstruction(
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
} else {
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getFileReplication(), true);
short numLocations = fileINode.getFileReplication();
blockInfo = new BlockInfoContiguous(block, numLocations);
blockInfo.convertToBlockUnderConstruction(
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
}
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
//将block加入到InodeFile中
fileINode.addBlock(blockInfo);
//省略
}
5. RPC处理过程:complete
所有数据传输完成后,客户端向namenode发送complete请求,结束文件创建流程。要注意,commit和complete区别:
- commit:客户端报告其已经完成该块所有数据的传输,但DataNode还没有增量报告给NameNode其block信息。
- complete:NameNode已经接收到了满足配置文件中要求的最小副本数的DataNode汇报其拥有此块。
处理过程中,FSNamesystem.completeFileInternal记录了整体流程:
- fsn.checkFileProgress检测倒数第二个块是否commit。
- fsn.commitOrCompleteLastBlock用于commit最后一个块。
- 等待dn汇报了文件所有block的最小副本数给namenode。
- fsn.finalizeINodeFileUnderConstruction就持久化inode,移除UnderConstruction信息。删除lease,增加edit log。
private static boolean completeFileInternal(
FSNamesystem fsn, INodesInPath iip,
String holder, Block last, long fileId)
throws IOException {
assert fsn.hasWriteLock();
final String src = iip.getPath();
final INodeFile pendingFile;
INode inode = null;
try {
// 目标文件的inode
inode = iip.getLastINode();
// 检查lease,inode 2 inodefile
pendingFile = fsn.checkLease(iip, holder, fileId);
} catch (LeaseExpiredException lee) {
if (inode != null && inode.isFile() &&
!inode.asFile().isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
// 判断文件是否能继续操作(addBlock、complete等)
// 这里是判断倒数第二个块是否已经complete,否则不能尝试complete最后一个块
if (!fsn.checkFileProgress(src, pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
// commit最后一个块,可以的话(已经有配置文件设置最小副本数的DataNode通知NameNode自己拥有此快)complete它
fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
// 这里第三个入参是true,表示判断文件中所有的块是否都已complete
// 但当numCommittedAllowed不为0时,最后numCommittedAllowed个块可以是COMMIT
if (!fsn.checkFileProgress(src, pendingFile, true)) {
return false;
}
/*
* numCommittedAllowed(配置文件设置)指的是,只有倒数numCommittedAllowed个块状态为COMMIT,
* 再往前的块状态都为COMPLETE时,才可以继续操作(addBlock、complete等)。默认的值是0,即只有
* 上一个块COMPLETE之后,才可以申请下一个块或者完成文件
*/
// 当配置文件中numCommittedAllowed参数不为0时需要将COMMIT但没有COMPLETE的块加入到pendingReconstruction中
fsn.addCommittedBlocksToPending(pendingFile);
/*
* 1、持久化inode(移除UnderConstruction信息)
* 2、删除lease
* 3、edit log
*/
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID, true);
return true;
}
6. 总结
- namenode接收客户端create请求。在FSDirectory中增加文件对应的INodeFile节点。
- namenode接收客户端addBlock请求。默认选择BlockPlacementPolicyDefault副本策略:第一个副本为client本机(如果client为DataNode),第二个副本从其它机架中随机选择, 第三个副本在第二个副本同机架中随机选择,如果副本数量大于3,剩下的副本都随机选择。将这些dn按照与client网络距离远近进行排序。在FSDirectory中将block信息记录到INodeFile节点中。
- namenode接收客户端complete请求。commit最后一个block,等待dn汇报了文件所有block的最小副本数给namenode后,移除Inode的UnderConstruction状态信息。