1. 背景
在https://blog.51cto.com/u_15327484/7920197文章中,将调度器从FairScheduler迁移到CapacityScheduler。
CapacityScheduler在默认情况下,当接受到NodeManager心跳请求时,会调用nodeUpdate方法开始进行资源调度,这种调度方式称为心跳调度,也称同步调度。
心跳调度实现起来简单,但是有很大的弊端。它及其依赖NodeManager心跳的稳定,一旦NodeManager和ResourceManager网络产生波动,那么ResourceManager的调度也会发生延迟,这种调度方式十分不稳定。
CapacityScheduler提供了异步调度功能,ResourceManager可以配置多个线程,在后台定时地对请求进行资源调度,完全不依赖于心跳。
2. 异步调度概览
对于异步调度,CapacityScheduler会设置若干线程,每个线程中,依次扫描所有节点,寻找合适的队列,选择合适的应用,最后选择应用中的某一资源请求进行资源调度,其层级关系如下:
for node in allNodes:
Go to parentQueue
Go to leafQueue
for application in leafQueue.applications:
for resource-request in application.resource-requests
try to schedule on node
3. 异步调度原理
默认情况下,不开启异步调度功能时,scheduleAsynchronously为false,nodeUpdate可以正常执行allocateContainersToNode方法开始调度流程。
一旦开启异步调度,ResourceManager处理好心跳信息后直接退出,不进行调度:
protected void nodeUpdate(RMNode rmNode) {
long begin = System.nanoTime();
readLock.lock();
try {
setLastNodeUpdateTime(Time.now());
super.nodeUpdate(rmNode);
} finally {
readLock.unlock();
}
// Try to do scheduling
//如果没有开启异步调度,那么就开始进行调度;否则调度会在异步线程中执行
if (!scheduleAsynchronously) {
writeLock.lock();
try {
// reset allocation and reservation stats before we start doing any
// work
updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
CSAssignment.NULL_ASSIGNMENT);
allocateContainersToNode(rmNode.getNodeID(), true);
} finally {
writeLock.unlock();
}
}
相应地,CapacityScheduler增加了死循环的异步线程AsyncScheduleThread队列,线程执行时,如果正在进行调度的数量不超过MaxPendingBacklogs,那么就开始调度:
private List<AsyncScheduleThread> asyncSchedulerThreads;
public void run() {
int debuggingLogCounter = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!runSchedules.get()) {
Thread.sleep(100);
} else {
// Don't run schedule if we have some pending backlogs already
//如果正在进行调度的数量超过MaxPendingBacklogs,就停1ms后再尝试
if (cs.getAsyncSchedulingPendingBacklogs()
> cs.asyncMaxPendingBacklogs) {
Thread.sleep(1);
} else{
//开始调度
schedule(cs);
if(LOG.isDebugEnabled()) {
// Adding a debug log here to ensure that the thread is alive
// and running fine.
if (debuggingLogCounter++ > 10000) {
debuggingLogCounter = 0;
LOG.debug("AsyncScheduleThread[" + getName() + "] is running!");
}
}
}
}
} catch (InterruptedException ie) {
// keep interrupt signal
Thread.currentThread().interrupt();
}
}
LOG.info("AsyncScheduleThread[" + getName() + "] exited!");
}
调度过程中,依次对所有节点进行调度:
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
节点选择好后,选择队列:
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
candidates.getPartition()); iter.hasNext(); ) {
CSQueue childQueue = iter.next();
//省略
CSAssignment childAssignment = childQueue.assignContainers(cluster,
candidates, childLimits, schedulingMode);
//省略
}
选择好队列后,选择应用:
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
//省略
// Try to schedule
assignment = application.assignContainers(clusterResource,
candidates, currentResourceLimits, schedulingMode, null);
//省略
}
选择好应用后,选择资源请求:
// Schedule in priority order
for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
ContainerAllocation result = allocate(clusterResource, candidates,
schedulingMode, resourceLimits, schedulerKey, null);
AllocationState allocationState = result.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource, result,
null, node);
}
4. 调度器压测
在https://blog.51cto.com/u_15327484/7939194文章中,介绍了Yarn SLS的使用。本次压测基于SLS,相关参数如下:
- sls 工具 1000 并发。
- YARN: CapacityScheduler。
- 运行时长:2000 sec。
- 模拟app数量:1618。
- 模拟节点数:635。
对于不同数量的异步线程,分别测试其调度性能。从测试结果来看,调度线程数设置为10最合适,对比同步调度性能提升 38%:
平均调度耗时 | |
---|---|
同步调度 | 24ms |
异步调度(1线程数) | 17.2ms |
异步调度(5线程数) | 16.3ms |
异步调度(10线程数) | 14.8ms |
异步调度(20线程数) | 14.7ms |
异步调度(100线程数) | 16.9ms |
基于此,在yarn-site.xml中新增异步调度相关配置:
yarn.scheduler.capacity.schedule-asynchronously.enable=true(是否开启异步调度,默认为false)
yarn.scheduler.capacity.schedule-asynchronously.maximum-threads=1(调度线程数,默认为1)
yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms=5(调度间隔时间,默认为5ms)
yarn.scheduler.capacity.schedule-asynchronously.maximum-pending-backlogs=10000(调度队列最大容量,默认为100)
4. 上线效果review
将异步线程上限到集群A后。变更后,平均container调度耗时减少 4.6%,最大调度吞吐量提升 10.5%,平均container pending数下降 7.6%:
指标 | 变更前 | 变更后 |
---|---|---|
container调度耗时 | 平均 994 微秒 | 平均 948 微秒 |
container调度数量 | 最大 1043/s | 最大 1153/s |
container pengding数量 | 平均 509K | 平均 470K |
将异步线程上限到集群B后。变更后,平均AM调度耗时减少 35%,平均container调度耗时减少 18%,最大调度吞吐量提高 300%。
指标 | 变更前 | 变更后 |
---|---|---|
AM调度耗时 | 平均 705ms | 平均 452ms |
container调度耗时 | 平均 338 微秒 | 平均 275 微秒 |
container调度吞吐量 | 最大 330 | 最大 1308 |
总结:集群B优化效果更好,原因是队列数更少。
5. 优化思考
目前异步调度循环逻辑的次序是先遍历所有节点,再遍历所有资源请求:
for node in allNodes:
Go to parentQueue
Go to leafQueue
for application in leafQueue.applications:
for resource-request in application.resource-requests
try to schedule on node
这样做的缺点是一旦有请求对节点有亲和性要求,那么不满足时节点要求时,遍历结束。再次遍历知道找到合适的节点进行调度。
可以看到这种调度方式会有大量无效的循环执行。为了优化,社区提出了全局调度:https://issues.apache.org/jira/browse/YARN-5139。
在获取到资源的请求后,再在节点队列中选择节点:
Go to parentQueue
Go to leafQueue
for application in leafQueue.applications:
for resource-request in application.resource-requests
for node in nodes (node candidates sorted according to resource-request)
try to schedule on node
不过这个patch还没有合并到主干,说明该功能可能存在不稳定的情况,需要社区有其实践经验后再上线。
标签:node,异步,schedule,CapacityScheduler,调度,application,线程 From: https://blog.51cto.com/u_15327484/7941708