当前位置: 首页 > news >正文

Dagster中的Ops与Assets:数据管道构建的两种选择

Dagster是一个强大的数据编排平台,它提供了多种工具来帮助数据工程师构建可靠的数据管道。在Dagster中,Ops和Assets是两种核心概念,用于定义数据处理逻辑。本文将全面介绍Ops的概念、特性及其使用方法,特别补充了Op上下文和Op工厂等重要内容,并解释为什么对于新用户我们推荐优先使用Assets。

在这里插入图片描述

什么是Ops?

Ops是Dagster中的基本计算单元,代表一个独立的数据处理任务。每个Op应该执行相对简单的任务,例如:

  • 从其他数据集派生新数据集
  • 执行数据库查询
  • 在远程集群中启动Spark作业
  • 查询API并将结果存储到数据仓库
  • 发送电子邮件或Slack消息

Ops的核心特性

1. 灵活的执行策略

Ops是独立于执行策略的逻辑单元,这使得它们可以在开发和生产环境之间无缝转换。Ops可以组合成图(graphs),并通过jobs绑定到适当的执行器上,实现单机执行或在集群中分布式执行。

2. 可插拔的外部系统集成

对于需要与外部系统交互的数据管道,Dagster提供了资源(resources)抽象层。你可以针对抽象资源(如数据库)编写Op逻辑,然后在job级别绑定具体的资源定义。这样,开发阶段可以使用本地替代方案,而生产环境则使用云服务。

3. 输入和输出管理

Ops具有明确的输入和输出,类似于Python函数的参数和返回值。这些输入输出可以附加Dagster类型进行运行时验证,并可以通过IO Manager管理数据存储,实现不同执行环境间的I/O策略切换和中间数据的高效缓存。

4. 配置能力

数据管道中的操作通常需要参数化配置。Ops允许通过配置模式(config schema)定义这些参数,使Ops更加灵活和可重用。例如,可以通过配置指定API端点:

from dagster import Configclass MyOpConfig(Config):api_endpoint: str@op
def my_configurable_op(config: MyOpConfig):data = requests.get(f"{config.api_endpoint}/data").json()return data

5. 事件流

Ops在执行过程中会发出一系列事件,包括默认事件(如开始执行)和通过事件API报告的自定义事件(如数据资产创建、数据质量检查结果等)。这些事件流可以在Dagster UI中可视化,便于调试、检查和实时监控。

6. 可测试性

Ops的设计使其易于测试,可以单独测试或在管道中测试。资源API还允许在需要时替换外部系统(如数据库)的存根(stub)。

定义和使用Ops

使用@op装饰器定义Ops:

@op
def my_op():return "hello"

输入和输出

Ops通过参数接收输入,通过返回值产生输出:

@op
def add(a: int, b: int) -> int:return a + b

对于多输出,可以使用Out对象:

@op(out={"sum": Out(), "product": Out()})
def math_ops(a: int, b: int):yield Output(a + b, "sum")yield Output(a * b, "product")

配置

为Ops添加配置:

from dagster import Config, opclass GreetingConfig(Config):name: str@op(config_schema=GreetingConfig)
def greet(context, config: GreetingConfig):context.log.info(f"Hello, {config.name}!")return f"Hello, {config.name}!"

Op上下文(Op Context)

在编写Op时,用户可以可选地提供一个上下文参数(通常命名为context)。当这个参数被提供时,Dagster会在Op执行时自动注入一个上下文对象,该对象提供了访问系统信息的能力,如日志记录器、当前运行ID等。

上下文对象的作用

  1. 日志记录:通过context.log记录不同级别的日志信息
  2. 访问运行信息:获取当前运行的ID、作业名称等信息
  3. 资源访问:在某些情况下可以访问配置的资源
  4. 错误处理:提供更丰富的错误报告能力

使用示例

from dagster import op, OpExecutionContext@op
def context_op(context: OpExecutionContext):# 记录info级别日志context.log.info(f"My run ID is {context.run_id}")# 记录debug级别日志(默认可能不显示)context.log.debug("This is a debug message")# 在实际业务逻辑中使用上下文try:result = do_something()return resultexcept Exception as e:context.log.error(f"Operation failed: {str(e)}")raise

Op工厂模式

在实际项目中,我们经常需要创建多个相似的Ops,或者需要动态生成Ops。这时,Op工厂模式就非常有用。Op工厂允许我们通过函数来生成Ops,而不是为每个Op手动编写装饰器。

工厂模式的应用场景

  1. 参数化Op创建:当需要创建多个相似但配置不同的Ops时
  2. 动态Op生成:根据运行时条件或配置动态生成Ops
  3. 代码复用:避免重复的Op定义代码

创建Op工厂

from dagster import op, OpDefinitiondef create_math_op(name: str, operation):"""创建数学运算Op的工厂函数Args:name (str): 新Op的名称operation (callable): 数学运算函数Returns:OpDefinition: 生成的Op定义"""@op(name=name)def math_op(a: float, b: float) -> float:return operation(a, b)return math_op# 使用工厂创建具体的Ops
add_op = create_math_op("add", lambda a, b: a + b)
multiply_op = create_math_op("multiply", lambda a, b: a * b)# 或者更复杂的工厂函数
def advanced_op_factory(config_schema=None, tags=None):"""更高级的Op工厂,支持配置和标签Args:config_schema: Op的配置模式tags: 要附加到Op的标签Returns:函数:接受compute函数并返回OpDefinition"""def decorator(compute_fn):op_def = op(name=compute_fn.__name__,config_schema=config_schema,tags=tags)(compute_fn)return op_defreturn decorator# 使用高级工厂
@advanced_op_factory(config_schema={"precision": int},tags={"team": "analytics"}
)
def divide(a: float, b: float, context) -> float:precision = context.op_config["precision"]result = a / breturn round(result, precision)

工厂模式的注意事项

  1. 性能考虑:工厂模式会引入额外的函数调用层,但在大多数情况下影响可以忽略
  2. 类型提示:使用工厂创建的Ops可能需要额外的类型提示处理
  3. 文档:确保为工厂函数和生成的Ops提供清晰的文档

为什么推荐Assets而非Ops?

虽然Ops功能强大,但对于新用户我们推荐优先使用Assets:

  1. 更高级的抽象:Assets提供了数据版本控制、血缘追踪和自动缓存等高级功能
  2. 声明式API:Assets允许以更声明式的方式定义数据管道
  3. 内置集成:Assets与Dagster的其他功能(如调度、监控)有更好的集成
  4. 简化复杂性:对于大多数用例,Assets可以简化数据管道的定义和维护

结论

Ops是Dagster中强大的计算单元,适合处理复杂的数据处理逻辑。然而,对于新用户或构建标准数据管道的场景,Assets提供了更高级的抽象和更简化的开发体验。随着对Dagster的深入理解,用户可以根据需要选择使用Ops来处理更复杂的场景。

无论选择哪种方式,Dagster都提供了丰富的工具和灵活性来构建可靠、可维护的数据管道。特别是Op上下文和Op工厂模式等高级特性,为复杂的数据工程需求提供了强大的支持。

相关文章:

  • C语言中的自定义类型 —— 结构体.位段.联合体和枚举
  • 深入理解Redis SDS:高性能字符串的终极设计指南
  • 用PyTorch搭建卷积神经网络实现MNIST手写数字识别
  • 《ATPL地面培训教材13:飞行原理》——第3章:基础空气动力学理论
  • 广义线性模型三剑客:线性回归、逻辑回归与Softmax分类的统一视角
  • 【查看.ipynp 文件】
  • 文献总结:TPAMI端到端自动驾驶综述——End-to-End Autonomous Driving: Challenges and Frontiers
  • 基于Springboot+Mysql的校园博客系统(含LW+PPT+源码+系统演示视频+安装说明)
  • 信息安全导论:解码社会工程学攻击的隐形战争
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】1.1 数据库核心概念与PostgreSQL技术优势
  • 软件工程实践
  • 基于 Dify + vLLM插件 + Qwen3 构建问答机器人Docker版
  • 机器人--MCU
  • MySQL数据操作全攻略:DML增删改与DQL高级查询实战指南
  • Oracle RAC ‘Metrics Global Cache Blocks Lost‘告警解决处理
  • 火语言RPA--DestoonV8商品发布
  • Qt 中实现观察者模式(Observer Pattern)
  • ros2 humble 控制真实机械臂(以lerobot为例)
  • 【Unity】XLua访问C#文件
  • 人工智能助力工业制造:迈向智能制造的未来
  • 吴清:巴菲特即将退休,但价值投资、长期投资、理性投资、努力回报投资者等理念不会退休
  • 独家专访|白先勇:我的家乡不是哪个地点,是中国传统文化
  • 印度扩大对巴措施:封锁巴基斯坦名人账号、热门影像平台
  • 英国传统两党受挫地方选举后反思,改革党异军突起“突破想象”
  • 新华每日电讯头版聚焦上海:科创高地向未来
  • 泽连斯基:美乌矿产协议将提交乌拉达批准