当前位置: 首页 > news >正文

血缘元数据采集开放标准:OpenLineage Integrations Apache Spark Main Concepts Installation

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

Apache Spark

该集成已知适用于最新 Spark 版本以及其他 Apache Spark 3.*。如需获取受支持版本的最新信息,请查看此处。

该集成通过 OpenLineageSparkListener 使用 SparkListener 接口,提供全面的监控方案。 它会监听 SparkContext 发出的事件,提取与作业和数据集相关的元数据,并利用 RDD 和 DataFrame 的依赖图。方法可有效从各种数据源收集信息,包括文件系统源(如 S3 和 GCS)、JDBC 后端以及 Redshift 和 BigQuery 等数据仓库。

核心概念

Main Concepts

Spark 作业通常在机器集群上运行。
在单台机器上运行的 “driver” 应用构建作业图——例如从数据源读取数据,过滤、转换、关联记录,再将结果写入某个接收端——并管理这些作业的执行。
Spark 的基本抽象是弹性分布式数据集(RDD),它封装了对记录的分布式读取和修改。
虽然可以直接使用 RDD,但更常见的是使用 Spark Dataset 或 DataFrame API,后者通过显式 Schema 提升性能,并支持以 SQL 方式与数据集交互。
DataFrame 的声明式 API 允许 Spark 在执行前通过分析和操作抽象查询计划来优化作业。

在 Spark 中收集血缘

收集血缘需要在 driver 应用中挂接到 Spark 的 ListenerBus,并在执行事件发生时收集和分析它们。
无论是原生 RDD 还是 DataFrame 作业,在执行期间都会向 listener bus 发送事件。
这些事件暴露了作业结构,包括优化后的查询计划,使 Spark 集成能够分析作业中消费和产出的数据集,并提取存储属性——如 GCS 或 S3 中的位置、关系数据库或仓库(如 Redshift、BigQuery)中的表名,以及 Schema。
除了数据集和作业血缘外,Spark SQL 作业还会报告逻辑计划,这些计划可以在多次作业运行之间进行比较,以追踪查询计划的重要变化,这些变化可能影响作业的正确性或速度。

单个 Spark 应用可能执行多个作业。
Spark OpenLineage 集成将一个 Spark 作业映射为一个 OpenLineage Job。
应用在启动时会被分配一个 Run id,每个执行的作业都会将该应用的 Run id 作为其父作业运行。
因此,一个应用如果读取一个或多个源数据集,写入一个中间数据集,然后转换该中间数据集并写入最终输出数据集,将报告三个作业:

  • 父应用作业
  • 读取源数据并创建中间数据集的初始作业
  • 消费中间数据集并产生最终输出的最终作业

如下图所示:

image

安装

Installation

要将 OpenLineage Spark 集成到您的应用,可选择以下方式:

  • 与 Apache Spark 应用项目一起打包
  • 将 JAR 放入 ${SPARK_HOME}/jars 目录
  • spark-submit / spark-shell / pyspark 中使用 --jars 选项
  • spark-submit / spark-shell / pyspark 中使用 --packages 选项
与 Apache Spark 应用项目一起打包

本方法未演示如何配置 OpenLineageSparkListener。请参见 配置 一节。

Maven 用户,在 pom.xml 中添加:

<dependency><groupId>io.openlineage</groupId><artifactId>openlineage-spark_${SCALA_BINARY_VERSION}</artifactId><version>{{PREPROCESSOR:OPENLINEAGE_VERSION}}</version>
</dependency>

Gradle 用户,在 build.gradle 中添加:

implementation("io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}")
将 JAR 放入 ${SPARK_HOME}/jars 目录

本方法未演示如何配置 OpenLineageSparkListener。请参见 配置 一节。

  1. 从 Maven Central 下载 JAR 及其校验文件。
  2. 使用校验文件验证 JAR 完整性。
  3. 验证成功后,将 JAR 移至 ${SPARK_HOME}/jars

以下脚本自动完成下载与验证:

#!/usr/bin/env bashif [ -z "$SPARK_HOME" ]; thenecho "SPARK_HOME 未设置。请将其定义为 Spark 安装目录。"exit 1
fiOPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13'        # 示例 Scala 版本
ARTIFACT_ID="openlineage-spark_${SCALA_BINARY_VERSION}"
JAR_NAME="${ARTIFACT_ID}-${OPENLINEAGE_SPARK_VERSION}.jar"
CHECKSUM_NAME="${JAR_NAME}.sha512"
BASE_URL="https://repo1.maven.org/maven2/io/openlineage/${ARTIFACT_ID}/${OPENLINEAGE_SPARK_VERSION}"curl -O "${BASE_URL}/${JAR_NAME}"
curl -O "${BASE_URL}/${CHECKSUM_NAME}"echo "$(cat ${CHECKSUM_NAME})  ${JAR_NAME}" | sha512sum -cif [ $? -eq 0 ]; thenmv "${JAR_NAME}" "${SPARK_HOME}/jars"
elseecho "校验失败。"exit 1
fi
spark-submit / spark-shell / pyspark 中使用 --jars 选项

本方法未演示如何配置 OpenLineageSparkListener。请参见 配置 一节。

  1. 从 Maven Central 下载 JAR 及其校验文件。
  2. 使用校验文件验证 JAR 完整性。
  3. 验证成功后,使用 --jars 选项提交 Spark 应用。

以下脚本演示该流程:

#!/usr/bin/env bashOPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13'        # 示例 Scala 版本
ARTIFACT_ID="openlineage-spark_${SCALA_BINARY_VERSION}"
JAR_NAME="${ARTIFACT_ID}-${OPENLINEAGE_SPARK_VERSION}.jar"
CHECKSUM_NAME="${JAR_NAME}.sha512"
BASE_URL="https://repo1.maven.org/maven2/io/openlineage/${ARTIFACT_ID}/${OPENLINEAGE_SPARK_VERSION}"curl -O "${BASE_URL}/${JAR_NAME}"
curl -O "${BASE_URL}/${CHECKSUM_NAME}"echo "$(cat ${CHECKSUM_NAME})  ${JAR_NAME}" | sha512sum -cif [ $? -eq 0 ]; thenspark-submit --jars "path/to/${JAR_NAME}" \# ... 其他选项
elseecho "校验失败。"exit 1
fi
spark-submit / spark-shell / pyspark 中使用 --packages 选项

本方法未演示如何配置 OpenLineageSparkListener。请参见 配置 一节。

Spark 允许在运行时使用 spark-submit--packages 选项添加依赖包。
该选项会在运行时自动从 Maven Central(或其他配置的仓库)下载包,并将其加入 Spark 应用的 classpath。

OPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13'        # 示例 Scala 版本spark-submit --packages "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}" \# ... 其他选项

1.8.0 及更早版本仅支持 Apache Spark 的 Scala 2.12 变体,且 artifact 名称中不含 Scala 版本后缀。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.dtcms.com/a/349520.html

相关文章:

  • 05 开发环境和远程仓库Gitlab准备
  • 【spring进阶】spring应用内方法调用时长统计
  • 【数据结构】串——(一)
  • 36 NoSQL 注入
  • Docker 部署 GitLab 并开启 SSH 使用详解
  • 【Java后端】Java 多线程:从原理到实战,再到高频面试题
  • Claude Code 使用及配置智能体
  • 【科研绘图系列】R语言绘制代谢物与临床表型相关性的森林图
  • 从零到一:现代化充电桩App的React前端参考
  • 将FGUI的Shader全部预热后,WebGL平台没有加载成功
  • 基于MalConv的恶意软件检测系统设计与实现
  • 大模型 transformer 步骤
  • 《拉康精神分析学中的欲望辩证法:能指的拓扑学与主体的解构性重构》
  • 计算机大数据技术不会?医院体检数据可视化分析系统Django+Vue全栈方案
  • 不止效率工具:AI 在文化创作中如何重构 “灵感逻辑”?
  • 【DFS 或 BFS 或拓扑排序 - LeetCode】329. 矩阵中的最长递增路径
  • 【图像算法 - 23】工业应用:基于深度学习YOLO12与OpenCV的仪器仪表智能识别系统
  • 基于视觉的果园无人机导航:一种基于干预模仿学习与VAE控制器的真实世界验证
  • 机器人中的李代数是什么
  • 抖音多账号运营新范式:巨推AI如何解锁流量矩阵的商业密码
  • 量子计算驱动的Python医疗诊断编程前沿展望(下)
  • 数据结构:单向链表的逆置;双向循环链表;栈,输出栈,销毁栈;顺序表和链表的区别和优缺点;0825
  • 平安产险青海分公司启动2025年“乡风文明100行动” 首站落地海东市乐都区土官沟村
  • 【C++详解】哈希表概念与实现 开放定址法和链地址法、处理哈希冲突、哈希函数介绍
  • Redis缓存雪崩缓存击穿缓存穿透的处理方式
  • [React]Antd Upload组件上传多个文件
  • 阿里云安装postgre数据库
  • Vim 的 :term命令:终端集成的终极指南
  • 中介者模式及优化
  • Flink 状态 RocksDBListState(写入时的Merge优化)