[Data Pipeline] docs | Mysql源 | Spark处理

发布于:2025-06-20 ⋅ 阅读:(18) ⋅ 点赞:(0)

链接:https://github.com/lnynhi02/coffee-sales-data-pipeline

https://github.com/wszel/coffee-shop-chain-aws-data-academy

在这里插入图片描述

这个项目是一个咖啡销售数据管道

用于从数据源收集数据,进行数据处理和分析,并将结果存储在数据库中。

该项目使用了 Apache Airflow 作为工作流引擎,Kafka 作为消息队列,MySQL 作为数据库。

主要功能点

  1. 从数据源收集咖啡销售数据
  2. 使用 Apache Airflow 管理数据处理工作流
  3. 使用 Kafka 作为消息队列进行实时数据处理
  4. 将处理后的数据存储在 MySQL 数据库中

技术栈

  • Apache Airflow
  • Apache Kafka
  • MySQL
  • Python

为什么选择使用 Kafka 作为消息队列?

1. 高吞吐量与低延迟

  • Kafka 每秒可处理百万级消息,适合咖啡销售场景(如促销活动时订单激增)
  • 毫秒级延迟保证实时数据处理能力,满足销售数据分析的时效性需求

2. 持久化与可靠性

  • 消息持久化存储(默认保留7天),避免数据丢失风险
  • 副本机制确保节点故障时数据不中断,保障交易完整性

3. 水平扩展能力

  • 通过增加 Broker 节点实现无缝扩容
  • 分区机制允许并行消费,适应业务规模增长

4. 生态集成优势

  • 与 Airflow 工作流无缝协作(通过 KafkaOperator)
  • 支持多种数据格式(JSON/Avro),兼容 MySQL 数据存储
  • 提供 Connect API 简化数据源对接

5. 实时流处理支持

  • Kafka Streams 可直接处理实时数据流
  • 与 Flink/Spark Streaming 集成,为后续销售预测等场景预留扩展空间

在该项目中,Kafka 作为数据缓冲层,有效解耦了数据采集(销售终端)与数据处理(Airflow ETL),同时为实时看板、库存预警等场景提供实时数据支撑。其分布式架构设计特别适合需要处理突发流量(如节日促销)的零售业务场景。


咖啡销售数据管道

一个咖啡销售数据的综合性数据管道

  • 包含由Airflow编排的批处理管道,可从MySQL提取数据,
  • 使用Spark进行转换处理,将数据存储于数据湖 MinIO的不同数据层(Bronze/Silver/Gold)
  • 通过自动化数据质量检查确保数据质量。
  • 同时配备基于KafkaRedis实时流处理系统,可即时响应新订单,
    支持实时商品推荐等功能。
  • 整个系统通过Docker Compose进行定义和管理。

架构

在这里插入图片描述

章节目录

  1. MySQL数据库(源系统)
  2. Spark作业(数据处理)
  3. MinIO存储(数据湖)
  4. 数据分层(Bronze/Silver/Gold)
  5. Airflow DAG(批处理编排)
  6. 数据质量检查
  7. Kafka消息(实时流)
  8. Redis缓存/存储
  9. Docker Compose环境

第一章:MySQL数据库(源系统)

欢迎进入咖啡销售数据管道之旅的第一章!

  • 每个数据管道都需要一个起点——原始数据最初存储的位置。在我们的项目中,这个主要起点之一就是**MySQL数据库**。

  • 想象我们的咖啡店场景。当顾客购买一杯美味的拿铁或一袋咖啡豆时,该交易需要被记录在某个地方。这个"地方"通常是连接销售点(POS)系统的数据库。我们的MySQL数据库就如同咖啡店的数字记录员

  • 可以将其想象成一个大型有序的文件柜,每个销售记录、产品详情、门店位置和支付方式都被整齐地存储在不同的文件夹(我们称之为)中。

为何称之为"源系统"?

  • 在数据管道领域,"源系统"即数据产生的源头
  • 它是我们抽取数据进行处理和分析的来源
  • 我们的MySQL数据库是所有交易数据(如订单)和重要查询信息(如产品列表或门店地址)的主要来源。

存储哪些类型的数据?

我们的MySQL数据库保存着咖啡店运营的基础信息。根据项目代码,它包含以下表结构:

  • orders: 每笔交易的详细信息(购买者、时间、地点、支付方式)
  • order_details: 每笔订单包含的具体商品、数量、价格等
  • stores: 各实体咖啡门店的位置信息
  • products: 咖啡及其他销售商品的详情
  • product_category: 商品分类如’咖啡’、‘烘焙食品’、‘周边商品’
  • payment_method: 支付方式(银行卡、现金等)
  • diamond_customers: 高价值客户信息(查询表)

这些表由项目中的脚本定义创建(scripts/database/create_table.py)。以下代码片段展示了表的创建逻辑:

# 来源:scripts/database/create_table.py

TABLES = {}
TABLES['stores'] = (
    "CREATE TABLE `stores` (" \
    "   `id` int," \
    "   `name` varchar(20) NOT NULL,"
    # ... 其他字段 ...
    "   `updated_at` DATETIME," \
    "   PRIMARY KEY (`id`)" \
    ") ENGINE=InnoDB"
)

TABLES['orders'] = (
    "CREATE TABLE `orders` (" \
    "   `id` varchar(250) NOT NULL," \
    "   `timestamp` datetime NOT NULL," \
    # ... 其他字段 ...
    "   PRIMARY KEY (`id`)," \
    # ... 外键约束 ...
    ") ENGINE=InnoDB"
)

# ... 其他表定义 ...

def create_table(cursor):
    """在MySQL数据库中创建表"""
    for table_name in TABLES:
        table_description = TABLES[table_name]
        try:
            print(f"创建表 {table_name}: ", end='')
            cursor.execute(table_description)
        except:
            # ... 错误处理 ...
            pass
        else:
            print("成功")

这个Python脚本包含构建我们"文件柜"的指令(SQL CREATE TABLE语句),定义了每个文件夹(表)的结构和每个信息单元(字段)的命名规范。

管道如何与源系统交互?

数据管道通过以下方式与MySQL数据库交互:

  1. 初始数据加载:通过脚本(scripts/database/load_static_file.py)从CSV文件加载静态信息(如门店、商品、支付方式)到MySQL,填充查询表
  2. 实时写入模拟:脚本(scripts/database/generate_data.py)模拟POS系统持续向ordersorder_details表写入新订单数据,保持源数据更新
  3. 批量读取:主批处理作业(使用Spark)连接MySQL读取表数据,这是数据流出源系统进入管道处理的主要方式

以下代码片段展示了批处理脚本如何连接读取MySQL表:

# 来源:scripts/batch/bronze_dimension_fact_load.py

def read_mysql_table(spark: SparkSession, table: str):
    host = os.getenv("MYSQL_HOST")
    user = os.getenv("MYSQL_USER")
    password = os.getenv("MYSQL_PASSWORD")
    database = os.getenv("MYSQL_DATABASE")

    return spark.read \
        .format("jdbc") \
        .option("url", f"jdbc:mysql://{host}:3306/{database}?user={user}&password={password}") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", table) \
        .load()

# ... 后续脚本内容 ...

def incremental_load_orders(spark: SparkSession) -> None:
    # ... 日志记录 ...
    orders_df = read_mysql_table(spark, "orders") # <<< 读取orders表
    details_df = read_mysql_table(spark, "order_details") # <<< 读取order_details表
    # ... 后续处理流程 ...

这段代码展示了数据处理引擎(Spark)如何使用read_mysql_table函数:

  • 通过指定数据库地址(host)、凭证信息和表名
  • 使用.load()命令将表数据载入Spark可处理的DataFrame结构。

数据流出流程(高层抽象)

以下是数据从MySQL源系统进入批处理管道的简化流程:
在这里插入图片描述

总结

MySQL数据库是我们批处理数据管道的核心起点,它以结构化方式(通过表)存储所有咖啡店交易数据和静态信息,作为数据管道提取原始数据的"源系统"。
下一章我们将探索数据处理环节。

下一章:Spark作业(数据处理)


第二章:Spark Jobs(数据处理)

  • 第一章:MySQL数据库(源系统)中,我们了解到旅程始于MySQL数据库——这个如同咖啡店文件柜般存储原始销售数据的系统

  • 原始数据无法直接生成精美报表或洞察结论,需要经过清洗、关联和结构化处理。这正是本章核心概念Spark Jobs的用武之地。

管道中的Spark作业是什么?

若将数据管道比作工厂,MySQL数据库是原材料(数据)仓库,那么Spark作业就是车间里的工人加工设备

它们接收原始数据,通过多道工序生产精炼材料或成品

在我们的咖啡销售项目中,"Spark作业"本质上是使用Apache Spark库的Python脚本,主要承担以下数据处理重任:

  • 从源系统(如MySQL)或存储层(MinIO存储读取数据
  • 清洗脏数据(如去除多余字符)
  • 关联不同表数据(如订单与商品详情匹配)
  • 转换数据(如计算总额或创建新字段)
  • 将处理结果写入数据湖MinIO存储)的下一阶段

管道中的每个批处理步骤(后续由Airflow DAG编排)通常对应独立的Spark作业脚本,专精于特定数据处理层级或任务。

为何选择Spark?

为何不使用普通Python脚本或数据库查询?

  • 大数据处理Spark专为跨机器集群(“集群”)处理海量数据设计。尽管当前示例数据量较小,真实场景常需处理单机无法承载的巨型数据集
  • 高效性:Spark优先内存计算,避免反复磁盘I/O带来的性能损耗
  • Python友好通过pyspark,数据从业者可用熟悉的Python语言编写强大处理逻辑

类比理解:小型数据集用电子表格,中型用数据库查询,海量数据则需Spark这类工业级工具。

Spark引擎初始化

Spark作业执行前需建立与Spark处理引擎的连接并配置数据源访问,通过创建SparkSession实现。

以下代码片段来自批处理脚本(scripts/batch/bronze_dimension_fact_load.py):

# 来源:scripts/batch/bronze_dimension_fact_load.py

from pyspark.sql import SparkSession

def create_SparkSession() -> SparkSession:
    return SparkSession.builder \
        .appName("数据摄取 - MySQL到Minio") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.hadoop.fs.s3a.S3AFileSystem") \
        # ... 其他配置 ...
        .getOrCreate()

# 脚本主流程调用:
# spark = create_SparkSession()

create_SparkSession函数如同设置Spark工厂的控制面板

  1. appName(...):命名作业(便于监控)
  2. .config(...):配置关键参数(如连接MinIO存储的S3兼容设置)
  3. .getOrCreate():复用现有会话或创建新会话

获取spark对象后,即可指挥Spark处理数据。

Spark Jobs核心任务

通过项目脚本片段解析Spark作业的五大核心任务:

1. 数据读取

从源系统或前序处理阶段读取数据

从MySQL读取:

# 来源:scripts/batch/bronze_dimension_fact_load.py

def read_mysql_table(spark: SparkSession, table: str):
    host = os.getenv("MYSQL_HOST")  # 从环境变量获取连接信息
    user = os.getenv("MYSQL_USER")
    password = os.getenv("MYSQL_PASSWORD")
    database = os.getenv("MYSQL_DATABASE")

    return spark.read \
        .format("jdbc") \
        .option("url", f"jdbc:mysql://{host}:3306/{database}?user={user}&password={password}") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", table) \
        .load()

# 调用示例:
# orders_df = read_mysql_table(spark, "orders")

通过·JDBC连接器读取MySQL指定表数据
生成Spark DataFrame(内存表结构)。

从MinIO读取:

# 来源:scripts/batch/silver_dimensions.py

def read_bronze_layer(spark, table):
    # 示例路径:s3a://bronze-layer/brz.stores
    return spark.read.parquet(f"s3a://bronze-layer/{table}")

# 调用示例:
# source_df = read_bronze_layer(spark, table="brz.stores")

从MinIO的"bronze-layer"存储桶读取Parquet格式文件s3a://前缀触发S3兼容连接器。

2. 数据清洗

处理原始数据中的异常或错误

# 来源:scripts/batch/silver_dimensions.py (cleaned_stores函数)

# source_df读取自brz.stores
cleaned_df = source_df.withColumn("city_cleaned", expr("regexp_replace(city, '\\\\r$', '')"))

# ... 后续筛选输出列 ...

withColumn创建新列city_cleaned使用正则表达式移除city字段末尾的\r字符

3. 数据关联

整合多源数据

# 来源:scripts/batch/silver_dimensions.py (cleand_products函数)

# product读取自brz.products
# product_category读取自brz.product_category

join_df = product.join(
    product_category,
    product["category_id"] == product_category["id"],
    how="left"
)
# ... 筛选关联结果特定列 ...

基于category_idid字段左连接商品表与分类表,保留所有商品并补充分类信息。

4. 数据转换

计算新值或改变数据结构

# 来源:scripts/batch/bronze_dimension_fact_load.py (incremental_load_orders函数)

# new_orders是包含新订单的DataFrame
enriched_orders = new_orders.withColumn("year", year("timestamp")) \
                            .withColumn("month", month("timestamp")) \
                            .withColumn("day", dayofmonth("timestamp"))

timestamp字段提取年、月、日生成新列,便于后续分区查询。

5. 数据写入

将处理结果写入下一阶段

# 来源:scripts/batch/silver_dimensions.py (cleand_stores函数)

# output_df是清洗后的DataFrame
output_df.write.mode("overwrite").parquet(f"{silver_path}/{table}")
  • .mode("overwrite"):覆盖写入模式
  • .parquet(...):指定Parquet格式及MinIO存储路径(如s3a://silver-layer/slv.stores

分区写入示例:

# 来源:scripts/batch/bronze_dimension_fact_load.py (incremental_load_orders函数)

# enriched_orders包含年、月、日字段
enriched_orders.write.partitionBy("year", "month", "day").mode("append").parquet(orders_path)

partitionBy按年月日分区存储,提升日期范围查询效率。

Spark Jobs执行流程(高层抽象)

在这里插入图片描述

Python脚本作为驱动器指挥SparkSession执行步骤

SparkSession将任务分发至集群资源(本项目中通过Docker Compose模拟)。

实际数据处理在Spark引擎内完成,读写操作面向MinIO存储MySQL等数据源。

代码结构范式

Spark Jobs脚本遵循统一范式:

  1. 导入依赖库(pyspark.sqloslogging等)
  2. 配置日志系统
  3. 定义create_SparkSession()函数
  4. 定义数据读取辅助函数(read_bronze_layerread_mysql_table等)
  5. 定义核心转换/加载函数(如cleand_storesincremental_load_orders
  6. main()函数编排流程:
    • 调用create_SparkSession()
    • 按序执行转换/加载函数
    • 最终调用spark.stop()
  7. if __name__ == "__main__": main()入口

该结构可见于scripts/batch/silver_dimensions.pyscripts/batch/gold_fact_orders.py等文件。

总结

Spark Jobs是数据管道的核心处理器,通过Python脚本驱动Apache Spark高效执行数据读取、清洗、转换、关联和写入任务。

它们将数据从源头(MySQL数据库)或中间存储(MinIO数据湖)转化为可供分析的形态,逐层推进数据精炼过程。

下一章我们将探索处理结果的存储之地——数据湖

下一章:MinIO存储(数据湖)


网站公告

今日签到

点亮在社区的每一天
去签到