首页 > 其他分享 >ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

时间:2024-02-06 16:57:34浏览次数:29  
标签:DAG name ACK 任务 Fan split Argo

作者:庄宇

什么是 Fan-out Fan-in

在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。

由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。

静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。

动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。

ACK One 分布式工作流 Argo 集群

在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。

如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。

Argo Workflow 编排 Fan-out Fan-in 任务

我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

  1. 创建分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4]

  3. 使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dynamic-dag-map-reduce-
spec:
  entrypoint: main
  # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  # how many tasks to split, default is 5.
  arguments:
    parameters:
      - name: numParts
        value: "5"
  templates:
    - name: main
      # DAG definition.
      dag:
        tasks:
          # split log files to several small files, based on numParts.
          - name: split
            template: split
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
          # multiple map task to count words in each small file.
          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            # run as a loop, partId from split task json outputs.
            withParam: '{{tasks.split.outputs.result}}'
          - name: reduce
            template: reduce
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
            depends: "map"
    # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
    # Finally, it dumps a list of partId to stdout as output parameters
    - name: split
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["split.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # One `map` per partID is started. Finds its own "part file" and processes it.
    - name: map
      inputs:
        parameters:
          - name: partId
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["count.py"]
        env:
        - name: PART_ID
          value: "{{inputs.parameters.partId}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # The `reduce` task takes the "results directory" and returns a single result.
    - name: reduce
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["merge.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
      outputs:
        artifacts:
          - name: result
            path: /mnt/vol/result.json
  1. 动态 DAG 实现

1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。

          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            withParam: '{{tasks.split.outputs.result}}'

更多定义方式,请参考开源 Argo Workflow 文档 [ 6]

  1. 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。

  1. 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562

相关链接:

[1] 阿里云 ACK One 分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/overview-12

[2] Argo Workflow

https://argo-workflows.readthedocs.io/en/latest/

[3] 创建分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/create-a-workflow-cluster

[4] 工作流使用存储卷

https://help.aliyun.com/zh/ack/use-volumes

[5] 创建工作流

https://help.aliyun.com/zh/ack/create-a-workflow

[6] 开源 Argo Workflow 文档

https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/

[7] 分布式工作流 Argo 集群控制台

https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fcs.console.aliyun.com%2Fone%3Fspm%3Da2c4g.11186623.0.0.7e2f1428OwzMip#/argowf/cluster/detail

[8] AliyunContainerService GitHub argo-workflow-examples

https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

标签:DAG,name,ACK,任务,Fan,split,Argo
From: https://www.cnblogs.com/alisystemsoftware/p/18009981

相关文章

  • 修改代码 使fan.index(CoolWay) CoolWay不存在于fan时输出0
    您提供的这段代码是一个很好的例子,它使用了Python字典的get方法。这个方法会返回字典中指定键的值,如果键不存在,则返回默认值。在您的代码中,如果FinishTime.month在dic_monthToTemp字典中存在,就返回对应的温度,否则返回默认温度22。根据这个思路,我们可以修改您的代码来满足您的需求。......
  • Jackson序列化clob数据
    1.情景展示在java当中,有时候我们不得不用jdbc来读取数据库数据,而不是通过mybatis框架。这样就遇到一个问题:如果表字段的数据类型为clob时,使用springboot默认进行序列化时,会报错。如何解决?2.具体分析在springboot中,其默认的序列化类时Jackson。既然Jackson的默认序列化规......
  • 堆栈与堆(Stack vs Heap):有什么区别?
    编写有效的代码需要了解堆栈和堆内存,这使其成为学习编程的重要组成部分。不仅如此,新程序员或职场老手都应该完全熟悉堆栈内存和堆内存之间的区别,以便编写有效且优化的代码。这篇博文将对这两种内存分配技术进行全面的比较。通过本文的结论,我们将对堆栈和堆内存有一个透彻的了解,从而......
  • [EFI]DELL-7472电脑 Hackintosh 黑苹果efi引导文件
    硬件型号驱动情况主板DELL-7472处理器IntelCorei7-8550U已驱动内存16GBRAMDDR4已驱动硬盘PNYSSDNVME500GB已驱动显卡IntelUHDGraphics620已驱动声卡瑞昱RealtekALC256@英特尔HighDefinitionAudio控制器已驱动网卡瑞昱RTL8168/8111/8112GigabitEthernetContro......
  • YouTrack 用户登录提示 JIRA 错误
    就算输入正确的用户名和密码,我们也得到了下面的错误信息:youtrackCannotretrieveJIRAuserprofiledetails.   解决办法出现这个问题是因为YouTrack在当前的系统重有JIRA的导入关联。需要把这个导入关联取消掉。找到后台配置的导入关联,然后禁用持续导入功能。   这样......
  • Argocd学习
    argocd官网文档链接ArgoCD官网文档在K8S集群使用argocd命令将集群添加到argcd的cluster列表中argocdclusteraddkubernetes-admin@iamdemo--nameiamdemo--kubeconfig/root/.kube/config遇到一个添加失败的问题,通过修改/root/.kube/config的server:https://iamdemo.tp......
  • 自动化运维工具【SaltStack】
    SaltStack管理工具允许管理员对多个操作系统创建一个一致的管理系统,包括VMwarevSphere环境。SaltStack作用于仆从和主拓扑。SaltStack与特定的命令结合使用可以在一个或多个下属执行。主要用的语言为python二、SaltStack的配置使用自动化软件,实现在server1中显示server2中执......
  • 修改项目中packages包目录
    修改项目中packages包目录修改.NET项目中引用的Packages包目录目录修改项目中packages包目录一、解决的问题二、操作步骤三、注意事项四、相关参考通常情况下在.NET项目中会在解决方案同级的目录中生成一个packages包,解决方案各项目中引用的nuget包都会下载缓存到packages目录......
  • Stack(栈)
    特性先进后出,后进先出头文件#include<stack>基本使用定义stack<int>s;//建立一个栈s,其内部元素类型是int.使用s.push(a);//将a压进栈。s.pop();//将s的栈顶元素弹出。s.top();//查询s的栈顶元素。s.size();//查询s的元素个数。s.empty();//查询s是否为空。......
  • hdu 1312 Red and Black (BFS模板题)
    Problem-1312(hdu.edu.cn)BFS模板题#include<iostream>#include<queue>usingnamespacestd;typedeflonglongll;constintINF=0x3f3f3f3f;intwx,hy,num;charroom[25][25];#defineCHECK(x,y)(x>=0&&x<wx&&y>=0&&am......