当前位置: 首页 > news >正文

生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts

生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts

Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。

文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。

Flyte

核心概念

Flyte 是一个用于构建和编排计算机集群中互连软件进程执行过程的平台。在 Flyte 术语中,软件进程称为 任务,任务之间的连接组织结构称为 工作流。工作流中的任务通过输入和输出相互连接,一个任务的输出成为另一个任务的输入。

更准确地说,Flyte 中的工作流是由 节点 组成的 有向无环图(DAG),其中每个节点是执行单元,节点之间的边表示数据流动。最常见的节点类型是任务节点(封装任务),此外还有工作流节点(封装子工作流)和分支节点。在大多数情况下,我们简称为工作流是任务的 DAG。

您可以使用 Flytekit SDK 在 Python 中定义任务和工作流。Flytekit SDK 提供一组装饰器和类,使您能够以易于理解和使用的方式定义任务和工作流。定义完成后,任务和工作流将被部署到您的 Flyte 实例(我们称之为 注册 到实例),在那里它们会被编译成可在 Flyte 集群上执行的形式。

除了任务和工作流之外,Flyte 的另一个重要概念是 启动计划。启动计划类似于模板,可用于定义工作流的输入。触发启动计划将使用指定参数启动其关联的工作流。

定义任务和工作流

使用 Flytekit SDK 时,任务和工作流通过 @fl.task@fl.workflow 装饰器定义为 Python 函数:

import flytekit as fl@fl.task
def task_1(a: int, b: int, c: int) -> int:return a + b + c@fl.task
def task_2(m: int, n: int) -> int:return m * n@fl.task
def task_3(x: int, y: int) -> int:return x - y@fl.workflow
def my_workflow(a: int, b: int, c: int, m: int, n: int) -> int:x = task_1(a=a, b=b, c=c)y = task_2(m=m, n=n)return task_3(x=x, y=y)

这里我们使用 @fl.task 定义了三个任务,并使用 @fl.workflow 定义了一个工作流。该工作流调用 task_1task_2,然后将结果传递给 task_3,最后输出 task_3 的结果。

当工作流注册后,Flyte 会根据任务之间的输入/输出依赖关系将工作流编译为有向无环图(DAG)。该 DAG 用于以正确顺序执行任务,并利用可能的并行性。例如,上述工作流会生成以下 DAG:

工作流 DAG

类型注解是必需的

Flyte 与普通 Python 的一个重要区别在于:Flyte 中所有输入和输出 必须进行类型注解。这是因为任务是强类型的,输入和输出的类型会在部署时进行验证。

详见任务具有强类型。

工作流 不是 完整的 Python 函数

工作流的定义必须是有效的 Python 函数,因此在开发期间可以像普通 Python 函数一样本地运行。但 仅允许使用 Python 语法的子集,因为它必须被编译成 DAG 并部署到 Flyte 上执行。

从技术上讲,工作流函数的语言是 Python 的子集,属于领域特定语言(DSL)。

详见工作流。

注册任务和工作流

使用 pyflyteflytectl 在命令行注册

大多数情况下,工作流和任务(可能还包括启动计划等其他内容)在项目代码中定义,并使用 pyflyteflytectl 进行批量注册。例如:

pyflyte register ./workflows --project my_project --domain development

任务也可以单独注册,但通常与使用它们的工作流一起注册。

详见运行代码。

使用 FlyteRemote 在 Python 中注册

与所有 Flyte 命令行操作一样,您也可以通过编程方式使用 FlyteRemote 注册工作流和任务,具体方法包括 FlyteRemote.register_scriptFlyteRemote.register_workflowFlyteRemote.register_task

注册结果

当上述代码注册到 Flyte 后,会创建五个对象:

  • 任务 workflows.my_example.task_1workflows.my_example.task_2workflows.my_example.task_3(详见任务基础)
  • 工作流 workflows.my_example.my_workflow
  • 默认启动计划 workflows.my_example.my_workflow(详见启动计划)

注意任务和工作流名称源自定义它们的 Python 代码的路径、文件名和函数名:<文件夹>.<文件>.<函数>。工作流的默认启动计划始终与工作流同名。

修改任务和工作流

通过修改代码中的定义并重新注册来变更任务和工作流。当注册具有相同项目、域和名称的任务或工作流时,将创建该实体的新版本。

查看任务和工作流

在 UI 中查看工作流

在侧边栏选择 Workflows 可显示项目中注册的所有工作流列表。可通过名称搜索工作流。

点击列表中的工作流进入工作流视图,该视图包含以下部分:

  • Recent Workflow Versions(最近工作流版本):该工作流的近期版本列表。选择版本可查看工作流版本视图,显示 DAG 和所有任务版本列表。可通过单选按钮切换版本
  • All Executions in the Workflow(工作流所有执行记录):该工作流的所有执行记录列表。点击执行记录跳转至执行视图
  • Launch Workflow 按钮:在工作流视图右上角点击可运行带默认输入的工作流

在 UI 中查看任务

在侧边栏选择 Tasks 可显示项目中注册的所有任务列表。可通过名称搜索任务。勾选 Show Only Archived Tasks 可筛选已归档任务。

点击列表中的任务进入任务视图,包含以下部分:

  • Inputs & Outputs(输入输出):该任务最新版本的输入输出名称和类型
  • Recent Task Versions(最近任务版本):该任务的近期版本列表。选择版本可查看任务版本视图,显示任务详情和所有版本列表。可通过单选按钮切换版本。详见任务
  • All Executions in the Task(任务所有执行记录):该任务的所有执行记录列表。点击执行记录跳转至执行视图
  • Launch Task 按钮:在任务视图右上角点击可运行带默认输入的任务

使用 flytectl 在命令行查看工作流

查看项目域内所有工作流:

$ flytectl get workflows \--project <project-id> \--domain <domain>

查看特定工作流:

$ flytectl get workflow \--project <project-id> \--domain <domain> \<workflow-name><workflow-version>

详见 Flytectl CLI。

使用 flytectl 在命令行查看任务

查看项目域内所有任务:

$ flytectl get tasks \--project <project-id> \--domain <domain>

查看特定任务:

$ flytectl get task \--project <project-id> \--domain <domain> \<task-name><task-version>

详见 Flytectl CLI。

使用 FlyteRemote 在 Python 中查看任务和工作流

使用 FlyteRemote.fetch_workflowFlyteRemote.client.get_workflow 方法获取工作流。详见 FlyteRemote

使用 FlyteRemote.fetch_taskFlyteRemote.client.get_task 方法获取任务。详见 FlyteRemote

运行任务和工作流

在 UI 中运行任务或工作流

在 UI 中运行工作流:点击工作流视图中的 Launch Workflow 按钮。

在 UI 中运行独立任务:点击任务视图中的 Launch Task 按钮。

使用 pyflytepython 在本地命令行运行

可以通过像调用普通 Python 函数一样执行 Flyte 工作流或任务。例如在代码中添加:

if __name__ == "__main__":my_workflow(a=1, b=2, c=3, m=4, n=5)

若文件保存为 my_example.py,可通过以下命令本地运行:

python my_example.py

或使用 pyflyte 命令行工具运行:

pyflyte run my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5

此方式的优势在于可通过命令行参数指定输入值。更多运行细节详见开发周期。

使用 pyflyte 在远程命令行运行

要在 Flyte 安装环境中远程运行工作流,请使用以下命令(需确保 FLYTECTL_CONFIG 配置正确):

pyflyte run --remote my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5

使用 FlyteRemote 在 Python 中远程运行

在 Python 中远程运行工作流或任务时,使用 FlyteRemote.execute 方法。详见 FlyteRemote 获取更多选项和细节。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

相关文章:

  • 需求管理缺乏持续改进机制,如何建立
  • 计算机视觉----时域频域在图像中的意义、傅里叶变换在图像中的应用、卷积核的频域解释
  • 黑白浮生项目测试报告
  • 【入门】纸盒的最大体积是多少?
  • docker部署WeDataSphere开源大数据平台
  • redis数据结构-10(ZREM、ZSCORE、ZINCRBY)
  • 以价值为导向的精准数据治理实践,赋能业务决策
  • 移动端前端开发调试工具/webkit调试工具/小程序调试工具WebDebugX使用教程
  • 第十五届蓝桥杯国赛Python A组题解
  • 【认知思维】沉没成本谬误:为何难以放弃已投入的资源
  • 山东大学软件学院计算机图形学2025期末考题回忆版
  • 使用bitNet架构
  • 普通IT的股票交易成长史--20250513复盘
  • CAN(控制器局域网络)协议详解
  • Confusion2(Python反序列化+JWT)
  • 【前端】【JavaScript】【总复习】四万字详解JavaScript知识体系
  • 【数据结构】栈
  • 【MyBatis-8】MyBatis对象关联查询详解:高效处理复杂关系映射
  • Altium Designer AD如何输出PIN带网络名的PDF装配图
  • 内存中的“BANK”
  • 美国务卿鲁比奥将前往土耳其参加俄乌会谈
  • 成都警方通报:8岁男孩落水父母下水施救,父亲遇难
  • 日本广岛大学一处拆迁工地发现疑似未爆弹
  • 王毅人民日报撰文:共商发展振兴,共建中拉命运共同体
  • 长沙通报一出租房疑存非法代孕:查封涉事场所,相关人员被控制
  • 巫蛊:文化的历史暗流