1. 背景
在HDFS分布式系统中,经常会上线新的datanode以环境集群容量不足的问题。但是往往旧datanode水位较高,甚至爆满无法写入,新datanode非常空闲,导致旧机器无法写入数据,集群的流量集中到新datanode中,造成新datanode网络延迟。
为了解决上述问题,可以通过Balancer工具定时讲高水位datanode的block迁移到低水位的datanode中。
balancer有两种场景:
- datanode间block迁移。
- 单个datanode中磁盘间block迁移。
2. HDFS DataNode Balancer
DataNode Balancer将高水位datanode的block移动到空闲datanode中:
主要步骤如下:
- Balancer客户端先从Name Node中获取所有的Data Node的磁盘使用情况。
- Balance客户端通过磁盘存储情况计算datanode block迁移策略。
- 从NameNode 获取要移动source node 的block 列表,迁移block,删除旧block。
- 进行下一次迭代,直到迭代次数完成或者达到平衡。
2.1 DataNode Balancer命令解析
hdfs --config /hadoop-client/conf balancer
-threshold 10 \\集群平衡的条件,datanode间磁盘使用率相差阈值,区间选择:0~100
-policy datanode \\默认为datanode,datanode级别的平衡策略
-exclude -f /tmp/ip1.txt \\默认为空,指定该部分ip不参与balance, -f:指定输入为文件
-include -f /tmp/ip2.txt \\默认为空,只允许该部分ip参与balance,-f:指定输入为文件
-idleiterations 5 \\迭代次数,默认为 5
一般会在上午定时开启balancer,在晚上关闭balancer,避免凌晨离线计算高峰期影响集群稳定性。
2.2 DataNode Balancer执行基本流程
Balancer进程启动时,会执行Balancer.run方法。它依次完成以下重要操作:
- 通过获取dfs.heartbeat.interval和dfs.namenode.replication.interval的配置,计算每次balance的迭代时间间隔,默认为2min。
- 按照客户端指定的迭代次数循环balance。
- 每次迭代时,如果包含多个集群,对每个集群依次进行balance。
/**
* Balance all namenodes.
* For each iteration,
* for each namenode,
* execute a {@link Balancer} to work through all datanodes once.
*/
static int run(Collection<URI> namenodes, final Parameters p,
Configuration conf) throws IOException, InterruptedException {
// 配置dfs.heartbeat.interval和dfs.namenode.replication.interval,可以控制每次balance 的间隔,默认 2mins
final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
List<NameNodeConnector> connectors = Collections.emptyList();
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration);
boolean done = false;
//按照客户端指定的迭代次数循环balance
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
//如果包含多个集群,对每个集群依次进行balance
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
// 运行一次balance 操作
final Result r = b.runOneIteration();
//省略
if (!done) {
Thread.sleep(sleeptime);
}
}
}
}
Balancer.runOneIteration开始进行执行balance流程:
- 先计算每个节点的使用情况,作为判断迁移的依据。
- 根据使用情况规划datanode间迁移策略。
- 执行迁移操作。
/** Run an iteration for all datanodes. */
Result runOneIteration() {
try {
final List<DatanodeStorageReport> reports = dispatcher.init();
// 初始化 datanodes 计算需要迁移的容量和节点
final long bytesLeftToMove = init(reports);
/* Decide all the nodes that will participate in the block move and
* the number of bytes that need to be moved from one node to another
* in this iteration. Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
//计算确定具体哪个满节点迁移至哪个空闲节点,每个节点最大迁移10GB
final long bytesBeingMoved = chooseStorageGroups();
/* For each pair of <source, target>, start a thread that repeatedly
* decide a block to be moved and its proxy source,
* then initiates the move until all bytes are moved or no more block
* available to move.
* Exit no byte has been moved for 5 consecutive iterations.
*/
// 进行调度各个节点的balance操作
if (!dispatcher.dispatchAndCheckContinue()) {
return newResult(ExitStatus.NO_MOVE_PROGRESS, bytesLeftToMove, bytesBeingMoved);
}
//省略
}
2.3 DataNode使用情况分类
Balancer在执行init方法时,会从namenode中获取每个DataNode使用情况,对于Balancer执行参数判断每个DataNode的使用类型。它基于以下两个公式作为判断依据:
- datanode使用率偏离量=datanode使用率-平均使用率。
- datanode阈值偏离量=datanode使用率偏离量的绝对值-阈值偏离量。
例如,阈值偏离量设置为10%,如果datanode使用率-平均使用率超过10%,就说明这个datanode使用量超过了使用阈值,应该对上面的block进行balance。
HDFS讲DataNode的使用情况分为以下四类:
- over-utilized:过度使用,datanode阈值偏离量>0。
- above-average:高于平均线使用。datanode使用率偏离量>0但是datanode阈值偏离量<0。
- below-average:低于平均线使用。datanode使用率偏离量<0但是datanode阈值偏离量<0。
- under-utilized:未充分使用。datanode使用率偏离量<0但是datanode阈值偏离量>0。
需要将前两种情况的datanode的block迁移到后两种datanode上:
final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
}
final double average = policy.getAvgUtilization(t);
if (utilization >= average && !isSource) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " >= average=" + average
+ " but it is not specified as a source; skipping it.");
continue;
}
final double utilizationDiff = utilization - average;
final long capacity = getCapacity(r, t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, maxSizeToMove);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
//过度使用
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
//高于平均线使用
} else {
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
}
g = s;
} else {
g = dn.addTarget(t, maxSize2Move);
//低于平均线使用
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
//未充分使用
} else {
underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
underUtilized.add(g);
}
}
dispatcher.getStorageGroupMap().put(g);
}
}
2.4 迁移目的节点选择
- 同一个组上的datanode优先进行迁移。
- 如果不满足,就选择在同一机架上的datanode进行迁移。
- 如果不满足,就随便在一台datanode上迁移。
注意:同一组是指两个datanode上一层是同一节点,这个节点可能是交换机或者路由器。由NetworkTopology定义:
The class represents a cluster of computer with a tree hierarchical
* network topology.
* For example, a cluster may be consists of many data centers filled
* with racks of computers.
* In a network topology, leaves represent data nodes (computers) and inner
* nodes represent switches/routers that manage traffic in/out of data centers
* or racks.
方法如下:
private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
if (dispatcher.getCluster().isNodeGroupAware()) {
chooseStorageGroups(Matcher.SAME_NODE_GROUP);
}
// Then, match nodes on the same rack
chooseStorageGroups(Matcher.SAME_RACK);
// At last, match all remaining nodes
chooseStorageGroups(Matcher.ANY_OTHER);
return dispatcher.bytesToMove();
}
在这些datanode间,需要按照容量进行迁移判断:
- overUtilized优先迁移到underUtilized。
- 剩下的节点中,overUtilized优先迁移到belowAvgUtilized。
- 剩下的节点中,underUtilized优先迁移到aboveAvgUtilized。
private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
2.5 balance线程池执行流程
Dispatcher.dispatchBlockMoves通过线程池分发balance作业:
for (int j = 0; j < futures.length; j++) {
final Source s = i.next();
final long delay = dSec * 1000;
futures[j] = dispatchExecutor.submit(new Runnable() {
@Override
public void run() {
s.dispatchBlocks(delay);
}
});
dispatchBlocks方法会先确定source 节点 哪个block可以进行迁移,执行 block 迁移,最后再更新datanode最新的block信息:
// 确定source 节点 哪个block可以进行迁移
final PendingMove p = chooseNextMove();
// 执行 block 迁移
executePendingMove(p);
//更新datanode最新的block信息
final long received = getBlockList();
在检查节点是否可以迁移时:
- source节点和target节点是否是相同, 若不相同,迁移。
- source节点和target节点的存储类型是否相同,比如RAM_DISK,DISK,SSD,ARCHIVE,若相同,迁移。
- 判断是否该节点有block 的副本, 若有副本,则不迁移。
- 迁移后,若会导致block 的Rack 数减少,则不迁移。
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
StorageType targetStorageType, DBlock block) {
// source节点和target节点是否是相同, 若不相同,迁移
if (source.equals(target)) {
return false;
}
//source节点和target节点的存储类型是否相同,比如RAM_DISK,DISK,SSD,ARCHIVE, 若相同,迁移
if (target.storageType != targetStorageType) {
return false;
}
// check if the block is moved or not
if (movedBlocks.contains(block.getBlock())) {
return false;
}
final DatanodeInfo targetDatanode = target.getDatanodeInfo();
if (source.getDatanodeInfo().equals(targetDatanode)) {
// the block is moved inside same DN
return true;
}
// 判断是否该节点有block 的副本, 若有副本,则不迁移
// check if block has replica in target node
for (StorageGroup blockLocation : block.getLocations()) {
if (blockLocation.getDatanodeInfo().equals(targetDatanode)) {
return false;
}
}
if (cluster.isNodeGroupAware()
&& isOnSameNodeGroupWithReplicas(source, target, block)) {
return false;
}
// 迁移后,若会导致block 的Rack 数减少,则不迁移
if (reduceNumOfRacks(source, target, block)) {
return false;
}
return true;
}
最后,在迁移时,发送replaceBlock请求:
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
null);
}
通过DataTransferProtocol定义可以看到,它负责讲block进行迁移,并通知namenode删除旧block:
* Receive a block from a source datanode
* and then notifies the namenode
* to remove the copy from the original datanode.
* Note that the source datanode and the original datanode can be different.
* It is used for balancing purpose.
3. Disk Balancer
DiskBalacer只是在单个dn内,它无需复杂的策略选择。执行流程:
#生成plan迁移计划文件
hdfs diskbalancer -plan dn的ip地址 -bandwidth 100 -thresholdPercentage 10 -maxerror 10 -out /system/diskbalancer/dn14
#开始迁移
hdfs diskbalancer -execute /system/diskbalancer/dn14/dnip.plan.json
#查询进度
hdfs diskbalancer -query dnip -v
#取消迁移
hdfs diskbalancer -cancel /system/diskbalancer/dn14/dnip.plan.json
4. Mover
在https://blog.51cto.com/u_15327484/8193991文章中,有讲到通过setStoragePolicy参数设置文件存储策略。在设置好文件的策略后,需要执行mover工具迁移到目标策略对应的磁盘中。
mover执行过程如下。先获取文件的存储策略:
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
if (policy == null) {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return;
}
List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
根据策略找到block要迁移的dn,使用Dispatcher进行迁移,后续流程和DataNode Balancer一致,不重复:
boolean chooseTarget(DBlock db, Source source,
List<StorageType> targetTypes, Matcher matcher) {
final NetworkTopology cluster = dispatcher.getCluster();
for (StorageType t : targetTypes) {
final List<StorageGroup> targets = storages.getTargetStorages(t);
Collections.shuffle(targets);
for (StorageGroup target : targets) {
if (matcher.match(cluster, source.getDatanodeInfo(),
target.getDatanodeInfo())) {
final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) {
dispatcher.executePendingMove(pm);
return true;
}
}
}
}
return false;
}
}
标签:HDFS,存储,Balancer,节点,source,datanode,迁移,final,block
From: https://blog.51cto.com/u_15327484/8219259