[Data Pipeline] Airflow DAG | 数据质量检查PyDeequ

发布于:2025-06-20 ⋅ 阅读:(19) ⋅ 点赞:(0)

在这里插入图片描述

第五章:Airflow DAG(批量任务编排)

欢迎回到数据探险之旅!

在前几章中,我们构建了数据流水线的核心组件:

现在设想我们的咖啡数据工厂:不同机器(Spark作业)执行特定任务(加载青铜层、清洗白银层、构建黄金表)。

如何确保这些作业按正确顺序执行?青铜层加载必须*先于*白银层清洗完成,白银层就绪*之后*才能构建黄金层。我们需要整个流程自动运行(例如每日执行)以保证数据新鲜度。

这就引出核心概念:Airflow DAG(批量任务编排)

Airflow是什么?为什么需要编排?

想象项目经理需要协调项目任务:

  • 任务定义
  • 责任人分配
  • 任务依赖关系
  • 执行计划

Apache Airflow 正是数据流水线的"项目经理",用于定义、调度和监控任务序列。

  • 编排即自动化协调复杂工作流,特别适合需要严格顺序执行的批量处理流水线

核心组件:Airflow DAG

Airflow中,工作流定义为DAG(有向无环图):

  • :由节点(任务)和边(依赖关系)组成的流程图
  • 有向:依赖关系单向流动(任务A→任务B表示A先于B执行)
  • 无环:禁止循环依赖,确保流程可终止

Airflow DAG本质是Python脚本,定义:

  1. 工作流唯一标识
  2. 调度计划(开始时间/执行频率)
  3. 具体任务步骤
  4. 任务间执行顺序

定义流水线DAG

项目中的DAG定义于airflow/dags/spark_job_airflow.py,该脚本指导Airflow按序执行Spark作业:

# 来源: airflow/dags/spark_job_airflow.py

import airflow.utils.dates
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    'spark-batch-job',          # 1. DAG唯一标识
    default_args=default_args,  # 2. 任务默认参数
    schedule_interval='@daily', # 3. 每日执行
    catchup=False               # 4. 禁止补跑历史任务
) as dag:
    # 任务定义区
    # 依赖关系定义区

关键参数解析:

  1. schedule_interval='@daily':每日触发,支持cron表达式
  2. catchup=False防止Airflow补跑start_date之前未执行的任务

定义任务节点

在DAG代码块内,使用SparkSubmitOperator定义各处理阶段:

# 在with DAG(...) as dag代码块内:

bronze_layer_load = SparkSubmitOperator(
    task_id="bronze_layer_load",  # 任务唯一ID
    conn_id="spark",              # Airflow的Spark连接配置
    application=str(Path("scripts/bronze_dimension_fact_load.py")),  # Spark脚本路径
    packages="org.apache.hadoop:hadoop-aws:3.3.4,mysql:mysql-connector-java:8.0.30"  # 依赖包
)

silver_layer_transform = SparkSubmitOperator(
    task_id="silver_layer_dimension_transform",
    conn_id="spark",
    application=str(Path("scripts/silver_dimensions.py")),
)
# 黄金层任务定义类似...

参数说明:

  • conn_id="spark":指向docker-compose.yaml中配置的Spark Master服务
  • application:Spark作业脚本路径(需与容器内路径一致)
  • packages:Spark作业依赖的第三方库(如MinIO连接器)

定义任务依赖

使用位移运算符>>定义执行顺序:

  • task_a >> task_b:任务A成功后才执行B
  • [task_a, task_b] >> task_c:A和B都成功后才执行C

项目中的依赖关系体现数据流动逻辑:

# 在with DAG(...) as dag代码块内:

bronze_layer_load >> bronze_data_quality_check  # 青铜层质检

bronze_data_quality_check >> [silver_layer_dim, silver_layer_fact]  # 并行执行白银层转换

[silver_layer_dim, silver_layer_fact] >> silver_data_quality_check  # 白银层质检

silver_data_quality_check >> [gold_dim_payment, gold_dim_stores, gold_dim_products]  # 并行构建黄金维度表

[gold_dim_payment, gold_dim_stores, gold_dim_products] >> gold_fact_orders  # 最后构建黄金事实表

可视化依赖关系图:

在这里插入图片描述

Airflow运行机制

Airflow组件协作流程:
在这里插入图片描述

关键组件:

  • 调度器触发定时任务,分配任务给工作节点
  • 工作节点执行具体Operator(如提交Spark作业)
  • 元数据库存储任务状态和日志

Docker Compose配置

Airflow服务在docker-compose-batch.yaml中的定义:

# 来源: docker-compose-batch.yaml

services:
  postgres:  # 元数据库
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
  
  airflow-webserver:  # 监控界面
    ports: ["8080:8080"]
    depends_on: [airflow-scheduler]
  
  airflow-scheduler:  # 调度核心
    command: >
      bash -c "airflow db init && airflow scheduler"
    volumes:  # 挂载DAG目录
      - ./airflow/dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts

关键配置:

  • 挂载本地dagsscripts目录到容器内
  • 调度器初始化数据库并启动

监控界面

访问http://localhost:8080可查看:

  • DAG运行状态
  • 任务日志详情
  • 手动触发/暂停流水线
  • 依赖关系可视化

总结

Airflow DAG是批量处理流水线的控制中枢,通过Python脚本定义任务流程

使用SparkSubmitOperator触发Spark作业,依赖关系运算符>>确保执行顺序,推动数据从源系统数据湖流向黄金层

Airflow的调度器和监控界面为流水线提供自动化管理与可视化支持。

下一章:数据质量检查


第六章:数据质量检查

欢迎回到数据流水线构建之旅!在前几章中,我们已通过第五章:Airflow DAG(批量任务编排)确保第二章:Spark作业(数据处理)按序执行,推动数据从第一章:MySQL数据库(源系统)第三章:MinIO存储(数据湖)流向第四章:数据层(青铜、白银、黄金)

但若数据本身存在缺陷怎么办?例如咖啡订单缺少客户ID、商品ID重复或价格出现负值?这些问题若未被检测,将导致黄金层数据污染,引发错误报表与商业决策失误。这就是数据质量检查至关重要的原因

数据质量检查的定义

数据质量检查是流水线各阶段执行的自动化检测机制,旨在识别数据缺陷,确保分析数据的可靠性。其作用类似于工厂质检员:

  • 青铜层:验证原始数据基础结构
  • 白银层:确保清洗后数据符合质量标准

常见数据质量问题

我们的检查聚焦以下问题类型:

  1. 空值检测(完整性):关键字段(如ID、价格)是否缺失
  2. 重复记录(唯一性):唯一标识(如订单ID)是否重复
  3. 非法值(有效性):数值范围(如价格>0)是否合理
  4. 模式变更(一致性):数据结构是否与预期匹配

PyDeequ质量检测框架

我们采用亚马逊开发的**PyDeequ**库进行高效数据验证。该库提供结构化约束定义能力,例如:

# 来源: scripts/batch/data_quality/silver_validation.py
check_product = Check(spark, CheckLevel.Error, "slv.products") \
    .hasCompleteness("product_id", lambda x: x >= 1.0) \  # 产品ID完整性检测
    .hasMin("unit_price", lambda x: x >= 0)  # 单价非负检测

此代码定义了两个约束:产品ID字段完整率需达100%,单价最小值需≥0

质量检测实现架构

项目包含两个专用检测脚本:

  1. bronze_validation.py:青铜层基础检测
  2. silver_validation.py:白银层高级检测(使用PyDeequ)

青铜层检测实现

# 来源: scripts/batch/data_quality/bronze_validation.py
def check_table_quality(df, table_name: str, null_cols=[], unique_cols=[]) -> bool:
    # 空值检测
    for col in null_cols:
        if not df.filter(f"{col} IS NULL").isEmpty():
            logger.warning(f"[{table_name}] {col}列存在空值")
    
    # 唯一性检测
    for col in unique_cols:
        dup_count = df.groupBy(col).count().filter("count > 1").count()
        if dup_count > 0:
            logger.error(f"[{table_name}] {col}列发现{dup_count}条重复记录")

该函数通过PySpark原生API检测空值与重复

白银层检测实现

# 来源: scripts/batch/data_quality/silver_validation.py
verification_result = VerificationSuite(spark) \
    .onData(product_df) \
    .addCheck(check_product) \
    .run()  # 执行PyDeequ约束检测
  
# 结果解析
result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
failed_checks = result_df.filter("constraint_status = 'Failure'")

该流程通过PyDeequ生成详细检测报告( 调库侠😋

质量检测与任务编排集成

质量检测任务已集成至Airflow DAG,形成质检关卡:
在这里插入图片描述

关键依赖代码:

# 来源: airflow/dags/spark_job_airflow.py
bronze_layer_load >> bronze_data_quality_check  # 青铜加载后执行质检
bronze_data_quality_check >> silver_transformation  # 质检通过执行白银转换
silver_transformation >> silver_data_quality_check  # 白银转换后执行质检

该逻辑确保任一阶段质检失败即终止后续流程

总结

数据质量检查通过自动化检测机制,在青铜层进行基础校验,在白银层实施高级约束,形成多层次质检体系。

结合Airflow的流程控制,构建起数据流水线的质量防线。PyDeequ的应用显著提升了检测效率与可维护性,为数据可靠性提供坚实保障
下一章:Kafka消息系统(实时流处理)


网站公告

今日签到

点亮在社区的每一天
去签到