血缘元数据采集开放标准:OpenLineage Integrations Apache Spark Main Concepts Installation
OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。
Apache Spark
该集成已知适用于最新 Spark 版本以及其他 Apache Spark 3.*。如需获取受支持版本的最新信息,请查看此处。
该集成通过 OpenLineageSparkListener
使用 SparkListener
接口,提供全面的监控方案。 它会监听 SparkContext 发出的事件,提取与作业和数据集相关的元数据,并利用 RDD 和 DataFrame 的依赖图。方法可有效从各种数据源收集信息,包括文件系统源(如 S3 和 GCS)、JDBC 后端以及 Redshift 和 BigQuery 等数据仓库。
核心概念
Main Concepts
Spark 作业通常在机器集群上运行。
在单台机器上运行的 “driver” 应用构建作业图——例如从数据源读取数据,过滤、转换、关联记录,再将结果写入某个接收端——并管理这些作业的执行。
Spark 的基本抽象是弹性分布式数据集(RDD),它封装了对记录的分布式读取和修改。
虽然可以直接使用 RDD,但更常见的是使用 Spark Dataset 或 DataFrame API,后者通过显式 Schema 提升性能,并支持以 SQL 方式与数据集交互。
DataFrame 的声明式 API 允许 Spark 在执行前通过分析和操作抽象查询计划来优化作业。
在 Spark 中收集血缘
收集血缘需要在 driver 应用中挂接到 Spark 的 ListenerBus
,并在执行事件发生时收集和分析它们。
无论是原生 RDD 还是 DataFrame 作业,在执行期间都会向 listener bus 发送事件。
这些事件暴露了作业结构,包括优化后的查询计划,使 Spark 集成能够分析作业中消费和产出的数据集,并提取存储属性——如 GCS 或 S3 中的位置、关系数据库或仓库(如 Redshift、BigQuery)中的表名,以及 Schema。
除了数据集和作业血缘外,Spark SQL 作业还会报告逻辑计划,这些计划可以在多次作业运行之间进行比较,以追踪查询计划的重要变化,这些变化可能影响作业的正确性或速度。
单个 Spark 应用可能执行多个作业。
Spark OpenLineage 集成将一个 Spark 作业映射为一个 OpenLineage Job。
应用在启动时会被分配一个 Run id,每个执行的作业都会将该应用的 Run id 作为其父作业运行。
因此,一个应用如果读取一个或多个源数据集,写入一个中间数据集,然后转换该中间数据集并写入最终输出数据集,将报告三个作业:
- 父应用作业
- 读取源数据并创建中间数据集的初始作业
- 消费中间数据集并产生最终输出的最终作业
如下图所示:
安装
Installation
要将 OpenLineage Spark 集成到您的应用,可选择以下方式:
- 与 Apache Spark 应用项目一起打包
- 将 JAR 放入
${SPARK_HOME}/jars
目录 - 在
spark-submit / spark-shell / pyspark
中使用--jars
选项 - 在
spark-submit / spark-shell / pyspark
中使用--packages
选项
与 Apache Spark 应用项目一起打包
本方法未演示如何配置
OpenLineageSparkListener
。请参见 配置 一节。
Maven 用户,在 pom.xml
中添加:
<dependency><groupId>io.openlineage</groupId><artifactId>openlineage-spark_${SCALA_BINARY_VERSION}</artifactId><version>{{PREPROCESSOR:OPENLINEAGE_VERSION}}</version>
</dependency>
Gradle 用户,在 build.gradle
中添加:
implementation("io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}")
将 JAR 放入 ${SPARK_HOME}/jars
目录
本方法未演示如何配置
OpenLineageSparkListener
。请参见 配置 一节。
- 从 Maven Central 下载 JAR 及其校验文件。
- 使用校验文件验证 JAR 完整性。
- 验证成功后,将 JAR 移至
${SPARK_HOME}/jars
。
以下脚本自动完成下载与验证:
#!/usr/bin/env bashif [ -z "$SPARK_HOME" ]; thenecho "SPARK_HOME 未设置。请将其定义为 Spark 安装目录。"exit 1
fiOPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13' # 示例 Scala 版本
ARTIFACT_ID="openlineage-spark_${SCALA_BINARY_VERSION}"
JAR_NAME="${ARTIFACT_ID}-${OPENLINEAGE_SPARK_VERSION}.jar"
CHECKSUM_NAME="${JAR_NAME}.sha512"
BASE_URL="https://repo1.maven.org/maven2/io/openlineage/${ARTIFACT_ID}/${OPENLINEAGE_SPARK_VERSION}"curl -O "${BASE_URL}/${JAR_NAME}"
curl -O "${BASE_URL}/${CHECKSUM_NAME}"echo "$(cat ${CHECKSUM_NAME}) ${JAR_NAME}" | sha512sum -cif [ $? -eq 0 ]; thenmv "${JAR_NAME}" "${SPARK_HOME}/jars"
elseecho "校验失败。"exit 1
fi
在 spark-submit / spark-shell / pyspark
中使用 --jars
选项
本方法未演示如何配置
OpenLineageSparkListener
。请参见 配置 一节。
- 从 Maven Central 下载 JAR 及其校验文件。
- 使用校验文件验证 JAR 完整性。
- 验证成功后,使用
--jars
选项提交 Spark 应用。
以下脚本演示该流程:
#!/usr/bin/env bashOPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13' # 示例 Scala 版本
ARTIFACT_ID="openlineage-spark_${SCALA_BINARY_VERSION}"
JAR_NAME="${ARTIFACT_ID}-${OPENLINEAGE_SPARK_VERSION}.jar"
CHECKSUM_NAME="${JAR_NAME}.sha512"
BASE_URL="https://repo1.maven.org/maven2/io/openlineage/${ARTIFACT_ID}/${OPENLINEAGE_SPARK_VERSION}"curl -O "${BASE_URL}/${JAR_NAME}"
curl -O "${BASE_URL}/${CHECKSUM_NAME}"echo "$(cat ${CHECKSUM_NAME}) ${JAR_NAME}" | sha512sum -cif [ $? -eq 0 ]; thenspark-submit --jars "path/to/${JAR_NAME}" \# ... 其他选项
elseecho "校验失败。"exit 1
fi
在 spark-submit / spark-shell / pyspark
中使用 --packages
选项
本方法未演示如何配置
OpenLineageSparkListener
。请参见 配置 一节。
Spark 允许在运行时使用 spark-submit
的 --packages
选项添加依赖包。
该选项会在运行时自动从 Maven Central(或其他配置的仓库)下载包,并将其加入 Spark 应用的 classpath。
OPENLINEAGE_SPARK_VERSION='{{PREPROCESSOR:OPENLINEAGE_VERSION}}'
SCALA_BINARY_VERSION='2.13' # 示例 Scala 版本spark-submit --packages "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}" \# ... 其他选项
1.8.0 及更早版本仅支持 Apache Spark 的 Scala 2.12 变体,且 artifact 名称中不含 Scala 版本后缀。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。