当前位置: 首页 > news >正文

Dagster Pipes系列-1:调用外部Python脚本

本文是"Dagster Pipes教程"的第一部分,介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先,创建Dagster资产subprocess_asset,利用PipesSubprocessClient资源执行外部脚本external_code.py,实现跨进程的数据处理。通过dagster dev启动UI,可在Dagster界面中监控子进程的执行状态和日志输出,包括标准输出(stdout)内容。本文详细讲解了资产定义、资源注入及命令执行的完整流程,为后续修改外部代码以支持Dagster Pipes通信奠定基础。此方法适用于需要将现有脚本集成到Dagster数据管道的场景,提升自动化与可观测性。完成本部分后,读者可继续学习第二部分,掌握如何增强外部脚本与Dagster的交互能力。

教程概述

本教程将指导你完成以下步骤:

  1. 创建一个调用外部Python脚本的Dagster资产
  2. 定义必要的Dagster资源(resources)
  3. 在Dagster UI中运行并查看结果
    在这里插入图片描述

前提条件

在开始之前,请确保你已经:

  • 安装了Dagster
  • 创建了一个名为external_code.py的独立Python脚本,内容如下:
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2],"item_id": [432, 878]})total_orders = len(orders_df)print(f"processing total {total_orders} orders")

第一步:定义Dagster资产

首先,在与external_code.py相同的目录下创建一个名为dagster_code.py的新文件。

1.1 创建资产定义

将以下代码复制到dagster_code.py中:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()

代码解析:

  • 我们创建了一个名为subprocess_asset的资产
  • 使用AssetExecutionContext作为上下文参数,它提供了系统信息如资源、配置和日志记录
  • 指定了PipesSubprocessClient资源
  • 构建了一个命令列表来执行外部脚本
  • 使用pipes_subprocess_client.run()方法在管道会话中同步执行子进程

1.2 从资产调用外部代码

上述代码中的关键部分是:

pipes_subprocess_client.run(command=cmd,context=context
).get_materialize_result()

这段代码做了什么:

  • PipesSubprocessClient资源暴露了一个run方法
  • 当资产执行时,这个方法会在管道会话中同步执行子进程
  • 返回一个PipesClientCompletedInvocation对象
  • 可以使用get_materialize_result()方法访问子进程报告的MaterializeResult事件

第二步:定义Definitions对象

为了让Dagster工具(如CLI、UI和Dagster+)能够加载和访问资产及子进程资源,我们需要创建一个Definitions对象。

dagster_code.py文件末尾添加以下代码:

from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

此时,dagster_code.py文件应该如下所示:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

第三步:从Dagster UI运行子进程

现在,让我们在Dagster UI中执行我们创建的子进程资产。

  1. 在新的命令行会话中运行以下命令启动UI:

    dagster dev -f dagster_code.py
    
  2. 点击右上角的"Materialize"按钮来运行你的代码

  3. 导航到"Run details"页面,在这里你可以看到运行的日志

  4. external_code.py中,我们有一个打印语句将输出到stdout。Dagster会在UI的原始计算日志视图中显示这些内容。

  5. 要查看stdout日志,切换日志部分到stdout:

在这里插入图片描述

下一步

到目前为止,你已经创建了一个调用外部Python脚本的Dagster资产,在子进程中执行了代码,并在Dagster UI中查看了结果。接下来,你将学习如何修改外部代码以与Dagster Pipes配合工作,将信息发送回Dagster。

总结

通过本教程的第一部分,我们实现了:

  • 创建了一个Dagster资产来调用外部Python脚本
  • 配置了必要的资源来支持子进程执行
  • 在Dagster UI中成功运行并查看了结果

这个基础设置为你在后续步骤中实现更复杂的管道通信打下了良好的基础。

相关文章:

  • 按钮导航组件 | 纯血鸿蒙组件库AUI
  • 基于STM32、HAL库的DPS368XTSA1气压传感器 驱动程序设计
  • Java高频面试之并发编程-16
  • 设置环境变量启动jar报
  • 基于SpringBoot的蜗牛兼职网设计与实现|源码+数据库+开发说明文档
  • Qt Creator 配置 Android 编译环境
  • 火山RTC 6 自定义视频
  • 深入解析MySQL联合查询(UNION):案例与实战技巧
  • 区块链技术构建电子发票平台“税链”
  • JVM之垃圾回收器
  • 开源 RPA 工具深度解析与官网指引
  • 【Git】GitHub上传图片遇到的问题
  • Spark,序列化反序列化
  • C# 基础 try-catch代码块
  • 「华为」人形机器人赛道投资首秀!
  • 单片机学习Day08--相邻流水灯
  • 【落羽的落羽 C++】stack和queue、deque、priority_queue、仿函数
  • MySQL 8.0安装(压缩包方式)
  • 热部署与双亲委派
  • Oracle数据库局部性HANG处理过程
  • 寒武纪陈天石:公司的产品力获得了行业客户广泛认可,市场有望迎来新增量需求
  • 贵州省总工会正厅级副主席梁伟被查,曾任贵州省纪委副书记
  • 长期对组织隐瞒真实年龄,广元市城发集团原董事韩治成被双开
  • 上海“电子支付费率成本为0”背后:金融服务不仅“快”和“省”,更有“稳”和“准”
  • 马上评丨规范隐藏式车门把手,重申安全高于酷炫
  • 长江画派创始人之一、美术家鲁慕迅逝世,享年98岁