当前位置: 首页 > news >正文

Dagster系列教程:快速掌握数据资产定义

数据资产是管理数据生命周期的核心单元,通过持久化存储(如文件、数据库表)与动态生成逻辑的绑定,实现版本控制、依赖追踪与高效协作。Dagster 提供声明式语法,允许开发者通过 @asset 装饰器定义资产,关联代码与数据生成过程,并支持依赖关系声明(deps 参数)自动解析执行顺序。资产可配置化(继承 Config 类)以适应动态参数需求,结合上下文管理(AssetExecutionContext)记录日志与运行追踪,层级化资产键(key_prefix)适配复杂存储场景。本文将带你从基础定义出发,学习如何利用 Dagster 构建可靠、可维护的数据管道,解锁数据驱动决策的潜力。

Dagster数据资产概述

资产是持久存储的对象,例如表、文件或持久化的机器学习模型。资产定义是对资产的描述,包括该资产应当存在的状态以及如何生成和更新该资产的方法。
在这里插入图片描述

资产定义支持数据管理的声明性方法,其中代码是关于应该存在哪些数据资产以及如何计算这些资产的事实来源。

资产定义包括以下内容:

  • 一个 AssetKey,用于引用资产。
  • 一组上游资产键,这些键指向资产定义的内容所源自的资产。
  • 一个 Python 函数,负责从其上游依赖项中计算资产的内容并存储结果。

注意:在幕后,Python 函数是一个操作(op)。操作是 Dagster 中的一个高级主题,但并非开始使用 Dagster 所必需的知识。资产定义和操作之间的关键区别在于,资产定义了解其依赖关系,而操作则不会。直到将操作放入DAG图中时,操作才会与依赖关系建立关联。

物化资产是运行其操作(op)并将结果保存到持久存储的行为。您可以通过Dagster UI或调用Python api来初始化物化。

相关API

名称描述
@asset定义数据资产的装饰器
AssetsDefinition一个类,用于表示一个或多个资产定义,通常由一个单一函数实现或观察这些定义的全部内容。该函数通常会将这些定义组合在一起进行处理或观察。

数据资产定义

基本资产定义

创建资产定义最简单的方法是使用@asset装饰器。

import json
import os

from dagster import asset

@asset
def my_asset():
    os.makedirs("data", exist_ok=True)
    with open("data/my_asset.json", "w") as f:
        json.dump([1, 2, 3], f)

默认情况下,被装饰函数的名称(即 my_asset)会被用作资产的键。被装饰的函数负责生成资产的内容。在本示例中,这个资产并不依赖于任何其他资产。

资产依赖关系

资产定义可以依赖于其他资产定义。在本节中,我们将向您展示如何定义:

  • 基本资产依赖关系
  • 跨代码位置的资产依赖关系

**定义基本依赖关系
可以通过在下游资产的 @asset 装饰器中传递上游资产到 deps 参数来定义两个资产之间的依赖关系。
在本示例中,资产 sugary_cereals 通过从 cereals 表中选择记录来创建一个新表(sugary_cereals)。然后,资产 shopping_list 通过从 sugary_cereals 中选择记录来创建一个新表(shopping_list):

from dagster import asset


@asset
def sugary_cereals() -> None:
    execute_query(
        "CREATE TABLE sugary_cereals AS SELECT * FROM cereals WHERE sugar_grams > 10"
    )


@asset(deps=[sugary_cereals])
def shopping_list() -> None:
    execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")

跨代码位置定义资产依赖关系
资产可以在不同的代码位置依赖于其他资产。以 code_location_1 为例:

# code_location_1.py
import json

from dagster import Definitions, asset


@asset
def code_location_1_asset():
    with open("/data/code_location_1_asset.json", "w+") as f:
        json.dump(5, f)


defs = Definitions(assets=[code_location_1_asset])

在code_location_2中,我们可以通过它的资产键来引用它:

# code_location_2.py
import json

from dagster import AssetKey, Definitions, asset


@asset(deps=["code_location_1_asset"])
def code_location_2_asset():
    with open("/data/code_location_1_asset.json") as f:
        x = json.load(f)

    with open("/data/code_location_2_asset.json", "w+") as f:
        json.dump(x + 6, f)


defs = Definitions(assets=[code_location_2_asset])

资产配置

在 Dagster 中,资产可以指定一个配置模式。这使得您能够在运行时为资产提供值。

资产函数可以为资产的配置指定一个带注释的配置参数。配置类(它继承自 Config(它继承自 pydantic.BaseModel))指定了资产的配置模式。
例如,以下下游资产会查询通过配置定义的 API 端点:

from dagster import Config, asset


class MyDownstreamAssetConfig(Config):
    api_endpoint: str


@asset
def my_downstream_asset(config: MyDownstreamAssetConfig):
    data = requests.get(f"{config.api_endpoint}/data").json()
    ...

请参考 Config 模式文档以获取更多配置信息和示例。

资产上下文

在编写资产时,用户可以选择提供一个名为 context 的第一个参数。当此参数被提供时,Dagster 会向资产主体提供一个 AssetExecutionContext 对象,该对象提供了对系统信息的访问权限,例如日志记录器和当前运行 ID。
例如,要访问日志记录器并记录一条信息消息:

from dagster import AssetExecutionContext, asset


@asset
def context_asset(context: AssetExecutionContext):
    context.log.info(f"My run ID is {context.run.run_id}")
    ...

重试失败的资产

如果在资产执行过程中出现异常,您可以使用重试策略在同一次运行中自动重试该资产。在以下示例中,我们指定了重试的次数以及每次重试之间等待的时间:

from dagster import (
    AssetExecutionContext,
    Backoff,
    Jitter,
    RetryPolicy,
    RetryRequested,
    asset,
)


@asset(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=0.2,  # 200ms
        backoff=Backoff.EXPONENTIAL,
        jitter=Jitter.PLUS_MINUS,
    )
)
def retried_asset(context: AssetExecutionContext):
    context.log.info("Retry me!")

多组件资产键

资产通常是在具有层次命名空间(如文件系统)的系统中的对象。由于这一点,对于资产键来说,使用字符串列表而不是单个字符串通常是有意义的。要使用多部分资产键定义资产,请使用带有字符串列表的 key_prefix 参数。完整的资产键是通过将 key_prefix 添加到资产名称(默认为装饰函数的名称)之前形成的。

from dagster import AssetIn, asset


@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
    return [1, 2, 3]


@asset(ins={"upstream_asset": AssetIn(key_prefix=["one", "two", "three"])})
def downstream_asset(upstream_asset):
    return upstream_asset + [4]

  1. 运行 upstream_asset:生成数据 [1, 2, 3],其完整键为 ("one", "two", "three", "upstream_asset", "<timestamp>")
  2. 运行 downstream_asset:通过 AssetInkey_prefix 匹配到上游资产的输出,接收数据 [1, 2, 3]。执行 upstream_asset + [4],生成结果 [1, 2, 3, 4]
  • key_prefix 适用于需要显式控制资产版本或分区的场景。
  • 在简单依赖关系中(如本示例),直接通过资产名称匹配更简洁。
  • 理解资产键(Asset Key)的构成对调试和血缘分析至关重要。

最后总结

Dagster 通过声明式编程与自动化编排简化数据工程复杂度,核心价值在于将数据资产定义为可观测、可复用的代码单元。资产依赖自动解析减少人工干预,配置化与上下文管理提升灵活性,重试策略保障任务稳定性,层级化资产键适配复杂存储需求。物化记录与血缘分析为调试和合规提供支持,使数据团队能专注于业务逻辑而非运维细节。实践中,建议从基础资产定义入手,逐步引入依赖管理与配置化能力,结合 Dagster UI 监控运行状态。这一工具不仅加速数据管道开发,更通过系统化的资产管理思维,推动数据驱动决策的规模化落地。

http://www.dtcms.com/a/111612.html

相关文章:

  • 数据库系统概述 | 第二章课后习题答案
  • 计算机系统---CPU
  • 嵌入式系统应用-拓展-相关开发软件说明
  • 常见的微信个人号二次开发功能
  • Unity:平滑输入(Input.GetAxis)
  • 【Cursor】切换主题
  • JS API
  • 【软考中级软件设计师】数据表示:原码、反码、补码、移码、浮点数
  • sward V1.0.8版本发布,全面支持各种附件上传预览
  • 初识数据结构——算法效率的“两面性”:时间与空间复杂度全解析
  • yolov12检测 聚类轨迹运动速度
  • 与总社团联合会合作啦
  • Linux的: /proc/sys/net/ipv6/conf/ 笔记250404
  • 操作系统面经(一)
  • 2025年【陕西省安全员C证】报名考试及陕西省安全员C证找解析
  • Qt QTableView QAbstractTableModel实现复选框+代理实现单元格编辑
  • 进行性核上性麻痹:饮食调理为健康护航
  • SpringBoot项目报错: 缺少 Validation
  • 【NLP 55、投机采样加速推理】
  • 在线考试系统带万字文档java项目java课程设计java毕业设计springboot项目
  • 【matplotlib参数调整】
  • 2011-2019年各省地方财政国土资源气象等事务支出决策数数据
  • 如何理解缓存一致性?
  • Linux 安装 MySQL8数据库
  • LLM面试题六
  • Linux随机数
  • React: hook相当于函数吗?
  • 算法设计学习9
  • 【Groovy快速上手 ONLY ONE】Groovy与Java的核心差异
  • 常见的ETL工具分类整理