生产级编排AI工作流套件:Flyte全面使用指南 — Development cycle
生产级编排AI工作流套件:Flyte全面使用指南 — Development cycle
Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。
开发周期
本节介绍如何为 Flyte 开发可用于生产的工作流程。
认证
使用命令行界面(CLI)与 Flyte 交互时需要进行身份认证。认证方式根据本地或远程工作环境有所不同。本指南将介绍不同的认证机制,并帮助您选择最适合的解决方案。
开始认证前,请确保已安装 Pyflyte CLI。详情参见本地设置。
认证方法
Pyflyte CLI 支持三种认证机制:
认证方法 | 本地可用? | 远程可用? | 使用场景 |
---|---|---|---|
PKCE(默认) | ✅ 是 | ❌ 否 | 本地浏览器环境最佳选择 |
DeviceFlow | ✅ 是 | ✅ 是 | 无浏览器的远程环境(如 SSH 会话) |
ClientSecret | ✅ 是 | ✅ 是 | CI/CD 流水线或自动化场景 |
使用 pyflyte create login --host <union-host-url>
命令时默认采用 PKCE 方式。
1. PKCE(Proof Key of Code Exchange)
PKCE 是默认认证方式。运行 Pyflyte CLI 命令时,会自动打开浏览器窗口进行认证。
认证流程:
- 执行 Pyflyte CLI 命令
- 自动重定向至默认浏览器完成登录
示例配置:
admin:endpoint: https://<YourOrg>.hosted.unionai.cloud # 组织专属端点insecure: false # 启用安全连接authType: Pkce # 认证类型
logger:show-source: true # 显示日志来源level: 0 # 日志级别
PKCE 需要本地浏览器支持,因此不适用于通过 SSH 会话连接的远程机器。
2. DeviceFlow(远程环境最佳方案)
在无浏览器的远程机器上使用 DeviceFlow 认证方式。该方法会生成可在本地浏览器打开的认证链接。
认证流程:
- 执行 Pyflyte CLI 命令
- CLI 返回认证 URL
- 在本地浏览器打开 URL 完成登录
示例配置:
admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud # DNS 端点配置insecure: false # 启用安全连接authType: DeviceFlow # 认证类型
logger:show-source: true # 显示日志来源level: 0 # 日志级别
认证过程中,Flyte 会尝试将认证令牌存储至操作系统的密钥管理服务。在基于 Linux 的 SSH 会话环境中,若发现每次运行都需要浏览器认证,可能需要执行 pip install keyring
或 pip install keyrings.alt
安装密钥管理服务。
3. ClientSecret(CI/CD 自动化场景推荐)
ClientSecret 是无头认证方案,专为自动化流程和 CI/CD 场景设计。
ClientSecret 认证配置步骤:
-
创建 API 密钥:
pyflyte create api-key admin --name my-custom-name
输出将包含 Client ID 和 API Key。请妥善保存 API Key(后续不可见)。
-
配置 ClientSecret 认证:
-
使用本地文件:
admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud # DNS 端点配置insecure: false # 启用安全连接authType: ClientSecret # 认证类型clientId: <YourClientId> # 客户端IDclientSecretLocation: /path/to/secret.txt # 密钥文件路径 logger:show-source: true # 显示日志来源level: 0 # 日志级别
-
使用环境变量:
admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud # DNS 端点配置insecure: false # 启用安全连接authType: ClientSecret # 认证类型clientId: <YourClientId> # 客户端IDclientSecretEnvVar: FLYTE_API_KEY # 环境变量名称 logger:show-source: true # 显示日志来源level: 0 # 日志级别
-
-
设置环境变量(环境变量方式):
export FLYTE_API_KEY="<SECRET>"
重要:切勿将 API 密钥提交至版本控制系统。建议使用环境变量或安全凭证库管理。
管理认证配置
Pyflyte CLI 默认读取 ~/.flyte/config.yaml
配置文件。可通过以下方式覆盖默认配置:
-
设置环境变量:
export FLYTECTL_CONFIG=~/.my-config-location/my-config.yaml
-
使用
--config
参数:pyflyte --config ~/.my-config-location/my-config.yaml run my_script.py my_workflow
认证问题排查
- 切换认证方式:更新
~/.flyte/config.yaml
或使用其他配置文件 - 重复登录提示:Linux 系统使用 DeviceFlow 时,请安装密钥服务
pip install keyring keyrings.alt
项目结构
有效组织工作流项目仓库是确保可扩展性、协作性和易维护性的关键。以下是构建 Flyte 工作流项目仓库的最佳实践,涵盖任务组织、工作流管理、依赖处理和文档管理。
推荐目录结构
典型的 Flyte 工作流项目结构如下所示:
├── .github/workflows/
├── .gitignore
├── docs/
│ └── README.md
├── src/
│ ├── core/ # 用例专用的核心逻辑
│ │ ├── __init__.py
│ │ ├── model.py
│ │ ├── data.py
│ │ └── structs.py
│ ├── tasks/ # 包含独立任务
│ │ ├── __init__.py
│ │ ├── preprocess.py
│ │ ├── fit.py
│ │ ├── test.py
│ │ └── plot.py
│ ├── workflows/ # 包含工作流定义
│ │ ├── __init__.py
│ │ ├── inference.py
│ │ └── train.py
│ └── orchestration/ # 辅助构造(如 secrets、images)
│ ├── __init__.py
│ └── constants.py
├── uv.lock
└── pyproject.toml
该结构旨在确保每个项目组件都有清晰、合理的归属位置,方便团队成员查找和修改文件。
组织任务和工作流
在 Flyte 中,任务是工作流的构建模块,因此需要直观地组织它们:
-
任务:将每个任务存储在
tasks/
目录下的独立文件中。如果多个任务密切相关,可考虑将它们分组到模块中。或者,每个任务可以有自己的模块以实现更细粒度的组织,并使用子目录对相似任务进行分组 -
工作流:将组合成端到端流程的工作流存储在
workflows/
目录中。这种分离确保工作流与核心任务逻辑独立组织,促进模块化和重用
编排目录的辅助构造
应包含 orchestration/
或 union_utils/
等目录来存放辅助工作流编排的构造。该目录可包含以下类型的辅助文件:
-
Secrets:定义 Flyte 中访问密钥(如 API keys)的方式
-
ImageSpec:简化容器管理的工具,无需直接编写 Dockerfiles
工作流专用核心逻辑
使用 core/
目录存放工作流专用的业务逻辑。这将核心应用代码与工作流编排代码分离,提高可维护性,并帮助新成员更快理解核心功能。
__init__.py
的重要性
在每个目录中添加 __init__.py
文件至关重要:
-
导入功能:这些文件使目录成为 Python 包,实现跨模块的正确导入
-
Flyte 快速注册:执行快速注册时,Flyte 将第一个没有
__init__.py
的目录视为根目录。Flyte 会将根目录及其内容打包成 tarball,简化注册流程,避免每次代码变更都重建容器镜像
Monorepo vs 多仓库:结构选择
与多团队协作时有两种主要选择:
-
Monorepo:所有团队共享单一仓库,可简化依赖管理并支持共享构造。但会增加权限管理和不同团队版本控制的复杂度
-
多仓库:为每个团队或项目创建独立仓库可提高隔离性和控制力。这种情况下,建议为多个团队使用的构造创建可安装的共享包,确保一致性而无需合并代码库
CI/CD
GitHub Action 应实现以下功能:
- 在合并到领域分支时注册(需要时升级)
- 在合并输入 YAML 时执行
- 将 git SHA 注入为实体版本
文档与文档字符串
建议编写清晰的文档字符串,因为它们会自动传播到 Flyte UI。这为查看 UI 中工作流和任务的用户提供有用上下文,减少查阅源代码解释的需求。
项目与域
项目(Project)和域(Domain)是 Flyte 中用于组织工作流的核心分类单元。
项目定义了具有共同功能目标的任务(task)、工作流(workflow)、启动计划(launch plan)等实体的分组。域则代表项目实体在开发周期中经历的不同阶段环境。
默认情况下,Flyte 提供三个预配置域:development
、staging
和 production
。在初始配置时,您可以根据需求自定义 Flyte 实例的域设置。如需详细信息,请联系 Flyte 团队。
项目与域之间是正交关系,即一个项目可存在于多个域中,一个域也可包含多个项目。
示例配置结构:
Development | Staging | Production | |
---|---|---|---|
Project 1 | workflow_1 (v2.0) | workflow_1 (v1.0) | workflow_1 (v1.0) |
Project 2 | workflow_2 (v2.0) | workflow_2 (v1.0) | workflow_2 (v1.0) |
项目
项目代表与特定团队、业务领域或应用程序相关的独立工作流集合。每个项目相互隔离,但工作流可以通过引用其他项目中的实体(工作流或任务)实现通用资源复用。
域
域代表 Flyte 组织中与项目集正交的独立环境(如 development/staging/production)。每个域可配置专属的权限策略、密钥管理、执行历史缓存和资源配额,避免对其他项目或域造成意外影响。
通过域可实现环境间的明确隔离,确保开发测试活动不会干扰生产环境工作流。生产域提供"纯净"环境,避免开发阶段的缓存执行导致意外行为。同时可配置生产环境专属的外部数据源密钥。
何时使用不同的 Flyte 项目?
项目用于组织特定团队、业务领域或应用程序相关的独立工作流。通常每个独立团队或 ML 产品应建立专属 Flyte 项目。虽然项目间保持隔离,但团队可通过跨项目引用实体(工作流或任务)复用通用资源。例如,某个团队可创建通用模型训练任务。但这需要高级协作机制和统一的编码规范。
在 Flyte 中配置工作流时,有效运用项目和域是管理环境、权限和资源分配的关键。以下是组织工作流的最佳实践建议。
项目与域:组合威力
Flyte 通过项目-域组合为工作流创建独立配置环境。这种组合支持:
-
专属权限控制
通过基于角色的访问控制(RBAC),可为用户分配针对特定项目-域组合的精细化权限(如 contributor 或 admin 角色)。这种细粒度控制确保权限分配既精准又安全。更多详情请参见此处。 -
资源与执行监控
每个项目-域组合拥有独立的监控看板,可追踪资源利用率、执行状态和性能指标。这有助于保持对工作流执行的可见性并确保最佳性能。更多详情请参见此处。 -
资源配额与分配
通过为每个项目-域组合设置配额,Flyte 可确保工作流不会超出指定限制,防止项目或域意外占用他人资源。您还可以为每个组合配置内存、CPU 和存储等专属资源默认值,满足不同工作流的特殊需求。更多详情请参见此处和此处。 -
密钥管理配置
Flyte 支持在项目-域层级配置密钥,确保 API 密钥等敏感信息仅能被特定工作流访问。这种隔离机制提升了安全性,降低了跨环境未授权访问风险。更多详情请参见此处。
域:清晰的环境隔离
域代表 Flyte 中的独立环境(如 development/staging/production),通过明确隔离防止跨环境干扰。这种结构确保开发测试环境的变更不会影响生产工作流。使用域进行阶段划分,可使工作流从开发到生产部署的全过程实现可控演进。
项目:按团队/业务/应用组织工作流
Flyte 项目用于组织特定团队、业务功能或应用程序相关的独立工作流。通过将项目映射到组织结构,可简化权限管理并促进跨团队/用例的工作流隔离。虽然支持跨项目引用,但建议保持项目内工作流的独立性以避免复杂度。
Flyte 的 CLI 工具和 SDK 提供便捷的项目/域指定方式:
-
CLI 命令
在pyflyte
和uctl
CLI 的大多数命令中,可使用--project
和--domain
参数指定目标项目域组合。更多详情请参见此处和此处。 -
Python SDK
使用flytekit
SDK 时,可通过FlyteRemote
编程指定项目域组合,确保所有操作发生在目标环境。更多详情请参见此处。
构建工作流
何时应分解任务?
将任务分解为更小任务有多个理由。这样做可以带来更好的计算性能、改进缓存性能,并充分利用可中断任务的优势。但分解任务需要承担任务间的开销成本,包括节点启动和数据下载。某些情况下,可以通过使用Actors来缓解这些成本。
差异化的运行时需求
首先,分解任务支持操作间的异构环境。例如,可能存在一个训练机器学习模型的大型任务,随后使用该模型在测试数据上进行批量推理。然而训练模型通常需要比推理更多的内存。因此在大规模场景下,将这个大任务分解为两个任务:(1) 训练模型和 (2) 运行批量推理,实际上是有益的。这样可以为第二个任务申请更少内存,从而降低工作流成本。若处理更大规模数据,可通过map_task
进一步分解批量推理任务以实现并行化,大幅缩短该步骤运行时间。总的来说,分解任务通过定义资源、依赖和执行并行性,提供了基础设施灵活性。
改进缓存性能
其次,可将大任务分解为更小任务以实现"细粒度"缓存。每个独立任务都提供自动化的"检查点"系统。通过将大型工作流分解为多个自然任务,可以最小化多次串行工作流执行中的冗余工作。这在快速迭代开发阶段尤其有用,用户可能在短时间内多次运行相同工作流。"细粒度"缓存将显著提升本地和远程工作流执行效率。
利用可中断任务
最后,可通过"细粒度"缓存充分利用可中断任务。可中断任务会尝试在可能情况下使用Spot实例或Spot虚拟机运行。这些节点具有可中断性,意味着任务可能因其他组织高价竞用而失败。但相比不可中断的按需实例/虚拟机,Spot实例成本显著更低。通过"细粒度"缓存,可在享受可中断任务成本优势的同时,最小化任务中断带来的影响。
何时应并行化任务?
总体原则是尽早且频繁并行化。如前所述,Flyte的许多强大特性(如缓存和工作流恢复)都基于任务级别实现。将任务分解为更小单元并进行并行化,可构建高性能且具备容错能力的工作流。
需要注意的例外是超短时任务,其Pod启动和清理的开销可能抵消并行化优势。但通过Actors实现可复用容器后,这些开销将被透明消除,在付出前期环境搭建成本后实现双赢。在任何情况下,批处理输入输出以分摊开销都是有效策略。需注意保持批次内输入顺序和批次间顺序,以确保可靠的缓存命中。
并行化结构
Flyte的两个主要并行化结构是映射任务和动态工作流。它们目标相似但实现方式迥异,各有优势。
动态工作流更类似for
循环,按顺序迭代输入。其并行度受整体工作流并行度控制。
映射任务效率更高且不保证执行顺序。它们拥有独立于整体工作流的并发设置,可设置子任务的最低失败阈值。二者的详细差异说明可参考此处,协同使用示例参见此处。
何时应启用缓存?
当任务主体逻辑稳定后应启用缓存。缓存键由任务签名隐式生成(主要基于输入输出)。若任务主体变更但签名未改,使用相同输入将产生缓存命中。这在迭代任务核心功能并期待不同下游输入时可能导致意外行为。此外,缓存不会检查FlyteFile
等内容。若相同URI输入对应完全不同的内容,也会产生缓存命中。因此建议添加显式缓存键以便随时使缓存失效。
尽管存在这些注意事项,缓存在工作流开发阶段仍是巨大的时间节省工具。缓存上游任务可快速运行工作流直至当前迭代节点。在复杂并行化场景(如调试大型映射任务的失败状态)中,缓存也极具价值。生产环境中,若集群资源紧张,缓存可使工作流通过多次重试逐步完成,因为每次运行会有更多任务成功返回。虽然这不是理想场景,但缓存有助于缓解生产故障的影响。考虑到这些因素,绝大多数场景都值得启用缓存。
设置生产项目
在 Flyte 中,您的工作按以下层次结构进行组织:
- Organization(组织):您的 Flyte 实例,可通过特定 URL(如
flyte.my-company.com
)访问 - Domains(域):组织内通常包含三个域:
development
(开发)、staging
(预发布)和production
(生产),用于在开发过程中组织代码。您可以通过初始配置自定义域集合 - Projects(项目):与域正交的项目结构,用于将代码组织为逻辑组。您可以根据需要创建任意数量的项目
一个具体的工作流会归属于特定项目。例如,假设 my_workflow
是 my_project
项目中的工作流。
当您开始开发 my_workflow
时,通常会在项目域 my_project/development
中注册该工作流。
随着工作流版本的迭代演进,您可以将 my_workflow
升级到 my_project/staging
域,最终部署到 my_project/production
域。
升级操作只需通过将工作流重新注册到新的项目域即可完成。
术语规范
日常使用中,“project” 一词可能指代以下不同概念:
- Flyte 中承载工作流集合的实体
- 本地开发工作流的目录
- 存储工作流代码的 GitHub(或其他 SCM)仓库
为避免混淆,本指南采用以下命名规范:
- Flyte project:Flyte 实例中承载工作流集合的实体(常简称为 project)
- Local project:本地开发工作流的目录(通常为 GitHub 仓库的工作目录)
创建 Flyte 项目
确保已安装 flytectl
CLI 并正确配置 Flyte 集群连接。执行以下命令创建新项目:
$ flytectl create project \--id "my-project" \--labels "my-label=my-project" \--description "My Flyte project" \--name "My project"
使用 pyflyte init
创建本地生产项目目录
在入门指南中,我们曾使用 pyflyte init
基于 flyte-simple
模板创建本地项目。
此处我们将使用 flyte-production
模板执行相同操作:
pyflyte init --template union-production my-project
目录结构
生成的 basic-example
目录将包含以下文件结构:
├── LICENSE
├── README.md
├── docs
│ └── docs.md
├── pyproject.toml
├── src
│ ├── core
│ │ ├── __init__.py
│ │ └── core.py
│ ├── orchestration
│ │ ├── __init__.py
│ │ └── orchestration.py
│ ├── tasks
│ │ ├── __init__.py
│ │ └── say_hello.py
│ └── workflows
│ ├── __init__.py
│ └── hello_world.py
└── uv.lock
您可以为生产项目创建自定义的目录结构和规范,但此模板提供了良好的起点。
特别需要注意 workflows
子目录及其包含的 __init__.py
文件。我们将在注册流程章节详细讨论其作用。
本地依赖管理
开发环境配置
在开发周期中,您需要能够在本地机器和 Flyte 远程环境运行工作流。为此,请确保两个环境均已安装所需依赖。本节将说明本地依赖安装方法。关于 Flyte 环境依赖配置,请参考 ImageSpec。
在 pyproject.toml
中定义依赖
推荐使用 uv 工具
进行项目与依赖管理。
最佳实践是在 pyproject.toml
文件的 dependencies
字段中声明依赖项:
[project]
name = "union-simple"
version = "0.1.0"
description = "A simple Flyte project"
readme = "README.md"
requires-python = ">=3.9,<3.13"
dependencies = ["union"]
创建 Python 虚拟环境
请确保已正确设置包含所需依赖的 Python 虚拟环境。
使用 uv
安装依赖:
uv sync
激活虚拟环境:
source .venv/bin/activate
activate
与 uv run
对比
在本地项目中运行 Pyflyte CLI 时,必须在项目关联的虚拟环境中执行。
使用 uv
在虚拟环境中运行 pyflyte
的两种方式:
- 使用前缀命令:
uv run pyflyte ...
- 先激活环境再直接执行:
source .venv/bin/activate
pyflyte ...
本文示例默认采用第二种方式。
后续步骤
完成本地依赖安装后,您可使用 pyflyte run
在本地运行工作流。
下一步需确保相同依赖已在 Flyte 远程环境可用。
ImageSpec
在开发周期中,您需要确保工作流既能在本地机器运行,也能在 Flyte 远程环境运行,因此必须保证两种环境都安装了所需的依赖项。
本文将说明如何为 Flyte 远程环境配置工作流依赖项。关于本地依赖项的配置方法,请参阅本地依赖项。
当工作流部署到 Flyte 时,每个任务都在 Kubernetes 集群的独立容器中运行。您需要通过 ImageSpec
类在容器镜像定义中指定任务依赖项。例如:
import flytekit as flimage_spec = union.ImageSpec(name="say-hello-image",requirements="uv.lock",
)@fl.task(container_image=image_spec)
def say_hello(name: str) -> str:return f"Hello, {name}!"@fl.workflow
def hello_world_wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
此处的 ImageSpec
类用于指定 say_hello
任务使用的容器镜像。
name
参数指定镜像名称,该名称将用于在容器注册表中标识镜像requirements
参数指定依赖项文件的路径(相对于执行pyflyte run
或pyflyte register
命令的目录),支持以下文件类型:requirements.txt
文件- 通过
uv sync
命令生成的uv.lock
文件 - 通过
poetry install
命令生成的poetry.lock
文件 pyproject.toml
文件
当执行 pyflyte run
或 pyflyte register
命令时,Flyte 将自动构建 ImageSpec
块中定义的容器镜像(同时注册代码中定义的任务和工作流)。
运行代码
设置开发环境
如果尚未完成环境设置,请按照入门指南完成以下操作:
- 登录Flyte平台
- 配置本地开发环境
运行代码的CLI命令
Pyflyte CLI和Flytectl CLI提供以下命令用于不同开发阶段的代码部署与运行:
pyflyte run
:在本地Python环境即时部署并运行单个脚本pyflyte run --remote
:在Flyte云平台即时部署并运行单个脚本pyflyte register
:将多个脚本部署到Flyte并通过Web界面运行pyflyte package
+flytectl register
:用于生产环境部署和CI/CD流水线脚本
如需在部署至Flyte前进行本地集群测试,可使用命令2、3或4并指定本地集群目标。详见本地集群运行指南。
注册模式概览
以下图表展示不同注册模式的对比:
使用pyflyte run
在本地运行脚本
开发过程中可使用以下命令在本地Python环境快速测试工作流:
pyflyte run workflows/example.py wf --name 'Albert'
参数说明:
workflows/example.py
:目标Python文件wf
:文件中要运行的工作流名称--name 'Albert'
:传递的命名参数及值
该命令适用于快速本地基础验证,详见pyflyte run详情。
使用pyflyte run --remote
在Flyte运行脚本
通过以下命令在Flyte云平台快速运行工作流:
pyflyte run --remote --project basic-example --domain development workflows/example.py wf --name 'Albert'
参数说明:
--project basic-example
:项目名称--domain development
:目标域workflows/example.py
:脚本文件路径wf
:工作流名称
执行流程:
- 构建
ImageSpec
定义的容器镜像 - 推送镜像到指定容器注册表(需确保Flyte可访问,如使用GitHub容器注册表需设为公开)
- 打包代码并部署至Flyte指定项目域
- 在Flyte平台运行工作流
详见pyflyte run远程执行详情。
通过flytectl运行任务
生成执行规范文件
flytectl launch task --project flytesnacks --domain development --name workflows.example.generate_normal_df --version v1
更新输入参数文件
iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole'
inputs:n: 200mean: 0.0sigma: 1.0
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: workflows.example.generate_normal_df
version: "v1"
创建执行实例
flytectl create execution -p flytesnacks -d development --execFile exec_spec.yaml
监控执行状态
flytectl get execution -p flytesnacks -d development <execid>
通过flytectl运行工作流
工作流需通过启动计划(launchplan)执行。默认启动计划与工作流同名,参数默认值相同。任务也可通过启动命令执行,但启动计划不可关联任务。
通过flytectl运行启动计划
生成执行规范文件
flytectl get launchplan -p flytesnacks -d development myapp.workflows.example.my_wf --execFile exec_spec.yaml
更新输入参数
inputs:name: "adam"
创建执行实例
flytectl create execution -p flytesnacks -d development --execFile exec_spec.yaml
监控执行状态
flytectl get execution -p flytesnacks -d development <execid>
使用pyflyte register
部署代码至Flyte
pyflyte register workflows --project basic-example --domain development
执行流程:
- 构建
ImageSpec
定义的容器镜像 - 打包
workflows
目录代码并部署(需确保目录包含__init__.py
文件)
该命令仅部署不运行,可通过Web界面触发。适用于全量工作流测试部署。
快速注册机制
当容器镜像已存在注册表且仅修改工作流/任务代码时,快速注册机制:
- 打包指定目录为
.tar.gz
文件(包含protobuf序列化规范) - 注册至集群并上传至对象存储(如S3/GCS)
执行时Flyte自动注入代码覆盖原镜像内容。
环境变量说明
WORKDIR
设置代码解压路径,PYTHONPATH
需包含源码路径。推荐使用src
目录结构并设置绝对导入。
执行监控
基础监控
flytectl get execution -p flytesnacks -d development <execid>
详细信息查看
flytectl get execution -p flytesnacks -d development <execid> --details
格式输出
flytectl get execution -p flytesnacks -d development <execid> --details -o yaml
输出结果查看:
"outputUri": "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-<execid>/n0/data/0/outputs.pb"
生产环境部署
使用pyflyte package
打包
pyflyte --pkgs workflows package
生成flyte-package.tgz
文件(需__init__.py
文件)。多目录打包:
pyflyte --pkgs DIR1 --pkgs DIR2 package ...
模块缺失处理:
pyflyte --pkgs <dir1> package --source ./src -f
使用flytectl register
注册
$ flytectl register files \--project basic-example \--domain development \--archive flyte-package.tgz \--version "$(git rev-parse HEAD)"
参数说明:
--project
:目标项目--domain
:目标域(development/staging/production)--archive
:打包文件路径--version
:推荐使用Git SHA
注册方法对比
方法 | 适用场景 |
---|---|
pyflyte register | 单集群快速迭代 |
package +register | 多集群/生产环境/CI-CD流水线 |
编程接口
可通过FlyteRemote对象实现等效操作。
镜像管理策略
ImageSpec
行为取决于注册方式:
- 快速注册:不复制源码文件
- 非快速注册:默认复制源码文件
- 指定
source_root
时强制复制文件
自定义镜像构建
推荐使用envd
构建器,也可通过模板生成Dockerfile:
pyflyte init --template basic-template-dockerfile
构建脚本示例:
./docker_build.sh -p PROJECT_NAME -r REGISTRY -v VERSION
镜像推送示例(GitHub容器注册表):
docker login ghcr.io
docker push TAG
CI/CD与GitHub Actions集成
Flyte提供两个GitHub Action:
flyte-setup-action
:安装flytectlflyte-register-action
:使用flytectl register
注册包
最佳实践
- 版本策略:
- 特性分支:
{分支名}-{短提交哈希}
- 主分支:
main-{短提交哈希}
- 正式发布:语义版本号
- 特性分支:
- 镜像管理:
- 使用
--image
参数指定运行时镜像 - 避免硬编码镜像地址
- 使用
覆盖参数
with_overrides
方法允许在执行时为任务、子工作流和子启动计划指定参数覆盖。这在需要修改任务、子工作流或子启动计划的行为而不更改原始定义时非常有用。
任务参数
调用任务时,可以在with_overrides
中指定以下参数:
accelerator
: 指定加速器cache_serialize
: 启用缓存序列化cache_version
: 指定缓存版本cache
: 启用缓存container_image
: 指定容器镜像interruptible
: 指定任务是否可中断limits
: 指定资源限制name
: 为任务执行指定特定名称,该名称将显示在UI的工作流流程图中(参见下文)node_name
: 为任务的DAG节点指定特定名称,该名称将显示在UI的工作流流程图中(参见下文)requests
: 指定资源请求retries
: 指定任务重试次数task_config
: 指定任务配置timeout
: 指定任务超时时间
例如,如果某个任务未启用缓存,可以在执行时通过with_overrides
启用缓存:
my_task(a=1, b=2, c=3).with_overrides(cache=True)
使用name
和node_name
参数
在任务上使用with_overrides
的name
参数是一个特别有用的功能。例如,使用with_overrides(name="my_task")
可以为任务执行指定特定名称,该名称将显示在UI中。指定的名称可以在调用时选择或生成,而无需修改任务定义。
@fl.workflow
def wf() -> int:my_task(a=1, b=1, c=1).with_overrides(name="my_task_1")my_task(a=2, b=2, c=2).with_overrides(name="my_task_2", node_name="my_node_2")return my_task(a=1, b=1, c=1)
上述代码将在UI中生成如下工作流展示:
还有一个相关参数node_name
可用于为任务的DAG节点指定特定名称。DAG节点名称通常自动生成如n0
、n1
、n2
等,显示在工作流表格的node
列中。覆盖node_name
后会自动替换为指定名称:
注意代码中指定的node_name
为my_node_2
,但在UI中显示为my-node-2
。这是因为Kubernetes节点名称不能包含下划线,Flyte会自动将名称转换为符合Kubernetes规范的格式。
子工作流和子启动计划参数
在从高层工作流中调用工作流或启动计划时(即调用子工作流或子启动计划),可以在with_overrides
中指定以下参数:
cache_serialize
: 启用缓存序列化cache_version
: 指定缓存版本cache
: 启用缓存
pyflyte run 的详细说明
pyflyte run
命令用于在本地 Python 环境或 Flyte 平台上运行特定 workflow 或 task。本节将讨论该命令的使用细节及原理。
参数传递
使用以下语法可以通过 pyflyte run
执行特定 workflow:
pyflyte run <路径/脚本.py> <workflow_或_task_函数名>
关键字参数可通过以下方式传递:
--<关键字> <值>
例如,我们使用脚本 example.py
、workflow wf
和命名参数 name
调用 pyflyte run
:
pyflyte run example.py wf --name 'Albert'
这里 Albert
会被传递给 name
参数。
当参数名为 snake_case
格式时,需转换为 kebab-case
格式。例如,如果代码接受 last_name
参数,则命令:
pyflyte run example.py wf --last-name 'Einstein'
会将值 Einstein
传递给该参数。
为何选择 pyflyte run 而非直接使用 python?
可以在脚本末尾添加 main
守卫:
if __name__ == "__main__":training_workflow(hyperparameters={"C": 0.1})
这样就能通过 python example.py
运行,但参数需要硬编码。
若想动态传递参数,代码会变得更冗长:
if __name__ == "__main__":import jsonfrom argparse import ArgumentParserparser = ArgumentParser()parser.add_argument("--hyperparameters", type=json.loads)... # 添加其他选项args = parser.parse_args()training_workflow(hyperparameters=args.hyperparameters)
pyflyte run
可以避免这种冗长的代码,通过简洁的方式传递参数运行 workflow。
使用交互式任务进行调试
通过交互式任务,您可以直接在 UI 的嵌入式 Visual Studio Code IDE 中实时检查和调试任务代码。
在代码中启用交互式任务
要启用交互式任务,需要:
- 包含
flytekitplugins-flyteinteractive
作为依赖项 - 在需要交互化的任务上使用
@vscode
装饰器
应用 @vscode
装饰器后,该装饰器会在运行时将任务转换为 Visual Studio Code 服务器。此过程会覆盖任务函数体的标准执行流程,转而启动 Visual Studio Code 服务器。
无需配置 ingress 或端口转发
Flyte 交互式任务功能是对开源 FlyteInteractive 插件 的改进版本。相比开源版本,移除了 ingress 配置和端口转发的需求,提供更无缝的调试体验。
基础示例
以下示例演示简单工作流中的交互式任务。
requirements.txt
本 requirements.txt
文件被本节所有示例使用:
flytekit
flytekitplugins-flyteinteractive
example.py
"""使用交互式任务(@vscode)的Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
注册并运行工作流
要将代码注册到 Flyte 项目并运行工作流,请按照 运行代码 中的说明操作
访问 IDE
- 在工作流页面选择第一个任务(本示例中任务名为
say_hello
)。任务信息面板将显示在页面右侧 - 等待任务进入 Running 状态并出现 VSCode (User) 链接
- 点击 VSCode (User) 链接
检查任务代码
IDE 打开后,您可以在编辑器中查看任务代码。
交互式调试
要在 VSCode 中运行任务,点击 IDE 左侧导航栏的 Run and debug 图标,选择 Interactive Debugging 配置。
点击配置下拉框旁的 Play 按钮运行任务。这将使用前序任务的输入参数运行任务。要检查中间状态,可在 Python 代码中设置断点并使用调试器跟踪。
任务输出不会写入 Flyte 存储
请注意在调试阶段任务完全在 VSCode 中运行,不会将输出写入 Flyte 存储。
更新代码
您可以在 VSCode 环境中编辑代码并重新运行任务以查看更改。但请注意,这些更改不会自动持久化存储,需要手动将更改复制回本地环境。
恢复任务
调试完成后,可通过执行 Resume Task 配置使用更新后的代码恢复任务。这将终止代码服务器,使用前序任务的输入参数运行任务,并将输出写入 Flyte 存储。
请记得持久化代码
恢复任务前请确保代码已持久化(例如提交到 GitHub),因为此后将失去与 VSCode 服务器的连接。
辅助 Python 文件
您会注意到除了代码外,VSCode 文件浏览器中还有一些系统自动生成的附加文件:
flyteinteractive_interactive_entrypoint.py
flyteinteractive_interactive_entrypoint.py
脚本实现了我们之前使用的 Interactive Debugging 操作:
flyteinteractive_resume_task.py
flyteinteractive_resume_task.py
脚本实现了我们之前使用的 Resume Task 操作:
launch.json
.vscode
目录中的 launch.json
文件配置了 Interactive Debugging 和 Resume Task 操作:
集成终端
除了使用辅助文件定义的便捷函数,您还可以直接通过集成终端使用 python <脚本名称>.py
运行 Python 代码脚本(本例中为 python hello.py
)。
安装扩展
与本地 VSCode 类似,您可以安装各种扩展来辅助开发。由于法律原因,可用扩展与官方 VSCode 不同,托管在 Open VSX Registry。
Python 和 Jupyter 扩展默认已安装。可以通过定义配置对象并将其传递给 @vscode
装饰器来添加其他扩展,如下所示:
example-extensions.py
"""使用扩展的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import COPILOT_EXTENSION, VscodeConfig, vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)config = VscodeConfig()
config.add_extensions(COPILOT_EXTENSION) # 使用预定义的URL
config.add_extensions("https://open-vsx.org/api/vscodevim/vim/1.27.0/file/vscodevim.vim-1.27.0.vsix"
) # 从 Open VSX 复制原始URL@fl.task(container_image=image)
@vscode(config=config)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
资源管理
为管理资源,VSCode 服务器在空闲一段时间(无活跃 HTTP 连接)后会自动终止。空闲状态通过心跳文件监控。
max_idle_seconds
参数可用于设置 VSCode 服务器在终止前允许的最大空闲秒数。
example-manage-resources.py
"""使用 max_idle_seconds 参数的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode(max_idle_seconds=60000)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
前置与后置钩子
交互式任务还支持注册在 VSCode 启动前后执行的函数,可用于需要设置或清理的任务。
example-pre-post-hooks.py
"""使用前置后置钩子的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)def set_up_proxy():print("set up")def push_code():print("push code")@fl.task(container_image=image)
@vscode(pre_execute=set_up_proxy, post_execute=push_code)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
仅在任务失败时启动 VSCode
系统可配置为仅在任务失败后启动 VSCode(防止任务终止以便检查),通过将 run_task_first
参数设为 True
实现。
example-run-task-first.py
"""使用 run_task_first 参数的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode(run_task_first=True)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting
调试执行问题
通过检查任务和工作流执行日志链接可进行深度调试。
使用 --details
标志可查看带有日志链接的节点执行:
└── n1 - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC└── Attempt :0└── Task - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC└── Logs :└── Name :Kubernetes Logs (User)└── URI :http://localhost:30082/#/log/flytectldemo-development/f3a5a4034960f4aa1a09-n1-0/pod?namespace=flytectldemo-development
此外,可检查 \<项目>-\<域>
命名空间中启动的 Pod:
kubectl get pods -n <project>-<domain>
启动的 Pod 会以执行名称作为前缀,并带有 nodeId
后缀:
NAME READY STATUS RESTARTS AGE
f65009af77f284e50959-n0-0 0/1 ErrImagePull 0 18h
例如,上述输出中 STATUS
显示存在镜像拉取问题。
任务资源验证
在 Flyte 中,当您尝试执行包含不可满足资源请求的工作流时,系统会立即终止执行而非允许其无限期排队等待。
我们通过 executions service 拦截执行创建请求以验证其资源需求是否可被满足。若验证不通过,将快速失败并返回如下错误信息:
Request failed with status code 400 rpc error: code = InvalidArgument desc = no node satisfies task 'workflows.fotd.fotd_directory' resource requests
在本地集群中运行
在本地 Kubernetes 集群中运行
虽然最终您将在 Flyte 的 Kubernetes 集群中运行工作流,但在本地机器上的集群中测试工作流会非常方便。
首先请确保已安装 Docker(或其他兼容 OCI 标准的容器引擎),并确保_守护进程正在运行_。
使用 flytectl
启动演示集群:
flytectl demo start
配置
当 flytectl
在本地容器引擎中启动集群时,会将配置信息写入 ~/.flyte/
目录。
最重要的配置文件是 ~/.flyte/config-sandbox.yaml
,其中包含目标 Kubernetes 集群的连接信息:
admin:endpoint: localhost:30080 # 集群端点地址authType: Pkce # 认证类型insecure: true # 使用非安全连接
console:endpoint: http://localhost:30080 # 控制台地址
logger:show-source: true # 显示日志来源level: 0 # 日志级别
当前配置指向本地 Docker 实例(localhost:30080
),后续可修改为指向 Flyte 集群。
使用 flytectl
或 pyflyte
时需指定目标集群配置,可通过两种方式实现:
- 显式指定配置文件路径
flytectl --config ~/.flyte/config-sandbox.yaml <命令>
pyflyte --config ~/.flyte/config-sandbox.yaml <命令>
- 设置环境变量
export FLYTECTL_CONFIG=~/.flyte/config-sandbox.yaml
本文档假设您已设置 FLYTECTL_CONFIG
环境变量。
启动工作流
添加 --remote
参数即可在本地集群中运行工作流:
$ pyflyte run --remote \workflows/example.py \training_workflow \--hyperparameters '{"C": 0.1}'
命令输出将包含工作流执行页面的 URL。
查看结果
访问 pyflyte run
生成的 URL,即可在 Flyte UI 中查看工作流执行详情。
使用默认镜像的本地集群
pyflyte run --remote my_file.py my_workflow
(需配置 pyflyte
指向 flytectl demo start
启动的本地集群)
- 任务代码在本地集群的默认镜像环境中运行
- Python 代码在运行时动态注入容器
- 仅支持依赖项已包含在默认镜像中的 Python 代码(参见说明文档)
- 包含本地 S3 存储
- 支持部分(非全部)插件
- 单工作流即时执行
- 工作流注册到默认项目
- 适用于演示场景
使用自定义镜像的本地集群
$ pyflyte run --remote \--image my_cr.io/my_org/my_image:latest \my_file.py \my_workflow
(需配置 pyflyte
指向 flytectl demo start
启动的本地集群)
- 任务代码在自定义镜像(
my_cr.io/my_org/my_image:latest
)环境中运行 - Python 代码在运行时动态注入容器
- 支持任意 Python 依赖项(通过自定义镜像控制)
- 包含本地 S3 存储
- 支持部分(非全部)插件
- 单工作流即时执行
- 工作流注册到默认项目
- 适用于开发周期中的高级测试场景
Jupyter notebooks
Flyte 支持在交互式 Jupyter notebook 环境中开发、运行和调试任务(task)及工作流(workflow),这能加快构建数据驱动或机器学习驱动应用时的迭代速度。
在单元格中编写工作流与任务
在 notebook 中构建任务和工作流时,您可以像常规操作一样在单元格中编写代码。
通过点击运行按钮,您可以在本地运行这些单元格中的代码(即在 notebook 本身而非 Flyte 平台上运行),这与普通 notebook 操作一致。
启用 notebook 向 Flyte 注册工作流
要使 notebook 中的任务和工作流能轻松注册到 Flyte 实例并运行,您需要设置一个_交互式_FlyteRemote 对象,然后通过它调用远程执行:
首先,在单元格中创建交互式 FlyteRemote 对象:
from flytekit.configuration import Config
from flytekit.remote import FlyteRemoteremote = FlyteRemote(config=Config.auto(),default_project="default",default_domain="development",interactive_mode_enabled=True,
)
当在 Jupyter notebook 环境中运行时,必须将 interactive_mode_enabled
标志设置为 True
,以启用工作流的交互式注册和执行功能。
接着,在另一个单元格中设置执行调用:
execution = remote.execute(my_task, inputs={"name": "Joe"})
execution = remote.execute(my_wf, inputs={"name": "Anne"})
交互式 FlyteRemote 客户端会在 notebook 中重新定义实体时(包括重新执行包含实体定义的单元格,即使实体未发生变更)自动重新注册该实体。此行为有效支持了在 Jupyter notebook 中迭代开发和调试任务及工作流的需求。
数据看板(Decks)
数据看板功能允许在任务代码中展示定制化的数据可视化效果。看板以 HTML 形式渲染,当工作流运行时可直接在 Flyte UI 中显示。
该功能为可选特性,启用时需在任务参数中将 enable_deck
设为 True
。
初始化配置
首先导入必要依赖:
import flytekit as fl
from flytekit.deck.renderer import MarkdownRenderer
from sklearn.decomposition import PCA
import plotly.express as px
import plotly
渲染器独立于 flytekit
主包。要使用上述导入的 MarkdownRenderer
,需先在本地 Python 环境中安装 flytekitplugins-deck-standard
包,并将其包含在 ImageSpec
中(如下所示)。
镜像配置
在 ImageSpec
中声明所需依赖:
custom_image = fl.ImageSpec(packages=["flytekitplugins-deck-standard","markdown","pandas","pillow","plotly","pyarrow","scikit-learn","ydata_profiling",],
)
任务定义
创建名为 pca
的新看板,并渲染 Markdown 内容与 主成分分析(PCA) 图表:
@fl.task(enable_deck=True, container_image=custom_image)
def pca_plot():iris_df = px.data.iris()X = iris_df[["sepal_length", "sepal_width", "petal_length", "petal_width"]]pca = PCA(n_components=3)components = pca.fit_transform(X)total_var = pca.explained_variance_ratio_.sum() * 100fig = px.scatter_3d(components,x=0,y=1,z=2,color=iris_df["species"],title=f"总解释方差: {total_var:.2f}%",labels={"0": "PC 1", "1": "PC 2", "2": "PC 3"},)main_deck = fl.Deck("pca", MarkdownRenderer().to_html("### 主成分分析"))main_deck.append(plotly.io.to_html(fig))
注意使用 append
方法将 Plotly 图表追加至 Markdown 看板。
执行输出
预期输出包含 deck.html
文件路径信息:
{"asctime": "2023-07-11 13:16:04,558", "name": "flytekit", "levelname": "INFO", "message": "pca_plot 任务创建看板 HTML 文件至 file:///var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-0_8qfjdd/sandbox/local_flytekit/c085853af5a175edb17b11cd338cbd61/deck.html"}
在 Flyte 实例执行该任务后,可通过任务视图点击 Deck 按钮访问看板:
看板标签页
每个看板至少包含三个标签页:输入(input)、输出(output)和默认(default)。输入输出页用于渲染任务的输入输出数据,默认页可用于创建折线图、散点图、Markdown 文本等自定义渲染。支持创建额外标签页。
看板渲染器
渲染器独立于 flytekit
主包。使用时需先在本地 Python 环境安装 flytekitplugins-deck-standard
包,并将其包含在 ImageSpec
中。
数据框分析渲染器
生成 Pandas DataFrame 的统计分析报告:
import flytekit as fl
import pandas as pd
from flytekitplugins.deck.renderer import FrameProfilingRenderer@fl.task(enable_deck=True, container_image=custom_image)
def frame_renderer() -> None:df = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})fl.Deck("数据框渲染器", FrameProfilingRenderer().to_html(df=df))
顶部数据框渲染器
将 DataFrame 渲染为 HTML 表格:
import flytekit as fl
from typing import Annotated
from flytekit.deck import TopFrameRenderer@fl.task(enable_deck=True, container_image=custom_image)
def top_frame_renderer() -> Annotated[pd.DataFrame, TopFrameRenderer(1)]:return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4])
Markdown 渲染器
将 Markdown 字符串转换为 HTML:
import flytekit as fl
from flytekit.deck import MarkdownRenderer@fl.task(enable_deck=True, container_image=custom_image)
def markdown_renderer() -> None:fl.current_context().default_deck.append(MarkdownRenderer().to_html("安装 flytekit 的命令: ```import flytekit```"))
箱线图渲染器
将 DataFrame 行数据分组渲染为箱线图,可视化数据分布:
import flytekit as fl
from flytekitplugins.deck.renderer import BoxRenderer@fl.task(enable_deck=True, container_image=custom_image)
def box_renderer() -> None:iris_df = px.data.iris()fl.Deck("箱线图", BoxRenderer("sepal_length").to_html(iris_df))
图像渲染器
将 FlyteFile
或 PIL.Image.Image
对象转换为 base64 编码的 HTML 可显示图像:
import flytekit as fl
from flytekitplugins.deck.renderer import ImageRenderer@fl.task(enable_deck=True, container_image=custom_image)
def image_renderer(image: fl.FlyteFile) -> None:fl.Deck("图像渲染器", ImageRenderer().to_html(image_src=image))@fl.workflow
def image_renderer_wf(image: fl.FlyteFile = "https://bit.ly/3KZ95q4") -> None:image_renderer(image=image)
表格渲染器
将 Pandas DataFrame 转换为 HTML 表格:
import flytekit as fl
from flytekitplugins.deck.renderer import TableRenderer@fl.task(enable_deck=True, container_image=custom_image)
def table_renderer() -> None:fl.Deck("表格渲染器",TableRenderer().to_html(df=pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}), table_width=50),)
贡献新渲染器
欢迎通过 renderer.py 提交能增强数据可视化的新渲染器。我们鼓励您发起 Pull Request 共同完善 Flyte 看板生态系统!
自定义渲染器
可创建继承 to_html
方法的渲染器类。以下示例创建 DataFrame 摘要渲染器:
class DataFrameSummaryRenderer:def to_html(self, df: pd.DataFrame) -> str:assert isinstance(df, pd.DataFrame)return df.describe().to_html()
使用 Annotated 类型覆盖默认渲染器:
try:from typing import Annotated
except ImportError:from typing_extensions import Annotated@task(enable_deck=True)
def iris_data(sample_frac: Optional[float] = None,random_state: Optional[int] = None,
) -> Annotated[pd.DataFrame, DataFrameSummaryRenderer()]:data = px.data.iris()if sample_frac is not None:data = data.sample(frac=sample_frac, random_state=random_state)md_text = ("# 鸢尾花数据集\n""本任务使用 `plotly` 包加载鸢尾花数据集")flytekit.current_context().default_deck.append(MarkdownRenderer().to_html(md_text))flytekit.Deck("箱线图", BoxRenderer("sepal_length").to_html(data))return data
流式看板
使用 Deck.publish()
实现看板实时流式更新:
import flytekit as fl@task(enable_deck=True)
def t_deck():fl.Deck.publish()
该功能创建实时看板,在任务成功前可通过刷新按钮查看动态更新。
联合看板成功演示视频
https://youtu.be/LJaBP0mdFeE
联合看板失败演示视频
https://youtu.be/xaBF6Jlzjq0
FlyteRemote
FlyteRemote
Python API 提供与 Pyflyte CLI 相似的功能,允许您从 Python 代码中管理 Flyte 工作流、任务、启动计划和构件。
FlyteRemote
的主要用例是实现 Flyte 实体的自动化部署。因此,它适用于在实际 Flyte 工作流和任务代码之外的脚本中使用,例如 CI/CD 管道脚本。
重要提示:不要在任务代码内部使用 FlyteRemote
。
创建 FlyteRemote 对象
确保已安装 Flytekit SDK,导入 FlyteRemote
类并按如下方式创建对象:
from pyflyte import FlyteRemoteremote = FlyteRemote()
当使用无参构造函数创建时,FlyteRemote
默认使用本地环境的现有配置连接 Flyte,即与该环境中 Pyflyte CLI 使用的配置相同(参见 Pyflyte CLI 配置搜索路径)。
默认情况下,与 Pyflyte CLI 相同,所有操作将应用于默认项目 flytesnacks
和默认域 development
。
您也可以通过显式指定包含 Flyte 实例连接信息的 flytekit.configuration.Config
对象、项目及域来初始化 FlyteRemote
。此外,构造函数支持指定文件上传位置(等效于默认原始数据前缀):
from pyflyte import FlyteRemote
from flytekit.configuration import Configremote = FlyteRemote(config=Config.for_endpoint(endpoint="union.example.com"),default_project="my-project",default_domain="my-domain",data_upload_location="<s3|gs|abs>://my-bucket/my-prefix",
)
这里我们使用 Config.for_endpoint
方法指定连接端点 URL。配置 Config
对象有多种方式,总体上与使用 config.yaml
文件为 Pyflyte CLI 指定连接时具有相同的选项。
使用客户端密钥认证
在 CI/CD 管道或通过 SSH 运行 FlyteRemote
脚本时,可能无法使用浏览器进行默认认证流程。这种情况下可使用客户端密钥认证方法建立 Flyte 连接。创建 API 密钥后,按如下方式初始化 FlyteRemote
:
from pyflyte import FlyteRemote
from flytekit.configuration import Config, PlatformConfigremote = FlyteRemote(config=Config(platform=PlatformConfig(endpoint="union.example.com",insecure=False,client_id="<your-client-id>", # 此处为 API 密钥名称client_credentials_secret="<your-client-secret>", # 此处为 API 密钥auth_mode="client_credentials",)),)
详细说明参见 flytekit.configuration.Config
API 文档
FlyteRemote 示例
注册并运行工作流
以下示例展示如何注册并运行工作流,同时获取其输出结果:
├── remote.py
└── workflow├── __init__.py└── example.py
将被注册并运行在 Flyte 上的工作流代码位于 workflow
目录,包含空文件 __init__.py
和存放工作流与任务代码的 example.py
:
import os
import flytekit as fl@fl.task()
def create_file(message: str) -> fl.FlyteFile:with open("data.txt", "w") as f:f.write(message)return fl.FlyteFile(path="data.txt")@fl.workflow
def my_workflow(message: str) -> fl.FlyteFile:f = create_file(message)return f
remote.py
文件包含 FlyteRemote
逻辑,不属于工作流代码,应在本地机器运行:
import flytekit as fl
from workflow.example import my_workflowdef run_workflow():remote = fl.FlyteRemote()remote.fast_register_workflow(entity=my_workflow)execution = remote.execute(entity=my_workflow,inputs={"message": "Hello, world!"},wait=True)output = execution.outputs["o0"]print(output)with open(output, "r") as f:read_lines = f.readlines()print(read_lines)
my_workflow
工作流和 create_file
任务被注册并运行。工作流完成后,输出结果将传回 run_workflow
函数并打印。
输出结果也可通过 UI 在 create_file
任务详情视图的 Outputs 选项卡查看:
以上步骤展示了使用 FlyteRemote
注册和运行工作流的最简方法。更多选项和细节请参考 API 参考 > FlyteRemote > 入口点。
获取输出结果
默认情况下 FlyteRemote.execute
是非阻塞的,但也可以通过设置 wait=True
使其同步等待任务/工作流完成(如前例所示)。
打印执行对应的 Flyte 控制台 URL:
print(f"Execution url: {remote.generate_console_url(execution)}")
使用 sync()
方法同步执行对象与远程状态:
synced_execution = remote.sync(execution)
print(synced_execution.inputs) # 打印输入参数
也可以在启动执行后等待并获取输出:
completed_execution = remote.wait(execution)
print(completed_execution.outputs) # 打印输出结果
终止工作流的所有运行中执行
此示例展示如何终止指定工作流名称的所有运行中执行:
import flytekit as fl
from dataclasses import dataclass
import json
from flytekit.configuration import Config
from flytekit.models.core.execution import NodeExecutionPhase@dataclass
class Execution:name: strlink: strSOME_LARGE_LIMIT = 5000
PHASE = NodeExecutionPhase.RUNNING
WF_NAME = "your_workflow_name"
EXECUTIONS_TO_IGNORE = ["some_execution_name_to_ignore"]
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint=ENDPOINT),default_project=PROJECT,default_domain=DOMAIN,
)executions_of_interest = []executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)for e in executions:if e.closure.phase == PHASE:if e.spec.launch_plan.name == WF_NAME:if e.id.name not in EXECUTIONS_TO_IGNORE:execution_on_interest = Execution(name=e.id.name, link=f"https://{ENDPOINT}/console/projects/{PROJECT}/domains/{DOMAIN}/executions/{e.id.name}")executions_of_interest.append(execution_on_interest)remote.terminate(e, cause="Terminated manually via script.")with open('terminated_executions.json', 'w') as f:json.dump([{'name': e.name, 'link': e.link} for e in executions_of_interest], f, indent=2)print(f"Terminated {len(executions_of_interest)} executions.")
重新运行工作流的所有失败执行
此示例展示如何识别指定工作流在特定时间后的所有失败执行,并使用相同输入和固定工作流版本重新运行:
import datetime
import pytz
import flytekit as fl
from flytekit.models.core.execution import NodeExecutionPhaseSOME_LARGE_LIMIT = 5000
WF_NAME = "your_workflow_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"
VERSION = "your_target_workflow_version"remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint=ENDPOINT),default_project=PROJECT,default_domain=DOMAIN,
)executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)failures = [\NodeExecutionPhase.FAILED,\NodeExecutionPhase.ABORTED,\NodeExecutionPhase.FAILING,\
]# 最近一次成功执行的时间
date = datetime.datetime(2024, 10, 30, tzinfo=pytz.UTC)# 按名称过滤执行
filtered = [execution for execution in executions if execution.spec.launch_plan.name == WF_NAME]# 按状态过滤执行
failed = [execution for execution in filtered if execution.closure.phase in failures]# 按时间窗口过滤执行
windowed = [execution for execution in failed if execution.closure.started_at > date]# 获取每个执行的输入参数
inputs = [remote.sync(execution).inputs for execution in windowed]# 获取新版本工作流实体
workflow = remote.fetch_workflow(name=WF_NAME, version=VERSION)# 为每个失败执行重新运行新工作流
[remote.execute(workflow, inputs=X) for X in inputs]
使用 Filter 筛选执行
此示例展示如何使用 Filter
筛选特定执行:
from flytekit.models import filters
import flytekit as flWF_NAME = "your_workflow_name"
LP_NAME = "your_launchplan_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"remote = fl.FlyteRemote.for_endpoint(ENDPOINT)# 仅查询当前项目的执行
project_filter = filters.Filter.from_python_std(f"eq(workflow.name,{WF_NAME})")
project_executions = remote.recent_executions(project=PROJECT, domain=DOMAIN, filters=[project_filter])# 查询最近成功且耗时8-16分钟的执行
latest_success = remote.recent_executions(limit=1,filters=[\filters.Equal("launch_plan.name", LP_NAME),\filters.Equal("phase", "SUCCEEDED"),\filters.GreaterThan("duration", 8 * 60),\filters.LessThan("duration", 16 * 60),\],
)
通过 FlyteRemote 启动新版本任务
import flytekit as fl
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, SerializationSettings# FlyteRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取任务
task = remote.fetch_task(name="workflows.example.generate_normal_df", version="v1")task = remote.register_task(entity=flyte_task,serialization_settings=SerializationSettings(image_config=None),version="v2",
)# 运行任务
execution = remote.execute(task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name="task-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name_prefix="flyte", wait=True
)# 检查执行状态
# 'inputs'和'outputs'对应任务执行
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()
通过 FlyteRemote 启动工作流
由于底层会获取并触发默认启动计划,工作流可通过UnionRemote执行:
import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取工作流
workflow = remote.fetch_workflow(name="workflows.example.wf", version="v1")# 执行
execution = remote.execute(workflow, inputs={"mean": 1}, execution_name="workflow-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(workflow, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)
通过 FlyteRemote 启动启动计划
可通过FlyteRemote以编程方式启动启动计划:
import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取启动计划
lp = remote.fetch_launch_plan(name="workflows.example.wf", version="v1", project="flytesnacks", domain="development"
)# 执行
execution = remote.execute(lp, inputs={"mean": 1}, execution_name="lp-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(lp, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)
检查执行状态
通过FlyteRemote可获取执行的输入输出并进行检查:
import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()# inputs和outputs对应顶层执行或工作流本身
# 获取特定输出(如模型文件):
model_file = execution.outputs["model_file"]
with open(model_file) as f:...# 使用UnionRemote.sync()在执行过程中同步实体对象状态
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()# node_executions将递归获取所有底层节点执行
# 获取特定节点执行的输出:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]
从 Airflow 迁移到 Flyte
许多 Airflow Operator 和 Sensor 已在 Flyte 上完成测试,但部分功能可能无法按预期工作。如遇问题,请提交 issue
或通过 Slack
联系 Flyte 社区。
Flyte 能够在不修改代码的情况下将 Airflow 任务编译为 Flyte 任务,这使得迁移 Airflow DAG 到 Flyte 的工作量降至最低。
除了迁移能力外,Flyte 用户还能无缝集成 Airflow 任务到工作流中,充分利用 Airflow Operator 和 Sensor 的生态系统。通过将强大的 Airflow 生态系统与 Flyte 的可扩展性、版本控制和可复现性等核心能力相结合,用户可以更轻松地运行复杂的数据和机器学习工作流。更多信息请参阅 Airflow 连接器文档
。
现有 Flyte 用户指南
即使您已在 Flyte 上运行工作流且无迁移需求,仍可将 Airflow 任务集成至 Flyte 工作流。例如,Airflow 提供 Google Cloud Dataproc Operators
支持,可便捷地在 Google Cloud Dataproc 集群上执行 Spark 作业。您无需在 Flyte 中开发自定义插件,即可直接集成 Airflow 的 Dataproc Operator 来执行 Spark 作业。
前置条件
- 在 Python 环境中安装
flytekitplugins-airflow
- 在 Flyte 集群中启用 {ref}
Airflow 连接器<deployment-connector-setup-airflow>
实施步骤
1. 在 Flyte 工作流中定义 Airflow 任务
Flytekit 会将 Airflow 任务编译为 Flyte 任务,因此可以在 Flyte 工作流中使用任意 Airflow Sensor 或 Operator:
from flytekit import task, workflow
from airflow.sensors.filesystem import FileSensor@task
def say_hello() -> str:return "Hello, World!"@workflow
def airflow_wf():flyte_task = say_hello()airflow_task = FileSensor(task_id="sensor", filepath="/")airflow_task >> flyte_taskif __name__ == "__main__":print(f"Running airflow_wf() {airflow_wf()}")
2. 本地测试工作流
本地运行前需配置 Airflow 连接
,通过设置 AIRFLOW_CONN_{CONN_ID}
环境变量实现。例如:
export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2'
虽然 Airflow 本身不支持本地执行,但包含 Airflow 任务的工作流可在本地运行,这对生产环境前的测试和调试非常有用:
AIRFLOW_CONN_FS_DEFAULT="/" pyflyte run workflows.py airflow_wf
部分 Airflow Operator 需要特定权限才能执行。例如 DataprocCreateClusterOperator
需要 dataproc.clusters.create
权限。本地运行 Airflow 任务时,可能需要配置本地权限以确保任务成功执行。
3. 迁移工作流至生产环境
生产环境中建议将连接信息存储在 密钥后端
,并确保连接器 Pod 具有访问外部密钥后端所需的权限(IAM 角色)。
完成本地测试后,可使用 --remote
标志在 Flyte 集群上执行工作流。此时 Flyte 会在 Kubernetes 集群中创建 Pod 来运行 say_hello
任务,并通过 Airflow 连接器运行 Airflow 的 BashOperator
任务:
pyflyte run --remote workflows.py airflow_wf
测试
flytekit Python SDK 提供了一些实用程序,方便您在测试套件中测试任务和工作流程。更多详细信息,请参阅 API 参考中的 flytekit.testing
模块。
模拟任务
许多编写的任务可以在本地运行,但部分任务可能无法本地执行,通常是因为这些任务依赖于仅在后端可用的第三方服务。Hive任务是典型示例,因为大多数开发者无法从本地开发环境访问执行Hive查询的服务。然而,能够本地运行调用此类任务的工作流仍然非常有用。为此,flytekit提供了多种实用工具来应对这种情况。
例如,这是一个通用SQL任务(默认未连接任何数据存储且未被任何插件处理),若要在单元测试中使用必须进行模拟:
import datetimeimport pandas
from flytekit import SQLTask, TaskMetadata, kwtypes, task, workflow
from flytekit.testing import patch, task_mock
from flytekit.types.schema import FlyteSchemasql = SQLTask("my-query",query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10",inputs=kwtypes(ds=datetime.datetime),outputs=kwtypes(results=FlyteSchema),metadata=TaskMetadata(retries=2),
)@task
def t1() -> datetime.datetime:return datetime.datetime.now()
假设存在使用这两个任务的工作流:
@workflow
def my_wf() -> FlyteSchema:dt = t1()return sql(ds=dt)
若无模拟,调用工作流通常会抛出异常。但使用返回MagicMock
对象的task_mock
构造器,我们可以覆盖返回值:
def test_demonstrate_mock():with task_mock(sql) as mock:mock.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all()
另一个实用工具patch
提供相同功能,但采用传统Python补丁风格,其中第一个参数是MagicMock
对象:
@patch(sql)
def test_demonstrate_patch(mock_sql):mock_sql.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all()
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。