当前位置: 首页 > news >正文

血缘元数据采集开放标准:OpenLineage Integrations Apache Spark Configuration Usage

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

Apache Spark

该集成已知适用于最新 Spark 版本以及其他 Apache Spark 3.*。如需获取受支持版本的最新信息,请查看此处。

该集成通过 OpenLineageSparkListener 使用 SparkListener 接口,提供全面的监控方案。 它会监听 SparkContext 发出的事件,提取与作业和数据集相关的元数据,并利用 RDD 和 DataFrame 的依赖图。方法可有效从各种数据源收集信息,包括文件系统源(如 S3 和 GCS)、JDBC 后端以及 Redshift 和 BigQuery 等数据仓库。

使用方法

Usage

import Tabs from ‘@theme/Tabs’;
import TabItem from ‘@theme/TabItem’;

配置 OpenLineage Spark 集成非常简单,它使用 Spark 内建的配置机制。
不过,Databricks 用户需特别注意,以确保在集群关闭后仍能兼容且不会破坏 Spark UI。

可选方式:

  1. 直接在应用内设置属性。
  2. 通过 CLI 使用 --conf 选项。
  3. 将属性写入 ${SPARK_HOME}/conf/spark-defaults.conf 文件。
直接在应用内设置属性

以下示例展示在构造 SparkSession 时如何直接在应用内设置属性。

设置 config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") 至关重要。若缺少此项,OpenLineage Spark 集成将不会被触发,导致集成失效。

Databricks
Databricks 用户必须在 spark.extraListeners 中同时包含 io.openlineage.spark.agent.OpenLineageSparkListenercom.databricks.backend.daemon.driver.DBCEventLoggingListener。若未添加,集群关闭后将无法访问 Spark UI。

Scala
import org.apache.spark.sql.SparkSessionobject OpenLineageExample extends App {val spark = SparkSession.builder().appName("OpenLineageExample")// 此行至关重要.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener").config("spark.openlineage.transport.type", "http").config("spark.openlineage.transport.url", "http://localhost:5000").config("spark.openlineage.namespace", "spark_namespace").config("spark.openlineage.parentJobNamespace", "airflow_namespace").config("spark.openlineage.parentJobName", "airflow_dag.airflow_task").config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx").getOrCreate()// ... 您的代码spark.stop()
}// Databricks
import org.apache.spark.sql.SparkSessionobject OpenLineageExample extends App {val spark = SparkSession.builder().appName("OpenLineageExample")// 此行至关重要.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener").config("spark.openlineage.transport.type", "http").config("spark.openlineage.transport.url", "http://localhost:5000").config("spark.openlineage.namespace", "spark_namespace").config("spark.openlineage.parentJobNamespace", "airflow_namespace").config("spark.openlineage.parentJobName", "airflow_dag.airflow_task").config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx").getOrCreate()// ... 您的代码spark.stop()
}
Python
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("OpenLineageExample") \.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \.config("spark.openlineage.transport.type", "http") \.config("spark.openlineage.transport.url", "http://localhost:5000") \.config("spark.openlineage.namespace", "spark_namespace") \.config("spark.openlineage.parentJobNamespace", "airflow_namespace") \.config("spark.openlineage.parentJobName", "airflow_dag.airflow_task") \.config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx") \.getOrCreate()# ... 您的代码spark.stop()# Databricks
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("OpenLineageExample") \.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener") \.config("spark.openlineage.transport.type", "http") \.config("spark.openlineage.transport.url", "http://localhost:5000") \.config("spark.openlineage.namespace", "spark_namespace") \.config("spark.openlineage.parentJobNamespace", "airflow_namespace") \.config("spark.openlineage.parentJobName", "airflow_dag.airflow_task") \.config("spark.openlineage.parentRunId", "xxxx-xxxx-xxxx-xxxx") \.getOrCreate()# ... 您的代码spark.stop()
通过 CLI 使用 --conf 选项

以下示例展示如何在使用 spark-submit 时通过 --conf 选项进行配置。

Databricks 记得同时添加 com.databricks.backend.daemon.driver.DBCEventLoggingListener 与 OpenLineage 监听器。

spark-submit \--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" \--conf "spark.openlineage.transport.type=http" \--conf "spark.openlineage.transport.url=http://localhost:5000" \--conf "spark.openlineage.namespace=spark_namespace" \--conf "spark.openlineage.parentJobNamespace=airflow_namespace" \--conf "spark.openlineage.parentJobName=airflow_dag.airflow_task" \--conf "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" \# ... 其他选项# Databricks
spark-submit \--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener" \--conf "spark.openlineage.transport.type=http" \--conf "spark.openlineage.transport.url=http://localhost:5000" \--conf "spark.openlineage.namespace=spark_namespace" \--conf "spark.openlineage.parentJobNamespace=airflow_namespace" \--conf "spark.openlineage.parentJobName=airflow_dag.airflow_task" \--conf "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" \# ... 其他选项
将属性写入 ${SPARK_HOME}/conf/spark-defaults.conf 文件

若该文件不存在,您可能需要手动创建。若文件已存在,强烈建议在修改前备份,特别是 Spark 安装被多人共用时。此处配置错误可能对 Spark 安装造成严重影响,尤其在共享环境中。

以下示例展示如何向 spark-defaults.conf 添加属性。

Databricks 用户需在 spark.extraListeners 中同时包含 com.databricks.backend.daemon.driver.DBCEventLoggingListener

spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.namespace=MyNamespace

Databricks 示例:

spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener,com.databricks.backend.daemon.driver.DBCEventLoggingListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.namespace=MyNamespace

spark.extraListeners 配置参数非追加。若通过 CLI 或 SparkSession#config 设置 spark.extraListeners,将替换 spark-defaults.conf 中的值。若您在 spark-defaults.conf 中设置了默认值,又想为特定作业覆盖,请务必注意。

对于 spark.openlineage.namespace 等配置参数,可在 spark-defaults.conf 中提供默认值。该默认值可在运行时被应用通过前述方法覆盖。然而,强烈建议将 spark.openlineage.parentRunIdspark.openlineage.parentJobName 等动态或易变参数通过 CLI 或 SparkSession#config 在运行时设置。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.dtcms.com/a/351284.html

相关文章:

  • 重写BeanFactory初始化方法并行加载Bean
  • 信息网络安全视角下的在线问卷调查系统设计与实践(国内问卷调查)
  • 记一个Mudbus TCP 帮助类
  • Linux 内核 Workqueue 原理与实现及其在 KFD SVM功能的应用
  • LeetCode - 844. 比较含退格的字符串
  • LeetCode 438. 找到字符串中所有的字母异位词
  • 微算法科技(NASDAQ:MLGO)通过修改 Grover 算法在可重构硬件上实现动态多模式搜索
  • LeetCode - 946. 验证栈序列
  • 智慧园区:从技术赋能到价值重构,解锁园区运营新范式
  • 透视光合组织大会:算力生态重构金融AI落地新实践
  • 亚马逊类目合规风暴:高压清洗机品类整顿背后的运营重构与风险防御
  • 便携屏选购指南:常见作用、移动性优势及多场景应用详解
  • 前端性能优化新维度:渲染流水线深度解析
  • 【前端开发实战】从零开始开发Chrome浏览器扩展 - 快乐传播者项目完整教程
  • DeepSeek分析
  • spring如何通过实现BeanPostProcessor接口计算并打印每一个bean的加载耗时
  • 【数据结构】树和二叉树——二叉树
  • pytorch_grad_cam 库学习笔记—— Ablation-CAM 算法的基类 AblationCAM 和 AblationLayer
  • OneCode RAD:揭秘前端开发的配置化魔法
  • 【RAGFlow代码详解-14】知识图谱处理
  • Linux之SELinux 概述、SSH 密钥登录、服务器初始化
  • IUV5G专网排障(下)
  • 开源大模型本地部署
  • [Mysql数据库] 知识点总结3
  • 基于Android的电影院订票选座系统、基于Android的电影院管理系统app#基于Android的电影在线订票系统
  • 玩转QEMU硬件模拟器 - vexpress-a9开发板模拟开发
  • 深入浅出理解支持向量机:从原理到应用,解锁分类算法的核心密码
  • 宝石组合(蓝桥杯)
  • UX 设计入门终章:让洞察落地!用用户流程图、IA 和旅程图,设计用户与产品的互动故事
  • 介绍一下 bev fusion 网络结构