当前位置: 首页 > news >正文

生产级编排AI工作流套件:Flyte全面使用指南 — Development cycle

生产级编排AI工作流套件:Flyte全面使用指南 — Development cycle

Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。

文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。

Flyte

开发周期

本节介绍如何为 Flyte 开发可用于生产的工作流程。

认证

使用命令行界面(CLI)与 Flyte 交互时需要进行身份认证。认证方式根据本地或远程工作环境有所不同。本指南将介绍不同的认证机制,并帮助您选择最适合的解决方案。

开始认证前,请确保已安装 Pyflyte CLI。详情参见本地设置。

认证方法

Pyflyte CLI 支持三种认证机制:

认证方法本地可用?远程可用?使用场景
PKCE(默认)✅ 是❌ 否本地浏览器环境最佳选择
DeviceFlow✅ 是✅ 是无浏览器的远程环境(如 SSH 会话)
ClientSecret✅ 是✅ 是CI/CD 流水线或自动化场景

使用 pyflyte create login --host <union-host-url> 命令时默认采用 PKCE 方式。

1. PKCE(Proof Key of Code Exchange)

PKCE 是默认认证方式。运行 Pyflyte CLI 命令时,会自动打开浏览器窗口进行认证。

认证流程:

  • 执行 Pyflyte CLI 命令
  • 自动重定向至默认浏览器完成登录

示例配置:

admin:endpoint: https://<YourOrg>.hosted.unionai.cloud  # 组织专属端点insecure: false                                   # 启用安全连接authType: Pkce                                    # 认证类型
logger:show-source: true                                 # 显示日志来源level: 0                                          # 日志级别

PKCE 需要本地浏览器支持,因此不适用于通过 SSH 会话连接的远程机器。

2. DeviceFlow(远程环境最佳方案)

在无浏览器的远程机器上使用 DeviceFlow 认证方式。该方法会生成可在本地浏览器打开的认证链接。

认证流程:

  • 执行 Pyflyte CLI 命令
  • CLI 返回认证 URL
  • 在本地浏览器打开 URL 完成登录

示例配置:

admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud   # DNS 端点配置insecure: false                                   # 启用安全连接authType: DeviceFlow                              # 认证类型
logger:show-source: true                                 # 显示日志来源level: 0                                          # 日志级别

认证过程中,Flyte 会尝试将认证令牌存储至操作系统的密钥管理服务。在基于 Linux 的 SSH 会话环境中,若发现每次运行都需要浏览器认证,可能需要执行 pip install keyringpip install keyrings.alt 安装密钥管理服务。

3. ClientSecret(CI/CD 自动化场景推荐)

ClientSecret 是无头认证方案,专为自动化流程和 CI/CD 场景设计。

ClientSecret 认证配置步骤:

  1. 创建 API 密钥:

    pyflyte create api-key admin --name my-custom-name
    

    输出将包含 Client ID 和 API Key。请妥善保存 API Key(后续不可见)。

  2. 配置 ClientSecret 认证:

    • 使用本地文件

      admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud   # DNS 端点配置insecure: false                                   # 启用安全连接authType: ClientSecret                            # 认证类型clientId: <YourClientId>                         # 客户端IDclientSecretLocation: /path/to/secret.txt         # 密钥文件路径
      logger:show-source: true                                 # 显示日志来源level: 0                                          # 日志级别
      
    • 使用环境变量

      admin:endpoint: dns:///<YourOrg>.hosted.unionai.cloud   # DNS 端点配置insecure: false                                   # 启用安全连接authType: ClientSecret                            # 认证类型clientId: <YourClientId>                         # 客户端IDclientSecretEnvVar: FLYTE_API_KEY                # 环境变量名称
      logger:show-source: true                                 # 显示日志来源level: 0                                          # 日志级别
      
  3. 设置环境变量(环境变量方式):

    export FLYTE_API_KEY="<SECRET>"
    

重要:切勿将 API 密钥提交至版本控制系统。建议使用环境变量或安全凭证库管理。

管理认证配置

Pyflyte CLI 默认读取 ~/.flyte/config.yaml 配置文件。可通过以下方式覆盖默认配置:

  • 设置环境变量:

    export FLYTECTL_CONFIG=~/.my-config-location/my-config.yaml
    
  • 使用 --config 参数:

    pyflyte --config ~/.my-config-location/my-config.yaml run my_script.py my_workflow
    

认证问题排查

  • 切换认证方式:更新 ~/.flyte/config.yaml 或使用其他配置文件
  • 重复登录提示:Linux 系统使用 DeviceFlow 时,请安装密钥服务 pip install keyring keyrings.alt

项目结构

有效组织工作流项目仓库是确保可扩展性、协作性和易维护性的关键。以下是构建 Flyte 工作流项目仓库的最佳实践,涵盖任务组织、工作流管理、依赖处理和文档管理。

推荐目录结构

典型的 Flyte 工作流项目结构如下所示:

├── .github/workflows/
├── .gitignore
├── docs/
│   └── README.md
├── src/
│   ├── core/                    # 用例专用的核心逻辑
│   │   ├── __init__.py
│   │   ├── model.py
│   │   ├── data.py
│   │   └── structs.py
│   ├── tasks/                   # 包含独立任务
│   │   ├── __init__.py
│   │   ├── preprocess.py
│   │   ├── fit.py
│   │   ├── test.py
│   │   └── plot.py
│   ├── workflows/               # 包含工作流定义
│   │   ├── __init__.py
│   │   ├── inference.py
│   │   └── train.py
│   └── orchestration/           # 辅助构造(如 secrets、images)
│       ├── __init__.py
│       └── constants.py
├── uv.lock
└── pyproject.toml

该结构旨在确保每个项目组件都有清晰、合理的归属位置,方便团队成员查找和修改文件。

组织任务和工作流

在 Flyte 中,任务是工作流的构建模块,因此需要直观地组织它们:

  • 任务:将每个任务存储在 tasks/ 目录下的独立文件中。如果多个任务密切相关,可考虑将它们分组到模块中。或者,每个任务可以有自己的模块以实现更细粒度的组织,并使用子目录对相似任务进行分组

  • 工作流:将组合成端到端流程的工作流存储在 workflows/ 目录中。这种分离确保工作流与核心任务逻辑独立组织,促进模块化和重用

编排目录的辅助构造

应包含 orchestration/union_utils/ 等目录来存放辅助工作流编排的构造。该目录可包含以下类型的辅助文件:

  • Secrets:定义 Flyte 中访问密钥(如 API keys)的方式

  • ImageSpec:简化容器管理的工具,无需直接编写 Dockerfiles

工作流专用核心逻辑

使用 core/ 目录存放工作流专用的业务逻辑。这将核心应用代码与工作流编排代码分离,提高可维护性,并帮助新成员更快理解核心功能。

__init__.py 的重要性

在每个目录中添加 __init__.py 文件至关重要:

  • 导入功能:这些文件使目录成为 Python 包,实现跨模块的正确导入

  • Flyte 快速注册:执行快速注册时,Flyte 将第一个没有 __init__.py 的目录视为根目录。Flyte 会将根目录及其内容打包成 tarball,简化注册流程,避免每次代码变更都重建容器镜像

Monorepo vs 多仓库:结构选择

与多团队协作时有两种主要选择:

  • Monorepo:所有团队共享单一仓库,可简化依赖管理并支持共享构造。但会增加权限管理和不同团队版本控制的复杂度

  • 多仓库:为每个团队或项目创建独立仓库可提高隔离性和控制力。这种情况下,建议为多个团队使用的构造创建可安装的共享包,确保一致性而无需合并代码库

CI/CD

GitHub Action 应实现以下功能:

  • 在合并到领域分支时注册(需要时升级)
  • 在合并输入 YAML 时执行
  • 将 git SHA 注入为实体版本

文档与文档字符串

建议编写清晰的文档字符串,因为它们会自动传播到 Flyte UI。这为查看 UI 中工作流和任务的用户提供有用上下文,减少查阅源代码解释的需求。

项目与域

项目(Project)和域(Domain)是 Flyte 中用于组织工作流的核心分类单元。

项目定义了具有共同功能目标的任务(task)、工作流(workflow)、启动计划(launch plan)等实体的分组。域则代表项目实体在开发周期中经历的不同阶段环境。

默认情况下,Flyte 提供三个预配置域:developmentstagingproduction。在初始配置时,您可以根据需求自定义 Flyte 实例的域设置。如需详细信息,请联系 Flyte 团队。

项目与域之间是正交关系,即一个项目可存在于多个域中,一个域也可包含多个项目。

示例配置结构:

DevelopmentStagingProduction
Project 1workflow_1 (v2.0)workflow_1 (v1.0)workflow_1 (v1.0)
Project 2workflow_2 (v2.0)workflow_2 (v1.0)workflow_2 (v1.0)

项目

项目代表与特定团队、业务领域或应用程序相关的独立工作流集合。每个项目相互隔离,但工作流可以通过引用其他项目中的实体(工作流或任务)实现通用资源复用。

域代表 Flyte 组织中与项目集正交的独立环境(如 development/staging/production)。每个域可配置专属的权限策略、密钥管理、执行历史缓存和资源配额,避免对其他项目或域造成意外影响。

通过域可实现环境间的明确隔离,确保开发测试活动不会干扰生产环境工作流。生产域提供"纯净"环境,避免开发阶段的缓存执行导致意外行为。同时可配置生产环境专属的外部数据源密钥。

何时使用不同的 Flyte 项目?

项目用于组织特定团队、业务领域或应用程序相关的独立工作流。通常每个独立团队或 ML 产品应建立专属 Flyte 项目。虽然项目间保持隔离,但团队可通过跨项目引用实体(工作流或任务)复用通用资源。例如,某个团队可创建通用模型训练任务。但这需要高级协作机制和统一的编码规范。

在 Flyte 中配置工作流时,有效运用项目是管理环境、权限和资源分配的关键。以下是组织工作流的最佳实践建议。

项目与域:组合威力

Flyte 通过项目-域组合为工作流创建独立配置环境。这种组合支持:

  • 专属权限控制
    通过基于角色的访问控制(RBAC),可为用户分配针对特定项目-域组合的精细化权限(如 contributor 或 admin 角色)。这种细粒度控制确保权限分配既精准又安全。更多详情请参见此处。

  • 资源与执行监控
    每个项目-域组合拥有独立的监控看板,可追踪资源利用率、执行状态和性能指标。这有助于保持对工作流执行的可见性并确保最佳性能。更多详情请参见此处。

  • 资源配额与分配
    通过为每个项目-域组合设置配额,Flyte 可确保工作流不会超出指定限制,防止项目或域意外占用他人资源。您还可以为每个组合配置内存、CPU 和存储等专属资源默认值,满足不同工作流的特殊需求。更多详情请参见此处和此处。

  • 密钥管理配置
    Flyte 支持在项目-域层级配置密钥,确保 API 密钥等敏感信息仅能被特定工作流访问。这种隔离机制提升了安全性,降低了跨环境未授权访问风险。更多详情请参见此处。

域:清晰的环境隔离

域代表 Flyte 中的独立环境(如 development/staging/production),通过明确隔离防止跨环境干扰。这种结构确保开发测试环境的变更不会影响生产工作流。使用域进行阶段划分,可使工作流从开发到生产部署的全过程实现可控演进。

项目:按团队/业务/应用组织工作流

Flyte 项目用于组织特定团队、业务功能或应用程序相关的独立工作流。通过将项目映射到组织结构,可简化权限管理并促进跨团队/用例的工作流隔离。虽然支持跨项目引用,但建议保持项目内工作流的独立性以避免复杂度。

Flyte 的 CLI 工具和 SDK 提供便捷的项目/域指定方式:

  • CLI 命令
    pyflyteuctl CLI 的大多数命令中,可使用 --project--domain 参数指定目标项目域组合。更多详情请参见此处和此处。

  • Python SDK
    使用 flytekit SDK 时,可通过 FlyteRemote 编程指定项目域组合,确保所有操作发生在目标环境。更多详情请参见此处。

构建工作流

何时应分解任务?

将任务分解为更小任务有多个理由。这样做可以带来更好的计算性能、改进缓存性能,并充分利用可中断任务的优势。但分解任务需要承担任务间的开销成本,包括节点启动和数据下载。某些情况下,可以通过使用Actors来缓解这些成本。

差异化的运行时需求

首先,分解任务支持操作间的异构环境。例如,可能存在一个训练机器学习模型的大型任务,随后使用该模型在测试数据上进行批量推理。然而训练模型通常需要比推理更多的内存。因此在大规模场景下,将这个大任务分解为两个任务:(1) 训练模型和 (2) 运行批量推理,实际上是有益的。这样可以为第二个任务申请更少内存,从而降低工作流成本。若处理更大规模数据,可通过map_task进一步分解批量推理任务以实现并行化,大幅缩短该步骤运行时间。总的来说,分解任务通过定义资源、依赖和执行并行性,提供了基础设施灵活性。

改进缓存性能

其次,可将大任务分解为更小任务以实现"细粒度"缓存。每个独立任务都提供自动化的"检查点"系统。通过将大型工作流分解为多个自然任务,可以最小化多次串行工作流执行中的冗余工作。这在快速迭代开发阶段尤其有用,用户可能在短时间内多次运行相同工作流。"细粒度"缓存将显著提升本地和远程工作流执行效率。

利用可中断任务

最后,可通过"细粒度"缓存充分利用可中断任务。可中断任务会尝试在可能情况下使用Spot实例或Spot虚拟机运行。这些节点具有可中断性,意味着任务可能因其他组织高价竞用而失败。但相比不可中断的按需实例/虚拟机,Spot实例成本显著更低。通过"细粒度"缓存,可在享受可中断任务成本优势的同时,最小化任务中断带来的影响。

何时应并行化任务?

总体原则是尽早且频繁并行化。如前所述,Flyte的许多强大特性(如缓存和工作流恢复)都基于任务级别实现。将任务分解为更小单元并进行并行化,可构建高性能且具备容错能力的工作流。

需要注意的例外是超短时任务,其Pod启动和清理的开销可能抵消并行化优势。但通过Actors实现可复用容器后,这些开销将被透明消除,在付出前期环境搭建成本后实现双赢。在任何情况下,批处理输入输出以分摊开销都是有效策略。需注意保持批次内输入顺序和批次间顺序,以确保可靠的缓存命中。

并行化结构

Flyte的两个主要并行化结构是映射任务和动态工作流。它们目标相似但实现方式迥异,各有优势。

动态工作流更类似for循环,按顺序迭代输入。其并行度受整体工作流并行度控制。

映射任务效率更高且不保证执行顺序。它们拥有独立于整体工作流的并发设置,可设置子任务的最低失败阈值。二者的详细差异说明可参考此处,协同使用示例参见此处。

何时应启用缓存?

当任务主体逻辑稳定后应启用缓存。缓存键由任务签名隐式生成(主要基于输入输出)。若任务主体变更但签名未改,使用相同输入将产生缓存命中。这在迭代任务核心功能并期待不同下游输入时可能导致意外行为。此外,缓存不会检查FlyteFile等内容。若相同URI输入对应完全不同的内容,也会产生缓存命中。因此建议添加显式缓存键以便随时使缓存失效。

尽管存在这些注意事项,缓存在工作流开发阶段仍是巨大的时间节省工具。缓存上游任务可快速运行工作流直至当前迭代节点。在复杂并行化场景(如调试大型映射任务的失败状态)中,缓存也极具价值。生产环境中,若集群资源紧张,缓存可使工作流通过多次重试逐步完成,因为每次运行会有更多任务成功返回。虽然这不是理想场景,但缓存有助于缓解生产故障的影响。考虑到这些因素,绝大多数场景都值得启用缓存。

设置生产项目

在 Flyte 中,您的工作按以下层次结构进行组织:

  • Organization(组织):您的 Flyte 实例,可通过特定 URL(如 flyte.my-company.com)访问
  • Domains(域):组织内通常包含三个域:development(开发)、staging(预发布)和 production(生产),用于在开发过程中组织代码。您可以通过初始配置自定义域集合
  • Projects(项目):与域正交的项目结构,用于将代码组织为逻辑组。您可以根据需要创建任意数量的项目

一个具体的工作流会归属于特定项目。例如,假设 my_workflowmy_project 项目中的工作流。

当您开始开发 my_workflow 时,通常会在项目域 my_project/development 中注册该工作流。

随着工作流版本的迭代演进,您可以将 my_workflow 升级到 my_project/staging 域,最终部署到 my_project/production 域。

升级操作只需通过将工作流重新注册到新的项目域即可完成。

术语规范

日常使用中,“project” 一词可能指代以下不同概念:

  • Flyte 中承载工作流集合的实体
  • 本地开发工作流的目录
  • 存储工作流代码的 GitHub(或其他 SCM)仓库

为避免混淆,本指南采用以下命名规范:

  • Flyte project:Flyte 实例中承载工作流集合的实体(常简称为 project
  • Local project:本地开发工作流的目录(通常为 GitHub 仓库的工作目录)

创建 Flyte 项目

确保已安装 flytectl CLI 并正确配置 Flyte 集群连接。执行以下命令创建新项目:

$ flytectl create project \--id "my-project" \--labels "my-label=my-project" \--description "My Flyte project" \--name "My project"

使用 pyflyte init 创建本地生产项目目录

在入门指南中,我们曾使用 pyflyte init 基于 flyte-simple 模板创建本地项目。

此处我们将使用 flyte-production 模板执行相同操作:

pyflyte init --template union-production my-project

目录结构

生成的 basic-example 目录将包含以下文件结构:

├── LICENSE
├── README.md
├── docs
│   └── docs.md
├── pyproject.toml
├── src
│   ├── core
│   │   ├── __init__.py
│   │   └── core.py
│   ├── orchestration
│   │   ├── __init__.py
│   │   └── orchestration.py
│   ├── tasks
│   │   ├── __init__.py
│   │   └── say_hello.py
│   └── workflows
│       ├── __init__.py
│       └── hello_world.py
└── uv.lock

您可以为生产项目创建自定义的目录结构和规范,但此模板提供了良好的起点。

特别需要注意 workflows 子目录及其包含的 __init__.py 文件。我们将在注册流程章节详细讨论其作用。

本地依赖管理

开发环境配置

在开发周期中,您需要能够在本地机器和 Flyte 远程环境运行工作流。为此,请确保两个环境均已安装所需依赖。本节将说明本地依赖安装方法。关于 Flyte 环境依赖配置,请参考 ImageSpec。

pyproject.toml 中定义依赖

推荐使用 uv 工具 进行项目与依赖管理。

最佳实践是在 pyproject.toml 文件的 dependencies 字段中声明依赖项:

[project]
name = "union-simple"
version = "0.1.0"
description = "A simple Flyte project"
readme = "README.md"
requires-python = ">=3.9,<3.13"
dependencies = ["union"]

创建 Python 虚拟环境

请确保已正确设置包含所需依赖的 Python 虚拟环境。

使用 uv 安装依赖:

uv sync

激活虚拟环境:

source .venv/bin/activate

activateuv run 对比
在本地项目中运行 Pyflyte CLI 时,必须在项目关联的虚拟环境中执行。

使用 uv 在虚拟环境中运行 pyflyte 的两种方式:

  1. 使用前缀命令:
uv run pyflyte ...
  1. 先激活环境再直接执行:
source .venv/bin/activate
pyflyte ...

本文示例默认采用第二种方式。

后续步骤

完成本地依赖安装后,您可使用 pyflyte run 在本地运行工作流。

下一步需确保相同依赖已在 Flyte 远程环境可用。

ImageSpec

在开发周期中,您需要确保工作流既能在本地机器运行,也能在 Flyte 远程环境运行,因此必须保证两种环境都安装了所需的依赖项。

本文将说明如何为 Flyte 远程环境配置工作流依赖项。关于本地依赖项的配置方法,请参阅本地依赖项。

当工作流部署到 Flyte 时,每个任务都在 Kubernetes 集群的独立容器中运行。您需要通过 ImageSpec 类在容器镜像定义中指定任务依赖项。例如:

import flytekit as flimage_spec = union.ImageSpec(name="say-hello-image",requirements="uv.lock",
)@fl.task(container_image=image_spec)
def say_hello(name: str) -> str:return f"Hello, {name}!"@fl.workflow
def hello_world_wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

此处的 ImageSpec 类用于指定 say_hello 任务使用的容器镜像。

  • name 参数指定镜像名称,该名称将用于在容器注册表中标识镜像
  • requirements 参数指定依赖项文件的路径(相对于执行 pyflyte runpyflyte register 命令的目录),支持以下文件类型:
    • requirements.txt 文件
    • 通过 uv sync 命令生成的 uv.lock 文件
    • 通过 poetry install 命令生成的 poetry.lock 文件
    • pyproject.toml 文件

当执行 pyflyte runpyflyte register 命令时,Flyte 将自动构建 ImageSpec 块中定义的容器镜像(同时注册代码中定义的任务和工作流)。

运行代码

设置开发环境

如果尚未完成环境设置,请按照入门指南完成以下操作:

  • 登录Flyte平台
  • 配置本地开发环境

运行代码的CLI命令

Pyflyte CLI和Flytectl CLI提供以下命令用于不同开发阶段的代码部署与运行:

  1. pyflyte run:在本地Python环境即时部署并运行单个脚本
  2. pyflyte run --remote:在Flyte云平台即时部署并运行单个脚本
  3. pyflyte register:将多个脚本部署到Flyte并通过Web界面运行
  4. pyflyte package + flytectl register:用于生产环境部署和CI/CD流水线脚本

如需在部署至Flyte前进行本地集群测试,可使用命令2、3或4并指定本地集群目标。详见本地集群运行指南。

注册模式概览

以下图表展示不同注册模式的对比:

注册模式示意图

使用pyflyte run在本地运行脚本

开发过程中可使用以下命令在本地Python环境快速测试工作流:

pyflyte run workflows/example.py wf --name 'Albert'

参数说明:

  • workflows/example.py:目标Python文件
  • wf:文件中要运行的工作流名称
  • --name 'Albert':传递的命名参数及值

该命令适用于快速本地基础验证,详见pyflyte run详情。

使用pyflyte run --remote在Flyte运行脚本

通过以下命令在Flyte云平台快速运行工作流:

pyflyte run --remote --project basic-example --domain development workflows/example.py wf --name 'Albert'

参数说明:

  • --project basic-example:项目名称
  • --domain development:目标域
  • workflows/example.py:脚本文件路径
  • wf:工作流名称

执行流程:

  1. 构建ImageSpec定义的容器镜像
  2. 推送镜像到指定容器注册表(需确保Flyte可访问,如使用GitHub容器注册表需设为公开)
  3. 打包代码并部署至Flyte指定项目域
  4. 在Flyte平台运行工作流

详见pyflyte run远程执行详情。

通过flytectl运行任务

生成执行规范文件

flytectl launch task --project flytesnacks --domain development --name workflows.example.generate_normal_df --version v1

更新输入参数文件

iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole'
inputs:n: 200mean: 0.0sigma: 1.0
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: workflows.example.generate_normal_df
version: "v1"

创建执行实例

flytectl create execution -p flytesnacks -d development --execFile exec_spec.yaml

监控执行状态

flytectl get execution -p flytesnacks -d development <execid>

通过flytectl运行工作流

工作流需通过启动计划(launchplan)执行。默认启动计划与工作流同名,参数默认值相同。任务也可通过启动命令执行,但启动计划不可关联任务。

通过flytectl运行启动计划

生成执行规范文件

flytectl get launchplan -p flytesnacks -d development myapp.workflows.example.my_wf --execFile exec_spec.yaml

更新输入参数

inputs:name: "adam"

创建执行实例

flytectl create execution -p flytesnacks -d development --execFile exec_spec.yaml

监控执行状态

flytectl get execution -p flytesnacks -d development <execid>

使用pyflyte register部署代码至Flyte

pyflyte register workflows --project basic-example --domain development

执行流程:

  1. 构建ImageSpec定义的容器镜像
  2. 打包workflows目录代码并部署(需确保目录包含__init__.py文件)

该命令仅部署不运行,可通过Web界面触发。适用于全量工作流测试部署。

快速注册机制

当容器镜像已存在注册表且仅修改工作流/任务代码时,快速注册机制:

  • 打包指定目录为.tar.gz文件(包含protobuf序列化规范)
  • 注册至集群并上传至对象存储(如S3/GCS)

执行时Flyte自动注入代码覆盖原镜像内容。

环境变量说明
WORKDIR设置代码解压路径,PYTHONPATH需包含源码路径。推荐使用src目录结构并设置绝对导入。

执行监控

基础监控

flytectl get execution -p flytesnacks -d development <execid>

详细信息查看

flytectl get execution -p flytesnacks -d development <execid> --details

格式输出

flytectl get execution -p flytesnacks -d development <execid> --details -o yaml

输出结果查看:

"outputUri": "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-<execid>/n0/data/0/outputs.pb"

生产环境部署

使用pyflyte package打包

pyflyte --pkgs workflows package

生成flyte-package.tgz文件(需__init__.py文件)。多目录打包:

pyflyte --pkgs DIR1 --pkgs DIR2 package ...

模块缺失处理:

pyflyte --pkgs <dir1> package --source ./src -f

使用flytectl register注册

$ flytectl register files \--project basic-example \--domain development \--archive flyte-package.tgz \--version "$(git rev-parse HEAD)"

参数说明:

  • --project:目标项目
  • --domain:目标域(development/staging/production)
  • --archive:打包文件路径
  • --version:推荐使用Git SHA

注册方法对比

方法适用场景
pyflyte register单集群快速迭代
package+register多集群/生产环境/CI-CD流水线

编程接口
可通过FlyteRemote对象实现等效操作。

镜像管理策略

ImageSpec行为取决于注册方式:

  • 快速注册:不复制源码文件
  • 非快速注册:默认复制源码文件
  • 指定source_root时强制复制文件

自定义镜像构建

推荐使用envd构建器,也可通过模板生成Dockerfile:

pyflyte init --template basic-template-dockerfile

构建脚本示例:

./docker_build.sh -p PROJECT_NAME -r REGISTRY -v VERSION

镜像推送示例(GitHub容器注册表):

docker login ghcr.io
docker push TAG

CI/CD与GitHub Actions集成

Flyte提供两个GitHub Action:

  • flyte-setup-action:安装flytectl
  • flyte-register-action:使用flytectl register注册包

最佳实践

  • 版本策略
    • 特性分支:{分支名}-{短提交哈希}
    • 主分支:main-{短提交哈希}
    • 正式发布:语义版本号
  • 镜像管理
    • 使用--image参数指定运行时镜像
    • 避免硬编码镜像地址

覆盖参数

with_overrides 方法允许在执行时为任务、子工作流和子启动计划指定参数覆盖。这在需要修改任务、子工作流或子启动计划的行为而不更改原始定义时非常有用。

任务参数

调用任务时,可以在with_overrides中指定以下参数:

  • accelerator: 指定加速器
  • cache_serialize: 启用缓存序列化
  • cache_version: 指定缓存版本
  • cache: 启用缓存
  • container_image: 指定容器镜像
  • interruptible: 指定任务是否可中断
  • limits: 指定资源限制
  • name: 为任务执行指定特定名称,该名称将显示在UI的工作流流程图中(参见下文)
  • node_name: 为任务的DAG节点指定特定名称,该名称将显示在UI的工作流流程图中(参见下文)
  • requests: 指定资源请求
  • retries: 指定任务重试次数
  • task_config: 指定任务配置
  • timeout: 指定任务超时时间

例如,如果某个任务未启用缓存,可以在执行时通过with_overrides启用缓存:

my_task(a=1, b=2, c=3).with_overrides(cache=True)

使用namenode_name参数

在任务上使用with_overridesname参数是一个特别有用的功能。例如,使用with_overrides(name="my_task")可以为任务执行指定特定名称,该名称将显示在UI中。指定的名称可以在调用时选择或生成,而无需修改任务定义。

@fl.workflow
def wf() -> int:my_task(a=1, b=1, c=1).with_overrides(name="my_task_1")my_task(a=2, b=2, c=2).with_overrides(name="my_task_2", node_name="my_node_2")return my_task(a=1, b=1, c=1)

上述代码将在UI中生成如下工作流展示:

覆盖名称

还有一个相关参数node_name可用于为任务的DAG节点指定特定名称。DAG节点名称通常自动生成如n0n1n2等,显示在工作流表格的node列中。覆盖node_name后会自动替换为指定名称:

覆盖节点名称

注意代码中指定的node_namemy_node_2,但在UI中显示为my-node-2。这是因为Kubernetes节点名称不能包含下划线,Flyte会自动将名称转换为符合Kubernetes规范的格式。

子工作流和子启动计划参数

在从高层工作流中调用工作流或启动计划时(即调用子工作流或子启动计划),可以在with_overrides中指定以下参数:

  • cache_serialize: 启用缓存序列化
  • cache_version: 指定缓存版本
  • cache: 启用缓存

pyflyte run 的详细说明

pyflyte run 命令用于在本地 Python 环境或 Flyte 平台上运行特定 workflow 或 task。本节将讨论该命令的使用细节及原理。

参数传递

使用以下语法可以通过 pyflyte run 执行特定 workflow:

pyflyte run <路径/脚本.py> <workflow_或_task_函数名>

关键字参数可通过以下方式传递:

--<关键字> <>

例如,我们使用脚本 example.py、workflow wf 和命名参数 name 调用 pyflyte run

pyflyte run example.py wf --name 'Albert'

这里 Albert 会被传递给 name 参数。

当参数名为 snake_case 格式时,需转换为 kebab-case 格式。例如,如果代码接受 last_name 参数,则命令:

pyflyte run example.py wf --last-name 'Einstein'

会将值 Einstein 传递给该参数。

为何选择 pyflyte run 而非直接使用 python?

可以在脚本末尾添加 main 守卫:

if __name__ == "__main__":training_workflow(hyperparameters={"C": 0.1})

这样就能通过 python example.py 运行,但参数需要硬编码。

若想动态传递参数,代码会变得更冗长:

if __name__ == "__main__":import jsonfrom argparse import ArgumentParserparser = ArgumentParser()parser.add_argument("--hyperparameters", type=json.loads)...  # 添加其他选项args = parser.parse_args()training_workflow(hyperparameters=args.hyperparameters)

pyflyte run 可以避免这种冗长的代码,通过简洁的方式传递参数运行 workflow。

使用交互式任务进行调试

通过交互式任务,您可以直接在 UI 的嵌入式 Visual Studio Code IDE 中实时检查和调试任务代码。

在代码中启用交互式任务

要启用交互式任务,需要:

  • 包含 flytekitplugins-flyteinteractive 作为依赖项
  • 在需要交互化的任务上使用 @vscode 装饰器

应用 @vscode 装饰器后,该装饰器会在运行时将任务转换为 Visual Studio Code 服务器。此过程会覆盖任务函数体的标准执行流程,转而启动 Visual Studio Code 服务器。

无需配置 ingress 或端口转发
Flyte 交互式任务功能是对开源 FlyteInteractive 插件 的改进版本。相比开源版本,移除了 ingress 配置和端口转发的需求,提供更无缝的调试体验。

基础示例

以下示例演示简单工作流中的交互式任务。

requirements.txt

requirements.txt 文件被本节所有示例使用:

flytekit
flytekitplugins-flyteinteractive

example.py

"""使用交互式任务(@vscode)的Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

注册并运行工作流

要将代码注册到 Flyte 项目并运行工作流,请按照 运行代码 中的说明操作

访问 IDE

  1. 在工作流页面选择第一个任务(本示例中任务名为 say_hello)。任务信息面板将显示在页面右侧
  2. 等待任务进入 Running 状态并出现 VSCode (User) 链接
  3. 点击 VSCode (User) 链接

VSCode链接

检查任务代码

IDE 打开后,您可以在编辑器中查看任务代码。

检查代码

交互式调试

要在 VSCode 中运行任务,点击 IDE 左侧导航栏的 Run and debug 图标,选择 Interactive Debugging 配置。

交互式调试

点击配置下拉框旁的 Play 按钮运行任务。这将使用前序任务的输入参数运行任务。要检查中间状态,可在 Python 代码中设置断点并使用调试器跟踪。

任务输出不会写入 Flyte 存储
请注意在调试阶段任务完全在 VSCode 中运行,不会将输出写入 Flyte 存储。

更新代码

您可以在 VSCode 环境中编辑代码并重新运行任务以查看更改。但请注意,这些更改不会自动持久化存储,需要手动将更改复制回本地环境。

恢复任务

调试完成后,可通过执行 Resume Task 配置使用更新后的代码恢复任务。这将终止代码服务器,使用前序任务的输入参数运行任务,并将输出写入 Flyte 存储。

请记得持久化代码
恢复任务前请确保代码已持久化(例如提交到 GitHub),因为此后将失去与 VSCode 服务器的连接。

恢复任务

辅助 Python 文件

您会注意到除了代码外,VSCode 文件浏览器中还有一些系统自动生成的附加文件:

flyteinteractive_interactive_entrypoint.py

flyteinteractive_interactive_entrypoint.py 脚本实现了我们之前使用的 Interactive Debugging 操作:

交互入口点

flyteinteractive_resume_task.py

flyteinteractive_resume_task.py 脚本实现了我们之前使用的 Resume Task 操作:

恢复任务脚本

launch.json

.vscode 目录中的 launch.json 文件配置了 Interactive DebuggingResume Task 操作:

launch.json

集成终端

除了使用辅助文件定义的便捷函数,您还可以直接通过集成终端使用 python <脚本名称>.py 运行 Python 代码脚本(本例中为 python hello.py)。

交互终端

安装扩展

与本地 VSCode 类似,您可以安装各种扩展来辅助开发。由于法律原因,可用扩展与官方 VSCode 不同,托管在 Open VSX Registry。

Python 和 Jupyter 扩展默认已安装。可以通过定义配置对象并将其传递给 @vscode 装饰器来添加其他扩展,如下所示:

example-extensions.py

"""使用扩展的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import COPILOT_EXTENSION, VscodeConfig, vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)config = VscodeConfig()
config.add_extensions(COPILOT_EXTENSION) # 使用预定义的URL
config.add_extensions("https://open-vsx.org/api/vscodevim/vim/1.27.0/file/vscodevim.vim-1.27.0.vsix"
) # 从 Open VSX 复制原始URL@fl.task(container_image=image)
@vscode(config=config)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

资源管理

为管理资源,VSCode 服务器在空闲一段时间(无活跃 HTTP 连接)后会自动终止。空闲状态通过心跳文件监控。

max_idle_seconds 参数可用于设置 VSCode 服务器在终止前允许的最大空闲秒数。

example-manage-resources.py

"""使用 max_idle_seconds 参数的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode(max_idle_seconds=60000)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

前置与后置钩子

交互式任务还支持注册在 VSCode 启动前后执行的函数,可用于需要设置或清理的任务。

example-pre-post-hooks.py

"""使用前置后置钩子的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)def set_up_proxy():print("set up")def push_code():print("push code")@fl.task(container_image=image)
@vscode(pre_execute=set_up_proxy, post_execute=push_code)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

仅在任务失败时启动 VSCode

系统可配置为仅在任务失败后启动 VSCode(防止任务终止以便检查),通过将 run_task_first 参数设为 True 实现。

example-run-task-first.py

"""使用 run_task_first 参数的交互式任务(@vscode)Flyte工作流示例"""import flytekit as fl
from flytekitplugins.flyteinteractive import vscodeimage = fl.ImageSpec(registry="<my-image-registry>",name="interactive-tasks-example",base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",requirements="requirements.txt"
)@fl.task(container_image=image)
@vscode(run_task_first=True)
def say_hello(name: str) -> str:s = f"Hello, {name}!"return s@fl.workflow
def wf(name: str = "world") -> str:greeting = say_hello(name=name)return greeting

调试执行问题

通过检查任务和工作流执行日志链接可进行深度调试。

使用 --details 标志可查看带有日志链接的节点执行:

└── n1 - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC└── Attempt :0└── Task - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC└── Logs :└── Name :Kubernetes Logs (User)└── URI :http://localhost:30082/#/log/flytectldemo-development/f3a5a4034960f4aa1a09-n1-0/pod?namespace=flytectldemo-development

此外,可检查 \<项目>-\<域> 命名空间中启动的 Pod:

kubectl get pods -n <project>-<domain>

启动的 Pod 会以执行名称作为前缀,并带有 nodeId 后缀:

NAME                        READY   STATUS             RESTARTS   AGE
f65009af77f284e50959-n0-0   0/1     ErrImagePull       0          18h

例如,上述输出中 STATUS 显示存在镜像拉取问题。

任务资源验证

在 Flyte 中,当您尝试执行包含不可满足资源请求的工作流时,系统会立即终止执行而非允许其无限期排队等待。

我们通过 executions service 拦截执行创建请求以验证其资源需求是否可被满足。若验证不通过,将快速失败并返回如下错误信息:

Request failed with status code 400 rpc error: code = InvalidArgument desc = no node satisfies task 'workflows.fotd.fotd_directory' resource requests

在本地集群中运行

在本地 Kubernetes 集群中运行

虽然最终您将在 Flyte 的 Kubernetes 集群中运行工作流,但在本地机器上的集群中测试工作流会非常方便。

首先请确保已安装 Docker(或其他兼容 OCI 标准的容器引擎),并确保_守护进程正在运行_。

使用 flytectl 启动演示集群:

flytectl demo start

配置

flytectl 在本地容器引擎中启动集群时,会将配置信息写入 ~/.flyte/ 目录。

最重要的配置文件是 ~/.flyte/config-sandbox.yaml,其中包含目标 Kubernetes 集群的连接信息:

admin:endpoint: localhost:30080  # 集群端点地址authType: Pkce             # 认证类型insecure: true             # 使用非安全连接
console:endpoint: http://localhost:30080  # 控制台地址
logger:show-source: true          # 显示日志来源level: 0                   # 日志级别

当前配置指向本地 Docker 实例(localhost:30080),后续可修改为指向 Flyte 集群。

使用 flytectlpyflyte 时需指定目标集群配置,可通过两种方式实现:

  1. 显式指定配置文件路径
    • flytectl --config ~/.flyte/config-sandbox.yaml <命令>
    • pyflyte --config ~/.flyte/config-sandbox.yaml <命令>
  2. 设置环境变量
    • export FLYTECTL_CONFIG=~/.flyte/config-sandbox.yaml

本文档假设您已设置 FLYTECTL_CONFIG 环境变量。

启动工作流

添加 --remote 参数即可在本地集群中运行工作流:

$ pyflyte run --remote \workflows/example.py \training_workflow \--hyperparameters '{"C": 0.1}'

命令输出将包含工作流执行页面的 URL。

查看结果

访问 pyflyte run 生成的 URL,即可在 Flyte UI 中查看工作流执行详情。

使用默认镜像的本地集群

pyflyte run --remote my_file.py my_workflow

(需配置 pyflyte 指向 flytectl demo start 启动的本地集群)

  • 任务代码在本地集群的默认镜像环境中运行
  • Python 代码在运行时动态注入容器
  • 仅支持依赖项已包含在默认镜像中的 Python 代码(参见说明文档)
  • 包含本地 S3 存储
  • 支持部分(非全部)插件
  • 单工作流即时执行
  • 工作流注册到默认项目
  • 适用于演示场景

使用自定义镜像的本地集群

$ pyflyte run --remote \--image my_cr.io/my_org/my_image:latest \my_file.py \my_workflow

(需配置 pyflyte 指向 flytectl demo start 启动的本地集群)

  • 任务代码在自定义镜像(my_cr.io/my_org/my_image:latest)环境中运行
  • Python 代码在运行时动态注入容器
  • 支持任意 Python 依赖项(通过自定义镜像控制)
  • 包含本地 S3 存储
  • 支持部分(非全部)插件
  • 单工作流即时执行
  • 工作流注册到默认项目
  • 适用于开发周期中的高级测试场景

Jupyter notebooks

Flyte 支持在交互式 Jupyter notebook 环境中开发、运行和调试任务(task)及工作流(workflow),这能加快构建数据驱动或机器学习驱动应用时的迭代速度。

在单元格中编写工作流与任务

在 notebook 中构建任务和工作流时,您可以像常规操作一样在单元格中编写代码。

通过点击运行按钮,您可以在本地运行这些单元格中的代码(即在 notebook 本身而非 Flyte 平台上运行),这与普通 notebook 操作一致。

启用 notebook 向 Flyte 注册工作流

要使 notebook 中的任务和工作流能轻松注册到 Flyte 实例并运行,您需要设置一个_交互式_FlyteRemote 对象,然后通过它调用远程执行:

首先,在单元格中创建交互式 FlyteRemote 对象:

from flytekit.configuration import Config
from flytekit.remote import FlyteRemoteremote = FlyteRemote(config=Config.auto(),default_project="default",default_domain="development",interactive_mode_enabled=True,
)

当在 Jupyter notebook 环境中运行时,必须将 interactive_mode_enabled 标志设置为 True,以启用工作流的交互式注册和执行功能。

接着,在另一个单元格中设置执行调用:

execution = remote.execute(my_task, inputs={"name": "Joe"})
execution = remote.execute(my_wf, inputs={"name": "Anne"})

交互式 FlyteRemote 客户端会在 notebook 中重新定义实体时(包括重新执行包含实体定义的单元格,即使实体未发生变更)自动重新注册该实体。此行为有效支持了在 Jupyter notebook 中迭代开发和调试任务及工作流的需求。

数据看板(Decks)

数据看板功能允许在任务代码中展示定制化的数据可视化效果。看板以 HTML 形式渲染,当工作流运行时可直接在 Flyte UI 中显示。

该功能为可选特性,启用时需在任务参数中将 enable_deck 设为 True

初始化配置

首先导入必要依赖:

import flytekit as fl
from flytekit.deck.renderer import MarkdownRenderer
from sklearn.decomposition import PCA
import plotly.express as px
import plotly

渲染器独立于 flytekit 主包。要使用上述导入的 MarkdownRenderer,需先在本地 Python 环境中安装 flytekitplugins-deck-standard 包,并将其包含在 ImageSpec 中(如下所示)。

镜像配置

ImageSpec 中声明所需依赖:

custom_image = fl.ImageSpec(packages=["flytekitplugins-deck-standard","markdown","pandas","pillow","plotly","pyarrow","scikit-learn","ydata_profiling",],
)

任务定义

创建名为 pca 的新看板,并渲染 Markdown 内容与 主成分分析(PCA) 图表:

@fl.task(enable_deck=True, container_image=custom_image)
def pca_plot():iris_df = px.data.iris()X = iris_df[["sepal_length", "sepal_width", "petal_length", "petal_width"]]pca = PCA(n_components=3)components = pca.fit_transform(X)total_var = pca.explained_variance_ratio_.sum() * 100fig = px.scatter_3d(components,x=0,y=1,z=2,color=iris_df["species"],title=f"总解释方差: {total_var:.2f}%",labels={"0": "PC 1", "1": "PC 2", "2": "PC 3"},)main_deck = fl.Deck("pca", MarkdownRenderer().to_html("### 主成分分析"))main_deck.append(plotly.io.to_html(fig))

注意使用 append 方法将 Plotly 图表追加至 Markdown 看板。

执行输出

预期输出包含 deck.html 文件路径信息:

{"asctime": "2023-07-11 13:16:04,558", "name": "flytekit", "levelname": "INFO", "message": "pca_plot 任务创建看板 HTML 文件至 file:///var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-0_8qfjdd/sandbox/local_flytekit/c085853af5a175edb17b11cd338cbd61/deck.html"}

联合看板图表

在 Flyte 实例执行该任务后,可通过任务视图点击 Deck 按钮访问看板:

联合看板按钮

看板标签页

每个看板至少包含三个标签页:输入(input)、输出(output)和默认(default)。输入输出页用于渲染任务的输入输出数据,默认页可用于创建折线图、散点图、Markdown 文本等自定义渲染。支持创建额外标签页。

看板渲染器

渲染器独立于 flytekit 主包。使用时需先在本地 Python 环境安装 flytekitplugins-deck-standard 包,并将其包含在 ImageSpec 中。

数据框分析渲染器

生成 Pandas DataFrame 的统计分析报告:

import flytekit as fl
import pandas as pd
from flytekitplugins.deck.renderer import FrameProfilingRenderer@fl.task(enable_deck=True, container_image=custom_image)
def frame_renderer() -> None:df = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})fl.Deck("数据框渲染器", FrameProfilingRenderer().to_html(df=df))

数据框渲染器

顶部数据框渲染器

将 DataFrame 渲染为 HTML 表格:

import flytekit as fl
from typing import Annotated
from flytekit.deck import TopFrameRenderer@fl.task(enable_deck=True, container_image=custom_image)
def top_frame_renderer() -> Annotated[pd.DataFrame, TopFrameRenderer(1)]:return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4])

顶部数据框渲染器

Markdown 渲染器

将 Markdown 字符串转换为 HTML:

import flytekit as fl
from flytekit.deck import MarkdownRenderer@fl.task(enable_deck=True, container_image=custom_image)
def markdown_renderer() -> None:fl.current_context().default_deck.append(MarkdownRenderer().to_html("安装 flytekit 的命令: ```import flytekit```"))

Markdown 渲染器

箱线图渲染器

将 DataFrame 行数据分组渲染为箱线图,可视化数据分布:

import flytekit as fl
from flytekitplugins.deck.renderer import BoxRenderer@fl.task(enable_deck=True, container_image=custom_image)
def box_renderer() -> None:iris_df = px.data.iris()fl.Deck("箱线图", BoxRenderer("sepal_length").to_html(iris_df))

箱线图渲染器

图像渲染器

FlyteFilePIL.Image.Image 对象转换为 base64 编码的 HTML 可显示图像:

import flytekit as fl
from flytekitplugins.deck.renderer import ImageRenderer@fl.task(enable_deck=True, container_image=custom_image)
def image_renderer(image: fl.FlyteFile) -> None:fl.Deck("图像渲染器", ImageRenderer().to_html(image_src=image))@fl.workflow
def image_renderer_wf(image: fl.FlyteFile = "https://bit.ly/3KZ95q4") -> None:image_renderer(image=image)

图像渲染器

表格渲染器

将 Pandas DataFrame 转换为 HTML 表格:

import flytekit as fl
from flytekitplugins.deck.renderer import TableRenderer@fl.task(enable_deck=True, container_image=custom_image)
def table_renderer() -> None:fl.Deck("表格渲染器",TableRenderer().to_html(df=pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}), table_width=50),)

表格渲染器

贡献新渲染器

欢迎通过 renderer.py 提交能增强数据可视化的新渲染器。我们鼓励您发起 Pull Request 共同完善 Flyte 看板生态系统!

自定义渲染器

可创建继承 to_html 方法的渲染器类。以下示例创建 DataFrame 摘要渲染器:

class DataFrameSummaryRenderer:def to_html(self, df: pd.DataFrame) -> str:assert isinstance(df, pd.DataFrame)return df.describe().to_html()

使用 Annotated 类型覆盖默认渲染器:

try:from typing import Annotated
except ImportError:from typing_extensions import Annotated@task(enable_deck=True)
def iris_data(sample_frac: Optional[float] = None,random_state: Optional[int] = None,
) -> Annotated[pd.DataFrame, DataFrameSummaryRenderer()]:data = px.data.iris()if sample_frac is not None:data = data.sample(frac=sample_frac, random_state=random_state)md_text = ("# 鸢尾花数据集\n""本任务使用 `plotly` 包加载鸢尾花数据集")flytekit.current_context().default_deck.append(MarkdownRenderer().to_html(md_text))flytekit.Deck("箱线图", BoxRenderer("sepal_length").to_html(data))return data

流式看板

使用 Deck.publish() 实现看板实时流式更新:

import flytekit as fl@task(enable_deck=True)
def t_deck():fl.Deck.publish()

该功能创建实时看板,在任务成功前可通过刷新按钮查看动态更新。

联合看板成功演示视频

https://youtu.be/LJaBP0mdFeE

联合看板失败演示视频

https://youtu.be/xaBF6Jlzjq0

FlyteRemote

FlyteRemote Python API 提供与 Pyflyte CLI 相似的功能,允许您从 Python 代码中管理 Flyte 工作流、任务、启动计划和构件。

FlyteRemote 的主要用例是实现 Flyte 实体的自动化部署。因此,它适用于在实际 Flyte 工作流和任务代码之外的脚本中使用,例如 CI/CD 管道脚本。

重要提示:不要在任务代码内部使用 FlyteRemote

创建 FlyteRemote 对象

确保已安装 Flytekit SDK,导入 FlyteRemote 类并按如下方式创建对象:

from pyflyte import FlyteRemoteremote = FlyteRemote()

当使用无参构造函数创建时,FlyteRemote 默认使用本地环境的现有配置连接 Flyte,即与该环境中 Pyflyte CLI 使用的配置相同(参见 Pyflyte CLI 配置搜索路径)。

默认情况下,与 Pyflyte CLI 相同,所有操作将应用于默认项目 flytesnacks 和默认域 development

您也可以通过显式指定包含 Flyte 实例连接信息的 flytekit.configuration.Config 对象、项目及域来初始化 FlyteRemote。此外,构造函数支持指定文件上传位置(等效于默认原始数据前缀):

from pyflyte import FlyteRemote
from flytekit.configuration import Configremote = FlyteRemote(config=Config.for_endpoint(endpoint="union.example.com"),default_project="my-project",default_domain="my-domain",data_upload_location="<s3|gs|abs>://my-bucket/my-prefix",
)

这里我们使用 Config.for_endpoint 方法指定连接端点 URL。配置 Config 对象有多种方式,总体上与使用 config.yaml 文件为 Pyflyte CLI 指定连接时具有相同的选项。

使用客户端密钥认证

在 CI/CD 管道或通过 SSH 运行 FlyteRemote 脚本时,可能无法使用浏览器进行默认认证流程。这种情况下可使用客户端密钥认证方法建立 Flyte 连接。创建 API 密钥后,按如下方式初始化 FlyteRemote

from pyflyte import FlyteRemote
from flytekit.configuration import Config, PlatformConfigremote = FlyteRemote(config=Config(platform=PlatformConfig(endpoint="union.example.com",insecure=False,client_id="<your-client-id>",  # 此处为 API 密钥名称client_credentials_secret="<your-client-secret>",  # 此处为 API 密钥auth_mode="client_credentials",)),)

详细说明参见 flytekit.configuration.Config API 文档

FlyteRemote 示例

注册并运行工作流

以下示例展示如何注册并运行工作流,同时获取其输出结果:

├── remote.py
└── workflow├── __init__.py└── example.py

将被注册并运行在 Flyte 上的工作流代码位于 workflow 目录,包含空文件 __init__.py 和存放工作流与任务代码的 example.py

import os
import flytekit as fl@fl.task()
def create_file(message: str) -> fl.FlyteFile:with open("data.txt", "w") as f:f.write(message)return fl.FlyteFile(path="data.txt")@fl.workflow
def my_workflow(message: str) -> fl.FlyteFile:f = create_file(message)return f

remote.py 文件包含 FlyteRemote 逻辑,不属于工作流代码,应在本地机器运行:

import flytekit as fl
from workflow.example import my_workflowdef run_workflow():remote = fl.FlyteRemote()remote.fast_register_workflow(entity=my_workflow)execution = remote.execute(entity=my_workflow,inputs={"message": "Hello, world!"},wait=True)output = execution.outputs["o0"]print(output)with open(output, "r") as f:read_lines = f.readlines()print(read_lines)

my_workflow 工作流和 create_file 任务被注册并运行。工作流完成后,输出结果将传回 run_workflow 函数并打印。

输出结果也可通过 UI 在 create_file 任务详情视图的 Outputs 选项卡查看:

输出结果

以上步骤展示了使用 FlyteRemote 注册和运行工作流的最简方法。更多选项和细节请参考 API 参考 > FlyteRemote > 入口点。

获取输出结果

默认情况下 FlyteRemote.execute 是非阻塞的,但也可以通过设置 wait=True 使其同步等待任务/工作流完成(如前例所示)。

打印执行对应的 Flyte 控制台 URL:

print(f"Execution url: {remote.generate_console_url(execution)}")

使用 sync() 方法同步执行对象与远程状态:

synced_execution = remote.sync(execution)
print(synced_execution.inputs)  # 打印输入参数

也可以在启动执行后等待并获取输出:

completed_execution = remote.wait(execution)
print(completed_execution.outputs)  # 打印输出结果

终止工作流的所有运行中执行

此示例展示如何终止指定工作流名称的所有运行中执行:

import flytekit as fl
from dataclasses import dataclass
import json
from flytekit.configuration import Config
from flytekit.models.core.execution import NodeExecutionPhase@dataclass
class Execution:name: strlink: strSOME_LARGE_LIMIT = 5000
PHASE = NodeExecutionPhase.RUNNING
WF_NAME = "your_workflow_name"
EXECUTIONS_TO_IGNORE = ["some_execution_name_to_ignore"]
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint=ENDPOINT),default_project=PROJECT,default_domain=DOMAIN,
)executions_of_interest = []executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)for e in executions:if e.closure.phase == PHASE:if e.spec.launch_plan.name == WF_NAME:if e.id.name not in EXECUTIONS_TO_IGNORE:execution_on_interest = Execution(name=e.id.name, link=f"https://{ENDPOINT}/console/projects/{PROJECT}/domains/{DOMAIN}/executions/{e.id.name}")executions_of_interest.append(execution_on_interest)remote.terminate(e, cause="Terminated manually via script.")with open('terminated_executions.json', 'w') as f:json.dump([{'name': e.name, 'link': e.link} for e in executions_of_interest], f, indent=2)print(f"Terminated {len(executions_of_interest)} executions.")

重新运行工作流的所有失败执行

此示例展示如何识别指定工作流在特定时间后的所有失败执行,并使用相同输入和固定工作流版本重新运行:

import datetime
import pytz
import flytekit as fl
from flytekit.models.core.execution import NodeExecutionPhaseSOME_LARGE_LIMIT = 5000
WF_NAME = "your_workflow_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"
VERSION = "your_target_workflow_version"remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint=ENDPOINT),default_project=PROJECT,default_domain=DOMAIN,
)executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)failures = [\NodeExecutionPhase.FAILED,\NodeExecutionPhase.ABORTED,\NodeExecutionPhase.FAILING,\
]# 最近一次成功执行的时间
date = datetime.datetime(2024, 10, 30, tzinfo=pytz.UTC)# 按名称过滤执行
filtered = [execution for execution in executions if execution.spec.launch_plan.name == WF_NAME]# 按状态过滤执行
failed = [execution for execution in filtered if execution.closure.phase in failures]# 按时间窗口过滤执行
windowed = [execution for execution in failed if execution.closure.started_at > date]# 获取每个执行的输入参数
inputs = [remote.sync(execution).inputs for execution in windowed]# 获取新版本工作流实体
workflow = remote.fetch_workflow(name=WF_NAME, version=VERSION)# 为每个失败执行重新运行新工作流
[remote.execute(workflow, inputs=X) for X in inputs]

使用 Filter 筛选执行

此示例展示如何使用 Filter 筛选特定执行:

from flytekit.models import filters
import flytekit as flWF_NAME = "your_workflow_name"
LP_NAME = "your_launchplan_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"remote = fl.FlyteRemote.for_endpoint(ENDPOINT)# 仅查询当前项目的执行
project_filter = filters.Filter.from_python_std(f"eq(workflow.name,{WF_NAME})")
project_executions = remote.recent_executions(project=PROJECT, domain=DOMAIN, filters=[project_filter])# 查询最近成功且耗时8-16分钟的执行
latest_success = remote.recent_executions(limit=1,filters=[\filters.Equal("launch_plan.name", LP_NAME),\filters.Equal("phase", "SUCCEEDED"),\filters.GreaterThan("duration", 8 * 60),\filters.LessThan("duration", 16 * 60),\],
)

通过 FlyteRemote 启动新版本任务

import flytekit as fl
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, SerializationSettings# FlyteRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取任务
task = remote.fetch_task(name="workflows.example.generate_normal_df", version="v1")task = remote.register_task(entity=flyte_task,serialization_settings=SerializationSettings(image_config=None),version="v2",
)# 运行任务
execution = remote.execute(task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name="task-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name_prefix="flyte", wait=True
)# 检查执行状态
# 'inputs'和'outputs'对应任务执行
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()

通过 FlyteRemote 启动工作流

由于底层会获取并触发默认启动计划,工作流可通过UnionRemote执行:

import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取工作流
workflow = remote.fetch_workflow(name="workflows.example.wf", version="v1")# 执行
execution = remote.execute(workflow, inputs={"mean": 1}, execution_name="workflow-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(workflow, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)

通过 FlyteRemote 启动启动计划

可通过FlyteRemote以编程方式启动启动计划:

import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)# 获取启动计划
lp = remote.fetch_launch_plan(name="workflows.example.wf", version="v1", project="flytesnacks", domain="development"
)# 执行
execution = remote.execute(lp, inputs={"mean": 1}, execution_name="lp-execution", wait=True
)# 或使用execution_name_prefix避免重复执行名称
execution = remote.execute(lp, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)

检查执行状态

通过FlyteRemote可获取执行的输入输出并进行检查:

import flytekit as fl
from flytekit.configuration import Config# UnionRemote对象是API的主要入口点
remote = fl.FlyteRemote(config=Config.for_endpoint(endpoint="flyte.example.net"),default_project="flytesnacks",default_domain="development",
)execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()# inputs和outputs对应顶层执行或工作流本身
# 获取特定输出(如模型文件):
model_file = execution.outputs["model_file"]
with open(model_file) as f:...# 使用UnionRemote.sync()在执行过程中同步实体对象状态
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()# node_executions将递归获取所有底层节点执行
# 获取特定节点执行的输出:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]

从 Airflow 迁移到 Flyte

许多 Airflow Operator 和 Sensor 已在 Flyte 上完成测试,但部分功能可能无法按预期工作。如遇问题,请提交 issue
或通过 Slack
联系 Flyte 社区。

Flyte 能够在不修改代码的情况下将 Airflow 任务编译为 Flyte 任务,这使得迁移 Airflow DAG 到 Flyte 的工作量降至最低。

除了迁移能力外,Flyte 用户还能无缝集成 Airflow 任务到工作流中,充分利用 Airflow Operator 和 Sensor 的生态系统。通过将强大的 Airflow 生态系统与 Flyte 的可扩展性、版本控制和可复现性等核心能力相结合,用户可以更轻松地运行复杂的数据和机器学习工作流。更多信息请参阅 Airflow 连接器文档

现有 Flyte 用户指南

即使您已在 Flyte 上运行工作流且无迁移需求,仍可将 Airflow 任务集成至 Flyte 工作流。例如,Airflow 提供 Google Cloud Dataproc Operators
支持,可便捷地在 Google Cloud Dataproc 集群上执行 Spark 作业。您无需在 Flyte 中开发自定义插件,即可直接集成 Airflow 的 Dataproc Operator 来执行 Spark 作业。

前置条件

  • 在 Python 环境中安装 flytekitplugins-airflow
  • 在 Flyte 集群中启用 {ref}Airflow 连接器<deployment-connector-setup-airflow>

实施步骤

1. 在 Flyte 工作流中定义 Airflow 任务

Flytekit 会将 Airflow 任务编译为 Flyte 任务,因此可以在 Flyte 工作流中使用任意 Airflow Sensor 或 Operator:

from flytekit import task, workflow
from airflow.sensors.filesystem import FileSensor@task
def say_hello() -> str:return "Hello, World!"@workflow
def airflow_wf():flyte_task = say_hello()airflow_task = FileSensor(task_id="sensor", filepath="/")airflow_task >> flyte_taskif __name__ == "__main__":print(f"Running airflow_wf() {airflow_wf()}")
2. 本地测试工作流

本地运行前需配置 Airflow 连接
,通过设置 AIRFLOW_CONN_{CONN_ID} 环境变量实现。例如:

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

虽然 Airflow 本身不支持本地执行,但包含 Airflow 任务的工作流可在本地运行,这对生产环境前的测试和调试非常有用:

AIRFLOW_CONN_FS_DEFAULT="/" pyflyte run workflows.py airflow_wf

部分 Airflow Operator 需要特定权限才能执行。例如 DataprocCreateClusterOperator 需要 dataproc.clusters.create 权限。本地运行 Airflow 任务时,可能需要配置本地权限以确保任务成功执行。

3. 迁移工作流至生产环境

生产环境中建议将连接信息存储在 密钥后端
,并确保连接器 Pod 具有访问外部密钥后端所需的权限(IAM 角色)。

完成本地测试后,可使用 --remote 标志在 Flyte 集群上执行工作流。此时 Flyte 会在 Kubernetes 集群中创建 Pod 来运行 say_hello 任务,并通过 Airflow 连接器运行 Airflow 的 BashOperator 任务:

pyflyte run --remote workflows.py airflow_wf

测试

flytekit Python SDK 提供了一些实用程序,方便您在测试套件中测试任务和工作流程。更多详细信息,请参阅 API 参考中的 flytekit.testing 模块。

模拟任务

许多编写的任务可以在本地运行,但部分任务可能无法本地执行,通常是因为这些任务依赖于仅在后端可用的第三方服务。Hive任务是典型示例,因为大多数开发者无法从本地开发环境访问执行Hive查询的服务。然而,能够本地运行调用此类任务的工作流仍然非常有用。为此,flytekit提供了多种实用工具来应对这种情况。

例如,这是一个通用SQL任务(默认未连接任何数据存储且未被任何插件处理),若要在单元测试中使用必须进行模拟:

import datetimeimport pandas
from flytekit import SQLTask, TaskMetadata, kwtypes, task, workflow
from flytekit.testing import patch, task_mock
from flytekit.types.schema import FlyteSchemasql = SQLTask("my-query",query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10",inputs=kwtypes(ds=datetime.datetime),outputs=kwtypes(results=FlyteSchema),metadata=TaskMetadata(retries=2),
)@task
def t1() -> datetime.datetime:return datetime.datetime.now()

假设存在使用这两个任务的工作流:

@workflow
def my_wf() -> FlyteSchema:dt = t1()return sql(ds=dt)

若无模拟,调用工作流通常会抛出异常。但使用返回MagicMock对象的task_mock构造器,我们可以覆盖返回值:

def test_demonstrate_mock():with task_mock(sql) as mock:mock.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all()

另一个实用工具patch提供相同功能,但采用传统Python补丁风格,其中第一个参数是MagicMock对象:

@patch(sql)
def test_demonstrate_patch(mock_sql):mock_sql.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all()

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

相关文章:

  • mysql中limit深度分页详细剖析【爽文】
  • 架构师论文《论模型驱动架构软件开发方法及其应用》
  • 【软件测试】性能测试 —— 工具篇 LoadRunner 介绍与使用
  • Ansible模块——服务管理和设置定时任务
  • 一字典两世界:优雅移除 `NSDictionary` 指定键的最佳实践
  • 腾讯 CodeBuddy 杀入 AI 编程赛道,能否撼动海外工具霸主地位?
  • C++类与对象--2 对象的初始化和清理
  • tp5 关键词搜索商品时进行关键词拆分
  • [Linux]我在Linux世界觉醒了指令系统
  • JavaScript基础-创建对象的三种方式
  • 【C++重载操作符与转换】文本查询示例
  • 数据库故障排查指南:解决常见问题,保障数据安全与稳定
  • 数据分析_Python
  • PyTorch实现三元组损失Triplet Loss
  • 为什么 Docker 建议关闭 Swap
  • 基于多头自注意力机制(MHSA)增强的YOLOv11主干网络—面向高精度目标检测的结构创新与性能优化
  • Elasticsearch Fetch阶段面试题
  • Springboot构建项目时lombok不生效
  • 51单片机仿真突然出问题
  • Almalinux中出现ens33 ethernet 未托管 -- lo loopback 未托管 --如何处理:
  • 淮安市车桥中学党总支书记王习元逝世,终年51岁
  • 体坛联播|热刺追平单赛季输球纪录,世俱杯或创收20亿美元
  • 中欧互动中的合作与分歧:务实需求将克服泛安全化的“政治钟摆”
  • 秦洪看盘|风格有所转变,热钱回流高弹性品种
  • 国税总局上海市税务局回应刘晓庆被举报涉嫌偷漏税:正依法依规办理
  • 占地57亩的“潮汕豪宅”面临强制拆除:曾被实施没收,8年间举行5次听证会