当前位置: 首页 > news >正文

[Data Pipeline] Airflow DAG | 数据质量检查PyDeequ

在这里插入图片描述

第五章:Airflow DAG(批量任务编排)

欢迎回到数据探险之旅!

在前几章中,我们构建了数据流水线的核心组件:

  • 数据起点(第一章:MySQL数据库(源系统))
  • 数据处理引擎(第二章:Spark作业(数据处理))
  • 分级存储体系(第三章:MinIO存储(数据湖) 第四章:数据层(青铜、白银、黄金))。

现在设想我们的咖啡数据工厂:不同机器(Spark作业)执行特定任务(加载青铜层、清洗白银层、构建黄金表)。

如何确保这些作业按正确顺序执行?青铜层加载必须*先于*白银层清洗完成,白银层就绪*之后*才能构建黄金层。我们需要整个流程自动运行(例如每日执行)以保证数据新鲜度。

这就引出核心概念:Airflow DAG(批量任务编排)

Airflow是什么?为什么需要编排?

想象项目经理需要协调项目任务:

  • 任务定义
  • 责任人分配
  • 任务依赖关系
  • 执行计划

Apache Airflow 正是数据流水线的"项目经理",用于定义、调度和监控任务序列。

  • 编排即自动化协调复杂工作流,特别适合需要严格顺序执行的批量处理流水线

核心组件:Airflow DAG

Airflow中,工作流定义为DAG(有向无环图):

  • :由节点(任务)和边(依赖关系)组成的流程图
  • 有向:依赖关系单向流动(任务A→任务B表示A先于B执行)
  • 无环:禁止循环依赖,确保流程可终止

Airflow DAG本质是Python脚本,定义:

  1. 工作流唯一标识
  2. 调度计划(开始时间/执行频率)
  3. 具体任务步骤
  4. 任务间执行顺序

定义流水线DAG

项目中的DAG定义于airflow/dags/spark_job_airflow.py,该脚本指导Airflow按序执行Spark作业:

# 来源: airflow/dags/spark_job_airflow.pyimport airflow.utils.dates
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperatorwith DAG('spark-batch-job',          # 1. DAG唯一标识default_args=default_args,  # 2. 任务默认参数schedule_interval='@daily', # 3. 每日执行catchup=False               # 4. 禁止补跑历史任务
) as dag:# 任务定义区# 依赖关系定义区

关键参数解析:

  1. schedule_interval='@daily':每日触发,支持cron表达式
  2. catchup=False防止Airflow补跑start_date之前未执行的任务

定义任务节点

在DAG代码块内,使用SparkSubmitOperator定义各处理阶段:

# 在with DAG(...) as dag代码块内:bronze_layer_load = SparkSubmitOperator(task_id="bronze_layer_load",  # 任务唯一IDconn_id="spark",              # Airflow的Spark连接配置application=str(Path("scripts/bronze_dimension_fact_load.py")),  # Spark脚本路径packages="org.apache.hadoop:hadoop-aws:3.3.4,mysql:mysql-connector-java:8.0.30"  # 依赖包
)silver_layer_transform = SparkSubmitOperator(task_id="silver_layer_dimension_transform",conn_id="spark",application=str(Path("scripts/silver_dimensions.py")),
)
# 黄金层任务定义类似...

参数说明:

  • conn_id="spark":指向docker-compose.yaml中配置的Spark Master服务
  • application:Spark作业脚本路径(需与容器内路径一致)
  • packages:Spark作业依赖的第三方库(如MinIO连接器)

定义任务依赖

使用位移运算符>>定义执行顺序:

  • task_a >> task_b:任务A成功后才执行B
  • [task_a, task_b] >> task_c:A和B都成功后才执行C

项目中的依赖关系体现数据流动逻辑:

# 在with DAG(...) as dag代码块内:bronze_layer_load >> bronze_data_quality_check  # 青铜层质检bronze_data_quality_check >> [silver_layer_dim, silver_layer_fact]  # 并行执行白银层转换[silver_layer_dim, silver_layer_fact] >> silver_data_quality_check  # 白银层质检silver_data_quality_check >> [gold_dim_payment, gold_dim_stores, gold_dim_products]  # 并行构建黄金维度表[gold_dim_payment, gold_dim_stores, gold_dim_products] >> gold_fact_orders  # 最后构建黄金事实表

可视化依赖关系图:

在这里插入图片描述

Airflow运行机制

Airflow组件协作流程:
在这里插入图片描述

关键组件:

  • 调度器触发定时任务,分配任务给工作节点
  • 工作节点执行具体Operator(如提交Spark作业)
  • 元数据库存储任务状态和日志

Docker Compose配置

Airflow服务在docker-compose-batch.yaml中的定义:

# 来源: docker-compose-batch.yamlservices:postgres:  # 元数据库image: postgres:13environment:POSTGRES_USER: airflowPOSTGRES_PASSWORD: airflowairflow-webserver:  # 监控界面ports: ["8080:8080"]depends_on: [airflow-scheduler]airflow-scheduler:  # 调度核心command: >bash -c "airflow db init && airflow scheduler"volumes:  # 挂载DAG目录- ./airflow/dags:/opt/airflow/dags- ./scripts:/opt/airflow/scripts

关键配置:

  • 挂载本地dagsscripts目录到容器内
  • 调度器初始化数据库并启动

监控界面

访问http://localhost:8080可查看:

  • DAG运行状态
  • 任务日志详情
  • 手动触发/暂停流水线
  • 依赖关系可视化

总结

Airflow DAG是批量处理流水线的控制中枢,通过Python脚本定义任务流程

使用SparkSubmitOperator触发Spark作业,依赖关系运算符>>确保执行顺序,推动数据从源系统经数据湖流向黄金层。

Airflow的调度器和监控界面为流水线提供自动化管理与可视化支持。

下一章:数据质量检查


第六章:数据质量检查

欢迎回到数据流水线构建之旅!在前几章中,我们已通过第五章:Airflow DAG(批量任务编排)确保第二章:Spark作业(数据处理)按序执行,推动数据从第一章:MySQL数据库(源系统)经第三章:MinIO存储(数据湖)流向第四章:数据层(青铜、白银、黄金)。

但若数据本身存在缺陷怎么办?例如咖啡订单缺少客户ID、商品ID重复或价格出现负值?这些问题若未被检测,将导致黄金层数据污染,引发错误报表与商业决策失误。这就是数据质量检查至关重要的原因

数据质量检查的定义

数据质量检查是流水线各阶段执行的自动化检测机制,旨在识别数据缺陷,确保分析数据的可靠性。其作用类似于工厂质检员:

  • 青铜层:验证原始数据基础结构
  • 白银层:确保清洗后数据符合质量标准

常见数据质量问题

我们的检查聚焦以下问题类型:

  1. 空值检测(完整性):关键字段(如ID、价格)是否缺失
  2. 重复记录(唯一性):唯一标识(如订单ID)是否重复
  3. 非法值(有效性):数值范围(如价格>0)是否合理
  4. 模式变更(一致性):数据结构是否与预期匹配

PyDeequ质量检测框架

我们采用亚马逊开发的**PyDeequ**库进行高效数据验证。该库提供结构化约束定义能力,例如:

# 来源: scripts/batch/data_quality/silver_validation.py
check_product = Check(spark, CheckLevel.Error, "slv.products") \.hasCompleteness("product_id", lambda x: x >= 1.0) \  # 产品ID完整性检测.hasMin("unit_price", lambda x: x >= 0)  # 单价非负检测

此代码定义了两个约束:产品ID字段完整率需达100%,单价最小值需≥0

质量检测实现架构

项目包含两个专用检测脚本:

  1. bronze_validation.py:青铜层基础检测
  2. silver_validation.py:白银层高级检测(使用PyDeequ)

青铜层检测实现

# 来源: scripts/batch/data_quality/bronze_validation.py
def check_table_quality(df, table_name: str, null_cols=[], unique_cols=[]) -> bool:# 空值检测for col in null_cols:if not df.filter(f"{col} IS NULL").isEmpty():logger.warning(f"[{table_name}] {col}列存在空值")# 唯一性检测for col in unique_cols:dup_count = df.groupBy(col).count().filter("count > 1").count()if dup_count > 0:logger.error(f"[{table_name}] {col}列发现{dup_count}条重复记录")

该函数通过PySpark原生API检测空值与重复

白银层检测实现

# 来源: scripts/batch/data_quality/silver_validation.py
verification_result = VerificationSuite(spark) \.onData(product_df) \.addCheck(check_product) \.run()  # 执行PyDeequ约束检测# 结果解析
result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
failed_checks = result_df.filter("constraint_status = 'Failure'")

该流程通过PyDeequ生成详细检测报告( 调库侠😋

质量检测与任务编排集成

质量检测任务已集成至Airflow DAG,形成质检关卡:
在这里插入图片描述

关键依赖代码:

# 来源: airflow/dags/spark_job_airflow.py
bronze_layer_load >> bronze_data_quality_check  # 青铜加载后执行质检
bronze_data_quality_check >> silver_transformation  # 质检通过执行白银转换
silver_transformation >> silver_data_quality_check  # 白银转换后执行质检

该逻辑确保任一阶段质检失败即终止后续流程

总结

数据质量检查通过自动化检测机制,在青铜层进行基础校验,在白银层实施高级约束,形成多层次质检体系。

结合Airflow的流程控制,构建起数据流水线的质量防线。PyDeequ的应用显著提升了检测效率与可维护性,为数据可靠性提供坚实保障
下一章:Kafka消息系统(实时流处理)

相关文章:

  • Linux 并发编程:从线程池到单例模式的深度实践
  • android 省市区联动选择
  • Kafka性能调优全攻略:从JVM参数到系统优化
  • Angular--Hello(TODO)
  • 基于C#的Baumer相机二次开发教程
  • 主流防火墙策略绕过漏洞的修复方案与加固实践
  • 宽度优先遍历(bfs)(2)——fllodfill算法
  • QLoRA (Quantized Low-Rank Adaptation)浅析
  • 火山引擎项亮:机器学习与智能推荐平台多云部署解决方案正式发布
  • 0_1排序与搜索
  • 【unitrix】 3.2 位取反运算(not.rs)
  • 【音视频】PJSIP库——示例简介、C++类说明
  • 【 感知集群】大规模分布式基础设施的AI赋能蓝图
  • Spring AOP @Before (前置通知): 在目标方法执行前做什么?
  • ChatGPT上瘾,大脑萎缩47%!?
  • Windows本地部署wordpress
  • 矩阵置零C++
  • Mac电脑-Office 2024 长期支持版(Excel、Word、PPT)
  • 《Go语言圣经》map
  • F接口基础.go
  • 网站产品动效怎么做/网络营销的传播手段
  • 手把手教你建网站/橘子seo历史查询
  • 哪网站建设好/抖音搜索引擎推广
  • 德国网站的后缀名/长沙网站seo推广公司
  • 三亚网站怎么制作/宁波seo网络推广主要作用
  • 网站的建设项目是什么/奶盘seo伪原创工具