首页 > 其他分享 >CapacityScheduler异步调度功能实践

CapacityScheduler异步调度功能实践

时间:2023-10-19 19:08:18浏览次数:29  
标签:node 异步 schedule CapacityScheduler 调度 application 线程

1. 背景

https://blog.51cto.com/u_15327484/7920197文章中,将调度器从FairScheduler迁移到CapacityScheduler。

CapacityScheduler在默认情况下,当接受到NodeManager心跳请求时,会调用nodeUpdate方法开始进行资源调度,这种调度方式称为心跳调度,也称同步调度。

心跳调度实现起来简单,但是有很大的弊端。它及其依赖NodeManager心跳的稳定,一旦NodeManager和ResourceManager网络产生波动,那么ResourceManager的调度也会发生延迟,这种调度方式十分不稳定。

CapacityScheduler提供了异步调度功能,ResourceManager可以配置多个线程,在后台定时地对请求进行资源调度,完全不依赖于心跳。

2. 异步调度概览

对于异步调度,CapacityScheduler会设置若干线程,每个线程中,依次扫描所有节点,寻找合适的队列,选择合适的应用,最后选择应用中的某一资源请求进行资源调度,其层级关系如下:

for node in allNodes:
   Go to parentQueue
      Go to leafQueue
        for application in leafQueue.applications:
           for resource-request in application.resource-requests
              try to schedule on node

3. 异步调度原理

默认情况下,不开启异步调度功能时,scheduleAsynchronously为false,nodeUpdate可以正常执行allocateContainersToNode方法开始调度流程。

一旦开启异步调度,ResourceManager处理好心跳信息后直接退出,不进行调度:

protected void nodeUpdate(RMNode rmNode) {
  long begin = System.nanoTime();
  readLock.lock();
  try {
    setLastNodeUpdateTime(Time.now());
    super.nodeUpdate(rmNode);
  } finally {
    readLock.unlock();
  }

// Try to do scheduling
//如果没有开启异步调度,那么就开始进行调度;否则调度会在异步线程中执行
if (!scheduleAsynchronously) {
    writeLock.lock();
    try {
       // reset allocation and reservation stats before we start doing any
      // work
      updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
          CSAssignment.NULL_ASSIGNMENT);
      allocateContainersToNode(rmNode.getNodeID(), true);
    } finally {
      writeLock.unlock();
    }
  }

相应地,CapacityScheduler增加了死循环的异步线程AsyncScheduleThread队列,线程执行时,如果正在进行调度的数量不超过MaxPendingBacklogs,那么就开始调度:

private List<AsyncScheduleThread> asyncSchedulerThreads;

public void run() {
      int debuggingLogCounter = 0;
      while (!Thread.currentThread().isInterrupted()) {
        try {
          if (!runSchedules.get()) {
            Thread.sleep(100);
          } else {
            // Don't run schedule if we have some pending backlogs already
            //如果正在进行调度的数量超过MaxPendingBacklogs,就停1ms后再尝试
            if (cs.getAsyncSchedulingPendingBacklogs()
                > cs.asyncMaxPendingBacklogs) {
              Thread.sleep(1);
            } else{
              //开始调度
              schedule(cs);
              if(LOG.isDebugEnabled()) {
                // Adding a debug log here to ensure that the thread is alive
                // and running fine.
                if (debuggingLogCounter++ > 10000) {
                  debuggingLogCounter = 0;
                  LOG.debug("AsyncScheduleThread[" + getName() + "] is running!");
                }
              }
            }
          }
        } catch (InterruptedException ie) {
          // keep interrupt signal
          Thread.currentThread().interrupt();
        }
      }
      LOG.info("AsyncScheduleThread[" + getName() + "] exited!");
    }

调度过程中,依次对所有节点进行调度:

for (FiCaSchedulerNode node : nodes) {
      if (current++ >= start) {
        if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
          continue;
        }
        cs.allocateContainersToNode(node.getNodeID(), false);
      }
    }

节点选择好后,选择队列:

// Try to assign to most 'under-served' sub-queue
    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
        candidates.getPartition()); iter.hasNext(); ) {
      CSQueue childQueue = iter.next();
      //省略
      CSAssignment childAssignment = childQueue.assignContainers(cluster,
          candidates, childLimits, schedulingMode);
      //省略
    }

选择好队列后,选择应用:

for (Iterator<FiCaSchedulerApp> assignmentIterator =
         orderingPolicy.getAssignmentIterator(sel);
         assignmentIterator.hasNext(); ) {
      FiCaSchedulerApp application = assignmentIterator.next();
      //省略
      // Try to schedule
      assignment = application.assignContainers(clusterResource,
          candidates, currentResourceLimits, schedulingMode, null);
      //省略
    }

选择好应用后,选择资源请求:

// Schedule in priority order
      for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
        ContainerAllocation result = allocate(clusterResource, candidates,
            schedulingMode, resourceLimits, schedulerKey, null);

        AllocationState allocationState = result.getAllocationState();
        if (allocationState == AllocationState.PRIORITY_SKIPPED) {
          continue;
        }
        return getCSAssignmentFromAllocateResult(clusterResource, result,
            null, node);
      }

4. 调度器压测

https://blog.51cto.com/u_15327484/7939194文章中,介绍了Yarn SLS的使用。本次压测基于SLS,相关参数如下:

  1. sls 工具 1000 并发。
  2. YARN: CapacityScheduler。
  3. 运行时长:2000 sec。
  4. 模拟app数量:1618。
  5. 模拟节点数:635。

对于不同数量的异步线程,分别测试其调度性能。从测试结果来看,调度线程数设置为10最合适,对比同步调度性能提升 38%:

平均调度耗时
同步调度 24ms
异步调度(1线程数) 17.2ms
异步调度(5线程数) 16.3ms
异步调度(10线程数) 14.8ms
异步调度(20线程数) 14.7ms
异步调度(100线程数) 16.9ms

基于此,在yarn-site.xml中新增异步调度相关配置:

yarn.scheduler.capacity.schedule-asynchronously.enable=true(是否开启异步调度,默认为false)
yarn.scheduler.capacity.schedule-asynchronously.maximum-threads=1(调度线程数,默认为1)
yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms=5(调度间隔时间,默认为5ms)
yarn.scheduler.capacity.schedule-asynchronously.maximum-pending-backlogs=10000(调度队列最大容量,默认为100)

4. 上线效果review

将异步线程上限到集群A后。变更后,平均container调度耗时减少 4.6%,最大调度吞吐量提升 10.5%,平均container pending数下降 7.6%:

指标 变更前 变更后
container调度耗时 平均 994 微秒 平均 948 微秒
container调度数量 最大 1043/s 最大 1153/s
container pengding数量 平均 509K 平均 470K

将异步线程上限到集群B后。变更后,平均AM调度耗时减少 35%,平均container调度耗时减少 18%,最大调度吞吐量提高 300%。

指标 变更前 变更后
AM调度耗时 平均 705ms 平均 452ms
container调度耗时 平均 338 微秒 平均 275 微秒
container调度吞吐量 最大 330 最大 1308

总结:集群B优化效果更好,原因是队列数更少。

5. 优化思考

目前异步调度循环逻辑的次序是先遍历所有节点,再遍历所有资源请求:

for node in allNodes:
   Go to parentQueue
      Go to leafQueue
        for application in leafQueue.applications:
           for resource-request in application.resource-requests
              try to schedule on node

这样做的缺点是一旦有请求对节点有亲和性要求,那么不满足时节点要求时,遍历结束。再次遍历知道找到合适的节点进行调度。

可以看到这种调度方式会有大量无效的循环执行。为了优化,社区提出了全局调度:https://issues.apache.org/jira/browse/YARN-5139

在获取到资源的请求后,再在节点队列中选择节点:

Go to parentQueue
    Go to leafQueue
      for application in leafQueue.applications:
        for resource-request in application.resource-requests
          for node in nodes (node candidates sorted according to resource-request)
            try to schedule on node

不过这个patch还没有合并到主干,说明该功能可能存在不稳定的情况,需要社区有其实践经验后再上线。

标签:node,异步,schedule,CapacityScheduler,调度,application,线程
From: https://blog.51cto.com/u_15327484/7941708

相关文章

  • celery包结构、celery延迟任务和定时任务、django中使用celery、接口缓存、双写一致性
    celery包结构project├──celery_task #celery包  这个包可以放在任意位置│├──__init__.py#包文件│├──celery.py#celery连接和配置相关文件,且名字必须叫celery.py│└──tasks.py#所有任务函数│├──add_task.p......
  • CompletableFuture异步优化代码
    CompletableFuture异步编排优化代码我们在项目开发中,有可能遇到一个接口需要调用N个服务的接口。比如用户请求获取订单信息,需要调用用户信息、商品信息、物流信息等接口,最后再汇总数据统一返回。如果使用串行的方法按照顺序挨个调用接口,这样接口的响应的速度就很慢。如果并行调用......
  • 38 异步组件
    异步,需要时才被加载<template><div><KeepAlive><component:is="tabComponent"></component></KeepAlive><button@click="change">切换组件</button></div></template>......
  • 使用腾讯云 Cloud studio 实现调度百度AI实现文字识别
    前言今天我们也来高大上一下,玩一把人工智能。那就是免费调用百度AI实现图片上面的文字识别。相对于Python的第三方库,百度人工智能要更强大,毕竟人工智能不是那么容易搞的。要调用,其实很简单,关键的代码只需要三行。但需要先注册百度AI,获得ID和密钥。注册也很简单,百度AI社区有详细......
  • python 处理异步物化视图同时执行导致内存溢出问题
    python处理异步物化视图同时执行导致内存溢出问题一、前提:因为物化视图过多,同时物化视图到时间同时爆发,导致CPU爆满,所以采用datax自带的调度服务来执行python命令二、直接看代码:importpymysqlimportpymssqlimportdatetimeimporttimeclassMaterialized_plan:d......
  • Xxl-job 分布式调度任务使用
    Xxl-job分布式调度任务使用一.XXL-job产品介绍1.简介:XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展.作者是大众点评的许雪里.目前最新版本为v2.x.2.特性功能:1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;2、动态:支......
  • 使用Guava的ListenableFuture完成异步多线程任务并返回结果
    privatestaticExecutorServiceexecutors=newThreadPoolExecutor(5,20,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(10),newThreadFactoryBuilder().setNameFormat("抓数据线程-%d").build());publicstaticvoidmain(String[]arg......
  • mongo异步python库Motor
    mongo异步python库Motor使用该第三方库前,先了解pythonasyncio安装python3-mpipinstallmotor​#motor版本要求:python>=3.5pymongo>=3.12创建客户端client=motor.motor_asyncio.AsyncIOMotorClient('localhost',27017)orclient=motor.motor_asyncio.AsyncIOM......
  • Yarn Scheduler调度器解析
    1.背景在Yarn中,资源调度是最核心的功能。在https://blog.51cto.com/u_15327484/7835300文章中,介绍了Yarn调度的核心接口ResourceScheduler通过nodeUpdate方法调度资源,通过allocate方法获取调度结果。ResourceScheduler常用的ResourceScheduler实现就是FairScheduler和CapacitySc......
  • 三、资源调度
    三、资源调度1.为什么要资源调度在传统架构中,我们总在考虑或者面临一个问题,我们的应用需要部署在哪里?我们的应用现在在哪里运行着?有一个服务不可访问了,去哪里排查?诸如此类的问题总是会出现在工作中。但是在使用Kubernetes部署应用后,我们无须再关心应用到底部署在了哪台服务器,也无......