Spark执行计划与UI分析

发布于:2025-08-11 ⋅ 阅读:(16) ⋅ 点赞:(0)

1.Spark任务阶段划分

1.1 job,stage与task

  • 首先根据action()操作顺序将应用划分为作业job。
  • 根据每个job的逻辑处理流程中的ShuffleDependency依赖关系,将job划分为执行阶段stage。
  • 在每个stage中,根据最后生成的RDD的分区个数生成多个计算任务task。

1.2 job划分

举一个简单的例子,在下面这段代码中:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col

# 初始化SparkSession
spark = SparkSession.builder.appName("MultiJobStageTaskExample").getOrCreate()

# 读取数据(Transformation,不触发Job)
orders = spark.read.csv(
    "orders.csv",
    header=True,
    inferSchema=True
).select("用户ID", "订单金额", "支付方式")

users = spark.read.csv(
    "users.csv",
    header=True,
    inferSchema=True
).select("用户ID", "所在城市")

# 缓存重复使用的数据集(优化性能)
orders.cache()
users.cache()


# --------------------------
# Job 1:计算不同支付方式的订单数和总金额
# --------------------------
payment_analysis = orders.groupBy("支付方式") \
    .agg(
        count("用户ID").alias("订单数"),  # 聚合操作(宽依赖,触发Shuffle)
        sum("订单金额").alias("总金额")
    )

# Action操作:触发Job 1
payment_result = payment_analysis.collect()  # Job 1
print("支付方式分析结果:", payment_result)


# --------------------------
# Job 2:计算每个城市的平均订单金额
# --------------------------
city_analysis = orders.join(users, on="用户ID", how="inner") \  #  join是宽依赖(Shuffle)
    .groupBy("所在城市") \  # 再次宽依赖(Shuffle)
    .agg(
        sum("订单金额").alias("城市总金额"),
        count("用户ID").alias("城市订单数")
    ) \
    .withColumn("平均订单金额", col("城市总金额") / col("城市订单数"))

# Action操作:触发Job 2
city_analysis.write.csv("city_avg_order")  # Job 2


# --------------------------
# Job 3:统计高消费用户(订单总金额>10000)的分布
# --------------------------
high_value_users = orders.groupBy("用户ID") \  # 宽依赖(Shuffle)
    .agg(sum("订单金额").alias("用户总消费")) \
    .filter(col("用户总消费") > 10000) \  # 过滤(窄依赖)
    .join(users, on="用户ID", how="inner")  # 宽依赖(Shuffle)

# Action操作:触发Job 3
high_value_count = high_value_users.count()  # Job 3
print("高消费用户数量:", high_value_count)

spark.stop()

根据payment_analysis.collect(),city_analysis.write.csv(“city_avg_order”)和high_value_count = high_value_users.count(),这段代码被划分成了三个job。

1.3 stage和task划分

如下图所示,在一个job中,出现了shuffle操作,就会划分一个stage。再根据每个stage中的分区数量划分task数量。
在这里插入图片描述

2.任务执行时机

  • job的提交时间与action()被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.action()形成的job提交给Spark。这其实也就是为什么有的时候写完代码没有运行的原因,因为没写action()操作,job不会被提交到Spark
  • 仅当上游的stage都执行完成后,再执行下游的stage。如果stage之间没有依赖,则并行执行,例如stage1和stage0是并行执行,当且仅当两者执行后,stage2才开始执行。
  • stage中每个task因为是独立而且同构的,可以并行运行没有先后之分。

3.task内部数据存储与流动

task是根据分区来划分的,而一个分区中有很多个record,根据不同record之间的关系,存储的方式也不同:
在这里插入图片描述
这是一个task的执行流程的几种不同的情况:

  • 第一个流程:record之间并没有相互依赖,因此可以进行流式处理,即record1处理成record1’之后就可以将record1从内存中删掉,而不用关心record2和record3处理到哪里了。
  • 第二个流程:f()流程无相互依赖,但是g()流程有相互依赖,也就是说record1在处理成record1’‘后,record1’‘会被保存到内存中,直到record2’‘和record3’'被处理完成。
  • 第三个流程:同理,在record1,record2和record3都被算出之后,才能执行f(),而在执行g()时,record1’,record2’和record3’才不会相互依赖。
  • 第四个流程:无法进行流水线处理,每处理完一个操作,才能回收该操作的输入结果。

4.根据sparkUI了解Spark执行计划

4.1查看job和stage

在spark的首界面可以看到当前正在执行的job:
在这里插入图片描述
点击job的链接,可以看到当前job中的stage数量:
在这里插入图片描述
其中stage 0包含3个task,共Shuffle Write了376.0B,stage 1包含4个task,共Shuffle Write了988.0B,而stage 2包含3个task,一共Shuffle Read了1364.0B=376.0B+988.0B。

4.2 查看DAG图

将Job链接中界面上的DAG Visualization展开,可以看到正在执行的DAG图:
在这里插入图片描述
每个黑色实心圆圈代表一个RDD,但这个图稍显混乱,stage 0中parallelize操作生成的RDD应该是被stage 2中的partitionBy处理的,与stage 1中的parallelize无关,也就是stage 0到stage 2的横箭头并没有在stage1中作停留生成一个RDD
如果想进一步了解黑色实心圆圈代表哪些RDD,则可以进入stage的UI界面:
在这里插入图片描述
这张图展示了每个操作会生成哪些RDD(如join()操作生成了CoGroupedRDD及两个MapPartitionsRDD),但没有展示stage之间的连接关系。但是没有展示Stage的连接关系。

4.3查看task

在某个stage界面,可以看到该stage的task信息:
在这里插入图片描述
stage 0包含3个task,每个task都进行了Shuffle Write,写入了2~3个record,也就是说Spark UI中也会统计Shuffle Write/Read的record数目。
在这里插入图片描述
stage 1包含4个task,每个task都进行了ShuffleWrite,写入了2个record。
在这里插入图片描述
stage 2包含3个task,每个task从上游的stage 0/1那里Shuffle Read了5~6个record。


网站公告

今日签到

点亮在社区的每一天
去签到