Spark教程6:Spark 底层执行原理详解

发布于:2025-06-23 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、整体架构概述

Spark 采用主从架构(Master-Slave),主要组件包括:

  • Driver Program:运行用户应用的 main 函数,负责创建 SparkContext、分析作业、调度任务。
  • Cluster Manager:资源管理器,如 YARN、Mesos、Standalone
  • Worker Node:集群中的工作节点,负责执行具体任务。
  • ExecutorWorker 节点上的进程,负责运行任务并缓存数据。

执行流程

  1. 用户提交应用,Driver 启动并创建 SparkContext
  2. SparkContext 连接 Cluster Manager,请求资源。
  3. Cluster Manager 分配资源,在 Worker 节点上启动 Executor
  4. Driver 将任务分发给 Executor 执行。
  5. ExecutorDriver 汇报任务状态和结果。
二、核心组件详解
1. SparkContext
  • Spark 应用的入口,负责与 Cluster Manager 通信,协调资源分配。
  • 管理 RDD 的依赖关系(血统图),并生成 DAG(有向无环图)。
2. DAG Scheduler
  • 将作业(Job)分解为多个阶段(Stage),每个阶段包含多个任务(Task)。
  • 根据 RDD 的依赖关系划分 Stage
    • 宽依赖(如 shuffle)会触发新的 Stage
    • 窄依赖(如 map、filter)会被合并到同一个 Stage
3. Task Scheduler
  • Task 分配给具体的 Executor 执行。
  • 负责任务调度、重试失败的任务,以及处理推测执行(Speculative Execution)。
4. Executor
  • 负责执行 Task,并将结果返回给 Driver
  • 维护内存缓存,存储 RDD 分区数据。
三、作业执行流程
1. DAG 生成与 Stage 划分
# 示例代码
rdd = sc.textFile("data.txt")  # 读取文件,创建 RDD
words = rdd.flatMap(lambda line: line.split())  # 转换操作
pairs = words.map(lambda word: (word, 1))  # 转换操作
counts = pairs.reduceByKey(lambda a, b: a + b)  # 触发 Shuffle
counts.collect()  # 动作操作,触发作业执行

执行流程

  1. collect() 触发作业提交。
  2. DAG Scheduler 将作业划分为两个 Stage
    • Stage 1:执行 textFile、flatMap、map 操作。
    • Stage 2:执行 reduceByKeycollect 操作,依赖于 Stage 1 的输出。
2. Task 调度与执行
  • ShuffleMapTask:执行 Stage 1 的任务,输出中间结果(Shuffle 文件)。
  • ResultTask:执行 Stage 2 的任务,读取 Shuffle 文件并聚合结果。
3. 内存管理
  • Storage Memory:存储缓存的 RDDDataFrame
  • Execution Memory:执行 Shuffle、聚合、排序等操作的内存。
  • User Memory:用户代码使用的内存。
四、Shuffle 机制详解
1. Shuffle 过程
  1. Map 端

    • 将数据分区并写入内存缓冲区。
    • 缓冲区满时溢写到磁盘,生成多个小文件。
    • 最终合并所有小文件为一个大文件,并生成索引。
  2. Reduce 端

    • 从各个 Map 任务拉取属于自己的数据。
    • 合并数据并按 key 排序。
    • 执行聚合或其他操作。
2. Shuffle 优化
  • Sort Shuffle:默认实现,减少文件数量。
  • Tungsten-Sort Shuffle:基于内存管理框架 Tungsten,提高效率。
  • 自适应执行(Spark 3.0+):动态调整 Shuffle 分区数。
五、内存管理机制
1. 统一内存管理(Unified Memory Management)
  • Spark 1.6+ 引入,StorageExecution 内存可相互借用:
    # 内存配置参数
    spark.memory.fraction = 0.6  # 统一内存占堆内存的比例
    spark.memory.storageFraction = 0.5  # Storage 内存占统一内存的比例
    
2. Tungsten 优化
  • 堆外内存:减少 GC 压力,提高内存访问效率。
  • 二进制格式:直接操作二进制数据,避免 Java 对象开销。
六、容错机制
1. Lineage(血统)
  • RDD 记录其创建过程(依赖关系),当部分分区丢失时,可通过重新计算恢复。
2. Checkpoint
  • RDD 写入可靠存储(如 HDFS),切断血统关系,用于长依赖链的 RDD
    rdd.checkpoint()  # 设置检查点
    
3. 任务重试
  • Task 失败时,Task Scheduler 会自动重试(默认 4 次)。
七、调度策略
1. 任务调度
  • FIFO(默认):先进先出。
  • FAIR:公平调度,支持多作业共享资源。
    # 启用公平调度
    spark.conf.set("spark.scheduler.mode", "FAIR")
    
2. 推测执行
  • 当某个任务执行缓慢时,会在其他节点启动副本任务,取最先完成的结果。
    # 启用推测执行
    spark.conf.set("spark.speculation", "true")
    
八、性能优化关键点
1. 数据本地性
  • PROCESS_LOCAL:数据在同一 JVM 内,最快。
  • NODE_LOCAL:数据在同一节点,但需跨进程传输。
  • RACK_LOCAL:数据在同一机架的不同节点。
  • ANY:数据在任意位置。
2. 并行度调整
  • 根据集群资源设置合理的并行度:
    # 设置默认并行度
    spark.conf.set("spark.default.parallelism", 200)
    
3. 内存调优
  • 调整 Executor 内存和堆外内存:
    spark.executor.memory = 8g
    spark.memory.offHeap.enabled = true
    spark.memory.offHeap.size = 2g
    
九、高级特性
1. Catalyst 优化器
  • Spark SQL 的查询优化器,将 SQL 查询转换为高效的物理执行计划:
    • 分析:解析 SQL 语句,检查表和列是否存在。
    • 逻辑优化:应用规则优化逻辑计划(如谓词下推、投影修剪)。
    • 物理计划生成:生成多个物理计划并选择最优。
    • 代码生成:将执行计划编译为 Java 字节码。
2. Tungsten 项目
  • 优化内存和 CPU 利用率:
    • 二进制数据处理,减少内存占用。
    • 避免 Java 对象开销,直接操作内存。
十、监控与调试工具
1. Spark UI
  • 查看作业、阶段、任务的执行情况,内存使用等指标。
2. 事件日志
  • 记录作业执行的详细信息,可用于离线分析:
    # 启用事件日志
    spark.eventLog.enabled = true
    spark.eventLog.dir = "hdfs:///spark-logs"
    
3. Spark 性能调优工具
  • Shuffle 调优:分析 Shuffle 性能瓶颈。
  • SQL 执行计划分析:查看 SQL 查询的优化过程。

网站公告

今日签到

点亮在社区的每一天
去签到