当前位置: 首页 > news >正文

血缘元数据采集开放标准:OpenLineage Guides 使用 Apache Airflow® 和 OpenLineage + Marquez 入门

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

使用 Apache Airflow® 和 OpenLineage + Marquez 入门

Getting Started with Apache Airflow® and OpenLineage+Marquez

本教程将指导你配置 Apache Airflow® 以将 OpenLineage 事件发送到 Marquez,并通过一个真实的故障排查场景进行探索。

目录

  • 前提条件
  • 获取并启动 Marquez
  • 配置 Airflow 将 OpenLineage 事件发送到 Marquez
  • 编写 Airflow DAG
  • 在 Marquez 中查看已收集的血缘
  • 使用 Marquez 排查失败的 DAG
  • 后续步骤
  • 反馈

前提条件

开始前,请确保已安装:

  • Docker 17.05+
  • Apache Airflow 2.7+ 本地运行。

如需在本地轻松安装并运行 Airflow 以用于开发,请参阅:快速开始。

获取并启动 Marquez

  1. 创建 Marquez 目录,然后通过运行以下命令检出 Marquez 源码:

    MacOS/Linux

    git clone https://github.com/MarquezProject/marquez && cd marquez
    

    Windows

    git config --global core.autocrlf false
    git clone https://github.com/MarquezProject/marquez && cd marquez
    
  2. Airflow 和 Marquez 都需要 5432 端口用于其元数据库,但 Marquez 服务更易于配置。你也可以即时为数据库服务分配一个新端口。要使用 2345 端口启动 Marquez,请运行:

    MacOS/Linux

    ./docker/up.sh --db-port 2345
    

    Windows

    验证 Postgres 和 Bash 是否在 PATH 中,然后运行:

    sh ./docker/up.sh --db-port 2345
    
  3. 要查看 Marquez UI 并验证其运行状态,请打开 http://localhost:3000。该 UI 允许你:

    • 查看跨平台依赖关系,即你可在生态系统中查看生成或消费关键表的工具中的作业。
    • 查看当前和先前作业运行的运行级元数据,使你能够看到作业的最新状态和数据集的更新历史。
    • 获取资源使用情况的高级视图,使你能够查看操作中的趋势。

配置 Airflow 将 OpenLineage 事件发送到 Marquez

  1. 要配置 Airflow 以将 OpenLineage 事件发送到 Marquez,你需要修改本地 Airflow 环境并添加依赖。首先,定义一个 OpenLineage 传输。一种方法是使用环境变量。要使用 http 并将事件发送到本地端口 5000 上运行的 Marquez API,请运行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    

    Windows

    set AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    
  2. 你还需要为 Airflow 作业定义一个命名空间。它可以是任意字符串。请运行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    

    Windows

    set AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    
  3. 要将所需的 Airflow OpenLineage Provider 包添加到你的 Airflow 环境,请运行:

    MacOS/Linux

    pip install apache-airflow-providers-openlineage
    

    Windows

    pip install apache-airflow-providers-openlineage
    
  4. 要完成本教程,你还需要在 Airflow 中启用本地 Postgres 操作。为此,请运行:

    MacOS/Linux

    pip install apache-airflow-providers-postgres
    

    Windows

    pip install apache-airflow-providers-postgres
    
  5. 在本地 Postgres 实例中创建一个数据库,并使用默认 ID (postgres_default) 创建一个 Airflow Postgres 连接。如需前者帮助,请参阅:Postgres 文档。如需后者帮助,请参阅:管理连接。

编写 Airflow DAG

在此步骤中,你将创建两个新的 Airflow DAG,它们执行简单任务,并将其添加到你现有的 Airflow 实例。counter DAG 每分钟将列值加 1,而 sum DAG 每五分钟计算一次总和。这将形成一个包含两个作业和两个数据集的简单管道。

  1. dags/ 目录下,创建一个名为 counter.py 的文件,并添加以下代码:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(schedule='*/1 * * * *',start_date=days_ago(1),catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that generates a new count value equal to 1.'
    )def counter():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS counts (value INTEGER);''',)query2 = PostgresOperator(task_id='inc',postgres_conn_id='postgres_default',sql='''INSERT INTO "counts" (value) VALUES (1);''',)query1 >> query2counter()
  2. dags/ 目录下,创建一个名为 sum.py 的文件,并添加以下代码:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(start_date=days_ago(1),schedule='*/5 * * * *',catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that sums the total of generated count values.'
    )def sum():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS sums (value INTEGER);''')query2 = PostgresOperator(task_id='total',postgres_conn_id='postgres_default',sql='''INSERT INTO sums (value)SELECT SUM(value) FROM counts;''')query1 >> query2sum()
  3. 重启 Airflow 以应用更改。然后,取消暂停两个 DAG。

在 Marquez 中查看已收集的血缘

  1. 要查看 Marquez 从 Airflow 收集的血缘,请访问 http://localhost:3000 打开 Marquez UI。然后,使用左上角搜索栏搜索 counter.inc 作业。要查看 counter.inc 的血缘元数据,请从下拉列表中点击该作业:

    image

  2. 查看 counter.inc 的血缘图,你应看到 <database>.public.counts 作为输出数据集,sum.total 作为下游作业:

    image

使用 Marquez 排查失败的 DAG

  1. 在此步骤中,你将模拟由于跨 DAG 依赖项更改导致的管道中断,并了解来自 OpenLineage + Marquez 的增强血缘如何使架构更改的故障排查变得轻松。

    假设 Team A 拥有 DAG counterTeam A 更新 counter 以将 counts 表中的 values 列重命名为 value_1_to_10,但未将架构更改正确传达给拥有 sum 的团队。

    counter 应用以下更改以模拟破坏性更改:

    query1 = PostgresOperator(
    -   task_id='if_not_exists',
    +   task_id='alter_name_of_column',postgres_conn_id='example_db',sql='''
    -   CREATE TABLE IF NOT EXISTS counts (
    -     value INTEGER
    -   );''',
    +   ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10";
    +   '''
    )
    
    query2 = PostgresOperator(task_id='inc',postgres_conn_id='example_db',sql='''
    -    INSERT INTO counts (value)
    +    INSERT INTO counts (value_1_to_10)VALUES (1)''',
    )
    

    正如 sum 的所有者 Team B 所做的那样,注意 Marquez 中 DataOps 视图的失败运行:

    image

    Team B 只能猜测 DAG 失败的可能原因,因为 DAG 最近没有更改。因此,团队决定检查 Marquez。

  2. 在 Marquez 中,导航到 Datasets 视图,并从右上角的命名空间下拉菜单中选择你的 Postgres 实例。然后,点击 <database>.public.counts 数据集并检查图表。你将在节点上找到架构:

    image

  3. 假设你不认识该列,并希望了解其原始名称及更改时间。点击节点将打开详情抽屉。在那里,使用版本历史查找架构更改的运行:

    image

  4. 在 Airflow 中,通过更新计算计数总和的任务以使用新列名来修复中断的下游 DAG:

    query2 = PostgresOperator(task_id='total',postgres_conn_id='example_db',sql='''
    -    INSERT INTO sums (value)
    -       SELECT SUM(value) FROM counts;
    +       SELECT SUM(value_1_to_10) FROM counts;'''
    )
    
  5. 重新运行 DAG。在 Marquez 中,通过查看 DataOps 视图中最近的运行历史来验证修复:

    image

后续步骤

  • 查看用于收集 Airflow DAG 元数据的 Marquez HTTP API,并学习如何使用 OpenLineage 构建自己的集成。
  • 查看可与 Airflow 一起使用的 openlineage-spark 集成。

反馈

你觉得本指南如何?请在 OpenLineage Slack 或 Marquez Slack 中告诉我们。你也可以通过 提交拉取请求 直接提出更改。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.dtcms.com/a/360885.html

相关文章:

  • IPC 进程间通信 interprocess communicate
  • 【macOS】垃圾箱中文件无法清理的“含特殊字符文件名”的方法
  • 应用平台更新:可定制目录、基于Git的密钥管理与K8s项目自动化管理
  • Python 爬虫基础教学
  • C#/.NET/.NET Core技术前沿周刊 | 第 52 期(2025年8.25-8.31)
  • C++ 面试高频考点 力扣 35. 搜索插入位置 二分查找 左右端点查找 题解 每日一题
  • RocksDB 在 macOS M 系列 上运行时报错的解决方案
  • 【公告】更新预告
  • vite基础讲解
  • 超越Transformer:语言模型未来的认知革命与架构重构
  • Golang之GoWorld深度解析:基于Go语言的分布式游戏服务器框架
  • 新启航技术白皮书:激光频率梳如何实现 130mm 深孔 2μm 级无遮挡 3D 轮廓测量
  • OpenCV-Python Tutorial : A Candy from Official Main Page(五)
  • 使用Spring Boot对接印度股票市场API开发实践
  • Burp Suite 插件 | 提供强大的框架自动化安全扫描功能。目前支持1000+POC、支持动态加载POC、指定框架扫描。
  • 一体化运维平台自动化模块:3 大场景解放运维双手
  • 开发中使用——鸿蒙CoreSpeechKit语音识别
  • 复杂计算任务的智能轮询优化实战
  • 教育项目管理工具新趋势:可视化与自动化如何提升效率?
  • 使用ansible的playbook完成以下操作
  • TFS-2010《Fuzzy PCA-Guided Robust k-Means Clustering》
  • macOS中Homebrew安装PHP的详细步骤(五)
  • React学习教程,从入门到精通, React 入门指南:创建 React 应用程序的语法知识点(7)
  • 反物质量子比特初探
  • [免费]基于Python的气象天气预报数据可视化分析系统(Flask+echarts+爬虫) 【论文+源码+SQL脚本】
  • 【iOS】关键字复习
  • 【iOS】折叠cell
  • 量子電腦組裝
  • FunASR开源部署中文实时语音听写服务(CPU)
  • 配送算法19 Two Fast Heuristics for Online Order Dispatching