首页 > 其他分享 >Flyte工作流平台调研(二)——核心概念说明

Flyte工作流平台调研(二)——核心概念说明

时间:2024-12-31 11:56:59浏览次数:10  
标签:启动 平台 工作 任务 Flyte 节点 输入 调研

Flyte 是一个面向数据和机器学习工作流的开源分布式处理平台,它通过任务(Task)和工作流(Workflow)的概念,为用户提供了一种构建、调试和运行可重复、可扩展的工作流的能力。本文基于 Flyte 的核心概念,结合 Flyte 官网和我的调研内容,详细介绍 Flyte 的体系结构及其核心功能。

Task(任务):Flyte 的最小执行单元

概念

任务是 Flyte 的完全独立的执行单元,也是其核心实体。任务是构建工作流的基本模块,同时也是扩展 Flyte 功能的切入点,任务主要封装了用户的代码。

特点:

  1. 唯一标识:每个任务必须拥有一个唯一的名字和版本号,用于识别。
  2. 接口签名:任务可以定义输入输出变量(如编程语言中的方法签名)。
  3. 可重复性:相同输入多次执行,必须返回一致的输出。
  4. 退出机制:任务必须拥有明确优雅的退出逻辑,避免资源泄露。
  5. 缓存能力:Flyte 支持缓存任务执行结果,减少重复计算。
  6. 自定义扩展:Flyte 支持用户定义自定义任务类型,灵活扩展任务功能。

注意:

  • 任务应该尽量避免直接访问外部系统(如 Web 服务),以提升可重复性。
  • 任务通常封装在容器(如 Docker 镜像)中运行,确保环境一致性。

扩展任务 (Extending Task)

插件 (Plugins)

Flyte 提供了一个可扩展的模型,用于以与执行无关的语言表达任务。它包含了一些一流的任务插件(例如:Papermill、Great Expectations 等),这些插件用于执行 Flyte 任务。几乎所有操作都可以作为“插件”引入到 Flyte 中,例如:

  • 在分布式数据仓库(如 Redshift、Hive、Snowflake 等)上运行查询的任务。
  • 在计算引擎(如 Spark、Flink、AWS Sagemaker、AWS Batch、Kubernetes Pods 等)上运行的任务。
  • 调用 Web 服务的任务。

Flyte 提供了一些默认配置,例如运行简单的 Python 函数无需托管服务,Flyte 知道如何在 Kubernetes 上执行此类任务。这种方式适用于机器学习中绝大多数的任务,Flyte 能够在 Kubernetes 上以超大规模高效运行。

任务类型 (Types)

由于任务的执行单元因任务类型而异,因此 Flyte 支持在系统中定义不同类型的任务。Flyte 提供了一组经过验证的任务类型,同时允许灵活定义新类型。

内在特性 (Inherent Features)

容错性 (Fault Tolerance)

在任何分布式系统中,失败是不可避免的。Flyte 的目标是允许用户设计容错系统(例如工作流)。任务通过以下两个参数实现容错:

  1. 重试机制 (Retries)

    • 系统重试 (System Retries)
      系统重试针对系统级别的可恢复失败(例如下游服务失败或网络中断)。当任务因系统错误失败时,系统会根据设定的重试次数重新尝试任务。

      • 下游系统重试 (Downstream System Retry)
        针对下游服务(或远程服务)不可用的情况,例如服务返回 500 错误或网络中断等,系统会重新尝试执行任务。
      • 瞬态失败重试 (Transient Failure Retry)
        针对用户无法感知的瞬态失败提供容错能力。Flyte 会尝试持续保存任务状态,直到达到最大重试次数。如果重试失败,则会进行回退(Backoff)。
    • 用户重试 (User Retries)
      如果任务执行失败,可以根据用户在 TaskMetadata 中定义的重试次数重新尝试。重试次数不得超过 10 次。

    注意:可恢复失败会被重试并计入任务的重试计数,而不可恢复失败(例如用户异常)则不会被重试。

  2. 超时 (Timeouts)
    系统为任务定义了默认超时时间,任务作者可以自定义超时时间。一旦任务超时,会被标记为失败,但仍会根据重试策略进行重试。

缓存/记忆化 (Caching/Memoization)

Flyte 支持任务输出的记忆化,以确保相同的任务调用不会重复执行,从而节省计算资源和时间。例如,如果希望多次运行相同的代码,可以复用之前的输出,而不是重新计算。

重试与 Spot 实例

Flyte 支持在 Spot(抢占式)实例上运行任务,同时提供以下重试策略:

  1. 系统重试

    • Spot 实例抢占。

    • 网络问题。

    • 服务不可用。

    • 硬件故障。

    最后一次重试会自动在非抢占式实例上运行,以确保任务完成。

  2. 用户重试
    由用户在 @task 装饰器中定义,例如:

@task(retries=3)  # 用户重试次数设置为 3 次
def my_task() -> None:
    ...

替代重试行为 (Alternative Retry Behavior)

从 1.10.0 开始,Flyte 提供了一种简化的重试行为,系统和用户重试会共享单一的重试预算 (Retry Budget)。要启用此功能:

  1. 在 Helm 配置中设置 configmap.core.propeller.node-config.ignore-retry-causetrue
  2. 在任务装饰器中定义重试次数,作为总重试预算。

这种方法使重试行为更加简单和可预测,同时保持可靠性。

示例

Flyte中的任务在其自己的容器中运行,并在Kubernetes的Pod上执行。任务可以分为以下两种类型:

  1. 与Python函数相关联的任务:执行任务等同于执行该Python函数。
  2. 不与Python函数相关联的任务:例如SQL查询、便携任务(如SageMaker中的预构建算法)或调用API的服务。

Flyte为任务提供了许多插件,包括Athena等后端插件。

以下示例展示了如何编写并执行一个Python函数任务。

开始

首先,从flytekit库中导入task

# basics/task.py 
from flytekit import task

使用@task装饰器是定义PythonFunctionTask任务的必要步骤。任务本质上是一个普通的Python函数,但要求所有的输入和输出必须明确注解其类型。有关支持的类型,请参阅类型系统部分。

以下示例创建了一个计算回归线斜率的任务:

@task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)

注意:Flytekit会为输出变量分配一个默认名称,比如o0。如果有多个输出,输出变量会按顺序编号,从0开始,例如o0, o1, o2,以此类推。


本地运行

你可以像调用普通的Python函数一样调用一个Flyte任务:

if __name__ == "__main__":
    print(slope(x=[-3, 0, 3], y=[7, 4, -2]))

注意:调用Flyte任务时,需要使用关键字参数为对应的参数指定值。


使用pyflyte run本地运行

以下是运行代码的命令:

pyflyte run \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/basics/basics/task.py \
  slope --x '[-3,0,3]' --y '[7,4,-2]'

在Flyte集群上远程运行

如果你想在Flyte集群上远程运行任务,只需在pyflyte run命令中添加--remote标志:

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/basics/basics/task.py \
  slope --x '[-3,0,3]' --y '[7,4,-2]'

注意

Workflow(工作流):任务的有序编排

概念

工作流是由节点封装的工作单元组成的有向无环图 (DAG)。工作流的具体实例(通常绑定了输入参数)被称为工作流执行 (Workflow Executions),或简称为执行 (Executions)。换句话说,工作流是有序任务执行的模板。

Flyte 工作流使用 Protobuf 定义,Flytekit SDK 提供了编写工作流的工具。用户可以将工作流定义为节点的集合。工作流中的节点可以生成输出,而后续节点可以将这些输出作为输入。这些依赖关系决定了工作流的结构。

使用 SDK 编写的工作流无需显式定义节点来封装执行单元(任务、子工作流、启动计划);这些节点会由 SDK 在注册时自动注入。

特点:

  1. 节点化结构:工作流由多个节点(Node)组成,每个节点封装一个任务或子工作流。
  2. 任务重用:相同的任务定义可以在不同的工作流中重复使用。
  3. 灵活并行:工作流允许独立的任务节点并行执行,优化执行效率。
  4. 嵌套支持:工作流可以包含子工作流,甚至触发外部工作流。
  5. 版本管理:工作流和任务都支持版本管理,确保执行时的一致性。

执行逻辑:

  • 当某节点的输入数据准备就绪时,节点立即触发执行。
  • Flyte 会根据节点间的依赖关系,自动安排任务的并行与串行执行。

结构 (Structure)

工作流接受输入、生成输出,并在不同项目和域中重用任务定义。每个工作流都有一个默认的启动计划 (Launchplan),其名称与工作流名称相同。

工作流的结构非常灵活:

  1. 节点可以并行执行。
  2. 相同的任务定义可以在不同的工作流中复用。
  3. 单个工作流可以包含任何组合的任务类型。
  4. 工作流可以只包含一个功能节点。
  5. 工作流可以包含多个以各种方式排列的节点。
  6. 工作流可以启动其他工作流。

在执行时,只要节点的输入可用,其执行就会被触发。

当可能时,工作流节点会自然地并行运行。例如,当工作流中有 5 个独立的节点(即这些节点不会消费其他节点生成的输出)时,Flyte 会根据数据和资源限制并行运行这些节点。

Flyte 特定结构 (Flyte-Specific Structure)

在注册过程中,Flyte 会验证工作流结构并保存工作流。注册过程会更新工作流图表 (Workflow Graph)。编译后的工作流总会在工作流图表中注入一个开始节点和结束节点。此外,错误处理器 (Failure Handler) 会捕获并处理执行失败。

版本控制 (Versioning)

与任务类似,工作流也是有版本控制的。注册后的工作流是不可变的,即由特定 {Project, Domain, Name, Version} 组合定义的工作流实例不能被更新。工作流中引用的任务版本是不可变的,并且绑定到具体任务的版本。

示例

工作流用于将多个任务链接在一起。它们可以被写成Python函数,但需要明确区分任务(Task)和工作流(Workflow):

  • 任务的主体在运行时会在Kubernetes集群、查询引擎(如BigQuery)或托管服务(如AWS Batch或SageMaker)上执行计算。
  • 工作流的主体不执行计算,而是用来组织任务。工作流的主体会在注册阶段执行,注册过程涉及将打包(序列化)的代码上传到Flyte后端,从而使工作流可以被触发。

有关注册的更多信息,请参阅注册文档。

开始

首先,从flytekit库中导入task()workflow()

# basics/workflow.py
from flytekit import task, workflow

我们定义了slopeintercept任务,分别用于计算回归线的斜率和截距:

# basics/workflow.py
@task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)


@task
def intercept(x: list[int], y: list[int], slope: float) -> float:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept

定义一个工作流来建立任务之间的依赖关系。与任务一样,工作流也是强类型的:

# basics/workflow.py
@workflow
def simple_wf(x: list[int], y: list[int]) -> float:
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value)
    return intercept_value

工作流的解析与执行

@workflow装饰器将Flyte任务封装起来,基本上表示延迟求值的Promise。在解析过程中,函数调用会被推迟到执行时。函数调用会生成Promises,这些Promises可以被传递给下游的函数,但在工作流内部并不可访问。真正的求值发生在工作流执行时。

工作流可以通过本地运行来立即求值,也可以通过工具如pyflyteflytectl或UI触发求值。尽管带有@workflow装饰器的工作流看起来像Python函数,但它实际上是一个类Python的领域专用语言(DSL)。当遇到一个带有@task装饰器的Python函数时,会生成一个Promise对象。这个Promise对象不存储任务的实际输出,其内容只有在执行时才能获得。此外,传递给工作流的输入也都是Promise,您只能将Promise传递给任务、工作流或其他Flyte结构。

注意

有关动态工作流的更多信息,请参考动态工作流。与简单工作流不同,动态工作流的输入是预先物化的。但在动态工作流中,每个任务调用仍然会生成一个Promise,该Promise依然是延迟求值的。请注意,工作流可以包含任务、其他工作流和动态工作流。

本地运行工作流

您可以像调用普通Python函数一样调用一个工作流,并提供必要的输入:

# basics/workflow.py
if __name__ == "__main__":
    print(f"Running simple_wf() {simple_wf(x=[-3, 0, 3], y=[7, 4, -2])}")

使用pyflyte run本地运行

运行工作流的命令如下:

pyflyte run \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/basics/basics/workflow.py \
  simple_wf --x '[-3,0,3]' --y '[7,4,-2]'

在Flyte集群上远程运行

若要在Flyte集群上远程运行工作流,只需在pyflyte run命令中添加--remote标志:

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/basics/basics/workflow.py \
  simple_wf --x '[-3,0,3]' --y '[7,4,-2]'

独立调试任务

虽然工作流通常由多个任务构成,并通过共享输入和输出建立依赖关系,但在开发和迭代任务逻辑时,隔离单个任务的执行是非常有利的。每次为此目的创建新的工作流定义可能会显得繁琐。然而,独立执行单个任务,不受工作流限制,是轻松迭代任务逻辑的一种便捷方法。

使用partial为任务提供默认参数

您可以使用functools.partial()函数为任务的参数赋予默认值或常量值:

# basics/workflow.py
import functools

@workflow
def simple_wf_with_partial(x: list[int], y: list[int]) -> float:
    partial_task = functools.partial(slope, x=x)
    return partial_task(y=y)

Node(节点):工作流中的工作单元

节点是工作流中执行单元或工作的一个表示。通常,一个节点封装了一个任务的实例,但它也可以包含一个完整的子工作流或触发外部工作流。节点可以拥有输入和输出,这些输入和输出被用来协调任务的输入和输出。此外,节点的输出可以作为工作流中其他节点的输入。

任务始终被封装在节点中。与任务类似,节点也有多种类型,取决于它们的目标。这些目标包括任务节点、工作流节点和分支节点。

  1. 任务节点 (Task Nodes):在工作流中引用的任务始终被封装在节点中。这适用于所有任务类型。例如,一个数组任务会被封装在一个单独的节点中。
  2. 工作流节点 (Workflow Nodes):一个节点可以包含一个完整的子工作流。由于工作流的执行始终需要一个启动计划 (launch plan),工作流节点会引用一个启动计划来触发其所包含的工作流。
  3. 分支节点 (Branch Nodes):分支节点改变工作流图的流程。在运行时,会根据条件进行评估以决定控制流的走向。

Launch Plan(启动计划):工作流的触发方式

概念

启动计划用于帮助执行工作流。一个工作流可以与多个启动计划及其版本相关联,但每个启动计划始终与一个特定的工作流绑定。在创建启动计划后,便于共享和执行这些计划。

启动计划提供了一种模板化 Flyte 工作流调用的方法。启动计划包含一组绑定的工作流输入,这些输入作为参数传递以创建执行。启动计划不一定包含工作流所需的所有输入,但启动计划始终是触发执行所必需的。在执行时,可以提供额外的输入参数以补充启动计划中静态输入值。

除了模板化的输入外,启动计划还允许用户以一个或多个调度计划运行工作流。每个启动计划可以选择性地定义一个调度计划(该计划可以通过禁用启动计划轻松禁用),并可以设置通知。有关通知的详细信息,请参考[通知文档 (Notifications)]。

工作流与启动计划之间的关联

每个工作流都有一个默认启动计划,其名称与工作流名称相同。默认启动计划是创建新工作流时(在代码中)生成的。一个启动计划版本只能与一个工作流版本绑定,这意味着启动计划版本不能重复使用。这是因为新的启动计划版本包含了特定工作流版本的映射关系。

注意
用户很少直接与默认启动计划交互。

例如,假设我们有工作流 A 的版本 1,以及启动计划 A 和 B 的版本 1,启动计划 B 的版本 2:

  • 工作流 A 可以关联到启动计划 A(版本 1)。
  • 工作流 A 可以关联到启动计划 B(不同的启动计划名称;版本 1)。
  • 工作流 A 可以关联到启动计划 B(版本 2)。

启动计划的功能

  1. 一键调用:通过预定义输入和友好的启动计划名称,一键调用工作流。
  2. 多种调度:为每个工作流定义多个具有不同默认输入值的调度计划。
  3. 启用/禁用调度:轻松启用或禁用调度计划。
  4. 动态或静态创建:通过 FlyteClient 动态创建,或使用 Flyte SDK 静态创建。
  5. 通知关联:将不同的通知与工作流关联。
  6. 限制输入:通过 fixed_inputs 参数限制工作流启动时的输入。
  7. 版本控制:一个启动计划可以有多个版本(具有相同名称),但只有一个活跃版本。调度仅反映在活跃的启动计划版本上。

启动计划输入 (Launch Plan Inputs)

通常,启动计划的输入与其相关工作流定义的输入相对应,即变量类型和名称需要匹配。启动计划不能引入核心工作流定义中未定义的输入。然而,启动计划输入与工作流输入略有不同,启动计划输入分为默认输入固定输入两类。

默认输入 (Default Inputs)

默认输入的行为类似于默认工作流输入。顾名思义,默认输入在执行时提供默认的工作流输入值,如果没有动态提供的值,则使用默认输入。

固定输入 (Fixed Inputs)

固定输入无法被覆盖。如果通过启动计划执行工作流,并尝试用动态输入重新定义启动计划的固定输入,则执行创建请求会失败。

示例

概述

启动计划链接工作流所需的部分或全部输入,并可附加运行时覆盖选项,例如通知、调度等。它们的功能包括:

  • 在多个时间点调度同一工作流,并可选择预定义输入。
  • 运行特定工作流,但更改通知设置。
  • 共享具有预定义输入的工作流,允许其他用户启动执行。
  • 共享工作流,并允许其他用户覆盖某些输入。
  • 共享工作流,同时确保某些输入保持不变。

启动计划是调用工作流执行的唯一方式。当工作流被序列化并注册时,会生成一个默认启动计划。该默认启动计划可以绑定默认的工作流输入以及项目 Flytekit 配置中定义的运行时选项(例如用户角色)。

注意

  • 要克隆并运行本页面中的示例代码,请参考 Flytesnacks 仓库
  • 首先,导入必要的库:
# basics/launch_plan.py
from flytekit import LaunchPlan, current_context

workflow.py 文件中导入要为其创建启动计划的工作流:

# basics/launch_plan.py
from .workflow import simple_wf

创建启动计划过程

1. 创建一个没有输入的默认启动计划(序列化期间):

# basics/launch_plan.py
default_lp = LaunchPlan.get_default_launch_plan(current_context(), simple_wf)

2. 本地运行启动计划:

# basics/launch_plan.py
default_lp(x=[-3, 0, 3], y=[7, 4, -2])

3. 创建启动计划并指定默认输入:

# basics/launch_plan.py
simple_wf_lp = LaunchPlan.create(
    name="simple_wf_lp", workflow=simple_wf, default_inputs={"x": [-3, 0, 3], "y": [7, 4, -2]}
)

4. 本地触发启动计划:

# basics/launch_plan.py
simple_wf_lp()

5. 覆盖默认输入:

# basics/launch_plan.py
simple_wf_lp(x=[3, 5, 3], y=[-3, 2, -2])

6. 锁定启动计划输入,防止执行期间被覆盖:

# basics/launch_plan.py
simple_wf_lp_fixed_inputs = LaunchPlan.get_or_create(
    name="fixed_inputs", workflow=simple_wf, fixed_inputs={"x": [-3, 0, 3]}
)

注意

  • 在一个启动计划中,可以结合使用默认输入和固定输入。
  • 启动计划还可以用于在特定节奏上运行工作流。有关详细信息,请参考[调度文档 (Schedules)]。

Schedule(调度):工作流的定时触发

概述

工作流可以通过与启动计划 (Launch Plans) 关联的调度自动运行。

  • 对于特定的 {Project, Domain, Name} 组合,仅一个启动计划版本可以处于激活状态,这意味着一个启动计划只能有一个激活的调度。
  • 一个工作流版本可以关联多个调度,只要这些调度存在于不同启动计划的版本中。
  • 创建新调度会生成启动计划的新版本。如果需要更改调度,必须创建该启动计划的新版本,因为调度无法直接编辑。

FlyteAdmin 会跟踪新添加的调度,并在所有启动计划版本中将其他调度设置为“非激活”状态。

以前被设置为非激活状态的启动计划版本可以手动使用,方法是点击启动按钮并选择特定的启动计划版本。

定义调度方式

调度可以通过 cron_expressionrate_unit 定义。


Cron 表达式 (Cron Expression)

Cron 表达式字符串使用如下语法,并在启动计划注册时进行验证。

格式 (Format)

一个 Cron 表达式通过 5 个用空格分隔的字段定义一组时间:

字段名称必填允许值允许的特殊字符
分钟 (Minutes)0-59* / , -
小时 (Hours)0-23* / , -
日期 (Day of month)1-31* / , - ?
月份 (Month)1-12 或 JAN-DEC* / , -
星期 (Day of week)0-6 或 SUN-SAT* / , - ?

注意MonthDay of week 字段不区分大小写。

Cron 调度示例

如果 Cron 表达式有误,则会导致调度触发失败。例如:

cron_lp_every_min_of_hour = LaunchPlan.get_or_create(
    name="my_cron_scheduled_lp",
    workflow=date_formatter_wf,
    schedule=CronSchedule(
        schedule="@hourly",  # 每小时的开始运行
        kickoff_time_input_arg="kickoff_time",
    ),
)

固定时间间隔调度 (Fixed Rate Schedules)

除了使用 Cron 表达式,还可以使用固定时间间隔调度。

  • 可以通过 timedelta 来指定调度的间隔时间(支持分钟、小时、天和周)。

以下是以分钟为间隔的示例:

fixed_rate_lp_minutes = LaunchPlan.get_or_create(
    name="my_fixed_rate_lp_minutes",
    workflow=positive_wf,
    schedule=FixedRate(duration=timedelta(minutes=30)),
)

以下是以天为间隔的示例:

fixed_rate_lp_days = LaunchPlan.get_or_create(
    name="my_fixed_rate_lp_days",
    workflow=positive_wf,
    schedule=FixedRate(duration=timedelta(days=1)),
    fixed_inputs={"name": "you"},
)

速率单位 (Rate Unit)

调度还可以使用固定速率定义,支持以天 (days)、小时 (hours)、分钟 (minutes) 为单位。

Execution(执行):工作流的实例化

执行是工作流、节点或任务的实例,由用户请求的执行或调度执行在系统中创建。执行 ID 在特定的项目域 (Project Domain) 内是唯一的,确保同一域中没有两个执行共享相同的 ID。

使用 Flytectl 的典型流程

当通过 UI/Flytecli 或其他无状态系统触发工作流执行时,系统会首先调用 getLaunchPlan 接口,检索匹配指定版本的启动计划。启动计划定义包含为工作流声明的所有输入变量的定义。

  1. 用户端验证输入
    用户侧组件确保所有必需的输入都已提供,并向 FlyteAdmin 服务发出执行请求。

  2. FlyteAdmin 验证输入
    FlyteAdmin 服务验证输入,确保所有输入参数都已指定,并且(如有需要)在声明的范围内。

  3. 工作流转译
    FlyteAdmin 随后获取之前验证并编译的工作流封闭定义 (Workflow Closure),并将其转译为可执行格式,其中包含所有输入参数。

  4. 执行启动
    转译后的工作流被部署在 Kubernetes 上,同时在数据库中创建一条执行记录。

Flyte 的工作流生命周期

Flyte 工作流的生命周期可以分为三个主要阶段:定义、拆分、执行。

1. 定义工作流

  • 用户通过 FlyteKit(SDK)定义任务和工作流,描述任务的输入、输出及依赖关系。
  • 提交时,Flyte 编译器会将用户定义的 Python 代码转换为中间表示,生成工作流模板。

2. 任务拆分

  • Flyte 编译器解析任务间的依赖关系,生成任务执行图。
  • 对于无依赖关系的任务,Flyte 自动并行调度。

3. Kubernetes 上的执行

  • Flyte 将任务映射为 Kubernetes Pod,并根据任务定义的 Docker 镜像启动容器执行。
  • FlytePropeller 负责工作流调度,根据依赖关系触发任务运行。
  • 任务结果存储在共享数据存储中,后续任务可以直接访问。

Flyte 的独特优势

  1. 可扩展性:支持自定义任务类型,适配不同的业务场景。
  2. 任务复用:相同的任务可以在多个工作流中重复使用,减少重复开发。
  3. 高效并行:通过节点并行执行提升工作流效率。
  4. 版本控制:任务和工作流的版本管理保证了执行过程中的一致性和可追溯性。
  5. 缓存机制:任务执行结果可以缓存,避免重复计算。

Flyte 的核心概念为数据和机器学习工作流提供了强大的支持,尤其适用于复杂的分布式任务管理场景。通过任务和工作流的抽象化,Flyte 将用户从底层调度细节中解放出来,让开发者可以专注于业务逻辑和模型开发。在实际使用 Flyte 的过程中,掌握这些核心概念,能够帮助我们更加高效地构建和管理工作流,释放生产力的潜能!

后记

Flyte调研其他文章:

Flyte工作流平台调研(一)——整体架构

Flyte工作流平台调研(三)——核心组件原理

Flyte工作流平台调研(四)——服务部署和扩展集成

Flyte工作流平台调研(五)——跟Ray框架对比

Flyte工作流平台调研(六)——核心源码走读

标签:启动,平台,工作,任务,Flyte,节点,输入,调研
From: https://blog.csdn.net/weixin_43837507/article/details/144843274

相关文章