1. 背景
在Yarn中,资源调度是最核心的功能。在https://blog.51cto.com/u_15327484/7835300文章中,介绍了Yarn调度的核心接口ResourceScheduler通过nodeUpdate方法调度资源,通过allocate方法获取调度结果。
ResourceScheduler常用的ResourceScheduler实现就是FairScheduler和CapacityScheduler,它们的调度流程都非常长。本文会围绕源码介绍调度流程,一些细节会忽略。
2. 调度器结构
- 队列:在FairScheduler和CapacityScheduler中,都维护了队列这一结构。每个队列拥有指定的资源,可以限制指定用户将作业运行在该队列之上。
- 应用:每个队列之上,有多个应用请求,这些应用请求中包含已经获得的资源以及需要分配的资源。
- 容器:每个应用下的资源请求以容器进行封装:
当resourcemanager接收到nodeUpdate请求后,会依次选择合适的队列、应用、容器进行调度。结构如下:
3. CapacityScheduler调度流程
对于CapacityScheduler而言,它设计的初衷就是优先分配资源给使用率最低的队列。调度步骤如下:
- 选择队列。从根队列开始,使用深度优先遍历算法,从根队列开始,依次遍历子队列找出资源占用率最小的子队列,以促使队列的使用率都处于平衡状态,即每个队列都有作业在运行。若子队列为叶子队列,则选择该队列;若子队列为非叶子队列,则以该子队列为根队列重复前面的过程直到找到一个资源使用率最小的叶子队列为止。
- 选择应用。在Step1中选好了叶子队列后,取该队列中最早提交的应用程序(实际排序时用的 Application ID,提交时间越早的应用程序,Application ID 越小)。
- 选择 Container。在 Step2中选好应用程序之后,选择该应用程序中优先级最高的 Container。对于优先级相同的 Containers,优选选择满足本地性的 Container,会依次选择 node local、rack local、no local。
CapacityScheduler服务端处理NODE_UPDATE调度事件,如果没有开启异步调度,就接收心跳时进行调度:
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
nodeUpdate(node);
//是否开启异步调度,如果没有开启,就在接收心跳时进行调度,否则在异步线程中进行调度
if (!scheduleAsynchronously) {
//开始调度
allocateContainersToNode(getNode(node.getNodeID()));
}
}
}
在allocateContainersToNode方法中,开始进行父队列的资源分配:
if (node.getReservedContainer() == null) {
//计算资源量是否大于0,否则说明集群资源已经用满了
if (calculator.computeAvailableContainers(node.getAvailableResource(),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
//父队列开始分配资源
root.assignContainers(clusterResource, node, false);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
}
其中,资源计算calculator由以下配置指定,DominantResourceCalculator使用cpu和内存计算资源。默认配置DefaultResourceCalculator只使用内存计算资源:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
步入正题,此时要从root队列中选择合适的子队列进行调度了。此时调用ParentQueue#assignContainers方法进行调度,该方法中,调用assignContainersToChildQueues方法,将决定调度哪个子队列。它遍历子队列,即队列中越前的子队列,越优先进行调度:
//遍历子队列,即队列中越前的子队列,越优先进行调度
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue childQueue = iter.next();
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
//子队列开始调度资源
assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
assignment.getResource() + ", " + assignment.getType());
}
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
}
break;
}
}
而子队列是TreeSet类型,它基于队列的使用大小,对于使用量最小的队列,越排在队列最前,越优先执行:
protected final Set<CSQueue> childQueues;
this.childQueues = new TreeSet<CSQueue>(queueComparator);
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
return -1;
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
return 1;
}
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
选择好合适的子队列后,最终选择到了叶子节点,叶子节点要选择合适的应用分配资源。叶子节点的实现类是LeafQueue,如下所示,它会遍历应用,依次进行优先级最高的请求进行调度,调用assignContainers方法:
//在队列中,应用越前,越先执行
for (FiCaSchedulerApp application : activeApplications) {
synchronized (application) {
// Check if this resource is on the blacklist
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}
//获取应用中每个资源请求的不同优先级,优先级越高越先执行
// Schedule in priority order
for (Priority priority : application.getPriorities()) {
ResourceRequest anyRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
continue;
}
// Required resource
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}
Set<String> requestedNodeLabels =
getRequestLabelSetByExpression(anyRequest
.getNodeLabelExpression());
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
// priority request as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
required, requestedNodeLabels);
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, true, requestedNodeLabels)) {
break;
}
// Inform the application it is about to get a scheduling opportunity
application.addSchedulingOpportunity(priority);
// Try to schedule
//进行调度
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null, needToUnreserve);
//省略
}
优先级的定义如下,优先级越高,资源请求越前,越先执行:
final Set<Priority> priorities = new ConcurrentSkipListSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
public static class Comparator
implements java.util.Comparator<org.apache.hadoop.yarn.api.records.Priority> {
@Override
public int compare(org.apache.hadoop.yarn.api.records.Priority o1, org.apache.hadoop.yarn.api.records.Priority o2) {
return o1.getPriority() - o2.getPriority();
}
}
应用的实现也是TreeSet,当appId越小,排序越前,先调度。appId越小,提交时间越早,越先执行:
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationId().compareTo(a2.getApplicationId());
}
};
选择好合适的应用后,就开始选择合适的container了。如下所示,LeafQueue按优先级分别调度本地、同机架、其他机架的container请求:
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
// Off-switch
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
在container的资源分配完后,会执行FiCaSchedulerApp#allocate方法:
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);
保存已经分配了资源的contianer信息:
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
当appMaster通过心跳获取资源分配结果时,就会从FiCaSchedulerApp#getAllocation中,获取这些容器,并和nodemanager通信启动容器。
4. FairScheduler调度流程
FairScheduler也是进行队列、应用、容器三个层次的调度,但是在排序方法上有所不同。
由FSParentQueue#assignContainer选择子队列。不同子队列的排序比较器是FairShareComparator:
private static final FairShareComparator COMPARATOR =
new FairShareComparator();
TreeSet<FSQueue> sortedChildQueues = new TreeSet<>(policy.getComparator());
readLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to parent queue: " +
getName() + " visiting " + childQueues.size() + " children");
}
sortedChildQueues.addAll(childQueues);
for (FSQueue child : sortedChildQueues) {
assigned = child.assignContainer(node);
if (!Resources.equals(assigned, Resources.none())) {
break;
}
}
}
对于FairShareComparator,它只计算内存,不计算CPU资源。为保持公平起见,它的排序策略如下:
- 一个队列需要资源, 另外一个队列不需要资源, 则需要资源的排前面。
- minResource空间使用占比上的排在前面。若都需要资源的话, 对比使用的内存占minShare的比例, 比例小的排前面, (即尽量保证达到minShare)。对于队列而言minShare就是minResource,即队列必须预留的资源量。
- 队列资源使用占比少的排在前面。若比例相同的话, 计算出使用量与队列权重的比例, 小的排前面, 即权重大的优先, 使用量小的优先。
- 若还是相同, 提交时间早的优先, app id小的排前面。
private static class FairShareComparator implements Comparator<Schedulable>,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
@Override
public int compare(Schedulable s1, Schedulable s2) {
int res = compareDemand(s1, s2);
// Share resource usages to avoid duplicate calculation
Resource resourceUsage1 = null;
Resource resourceUsage2 = null;
if (res == 0) {
//资源使用量
resourceUsage1 = s1.getResourceUsage();
resourceUsage2 = s2.getResourceUsage();
//资源使用量 % min resource
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
if (res == 0) {
//资源使用量 % resources consumed
res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
// Break the tie by submit time
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
// Break the tie by job name
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
相对于CapcityScheduler只考虑使用队列资源最小值,FairScheduler更为复杂,它考虑的是使用量与队列规格的比值。
了解了FairScheduler队列排序后,再看下应用的排序,它也是FairShareComparator进行排序的:
private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
TreeSet<FSAppAttempt> pendingForResourceApps =
new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
if (!Resources.isNone(app.getPendingDemand()) &&
(assignment || app.shouldCheckForStarvation())) {
pendingForResourceApps.add(app);
}
}
} finally {
readLock.unlock();
}
return pendingForResourceApps;
}
对于应用的排序规则如下:
- 一个作业需要资源, 另外一个作业不需要资源, 则需要资源的排前面。
- 在应用的实现类FSAppAttempt中,minShare为Resources.none(),为0,即应用没有minShare的概念。实际上已使用内存量越低的应用排序越前。
- 当两个应用的内存使用量相同时。在应用的实现类FSAppAttempt中,weights的实现是应用的优先级,内存使用量/优先级越小,越先执行。
- 若还是相同, 提交时间早的优先, app id小的排前面。
对于容器的优先级,也是本地、同机架、其他机架的顺序,这里不展开。
5. FairScheduler和CapacityScheduler对比
如下,Capacity在队列排序中,基于资源利用率进行排序;在应用排序中,使用FIFO。在Hadoop3.3.5版本中,应用排序支持Fair了,它对所有应用按资源使用比例从小到大、提交时间从前往后的顺序排序。
FairScheduler在队列排序和应用排序都使用FairShareComparator进行排序(当然也可以选择其他排序器):
本文只描述两个最核心的区别,其他重点后续再看。
- 备注:CapacityScheduler处理心跳调度,还可以异步调度。FairScheduler则可以连续调度。但是社区反馈连续调度当nm节点数超过100,会严重影响调度性能:https://issues.apache.org/jira/browse/YARN-6487,处于废弃状态。
6. FairScheduler相关配置
FairScheduler配置 | CapacityScheduler配置 | 配置说明 |
---|---|---|
minResources | yarn.scheduler.capacity.<queue-path>.capacity | 队列最小资源 |
maxResources | yarn.scheduler.capacity.<queue-path>.maximum-capacity | 队列最大资源 |
aclSubmitApps | yarn.scheduler.capacity.<queue-path>.acl_submit_applications | 队列提交授权 |
aclAdministerApps | yarn.scheduler.capacity.<queue-path>.acl_administer_queue | 队列管理授权 |
maxAMShare | yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent | AM资源限额管理 |
queueMaxAppsDefault | yarn.scheduler.capacity.<queue-path>.maximum-applications | 最大apps限制 |
maxRunningApps | yarn.scheduler.capacity.<queue-path>.max-parallel-apps | 最大运行apps限制 |
maxContainerAllocation (“X mb, Y vcores”) | yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores | container最大资源 |
yarn.scheduler.fair.preemption | yarn.resourcemanager.scheduler.monitor.enable | 是否开启抢占,默认值为false |
yarn.scheduler.fair.allow-undeclared-pools | yarn.scheduler.capacity.<queue-path>.auto-create-child-queue.enabled | 是否允许自动创建队列,FS默认为true,CS默认为false |
yarn.scheduler.fair.user-as-default-queue | yarn.scheduler.capacity.queue-mappings | 用户队列映射,FS默认为true |
fairscheduler配置示例:
<pool name="quque1">
<minSharePreemptionTimeout>600</minSharePreemptionTimeout>
<maxResources>2400000 mb,1200 vcores</maxResources>
<aclAdministerApps>quque1</aclAdministerApps>
<aclSubmitApps>quque1</aclSubmitApps>
<minResources>16000 mb,8 vcores</minResources>
</pool>
webUI示例:
capacityscheduler配置示例:
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.maximum-applications</name>
<value>10000</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.minimum-user-limit-percent</name>
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.acl_administer_queue</name>
<value>gzsushixuan</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.capacity</name>
<value>[memory=16000,vcores=8]</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.user-limit-enable</name>
<value>false</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.ordering-policy</name>
<value>fair</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.acl_submit_applications</name>
<value>gzsushixuan</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.user-limit-factor</name>
<value>10000</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.maximum-capacity</name>
<value>[memory=2400000,vcores=1200]</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.state</name>
<value>RUNNING</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.gzsushixuan.maximum-am-resource-percent</name>
<value>300.0</value>
</property>
webUI示例:
capacityScheduler配置解释:
- yarn.scheduler.capacity.<queue-path>.capacity:最小资源量,对应webUI的Absolute Configured Capacity。
- yarn.scheduler.capacity.<queue-path>.maximum-capacity:最大资源量,对应webUI的Absolute Configured Max Capacity。
- yarn.scheduler.capacity.<queue-path>.user-limit-factor:单个用户最多能够使用占比,如果已用户命令队列,可以设置100%。
- yarn.scheduler.capacity.resource-calculator: org.apache.hadoop.yarn.util.resource.DefaultResourseCalculator,它只会计算内存。DominantResourceCalculator则会计算内存和CPU。
- 如果队列配置的是绝对资源,当所有队列的capacity总和大于集群总资源时,队列的最小有效资源(Effective Capacity)= 队列最小资源占所有队列资源占比 * 集群总资源。
- 如果队列配置的是绝对资源,当所有队列的capacity总和大于集群总资源时,队列的最大有效资源(Effective Max Capacity)= 队列最大资源占所有队列资源占比 * 集群总资源。
- 当队列配置的是绝对资源时,叶子队列设置的maximum-applications无效,会被重新计算,计算方式如下:队列的maximum-applications = 队列资源占比 * yarn.scheduler.capacity.maximum-applications(默认为10000)。