血缘元数据采集开放标准:OpenLineage Integrations Apache Spark Configuration Usage
OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。
Apache Spark
该集成已知适用于最新 Spark 版本以及其他 Apache Spark 3.*。如需获取受支持版本的最新信息,请查看此处。
该集成通过 OpenLineageSparkListener
使用 SparkListener
接口,提供全面的监控方案。 它会监听 SparkContext 发出的事件,提取与作业和数据集相关的元数据,并利用 RDD 和 DataFrame 的依赖图。方法可有效从各种数据源收集信息,包括文件系统源(如 S3 和 GCS)、JDBC 后端以及 Redshift 和 BigQuery 等数据仓库。
使用方法
Usage
import Tabs from ‘@theme/Tabs’;
import TabItem from ‘@theme/TabItem’;
配置 OpenLineage Spark 集成非常简单,它使用 Spark 内建的配置机制。
不过,Databricks 用户需特别注意,以确保在集群关闭后仍能兼容且不会破坏 Spark UI。
可选方式:
- 直接在应用内设置属性。
- 通过 CLI 使用
--conf
选项。 - 将属性写入
${SPARK_HOME}/conf/spark-defaults.conf
文件。
直接在应用内设置属性
以下示例展示在构造 SparkSession
时如何直接在应用内设置属性。
设置
config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener")
至关重要。若缺少此项,OpenLineage Spark 集成将不会被触发,导致集成失效。Databricks
Databricks 用户必须在spark.extraListeners
中同时包含io.openlineage.spark.agent.OpenLineageSparkListener
与com.databricks.backend.daemon.driver.DBCEventLoggingListener
。若未添加,集群关闭后将无法访问 Spark UI。
Scala
import org.apache.spark.sql.SparkSessionobject OpenLineageExample extends App {val spark = SparkSession.builder().appName("OpenLineageExample")// 此行至关重要.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener").config("spark.openlineage.transport.type", "http").config("spark.openlineage.transport.url", "http://localhost:5000").config("spark.openlineage.namespace", "spark_namespace").config("spark.openlineage.parentJobNamespace", "airflow_namespace").config("spark.openlineage.parentJobName", "airflow_dag.airflow_task").config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx").getOrCreate()// ... 您的代码spark.stop()
}// Databricks
import org.apache.spark.sql.SparkSessionobject OpenLineageExample extends App {val spark = SparkSession.builder().appName("OpenLineageExample")// 此行至关重要.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener").config("spark.openlineage.transport.type", "http").config("spark.openlineage.transport.url", "http://localhost:5000").config("spark.openlineage.namespace", "spark_namespace").config("spark.openlineage.parentJobNamespace", "airflow_namespace").config("spark.openlineage.parentJobName", "airflow_dag.airflow_task").config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx").getOrCreate()// ... 您的代码spark.stop()
}
Python
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("OpenLineageExample") \.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \.config("spark.openlineage.transport.type", "http") \.config("spark.openlineage.transport.url", "http://localhost:5000") \.config("spark.openlineage.namespace", "spark_namespace") \.config("spark.openlineage.parentJobNamespace", "airflow_namespace") \.config("spark.openlineage.parentJobName", "airflow_dag.airflow_task") \.config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx") \.getOrCreate()# ... 您的代码spark.stop()# Databricks
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("OpenLineageExample") \.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener") \.config("spark.openlineage.transport.type", "http") \.config("spark.openlineage.transport.url", "http://localhost:5000") \.config("spark.openlineage.namespace", "spark_namespace") \.config("spark.openlineage.parentJobNamespace", "airflow_namespace") \.config("spark.openlineage.parentJobName", "airflow_dag.airflow_task") \.config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx") \.getOrCreate()# ... 您的代码spark.stop()
通过 CLI 使用 --conf
选项
以下示例展示如何在使用 spark-submit
时通过 --conf
选项进行配置。
Databricks 记得同时添加
com.databricks.backend.daemon.driver.DBCEventLoggingListener
与 OpenLineage 监听器。
spark-submit \--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" \--conf "spark.openlineage.transport.type=http" \--conf "spark.openlineage.transport.url=http://localhost:5000" \--conf "spark.openlineage.namespace=spark_namespace" \--conf "spark.openlineage.parentJobNamespace=airflow_namespace" \--conf "spark.openlineage.parentJobName=airflow_dag.airflow_task" \--conf "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" \# ... 其他选项# Databricks
spark-submit \--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener" \--conf "spark.openlineage.transport.type=http" \--conf "spark.openlineage.transport.url=http://localhost:5000" \--conf "spark.openlineage.namespace=spark_namespace" \--conf "spark.openlineage.parentJobNamespace=airflow_namespace" \--conf "spark.openlineage.parentJobName=airflow_dag.airflow_task" \--conf "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" \# ... 其他选项
将属性写入 ${SPARK_HOME}/conf/spark-defaults.conf
文件
若该文件不存在,您可能需要手动创建。若文件已存在,强烈建议在修改前备份,特别是 Spark 安装被多人共用时。此处配置错误可能对 Spark 安装造成严重影响,尤其在共享环境中。
以下示例展示如何向 spark-defaults.conf
添加属性。
Databricks 用户需在
spark.extraListeners
中同时包含com.databricks.backend.daemon.driver.DBCEventLoggingListener
。
spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.namespace=MyNamespace
Databricks 示例:
spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.namespace=MyNamespace
spark.extraListeners
配置参数非追加。若通过 CLI 或SparkSession#config
设置spark.extraListeners
,将替换spark-defaults.conf
中的值。若您在spark-defaults.conf
中设置了默认值,又想为特定作业覆盖,请务必注意。对于
spark.openlineage.namespace
等配置参数,可在spark-defaults.conf
中提供默认值。该默认值可在运行时被应用通过前述方法覆盖。然而,强烈建议将spark.openlineage.parentRunId
、spark.openlineage.parentJobName
等动态或易变参数通过 CLI 或SparkSession#config
在运行时设置。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。