首页 > 其他分享 >Yarn federation原理与实践

Yarn federation原理与实践

时间:2023-10-21 22:01:54浏览次数:26  
标签:federation 权重 实践 yarn YARN 集群 Yarn router

1. 背景

随着业务的增长,Yarn集群也不断扩展。节点数增多、请求增多、队列增多,造成调度性能线性下降。如下是三个集群的性能数据:

集群 队列数量 平均调度耗时 最大每秒调度数量CPS
集群A 2706 3.8 ms 483
集群B 620 940 微秒 1150
集群C 399 676 微秒 1013

对于集群A,其调度耗时已经非常高了,吞吐量也是腰斩。最大内存使用率72%,平均内存使用率45%;最大vcore使用率66%,平均vcore使用率40%。

为了避免由于队列的增加导致调度性能下降,无法通过打label的方式进行缓解,这是因为即使有label,依然是全队列排序。因此只能调研Yarn Federation,将一个集群拆分成为两个Yarn集群,每个Yarn集群设置一半的队列。

2. Yarn Federation简介

YARN Federation 架构与 HDFS Federation 的架构有点相似,使用联邦模式,将多个 YARN 集群联合成一个超大 YARN 集群,可以将 YARN 集群扩展至数万计算节点。每个子 YARN 集群都有自己 RM 和计算节点。对用户屏蔽了底层集群存在,用户见到的是一个超大 YARN 集群,可以在联合集群上进行提交作业。而在实际底层,联合系统与各子集群 RM 进行协商,为作业提供计算资源,允许作业跨集群运行。

Client 端向 YARN Router 提交作业,Router 访问 State-Store,获取作业的运行策略。根据策略确定哪个 ResourceManager 作为 HomeCluster。 AppMaster 通过 AMRMProxy 向 ResourceManager 申请资源。 AppMaster 申请到资源后,通过 AMRMProxy 实现跨集群启动 task 任务,实现任务的运行。

Untitled.png

它包含以下几个组件:

  1. SubCluster:SubCluster 就是设计中的 YARN 子集群,一般子集群要具有高可用性,即可以容忍 RM,NM 宕机,实现伤害最小化。如果一个子集群整个挂掉,联邦机制应该保证这个子集群上的作业被重新提交到另一个子集群上。在联邦集群里,子集群是一个扩展单位。我们可以通过添加一个或者多个联邦来扩展一个联邦集群。而且,各自独立的子集群可以决定是否并入联邦系统,贡献出自己一部分的资源。
  2. Router:Router 为联邦架构下对外提供服务的接口,实现了 ApplicationClientProtocol 担当了以前 RM 的角色,屏蔽了底层多个子集群 RMs 的存在。对客户端的请求进行转发,根据策略将请求转发至子集群。该组件是无状态的,一个 Federation 集群可以配置一组,但最少配置一个。用户提交应用时首先会访问其中一个 Router,然后 Router 会先从 State Store 中获得所有子集群信息(active rm 和 其他一些使用率信息),之后根据配置的路由策略将应用程序提交请求转发到对应的 RM 上。一个作业在哪一个子集群中启动,这个子集群就被称为该作业的"home sub-cluster",其它子集群都是该作业的"secondary sub-cluster"。
  3. StateStore:Federation 集群状态存储组件,其中主要记录了所有 sub-cluster 的信息。目前提供了 Memory、MySQL、Zookeeper 几种存储实现,可以根据自己的场景选择一种。一般使用 Zookeeper 存储。内容包括:
    • policies: 策略信息,确定任务如何在子集群 (SubCluster) 中运行。
    • memberships:子集群 (SubCluster) 信息,通过在子集群中配置(YARN.federation.state-store.heartbeat-interval-secs), 从而向 YARN Router 汇报。
    • applications: 具体作业的 application 信息。
  4. AMRMProxy:AMRMProxy 是保证应用可以跨子集群运行的关键组件,充当应用程序和多个 RM 通讯的桥梁。AMRMProxy 会在每个 NM 上启动,实现了 ApplicationMasterProtocol,充当 AM 向 RM 的代理。YARN Federation 架构应用不允许和子集群中的 RM 直接通信,应用被限定只能与 AMRMProxy 通信,以此来实现多 RMs 的透明访问。

本文将介绍Yarn Router处理客户端请求过程,以及AMRMProxy服务。

3. Router向子集群提交作业请求过程

Router服务实现了ApplicationClientProtocol,它能够接受用户请求,用户提交时无需更改代码。Router接收到用户提交的作业后,调用RouterClientRMService#submitApplication方法开始处理请求:

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
    return pipeline.getRootInterceptor().submitApplication(request);
  }

真正执行作业提交的是FederationClientInterceptor#submitApplication。它进行如下步骤:

  1. 获取用户提交作业时指定的队列,从zk的/federationstore/policies/{queue}中获取子集群选择策略。
  2. 从zk的/federationstore/memberships中获取所有子集群的信息。
  3. 根据子集群选择策略,选择合适的子集群提交应用。

首先,调用ZookeeperFederationStateStore#getPolicy方法。从/federationstore/policies/{queue}中获取请求队列指定的策略:

private SubClusterPolicyConfiguration getPolicy(final String queue)
      throws YarnException {
    //查询zk的基础路径为/federationstore/policies,后面加上请求时指定的队列名
    String policyZNode = getNodePath(policiesZNode, queue);

    SubClusterPolicyConfiguration policy = null;
    //获取zk中队列的策略
    byte[] data = get(policyZNode);
    if (data != null) {
      try {
        policy = new SubClusterPolicyConfigurationPBImpl(
            SubClusterPolicyConfigurationProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse policy at " + policyZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return policy;
  }

将策略存储到缓存中:

private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
      Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
      SubClusterPolicyConfiguration conf)
      throws FederationPolicyInitializationException {

    FederationPolicyInitializationContext context =
        new FederationPolicyInitializationContext(conf, subClusterResolver,
            federationFacade, null);
    String newType = context.getSubClusterPolicyConfiguration().getType();
    FederationRouterPolicy routerPolicy = policyMap.get(queue);

    FederationPolicyManager federationPolicyManager =
        FederationPolicyUtils.instantiatePolicyManager(newType);
    // set queue, reinit policy if required (implementation lazily check
    // content of conf), and cache it
    federationPolicyManager.setQueue(queue);
    routerPolicy =
        federationPolicyManager.getRouterPolicy(context, routerPolicy);

    // we need the two put to be atomic (across multiple threads invoking
    // this and reset operations)
    synchronized (this) {
      policyMap.put(queue, routerPolicy);
      cachedConfs.put(queue, conf);
    }
  }

RouterPolicyFacade#getHomeSubcluster根据zk的策略,获取子集群:

FederationRouterPolicy policy = policyMap.get(queue);
return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);

FederationRouterPolicy定义了getHomeSubcluster方法专用于获取子集群:

SubClusterId getHomeSubcluster(
      ApplicationSubmissionContext appSubmissionContext,
      List<SubClusterId> blackListSubClusters) throws YarnException;

其实现类如下所示:

  1. UniformRandomRouterPolicy:随机挑选子集群。
  2. WeightedRandomRouterPolicy:基于配置的权重随机挑选子集群。例如:权重配置为SC-1: 0.8, SC-2: 0.2,80%的作业路由到SC-1,20%的作业路由到SC-2。
  3. PriorityRouterPolicy:基于配置的权重选择权重最大的子集群。例如:权重配置为SC-1: 0.8, SC-2: 0.2,作业全部路由到SC-1。
  4. HashBasedRouterPolicy:基于作业队列名称的hash值挑选子集群,可以保证一个队列的所有作业都提交到同一个子集群。
  5. LoadBasedRouterPolicy:基于子集群的负载,选择可用内存最多的子集群。
  6. RejectRouterPolicy:拒绝作业请求,场景是拒绝某个队列的作业提交。

Untitled 1.png

以WeightedRandomRouterPolicy为例。先调用ZookeeperFederationStateStore#getPolicy 从/federationstore/memberships中获取子集群信息:

public GetSubClustersInfoResponse getSubClusters(
      GetSubClustersInfoRequest request) throws YarnException {
    List<SubClusterInfo> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(membershipZNode)) {
        SubClusterId subClusterId = SubClusterId.newInstance(child);
        SubClusterInfo info = getSubclusterInfo(subClusterId);
        if (!request.getFilterInactiveSubClusters() ||
            info.getState().isActive()) {
          result.add(info);
        }
      }
    } catch (Exception e) {
      String errMsg = "Cannot get subclusters: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    return GetSubClustersInfoResponse.newInstance(result);
  }

PriorityRouterPolicy再根据集群选择策略,选择合适的子集群。重点解释下权重选择算法:

假设有三个子集群,权重分别时0.1,0.3,0.6,总权重是1。算法每次执行时,随机选择一个数字,通过算法的逻辑,当随机数处于[0,0.1],返回权重0.1的下标;当随机数处于[0.1,0.4],返回权重0.3的下标;当随机数处于[0.4,1.0],返回权重0.6的下标。可以根据源码+以下例子理解算法:

  1. 随机数字就是0~1的随机数*1,它处于[0,0.1]的概率是10%,直接返回该权重对应的下标。当随机数大于0.1时,不会选择该节点,开始选择下一个节点。
  2. 对于[0.1,0.4]区间的随机数,它大于0.1,且[0.1,0.4]-0.1都小于0.3。因此0.3的权重的选择概率也是30%,直接返回该权重对应的下标。当随机数大于0.4时,不会选择该节点,开始选择下一个节点。
  3. 对于[0.4,1.0]区间的随机数,[0.4,1.0]-0.1-0.3都大于0,因此选择权重0.6,0.6权重的选择概率也是60%,直接返回该权重对应的下标。

最终WeightedRandomRouterPolicy根据返回的权重下标获取对应子集群并返回:

public SubClusterId getHomeSubcluster(
      ApplicationSubmissionContext appSubmissionContext,
      List<SubClusterId> blacklist) throws YarnException {
    //将子集群和权重放到队列中
    ArrayList<Float> weightList = new ArrayList<>();
    ArrayList<SubClusterId> scIdList = new ArrayList<>();
    for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
      if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
        continue;
      }
      if (entry.getKey() != null
          && activeSubclusters.containsKey(entry.getKey().toId())) {
        weightList.add(entry.getValue());
        scIdList.add(entry.getKey().toId());
      }
    }
    //获取基于权重选择的下标
    int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList);
    if (pickedIndex == -1) {
      throw new FederationPolicyException(
          "No positive weight found on active subclusters");
    }
    //根据下标选择子集群
    return scIdList.get(pickedIndex);
}

//获取基于权重选择的下标
public static int getWeightedRandom(ArrayList<Float> weights) {
    int i;
    float totalWeight = 0;
    for (i = 0; i < weights.size(); i++) {
      if (weights.get(i) > 0) {
        totalWeight += weights.get(i);
      }
    }
    if (totalWeight == 0) {
      return -1;
    }
    //比totalWeight小的大于0的随机数字
    //RAND.nextFloat()在[0,1]之间
    float samplePoint = RAND.nextFloat() * totalWeight;
    int lastIndex = 0;
    for (i = 0; i < weights.size(); i++) {
      if (weights.get(i) > 0) {
        //如果数字小于权重,返回权重对应的下表
        if (samplePoint <= weights.get(i)) {
          return i;
        } else {
          //如果数字大于权重,则减去该权重,继续下次遍历
          lastIndex = i;
          samplePoint -= weights.get(i);
        }
      }
    }
    // This can only happen if samplePoint is very close to totoalWeight and
    // float rounding kicks in during subtractions
    return lastIndex;
  }
//根据下表返回对应子集群
return scIdList.get(pickedIndex);

FederationClientInterceptor#submitApplication最终提交应用到子集群中:

ApplicationClientProtocol clientRMProxy =
          getClientRMProxyForSubCluster(subClusterId);

      SubmitApplicationResponse response = null;
      try {
        response = clientRMProxy.submitApplication(request);
      } catch (Exception e) {
        LOG.warn("Unable to submit the application " + applicationId
            + "to SubCluster " + subClusterId.getId(), e);
      }

4. AppMaster跨子集群申请container

Yarn Federation支持一个应用在不同的子集群中申请资源。

对于Yarn原有的架构,AM只能向一个ResourceMananger上发送资源请求,为了支持Yarn Federation跨子集群的资源请求,在NodeMananger上新增了AMRMProxy服务。该服务拦截所在NodeManager的AM请求,向多个RM发起资源请求,从而屏蔽AM与所有ResourceManager的直接交互。对于AM而言,AMRMProxy就是RM上的ApplicationMasterService。

新增AMRMProxy这个额外的服务,对于Hadoop而言又新增了复杂性。但是这时不可避免的,因为如果不增加这个服务,那么就需要变更ApplicationMasterProtocol协议。那么所有计算组件都要适配,这个改动量太大,Hadoop只能妥协,自己新增AMRMProxy来这支持新变更。

对于AMRMProxy而言,可以通过设置策略的方式来指定向哪些子集群发起调度。有以下4种策略:

  1. BroadcastAMRMProxyPolicy:向所有子集群发送资源请求。
  2. HomeAMRMProxyPolicy:只向作业的home subcluster发送资源请求。
  3. LocalityMulticastAMRMProxyPolicy:优先考虑调度本地性,没有本地性的资源请求会根据配置的权重发送,不会向权重配置为0的子集群发送请求(即使满足调度本地性)。
  4. RejectAMRMProxyPolicy:拒绝任何请求,作业无法调度到任何集群。

对于生产环境而言,对于不同的Yarn子集群,一般都不需要进行跨集群申请资源。跨集群还有一些缺点,比如要在其他子集群中添加队列,这会降低调度性能;还会在启动集群中启动AppMaster,这会浪费资源。

一般使用LocalityMulticastAMRMProxyPolicy策略即可。对于权重的设置,往往只会有一个集群是1,其他集群都是0。即每个应用只会调度到一个集群中。

由于没有跨集群的需求,暂不研究AMRMProxy的执行流程。

5. 部署实践

标签:federation,权重,实践,yarn,YARN,集群,Yarn,router
From: https://blog.51cto.com/u_15327484/7969763

相关文章

  • java项目实践-jsp-finter-监听器-day19
    目录1.jsp2.过滤器3.listener监听器1.jspservle逻辑处理方便html页面表现麻烦jsp页面表现方便但是逻辑处理麻烦JSP是一种页面技术JSP本质上是servlet类通过JSP引擎翻译成servletjsp约等于java+html注意:jsp不是访问静态的html文件index.jsp修改成如下代码:<%-......
  • 苏格拉底问答、实践过程截图、遇到问题解决问题截图,代码链接
    苏格拉底问答实践过程遇到问题及解决代码......
  • 《软件工程:方法与实践》读书笔记3
    1.瀑布模型是最早出现的软件开发模型,在软件工程中占有重要的地位,它提供了软件开发的基本框架。瀑布模型的本质是一次通过,即每个活动只执行一次,最后得到软件产品,也称为“线性顺序模型”或者“传统生命周期”。其过程是从上一项活动接收该项活动的工作对象作为输入,利用这一输入实施......
  • 数据采集与融合技术实践第三次实验
    数据采集与融合技术实践第三次实验Gitee:https://gitee.com/lululusc/crawl_project/tree/master/作业3作业1要求指定一个网站,爬取这个网站中的所有的所有图片,例如中国气象网(http://www.weather.com.cn/(要求:指定--个网站,爬取这个网站中的所有的所有图片,例如中国气象网)结......
  • 优维产品最佳实践第12期:IT资源管理首页丰富
    背景当我们进入平台后,默认跳转至IT资源管理首页,因此该页面的优化与丰富将极大的提高平台使用者的体验和效率。优化后的首页可以更好地展示常用模型、小产品、外部系统、以及保存的所有关系查询和快速查询条件,使用户能够更快捷、方便地找到所需内容。丰富前:丰富后:功能1.关系查询和......
  • 花生好车基于 KubeSphere 的微服务架构实践
    公司简介花生好车成立于2015年6月,致力于打造下沉市场汽车出行解决方案第一品牌。通过自建直营渠道,瞄准下沉市场,现形成以直租、批售、回租、新能源汽车零售,四大业务为核心驱动力的汽车新零售平台,目前拥有门店600余家,覆盖400余座城市,共设有25个中心仓库。目前已为超40万......
  • 花生好车基于 KubeSphere 的微服务架构实践
    公司简介花生好车成立于2015年6月,致力于打造下沉市场汽车出行解决方案第一品牌。通过自建直营渠道,瞄准下沉市场,现形成以直租、批售、回租、新能源汽车零售,四大业务为核心驱动力的汽车新零售平台,目前拥有门店600余家,覆盖400余座城市,共设有25个中心仓库。目前已为超40......
  • java项目实践-cookie-session-day18
    目录1.cookie2.session3.servletcontext4.servletConfig1.cookiecookie在客户端(浏览器)中保持http状态的信息技术本质是浏览器缓存每次发请求的时候在请求头中带给服务端常见的应用场景:实现7天免登录浏览器F12request.setCharacterEncoding("utf-8");......
  • 2023-10-19 第22本书《Jenkins 2.x 实践指南》
    囫囵吞枣读完的,也算听有收获的,他这里面介绍了ansible集成进jenkins感觉有点搞头。收益是:方便部署到多台服务器。明天继续研究一下把。 总体讲的还行,继续看看把。 主要是,我最近有点疲惫,没啥状态。心里乱乱的。 ......
  • Netty实践 -- echo
    Netty实践学习netty,可以从netty源码的netty-example模块开始。netty-example有一个例子echo,非常适合入门学习。这里稍微改造一下,用作示例学习。引入依赖包:<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId>......