首页 > 其他分享 >HDFS Balancer存储水位稳定性原理与实践

HDFS Balancer存储水位稳定性原理与实践

时间:2023-11-06 23:33:09浏览次数:49  
标签:HDFS 存储 Balancer 节点 source datanode 迁移 final block

1. 背景

在HDFS分布式系统中,经常会上线新的datanode以环境集群容量不足的问题。但是往往旧datanode水位较高,甚至爆满无法写入,新datanode非常空闲,导致旧机器无法写入数据,集群的流量集中到新datanode中,造成新datanode网络延迟。

为了解决上述问题,可以通过Balancer工具定时讲高水位datanode的block迁移到低水位的datanode中。

balancer有两种场景:

  1. datanode间block迁移。
  2. 单个datanode中磁盘间block迁移。

2. HDFS DataNode Balancer

DataNode Balancer将高水位datanode的block移动到空闲datanode中:

Untitled.png

主要步骤如下:

  1. Balancer客户端先从Name Node中获取所有的Data Node的磁盘使用情况。
  2. Balance客户端通过磁盘存储情况计算datanode block迁移策略。
  3. 从NameNode 获取要移动source node 的block 列表,迁移block,删除旧block。
  4. 进行下一次迭代,直到迭代次数完成或者达到平衡。

2.1 DataNode Balancer命令解析

hdfs --config /hadoop-client/conf balancer
-threshold  10                    \\集群平衡的条件,datanode间磁盘使用率相差阈值,区间选择:0~100
-policy datanode                  \\默认为datanode,datanode级别的平衡策略
-exclude  -f  /tmp/ip1.txt        \\默认为空,指定该部分ip不参与balance, -f:指定输入为文件
-include  -f  /tmp/ip2.txt        \\默认为空,只允许该部分ip参与balance,-f:指定输入为文件
 -idleiterations  5               \\迭代次数,默认为 5

一般会在上午定时开启balancer,在晚上关闭balancer,避免凌晨离线计算高峰期影响集群稳定性。

2.2 DataNode Balancer执行基本流程

Balancer进程启动时,会执行Balancer.run方法。它依次完成以下重要操作:

  1. 通过获取dfs.heartbeat.interval和dfs.namenode.replication.interval的配置,计算每次balance的迭代时间间隔,默认为2min。
  2. 按照客户端指定的迭代次数循环balance。
  3. 每次迭代时,如果包含多个集群,对每个集群依次进行balance。
/**
   * Balance all namenodes.
   * For each iteration,
   * for each namenode,
   * execute a {@link Balancer} to work through all datanodes once.  
   */
  static int run(Collection<URI> namenodes, final Parameters p,
      Configuration conf) throws IOException, InterruptedException {
      // 配置dfs.heartbeat.interval和dfs.namenode.replication.interval,可以控制每次balance 的间隔,默认 2mins
    final long sleeptime =
        conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
            DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
        conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
            DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
    
    List<NameNodeConnector> connectors = Collections.emptyList();
    try {
      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
            Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration);

      boolean done = false;
      //按照客户端指定的迭代次数循环balance
      for(int iteration = 0; !done; iteration++) {
        done = true;
        Collections.shuffle(connectors);
        //如果包含多个集群,对每个集群依次进行balance
        for(NameNodeConnector nnc : connectors) {
          final Balancer b = new Balancer(nnc, p, conf);
          // 运行一次balance 操作
          final Result r = b.runOneIteration();
          //省略
        if (!done) {
          Thread.sleep(sleeptime);
        }
      }
    } 
  }

Balancer.runOneIteration开始进行执行balance流程:

  1. 先计算每个节点的使用情况,作为判断迁移的依据。
  2. 根据使用情况规划datanode间迁移策略。
  3. 执行迁移操作。
/** Run an iteration for all datanodes. */
  Result runOneIteration() {
    try {
      final List<DatanodeStorageReport> reports = dispatcher.init();
      // 初始化 datanodes 计算需要迁移的容量和节点
      final long bytesLeftToMove = init(reports);

       /* Decide all the nodes that will participate in the block move and
       * the number of bytes that need to be moved from one node to another
       * in this iteration. Maximum bytes to be moved per node is
       * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
       */
      //计算确定具体哪个满节点迁移至哪个空闲节点,每个节点最大迁移10GB
      final long bytesBeingMoved = chooseStorageGroups();

      /* For each pair of <source, target>, start a thread that repeatedly 
       * decide a block to be moved and its proxy source, 
       * then initiates the move until all bytes are moved or no more block
       * available to move.
       * Exit no byte has been moved for 5 consecutive iterations.
       */
      // 进行调度各个节点的balance操作
      if (!dispatcher.dispatchAndCheckContinue()) {
        return newResult(ExitStatus.NO_MOVE_PROGRESS, bytesLeftToMove, bytesBeingMoved);
      }
      //省略
  }

2.3 DataNode使用情况分类

Balancer在执行init方法时,会从namenode中获取每个DataNode使用情况,对于Balancer执行参数判断每个DataNode的使用类型。它基于以下两个公式作为判断依据:

  1. datanode使用率偏离量=datanode使用率-平均使用率。
  2. datanode阈值偏离量=datanode使用率偏离量的绝对值-阈值偏离量。

例如,阈值偏离量设置为10%,如果datanode使用率-平均使用率超过10%,就说明这个datanode使用量超过了使用阈值,应该对上面的block进行balance。

HDFS讲DataNode的使用情况分为以下四类:

  1. over-utilized:过度使用,datanode阈值偏离量>0。
  2. above-average:高于平均线使用。datanode使用率偏离量>0但是datanode阈值偏离量<0。
  3. below-average:低于平均线使用。datanode使用率偏离量<0但是datanode阈值偏离量<0。
  4. under-utilized:未充分使用。datanode使用率偏离量<0但是datanode阈值偏离量>0。

需要将前两种情况的datanode的block迁移到后两种datanode上:

final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
for(DatanodeStorageReport r : reports) {
      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
      final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
      for(StorageType t : StorageType.getMovableTypes()) {
        final Double utilization = policy.getUtilization(r, t);
        if (utilization == null) { // datanode does not have such storage type 
          continue;
        }
        
        final double average = policy.getAvgUtilization(t);
        if (utilization >= average && !isSource) {
          LOG.info(dn + "[" + t + "] has utilization=" + utilization
              + " >= average=" + average
              + " but it is not specified as a source; skipping it.");
          continue;
        }

        final double utilizationDiff = utilization - average;
        final long capacity = getCapacity(r, t);
        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
        final long maxSize2Move = computeMaxSize2Move(capacity,
            getRemaining(r, t), utilizationDiff, maxSizeToMove);

        final StorageGroup g;
        if (utilizationDiff > 0) {
          final Source s = dn.addSource(t, maxSize2Move, dispatcher);
          //过度使用
          if (thresholdDiff <= 0) { // within threshold
            aboveAvgUtilized.add(s);
          //高于平均线使用
          } else {
            overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
            overUtilized.add(s);
          }
          g = s;
        } else {
          g = dn.addTarget(t, maxSize2Move);
          //低于平均线使用
          if (thresholdDiff <= 0) { // within threshold
            belowAvgUtilized.add(g);
          //未充分使用
          } else {
            underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
            underUtilized.add(g);
          }
        }
        dispatcher.getStorageGroupMap().put(g);
      }
    }

2.4 迁移目的节点选择

  1. 同一个组上的datanode优先进行迁移。
  2. 如果不满足,就选择在同一机架上的datanode进行迁移。
  3. 如果不满足,就随便在一台datanode上迁移。

注意:同一组是指两个datanode上一层是同一节点,这个节点可能是交换机或者路由器。由NetworkTopology定义:

The class represents a cluster of computer with a tree hierarchical
 * network topology.
 * For example, a cluster may be consists of many data centers filled 
 * with racks of computers.
 * In a network topology, leaves represent data nodes (computers) and inner
 * nodes represent switches/routers that manage traffic in/out of data centers
 * or racks.

方法如下:

private long chooseStorageGroups() {
    // First, match nodes on the same node group if cluster is node group aware
    if (dispatcher.getCluster().isNodeGroupAware()) {
      chooseStorageGroups(Matcher.SAME_NODE_GROUP);
    }
    
    // Then, match nodes on the same rack
    chooseStorageGroups(Matcher.SAME_RACK);
    // At last, match all remaining nodes
    chooseStorageGroups(Matcher.ANY_OTHER);
    
    return dispatcher.bytesToMove();
  }

在这些datanode间,需要按照容量进行迁移判断:

  1. overUtilized优先迁移到underUtilized。
  2. 剩下的节点中,overUtilized优先迁移到belowAvgUtilized。
  3. 剩下的节点中,underUtilized优先迁移到aboveAvgUtilized。
private void chooseStorageGroups(final Matcher matcher) {
    /* first step: match each overUtilized datanode (source) to
     * one or more underUtilized datanodes (targets).
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
    chooseStorageGroups(overUtilized, underUtilized, matcher);
    
    /* match each remaining overutilized datanode (source) to 
     * below average utilized datanodes (targets).
     * Note only overutilized datanodes that haven't had that max bytes to move
     * satisfied in step 1 are selected
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);

    /* match each remaining underutilized datanode (target) to 
     * above average utilized datanodes (source).
     * Note only underutilized datanodes that have not had that max bytes to
     * move satisfied in step 1 are selected.
     */
    LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
  }

2.5 balance线程池执行流程

Dispatcher.dispatchBlockMoves通过线程池分发balance作业:

for (int j = 0; j < futures.length; j++) {
      final Source s = i.next();
      final long delay = dSec * 1000;
      futures[j] = dispatchExecutor.submit(new Runnable() {
        @Override
        public void run() {
          s.dispatchBlocks(delay);
        }
      });

dispatchBlocks方法会先确定source 节点 哪个block可以进行迁移,执行 block 迁移,最后再更新datanode最新的block信息:

// 确定source 节点 哪个block可以进行迁移
final PendingMove p = chooseNextMove();
// 执行 block 迁移
executePendingMove(p);
//更新datanode最新的block信息
final long received = getBlockList();

在检查节点是否可以迁移时:

  1. source节点和target节点是否是相同, 若不相同,迁移。
  2. source节点和target节点的存储类型是否相同,比如RAM_DISK,DISK,SSD,ARCHIVE,若相同,迁移。
  3. 判断是否该节点有block 的副本, 若有副本,则不迁移。
  4. 迁移后,若会导致block 的Rack 数减少,则不迁移。
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
      StorageType targetStorageType, DBlock block) {
      // source节点和target节点是否是相同, 若不相同,迁移
    if (source.equals(target)) {
      return false;
    }
    //source节点和target节点的存储类型是否相同,比如RAM_DISK,DISK,SSD,ARCHIVE,  若相同,迁移
    if (target.storageType != targetStorageType) {
      return false;
    }
    // check if the block is moved or not
    if (movedBlocks.contains(block.getBlock())) {
      return false;
    }

    final DatanodeInfo targetDatanode = target.getDatanodeInfo();
    if (source.getDatanodeInfo().equals(targetDatanode)) {
      // the block is moved inside same DN
      return true;
    }
     // 判断是否该节点有block 的副本, 若有副本,则不迁移
    // check if block has replica in target node
    for (StorageGroup blockLocation : block.getLocations()) {
      if (blockLocation.getDatanodeInfo().equals(targetDatanode)) {
        return false;
      }
    }

    if (cluster.isNodeGroupAware()
        && isOnSameNodeGroupWithReplicas(source, target, block)) {
      return false;
    }
     // 迁移后,若会导致block 的Rack 数减少,则不迁移
    if (reduceNumOfRacks(source, target, block)) {
      return false;
    }
    return true;
  }

最后,在迁移时,发送replaceBlock请求:

private void sendRequest(DataOutputStream out, ExtendedBlock eb,
        Token<BlockTokenIdentifier> accessToken) throws IOException {
      new Sender(out).replaceBlock(eb, target.storageType, accessToken,
          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
          null);
    }

通过DataTransferProtocol定义可以看到,它负责讲block进行迁移,并通知namenode删除旧block:

* Receive a block from a source datanode
* and then notifies the namenode
* to remove the copy from the original datanode.
* Note that the source datanode and the original datanode can be different.
* It is used for balancing purpose.

3. Disk Balancer

DiskBalacer只是在单个dn内,它无需复杂的策略选择。执行流程:

#生成plan迁移计划文件
hdfs diskbalancer  -plan dn的ip地址 -bandwidth 100 -thresholdPercentage 10 -maxerror 10  -out /system/diskbalancer/dn14
#开始迁移
hdfs diskbalancer -execute /system/diskbalancer/dn14/dnip.plan.json
#查询进度
hdfs diskbalancer -query dnip -v
#取消迁移
hdfs diskbalancer -cancel /system/diskbalancer/dn14/dnip.plan.json

4. Mover

https://blog.51cto.com/u_15327484/8193991文章中,有讲到通过setStoragePolicy参数设置文件存储策略。在设置好文件的策略后,需要执行mover工具迁移到目标策略对应的磁盘中。

mover执行过程如下。先获取文件的存储策略:

final BlockStoragePolicy policy = blockStoragePolicies[policyId];
      if (policy == null) {
        LOG.warn("Failed to get the storage policy of file " + fullPath);
        return;
      }
      List<StorageType> types = policy.chooseStorageTypes(
          status.getReplication());

根据策略找到block要迁移的dn,使用Dispatcher进行迁移,后续流程和DataNode Balancer一致,不重复:

boolean chooseTarget(DBlock db, Source source,
        List<StorageType> targetTypes, Matcher matcher) {
      final NetworkTopology cluster = dispatcher.getCluster(); 
      for (StorageType t : targetTypes) {
        final List<StorageGroup> targets = storages.getTargetStorages(t);
        Collections.shuffle(targets);
        for (StorageGroup target : targets) {
          if (matcher.match(cluster, source.getDatanodeInfo(),
              target.getDatanodeInfo())) {
            final PendingMove pm = source.addPendingMove(db, target);
            if (pm != null) {
              dispatcher.executePendingMove(pm);
              return true;
            }
          }
        }
      }
      return false;
    }
  }

标签:HDFS,存储,Balancer,节点,source,datanode,迁移,final,block
From: https://blog.51cto.com/u_15327484/8219259

相关文章

  • HF Hub 现已加入存储区域功能
    我们在企业版Hub服务方案中推出了存储区域(StorageRegions)功能。通过此功能,用户能够自主决定其组织的模型和数据集的存储地点,这带来两大显著优势,接下来的内容会进行简要介绍:法规和数据合规,此外还能增强数字主权性能提升(下载和上传速度更快,减少延迟)目前,我们支持以下几......
  • 【躬行】-深度缓冲和模板缓冲是怎么存储的?
    概述最近在工作中需要实现一个功能,用到了模板测试。但奇怪的是,模板测试竟然不起作用!在解决问题的过程中,发现了一些有趣的知识点。通过本文,可以了解在unity中,深度缓冲和模板缓冲到底是怎么存储的。测试环境的搭建Unity版本:2021.3.16f1URP版本:12.1.8RenderDoc:1.29需要注意的是......
  • 视频集中存储/云存储EasyCVR启动后查询端口是否被占用出错,该如何解决?
    安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力,也具备接入AI智能分析的能......
  • 视频集中存储/云存储EasyCVR启动后查询端口是否被占用出错,该如何解决?
    安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力,也具备接入AI智能分析的......
  • 视频直播场景下对象存储的应用
    本文分享自天翼云开发者社区《视频直播场景下对象存储的应用》,作者:王****宇视频直播是当前比较火的互联网应用场景,越来越多的人通过直播进行娱乐和营销带货。国家规定,直播带货类需提供不低于3年的存档回看能力,而其他直播内容也需要提供不低于60天的视频保存能力。具体要求可参看:......
  • Apache Paimon 实时数据湖 Streaming Lakehouse 的存储底座
    摘要:本文整理自阿里云开源大数据表存储团队负责人,阿里巴巴高级技术专家李劲松(之信),在StreamingLakehouseMeetup的分享。内容主要分为四个部分:流计算邂逅数据湖PaimonCDC实时入湖Paimon不止CDC入湖总结与生态一、流计算邂逅数据湖流计算1.0实时预处理流计算1.0架构截止......
  • 金额存储不能用float
    一、简介    金额存储不能使用float类型。    publicclassFloatTest{publicstaticvoidmain(String[]args){floatf1=6.6f;floatf2=1.3f;System.out.println(f1+f2);}}   以上结果为:7.8999996,直接......
  • TDengine 3.2.0.0 重磅发布!S3 存储 + IP 白名单正式上线
    自3.0版本发布以来,在研发人员和社区用户的不断努力下,TDengine做了大量更新,产品稳定性和易用性也在不断提升。近日,TDengine3.2.0.0成功发布,本文将向大家简单介绍一下该版本涉及到的重大更新。开源的时序数据库功能更新(所有版本同步更新):1、查询性能优化:优化partitionby普......
  • 存储云服务中弹性文件服务(SFS)的一些总结
    1.概念简单地说,即按需扩展的高性能文件存储,并且可共享里面的所有数据,可把它看作是一个大的文件夹。采用的是FTP/SFTP协议,且要访问该文件夹的时候只需在本地进行访问即可,即减少了访问时长。2.地位可为ECS,BMS,CCE等提供服务,也可被共享其中的数据。3.优势弹性扩展操作简单,低......
  • Hadoop整合AWS S3和Google gcs对象存储实践
    1.背景https://blog.51cto.com/u_15327484/8193991介绍了海外Hadoop集群一般将冷数据放入到AWSS3或者存放到Googlegcs对象存储中。这些对象存储都提供了各自的客户端进行访问,例如awss3的客户端命令就是awss3;gcs的客户端命令是gsutil。这些命令一般需要直接登陆到授权机器中执......