首页 > 其他分享 >Pass Artifact between tfx compoents when running with kubeflow pipeline

Pass Artifact between tfx compoents when running with kubeflow pipeline

时间:2024-01-31 17:27:15浏览次数:32  
标签:pipeline pv name compoents Artifact tfx kubeflow root

What is Artifact?

An Artifact is a file or directory produced by a tfx component, which can be passed to a downstream component, and then the downstream component can use it.

How does tfx pass an Artifact between components?

tfx pipeline has an argument "pipeline_root" when instantiating, this is the directory where components will put output Artifacts in, and when one component completes executing, it stores its output Artifacts in pipeline_root, and records the uri (namely the path) of each output Artifact in database metadb. when a downstream component has an Input Artifact, namely it needs an Output Artifact of its upperdstream component, It queries the database metadb accoding to the Input Artifact.channel (namely information about which pipeline, which pipeline-run, which pipeline node the Artifact belongs to. ) , to find the Artifact record in the database metadb, and in the Artifact record, there is its uri, and the downstream component reads the Input Artifact's data from its uri.


tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components,
)

when run tfx pipelne using kubeflow pipeline, how to pass an Artifact between components?

  When run tfx pipelne using kubeflow pipeline, the pipeline runs on kubernetes cluster, one component runs in one pod, and a container in a pod has standalone file system. Even thoug pipeline_root is same in each component's container, since they belong to standaloine file systems, they are different directories. So one component can not read the Artifact from its uri (pipeline_root/xxx) since the Artifact is stored at pipeiine_root/xxx of another comtainer's file system.

pipeline_root needs to be a directory which can be read and written by all components of the pipeline, One solution is to mount one PersistentVolume to each component's container direcotry (pipeline_root) , and the PersistentVolume should be nfs (network files system), since normally components of a pipeline run on different computers (namely nodes) in the kubernetes cluster.

# create resource Persitent Volume tfx_pv in kubernetes cluster
kubectl apply -f tfx_pv.yaml

# create resource Persistent Volume Claim tfx_pv_claim in kubernetes cluster
# Attention: tfx_pv_claim needs to be in the same namespace with the object who wants to use it,
# in this case, the components of tfx pipeline.
# a pv claim will wait for the first comsumer (namely a pod who uses this pv claim) before binding to an available pv (persistent volume).
kubeclt apply -f tfx_pv_claim.yaml


# pipeline.yaml

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: detect-anomolies-on-wafer-tfdv-schema-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0, 
  pipelines.kubeflow.org/pipeline_compilation_time: '2024-01-07T22:16:36.438482',
  pipelines.kubeflow.org/pipeline_spec: '{"description": "Constructs a Kubeflow
  pipeline.", "inputs": [{"default": "pipelines/detect_anomolies_on_wafer_tfdv_schema",
  "name": "pipeline-root"}], "name": "detect_anomolies_on_wafer_tfdv_schema"}'}
  labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0}
spec:
  entrypoint: detect-anomolies-on-wafer-tfdv-schema

  ...
  volumes:
  - name: tfx-pv
    persistentVolumeClaim:
      claimName: tfx-pv-claim

  templates:
  - name: detect-anomolies-on-wafer-tfdv-schema
    inputs:
    parameters:
    - {name: pipeline-root}

    dag:
      tasks:
      - name: importexamplegen
        template: importexamplegen
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: wafer-data
        - name: tfx-pv

      - name: pusher
        template: pusher
        dependencies: [trainer]
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: tfx-pv

      - name: schema-importer
        template: schema-importer
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: tfx-pv
        - name: schema-path

      - name: statisticsgen
        template: statisticsgen
        dependencies: [importexamplegen]
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: tfx-pv

      - name: trainer
        template: trainer
        dependencies: [importexamplegen, transform]
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: trainer-module
        - name: tfx-pv

      - name: transform
        template: transform
        dependencies: [importexamplegen, schema-importer]
        arguments:
        parameters:
        - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
        volumes:
        - name: transform-module
        - name: tfx-pv
        - name: schema-path

  - name: importexamplegen
    container:
      ...
      volumeMounts:
      - mountPath: /maye/trainEvalData
        name: wafer-data
      - mountPath: /tfx/tfx_pv
        name: tfx-pv

# tfx_pv_claim.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: tfx-pv-claim
namespace: kubeflow
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
# tfx_pv.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
name: tfx-pv
spec:
capacity:
storage: 5Gi
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Retain
storageClassName: local-storage
nfs:
server: nfs-server-ip
path: /home/maye/nfs/tfx_pv
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- maye-inspiron-5547

标签:pipeline,pv,name,compoents,Artifact,tfx,kubeflow,root
From: https://www.cnblogs.com/zhenxia-jiuyou/p/17999702

相关文章

  • Unity:Couldn't open include file 'Packages/com.unity.render-pipelines.universal/S
    初学Shader,Unity报错↑,总之是找不到ulsl的Core文件,网上找的其他解决方案例如删除文件夹再生成或者改变shader的某些属性,但是根本找不到。最终找到原因是项目类型不同,要把传统3D升级成URP项目。解决办法:安装UniversalRP拓展1.在unity中打开需要升级的场景,SaveAs一份并打开该场......
  • Apache Geode‘s Integration with Apache Kafka: Building HighThroughput, LowLaten
    1.背景介绍在当今的大数据时代,高性能、高吞吐量和低延迟的数据处理能力已经成为企业和组织的核心需求。ApacheGeode和ApacheKafka都是开源社区提供的强大工具,它们各自擅长于不同的数据处理场景。Geode是一个高性能的分布式缓存和计算引擎,它可以处理大量数据并提供低延迟的访......
  • ax650使用ax-pipeline进行推理
    ax650使用ax-pipeline进行推理搭建交叉编译环境拉取ax-pipeline源码及子模块gitclone--recursivehttps://github.com/AXERA-TECH/ax-pipeline.git下载sdkcdax-pipeline./download_ax_bsp.shax650cdax650n_bsp_sdkwgethttps://github.com/ZHEQIUSHUI/assets/re......
  • Tekton pipelineruns 基础
    pipelineruns概述PipelineRun允许你在集群上实例化和执行Pipeline。一个Pipeline指定一个或多个Tasks,按照期望的执行顺序执行。PipelineRun按照指定的顺序执行Pipeline中的Tasks,直到所有Tasks都成功执行或失败。PipelineRun会自动为Pipeline中的每个Task创建相应的taskrun。pipeli......
  • Tekton Pipelines 基础
    Pipelines概述Pipeline是Tasks的集合,作为持续集成流的一部分,您可以定义并按照特定的执行顺序排列这些Tasks。Pipeline中的每个Tasks在Kubernetes集群上作为Pod执行。您可以配置各种执行条件来满足您的业务需求。Pipeline使用When表达式when表达式input:被评估的内容,支持使用静态......
  • Pipeline模式应用
    本文记录Pipeline设计模式在业务流程编排中的应用前言Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整......
  • day11 Jenkins Pipeline语法-Jenkins基于Gitlab的授权认证 (4.3.1-4.4)
    一、JenkinsPipeline语法上JenkinsPipeline语法Jenkins有多种方式实现交付流水线。其中,JenkinsPipeline是一种比较流行的方式,它提供一个DSL(DomainSpecificLanguage的缩写,)来描述交付流水线。官网地址:https://www.jenkins.io/doc/book/pipeline/syntax/1、什么是Jenkin......
  • Tekton pipelineruns 基础
    pipelineruns概述PipelineRun允许你在集群上实例化和执行Pipeline。一个Pipeline指定一个或多个Tasks,按照期望的执行顺序执行。PipelineRun按照指定的顺序执行Pipeline中的Tasks,直到所有Tasks都成功执行或失败。PipelineRun会自动为Pipeline中的每个Task创建相应的taskrun。pi......
  • Tekton Pipelines 基础
    Pipelines概述Pipeline是Tasks的集合,作为持续集成流的一部分,您可以定义并按照特定的执行顺序排列这些Tasks。Pipeline中的每个Tasks在Kubernetes集群上作为Pod执行。您可以配置各种执行条件来满足您的业务需求。Pipeline使用When表达式when表达式input:被评估的内容,支持使用......
  • Vulkan/Graphics Pipelines
    渲染是vulkan最基础的功能,也是众多图形化应用最核心的部分。vulkan的渲染过程可以当作是通过执行不同阶段的命令以此来在展示设备上渲染出图片的过程。 vulkan中,渲染管线可以看作是一条生产流水线,命令在管线的开头进入,并且在管线内不同阶段执行。每个阶段都有诸如变换,读取命令......