当前位置: 首页 > news >正文

生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans

生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans

Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。

文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。

Flyte

启动计划

启动计划是工作流调用的模板。它将以下元素整合在一起:

  • 一个工作流
  • 启动该工作流所需的(可能不完整)输入参数集合
  • 可选的通知和调度计划

当调用时,启动计划会传递输入参数来启动工作流。如果启动计划未包含所有必需的工作流输入参数,则需要在执行时提供额外的输入参数。

默认启动计划

每个工作流自动附带一个_默认启动计划_。该启动计划不定义任何默认输入参数,因此所有参数必须在执行时提供。默认启动计划始终与其对应工作流同名。

启动计划具有版本控制

与任务和工作流一样,启动计划具有版本控制。可以更新启动计划来更改输入参数集合、调度计划或通知配置。每次更新都会创建新的启动计划版本。

自定义启动计划

除了默认启动计划外,可以为任何工作流定义额外的启动计划。通常,一个工作流可以关联多个启动计划,但每个启动计划只能关联一个特定工作流。

查看工作流的启动计划

要查看指定工作流的启动计划,在UI中导航至工作流页面并点击Launch Workflow。从Launch Plan下拉菜单中可选择用于启动工作流的启动计划。默认情况下会选中默认启动计划。如果未为该工作流定义任何自定义启动计划,则仅显示默认计划。若已定义自定义启动计划,它们将与默认计划一起显示在下拉菜单中。更多细节请参考运行启动计划。

注册启动计划

通过命令行注册启动计划

大多数情况下,启动计划与项目代码中的工作流和任务一起定义,并通过CLI与其他实体一起批量注册(参见运行代码)。

使用FlyteRemote在Python中注册启动计划

与所有Flyte命令行操作类似,您也可以通过编程方式使用FlyteRemote注册启动计划,具体方法是调用FlyteRemote.register_launch_plan

注册结果

当上述代码注册到Flyte时,会创建四个对象:

  • 任务 workflows.launch_plan_example.my_task
  • 工作流 workflows.launch_plan_example.my_workflow
  • 默认启动计划 workflows.launch_plan_example.my_workflow(注意其名称与工作流相同)
  • 自定义启动计划 my_workflow_custom_lp(即我们在代码中定义的那个)

修改启动计划

通过修改代码中的定义并重新注册来更改启动计划。当重新注册具有相同项目(project)、域(domain)和名称的启动计划时,将创建该启动计划的新版本。

定义启动计划

您可以使用 LaunchPlan 类 定义启动计划。

以下是一个定义启动计划的简单示例:

import flytekit as fl@fl.workflow
def my_workflow(a: int, b: str) -> str:return f"Result: {a} and {b}"# 创建默认启动计划
default_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow)# 创建命名启动计划
named_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_custom_launch_plan"
)

默认与固定输入

默认输入可在执行时被覆盖,而固定输入不可修改。

import flytekit as fl# 带默认输入的启动计划
lp_with_defaults = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_defaults",default_inputs={"a": 42, "b": "default_value"}
)# 带固定输入的启动计划
lp_with_fixed = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_fixed",fixed_inputs={"a": 100}  # 'a' 将始终为 100,只有 'b' 可被指定
)# 组合默认与固定输入
lp_combined = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="combined_inputs",default_inputs={"b": "default_string"},fixed_inputs={"a": 200}
)

定时执行

import fl
from datetime import timedelta
from flytekit.core.schedule import CronSchedule, FixedRate# 使用 cron 调度(每周一 UTC 时间 10:00 AM 运行)
cron_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="weekly_monday",default_inputs={"a": 1, "b": "weekly"},schedule=CronSchedule(schedule="0 10 * * 1",  # Cron 表达式: 分钟 小时 日 月 周几kickoff_time_input_arg=None)
)# 使用固定频率调度(每 6 小时运行)
fixed_rate_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="every_six_hours",default_inputs={"a": 1, "b": "periodic"},schedule=FixedRate(duration=timedelta(hours=6))
)

标签与注解

标签和注解有助于组织管理,可用于过滤或添加元数据。

import fl
from flytekit.models.common import Labels, Annotations# 添加标签和注解
lp_with_metadata = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_metadata",default_inputs={"a": 1, "b": "metadata"},labels=Labels({"team": "data-science", "env": "staging"}),annotations=Annotations({"description": "测试用启动计划", "owner": "jane.doe"})
)

执行参数

import fl# 设置最大并行度限制并发任务执行
lp_with_parallelism = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_parallelism",default_inputs={"a": 1, "b": "parallel"},max_parallelism=10  # 最多允许 10 个任务节点并发执行
)# 禁用该启动计划的执行缓存
lp_no_cache = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="no_cache",default_inputs={"a": 1, "b": "fresh"},overwrite_cache=True  # 总是全新执行,忽略缓存结果
)# 注册时自动激活
lp_auto_activate = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="auto_active",default_inputs={"a": 1, "b": "active"},auto_activate=True  # 注册后立即激活启动计划
)

安全与认证

我们可以覆盖用于执行启动计划的认证角色(IAM 角色或 Kubernetes 服务账户)。

import fl
from flytekit.models.common import AuthRole
from flytekit import SecurityContext# 为启动计划设置认证角色
lp_with_auth = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_auth",default_inputs={"a": 1, "b": "secure"},auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/my-execution-role")
)# 设置安全上下文
lp_with_security = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_security",default_inputs={"a": 1, "b": "context"},security_context=SecurityContext(run_as=SecurityContext.K8sServiceAccount(name="my-service-account"))
)

原始输出数据配置

from flytekit.models.common import RawOutputDataConfig# 配置大型输出存储位置
lp_with_output_config = LaunchPlan.get_or_create(workflow=my_workflow,name="with_output_config",default_inputs={"a": 1, "b": "output"},raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://my-bucket/workflow-outputs/")
)

完整整合示例

以下是一个较为全面的示例。该自定义启动计划包含:

comprehensive_lp = LaunchPlan.get_or_create(workflow=my_workflow,name="comprehensive_example",default_inputs={"b": "configurable"},fixed_inputs={"a": 42},schedule=CronSchedule(schedule="0 9 * * *"),  # 每日 UTC 时间 9:00 AMnotifications=[\Notification(\phases=["SUCCEEDED", "FAILED"],\email=EmailNotification(recipients_email=["team@example.com"])\)\],labels=Labels({"env": "production", "team": "data"}),annotations=Annotations({"description": "每日数据处理"}),max_parallelism=20,overwrite_cache=False,auto_activate=True,auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/workflow-role"),raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://results-bucket/daily-run/")
)

这些示例展示了 Flyte 中启动计划的灵活性,您可以根据工作流需求自定义执行参数、输入、调度等多种配置。

查看启动计划

通过用户界面查看启动计划

在侧边栏选择Launch Plans,将显示项目与域中所有已注册启动计划的列表:

启动计划列表

您可以通过以下方式筛选:

  • 按名称搜索启动计划
  • 过滤仅显示已归档的启动计划

启动计划表格列定义如下:

  • 名称:启动计划的名称。点击可查看具体详情
  • 触发器
    • 若启动计划处于激活状态,会显示绿色Active徽章。激活状态下,所有关联的调度将生效并按计划触发
    • 显示是否包含触发器。可通过右上角Has Triggers复选框过滤含触发器的启动计划
  • 最后执行:最近一次执行的时间戳(包含调度触发、手动触发等所有方式)
  • 最近10次执行:以可视化方式展示最近10次执行记录(包含所有触发方式)

点击列表条目可进入具体启动计划视图:

启动计划详情

在此界面可查看:

  • 启动计划详情(最新版本)
    • 预期输入:启动计划的输入输出类型
    • 固定输入:显示预定义的输入值(如有)
  • 启动计划版本:该计划的所有历史版本列表
  • 所有执行记录:该计划的所有执行历史

右上角显示激活状态(若激活则显示具体激活版本),并提供版本切换或完全停用的控制选项。详见激活与停用

通过uctl命令行查看启动计划

查看项目与域中所有启动计划:

$ uctl get launchplans \--project <project-id> \--domain <domain>

查看具体启动计划:

$ uctl get launchplan \--project <project-id> \--domain <domain> \<launch-plan-name>

更多详情请参考Uctl CLI文档

通过Python的FlyteRemote查看启动计划

使用FlyteRemote.client.list_launch_plans_paginated方法获取启动计划列表。

通知

一个启动计划(launch plan)可以关联一个或多个通知,当该启动计划关联的工作流(workflow)执行完成时,这些通知会被触发。

共有三种类型的通知:

  • Email: 向指定收件人发送电子邮件
  • PagerDuty: 向配置的PagerDuty服务发送通知(需指定接收方)。PagerDuty将根据您的配置转发通知
  • Slack: 向指定Slack频道的关联邮箱地址发送通知。此功能要求预先配置Slack账户以接收通知

可以根据工作流执行的不同最终状态发送对应的通知。可选状态包括:

  • WorkflowExecutionPhase.ABORTED(执行中止)
  • WorkflowExecutionPhase.FAILED(执行失败)
  • WorkflowExecutionPhase.SUCCEEDED(执行成功)
  • WorkflowExecutionPhase.TIMED_OUT(执行超时)

示例:

from datetime import datetimeimport flytekit as flfrom flytekit import (WorkflowExecutionPhase,Email,PagerDuty,Slack
)@fl.task
def add_numbers(a: int, b: int, c: int) -> int:return a + b + c@fl.task
def generate_message(s: int, kickoff_time: datetime) -> str:return f"sum: {s} at {kickoff_time}"@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime) -> str:return generate_message(add_numbers(a, b, c),kickoff_time,)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},notifications=[\Email(\phases=[WorkflowExecutionPhase.FAILED],\recipients_email=["me@example.com", "you@example.com"],\),\PagerDuty(\phases=[WorkflowExecutionPhase.SUCCEEDED],\recipients_email=["myboss@example.com"],\),\Slack(\phases=[\WorkflowExecutionPhase.SUCCEEDED,\WorkflowExecutionPhase.ABORTED,\WorkflowExecutionPhase.TIMED_OUT,\],\recipients_email=["your_slack_channel_email"],\),\],
)

调度计划

启动计划允许您对工作流的定时调用进行调度。一个启动计划可以关联一个或多个调度方案,但同一时间最多只能有一个调度处于激活状态。如果在启动计划上激活了调度,系统将按照预定时间自动调用工作流,并使用启动计划提供的输入参数。

要为启动计划添加调度方案,请按以下方式向启动计划添加调度对象:

from datetime import timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10))
)

这里我们指定了FixedRate调度方案,系统将每10分钟调用一次工作流。固定频率调度也可以使用天数或小时数来定义。

或者,您也可以指定CronSchedule:

import flytekit as fl
from flytekit import CronSchedule@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=CronSchedule(schedule="*/10 * * * *")
)

kickoff_time_input_arg

FixedRateCronSchedule都可以接受名为kickoff_time_input_arg的可选参数。

该参数用于指定工作流输入参数的名称。每次系统通过此调度调用工作流时,调用的时间将通过指定的参数传递给工作流。例如:

from datetime import datetime, timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10),kickoff_time_input_arg="kickoff_time")
)

在此示例中,每次调度调用my_workflow时,调用时间都会通过kickoff_time参数传递。

激活与停用

您可以为启动计划设置激活/停用状态。具体规则如下:

  • 在具有相同名称的启动计划版本中,最多只能有一个版本处于激活状态,其他所有版本均为停用状态

  • 若激活包含执行计划的启动计划版本,其关联的执行计划也会自动激活,工作流将按照该计划自动触发

  • 当包含执行计划的启动计划版本处于停用状态时,其关联的执行计划也会停用,不会用于触发工作流程

未关联执行计划的启动计划也可以设置激活版本。对于此类非计划型启动计划,激活状态可作为版本标识,用于区分不同版本。例如,管理逻辑可依据此状态决定使用哪个版本进行新调用。

新注册的启动计划首个版本默认为停用状态。若该版本包含执行计划,该计划同样处于停用状态。一旦激活,该版本将保持激活状态,即使后续注册新版本也不会改变其状态。

包含执行计划的启动计划版本可通过以下方式激活:用户界面、uctl命令行工具或FlyteRemote

通过用户界面激活/停用启动计划

激活操作步骤:

  1. 进入启动计划视图
  2. 点击屏幕右上角的添加激活启动计划按钮:

激活计划

  1. 在弹出的模态框中选择要激活的版本:

激活计划

该列表仅显示包含执行计划的版本。注意同一时间只能激活一个版本(即最多一个执行计划)。

选择版本后点击更新即可激活该版本及其执行计划。系统将根据执行计划定期触发工作流。

注意

  • 未关联执行计划的启动计划无法通过UI激活
  • UI不支持管理无执行计划的启动计划状态,需使用uctlFlyteRemote

停用操作步骤:

  1. 定位到包含激活计划的启动计划
  2. 点击Active launch plan旁的**…**图标
  3. 选择"停用"选项:

停用计划

  1. 在确认模态框中完成停用操作

注意

  • 未关联执行计划的启动计划无法通过UI停用
  • 需使用uctlFlyteRemote管理无执行计划的启动计划状态

使用uctl命令行工具管理启动计划状态

激活命令:

$ uctl update launchplan \--activate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>

停用命令:

$ uctl update launchplan \--deactivate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>

详细说明请参考Uctl CLI文档

使用Python的FlyteRemote管理启动计划状态

激活示例代码:

from union.remote import FlyteRemote
from flytekit.configuration import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>).id
remote.client.update_launch_plan(launch_plan.id, "ACTIVE")

停用示例代码:

from union.remote import FlyteRemote
from flytekit.remote import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>)
remote.client.update_launch_plan(launch_plan.id, "INACTIVE")

运行启动计划

在用户界面中运行启动计划

要调用启动计划,请进入工作流列表,选择目标工作流,点击启动工作流。在新执行对话框中,从启动计划下拉菜单中选择目标启动计划,然后点击启动

使用 uctl 命令行运行启动计划

要通过命令行调用启动计划,首先生成启动计划的执行规范文件:

$ uctl get launchplan \--project <project-id>--domain <domain> \<launch-plan-name> \--execFile <execution-spec-file-name>.yaml

然后使用以下命令执行启动计划:

$ uctl create execution \--project <project-id> \--domain <domain> \--execFile <execution-spec-file-name>.yaml

更多细节请参阅 Uctl CLI。

使用 FlyteRemote 在 Python 中运行启动计划

以下代码使用 FlyteRemote 执行启动计划:

import flytekit as fl
from flytekit.remote import Configremote = fl.FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(name=<launch-plan-name>, version=<launch-plan-version>)
remote.execute(launch_plan, inputs=<inputs>)

更多细节请参阅 FlyteRemote。

子启动计划

上述调用示例假设您希望将启动计划作为项目中的顶级实体运行。但您也可以从_工作流内部_调用启动计划,创建_子启动计划_。这将使被调用的启动计划触发其对应工作流,并向该工作流传递指定的参数。

这与子工作流的情况不同——当您在一个工作流函数中调用另一个工作流函数时,子工作流会成为父工作流执行图的一部分,并共享相同的 execution ID 和执行上下文。而调用子启动计划时,会启动一个完整的顶级工作流,该工作流拥有独立的 execution ID 和执行上下文。

更多细节请参阅子工作流与子启动计划。

引用启动计划

引用启动计划是指引用先前已定义、序列化并注册的启动计划。您可以跨项目引用其他启动计划,创建使用他人声明的启动计划的工作流。

创建引用启动计划时,请务必验证工作流接口是否与引用工作流的接口一致。

引用启动计划无法在本地运行。若需本地测试,请使用模拟实现。

示例

本例演示如何为Flytesnacks仓库中的simple_wf工作流创建引用启动计划。

  1. 克隆Flytesnacks仓库:

    git clone git@github.com:flyteorg/flytesnacks.git
    
  2. 进入basics目录:

    cd flytesnacks/examples/basics
    
  3. 注册simple_wf工作流:

    pyflyte register --project flytesnacks --domain development --version v1 basics/workflow.py.
    
  4. 创建simple_wf_ref_lp.py文件并复制以下代码:

    import flytekit as fl
    from flytekit import reference_launch_plan@reference_launch_plan(project="flytesnacks",domain="development",name="basics.workflow.simple_wf",version="v1",
    )def simple_wf_lp(x: list[int], y: list[int]
    ) -> float:return 1.0@fl.workflow
    def run_simple_wf() -> float:x = [-8, 2, 4]y = [-2, 4, 7]return simple_wf_lp(x=x, y=y)
    
  5. 注册run_simple_wf工作流:

    pyflyte register simple_wf_ref_lp.py
    
  6. 在Flyte UI中运行run_simple_wf工作流

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

相关文章:

  • MinerU本地化部署可视化界面
  • 【前端】[vue3] [uni-app]使用 vantUI 框架
  • 在24GB显存大小的GPU上运行27GB的Pytorch模型
  • 05-SpringBoot
  • python报错:在int中找不到引用value错误问题原因及解决方案
  • SpringBoot通过虚拟路径指定文件上传下载目录
  • 【深度剖析】安踏体育的数字化转型(上篇2)
  • 网站推荐(第四期)
  • 淘宝商家层级存在流量上限怎么办,如何突破流量上限?
  • 从数据包到可靠性:UDP/TCP协议的工作原理分析
  • 从零开始学习three.js(19):一文详解three.js中的辅助类Helper
  • PCL PolygonMesh 与 TextureMesh 源码阅读与简单测试
  • 从前序与中序遍历序列构造二叉树(中等)
  • ubuntu 更新华为源
  • 网络世界的“百变身份“:动态IP让连接更自由
  • DevExpressWinForms-RichEditControl-基础应用
  • FreeSWITCH Jitter Buffer 技术解析与应用指南
  • 【CanMV K230】AI_CUBE1.4
  • 利用边缘计算和工业计算机实现智能视频分析
  • AI日报 · 2025年05月16日|Google DeepMind推出AlphaEvolve,能自主设计高级算法的编码代理
  • 体坛联播|热刺追平单赛季输球纪录,世俱杯或创收20亿美元
  • 打击网络侵权盗版!四部门联合启动“剑网2025”专项行动
  • 《日出》华丽的悲凉,何赛飞和赵文瑄演绎出来了
  • 清雪车司机未拉手刹下车导致溜车被撞亡,事故调查报告发布
  • 微软宣布全球裁员约3%:涉及约6000人,侧重经理层
  • 中国科学院院士、我国航天液体火箭技术专家朱森元逝世