生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts
生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts
Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。
核心概念
Flyte 是一个用于构建和编排计算机集群中互连软件进程执行过程的平台。在 Flyte 术语中,软件进程称为 任务,任务之间的连接组织结构称为 工作流。工作流中的任务通过输入和输出相互连接,一个任务的输出成为另一个任务的输入。
更准确地说,Flyte 中的工作流是由 节点 组成的 有向无环图(DAG),其中每个节点是执行单元,节点之间的边表示数据流动。最常见的节点类型是任务节点(封装任务),此外还有工作流节点(封装子工作流)和分支节点。在大多数情况下,我们简称为工作流是任务的 DAG。
您可以使用 Flytekit SDK 在 Python 中定义任务和工作流。Flytekit SDK 提供一组装饰器和类,使您能够以易于理解和使用的方式定义任务和工作流。定义完成后,任务和工作流将被部署到您的 Flyte 实例(我们称之为 注册 到实例),在那里它们会被编译成可在 Flyte 集群上执行的形式。
除了任务和工作流之外,Flyte 的另一个重要概念是 启动计划。启动计划类似于模板,可用于定义工作流的输入。触发启动计划将使用指定参数启动其关联的工作流。
定义任务和工作流
使用 Flytekit SDK 时,任务和工作流通过 @fl.task
和 @fl.workflow
装饰器定义为 Python 函数:
import flytekit as fl@fl.task
def task_1(a: int, b: int, c: int) -> int:return a + b + c@fl.task
def task_2(m: int, n: int) -> int:return m * n@fl.task
def task_3(x: int, y: int) -> int:return x - y@fl.workflow
def my_workflow(a: int, b: int, c: int, m: int, n: int) -> int:x = task_1(a=a, b=b, c=c)y = task_2(m=m, n=n)return task_3(x=x, y=y)
这里我们使用 @fl.task
定义了三个任务,并使用 @fl.workflow
定义了一个工作流。该工作流调用 task_1
和 task_2
,然后将结果传递给 task_3
,最后输出 task_3
的结果。
当工作流注册后,Flyte 会根据任务之间的输入/输出依赖关系将工作流编译为有向无环图(DAG)。该 DAG 用于以正确顺序执行任务,并利用可能的并行性。例如,上述工作流会生成以下 DAG:
类型注解是必需的
Flyte 与普通 Python 的一个重要区别在于:Flyte 中所有输入和输出 必须进行类型注解。这是因为任务是强类型的,输入和输出的类型会在部署时进行验证。
详见任务具有强类型。
工作流 不是 完整的 Python 函数
工作流的定义必须是有效的 Python 函数,因此在开发期间可以像普通 Python 函数一样本地运行。但 仅允许使用 Python 语法的子集,因为它必须被编译成 DAG 并部署到 Flyte 上执行。
从技术上讲,工作流函数的语言是 Python 的子集,属于领域特定语言(DSL)。
详见工作流。
注册任务和工作流
使用 pyflyte
或 flytectl
在命令行注册
大多数情况下,工作流和任务(可能还包括启动计划等其他内容)在项目代码中定义,并使用 pyflyte
或 flytectl
进行批量注册。例如:
pyflyte register ./workflows --project my_project --domain development
任务也可以单独注册,但通常与使用它们的工作流一起注册。
详见运行代码。
使用 FlyteRemote
在 Python 中注册
与所有 Flyte 命令行操作一样,您也可以通过编程方式使用 FlyteRemote
注册工作流和任务,具体方法包括 FlyteRemote.register_script
、FlyteRemote.register_workflow
和 FlyteRemote.register_task
。
注册结果
当上述代码注册到 Flyte 后,会创建五个对象:
- 任务
workflows.my_example.task_1
、workflows.my_example.task_2
和workflows.my_example.task_3
(详见任务基础) - 工作流
workflows.my_example.my_workflow
- 默认启动计划
workflows.my_example.my_workflow
(详见启动计划)
注意任务和工作流名称源自定义它们的 Python 代码的路径、文件名和函数名:<文件夹>.<文件>.<函数>
。工作流的默认启动计划始终与工作流同名。
修改任务和工作流
通过修改代码中的定义并重新注册来变更任务和工作流。当注册具有相同项目、域和名称的任务或工作流时,将创建该实体的新版本。
查看任务和工作流
在 UI 中查看工作流
在侧边栏选择 Workflows 可显示项目中注册的所有工作流列表。可通过名称搜索工作流。
点击列表中的工作流进入工作流视图,该视图包含以下部分:
- Recent Workflow Versions(最近工作流版本):该工作流的近期版本列表。选择版本可查看工作流版本视图,显示 DAG 和所有任务版本列表。可通过单选按钮切换版本
- All Executions in the Workflow(工作流所有执行记录):该工作流的所有执行记录列表。点击执行记录跳转至执行视图
- Launch Workflow 按钮:在工作流视图右上角点击可运行带默认输入的工作流
在 UI 中查看任务
在侧边栏选择 Tasks 可显示项目中注册的所有任务列表。可通过名称搜索任务。勾选 Show Only Archived Tasks 可筛选已归档任务。
点击列表中的任务进入任务视图,包含以下部分:
- Inputs & Outputs(输入输出):该任务最新版本的输入输出名称和类型
- Recent Task Versions(最近任务版本):该任务的近期版本列表。选择版本可查看任务版本视图,显示任务详情和所有版本列表。可通过单选按钮切换版本。详见任务
- All Executions in the Task(任务所有执行记录):该任务的所有执行记录列表。点击执行记录跳转至执行视图
- Launch Task 按钮:在任务视图右上角点击可运行带默认输入的任务
使用 flytectl
在命令行查看工作流
查看项目域内所有工作流:
$ flytectl get workflows \--project <project-id> \--domain <domain>
查看特定工作流:
$ flytectl get workflow \--project <project-id> \--domain <domain> \<workflow-name><workflow-version>
详见 Flytectl CLI。
使用 flytectl
在命令行查看任务
查看项目域内所有任务:
$ flytectl get tasks \--project <project-id> \--domain <domain>
查看特定任务:
$ flytectl get task \--project <project-id> \--domain <domain> \<task-name><task-version>
详见 Flytectl CLI。
使用 FlyteRemote
在 Python 中查看任务和工作流
使用 FlyteRemote.fetch_workflow
或 FlyteRemote.client.get_workflow
方法获取工作流。详见 FlyteRemote
。
使用 FlyteRemote.fetch_task
或 FlyteRemote.client.get_task
方法获取任务。详见 FlyteRemote
。
运行任务和工作流
在 UI 中运行任务或工作流
在 UI 中运行工作流:点击工作流视图中的 Launch Workflow 按钮。
在 UI 中运行独立任务:点击任务视图中的 Launch Task 按钮。
使用 pyflyte
或 python
在本地命令行运行
可以通过像调用普通 Python 函数一样执行 Flyte 工作流或任务。例如在代码中添加:
if __name__ == "__main__":my_workflow(a=1, b=2, c=3, m=4, n=5)
若文件保存为 my_example.py
,可通过以下命令本地运行:
python my_example.py
或使用 pyflyte
命令行工具运行:
pyflyte run my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5
此方式的优势在于可通过命令行参数指定输入值。更多运行细节详见开发周期。
使用 pyflyte
在远程命令行运行
要在 Flyte 安装环境中远程运行工作流,请使用以下命令(需确保 FLYTECTL_CONFIG 配置正确):
pyflyte run --remote my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5
使用 FlyteRemote
在 Python 中远程运行
在 Python 中远程运行工作流或任务时,使用 FlyteRemote.execute
方法。详见 FlyteRemote
获取更多选项和细节。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。