大数据(4)-spark

发布于:2025-07-01 ⋅ 阅读:(22) ⋅ 点赞:(0)

spark核心架构

img

Spark使用Spark RDD、Spark SQL、 Spark Streaming,MLlib,GraphX成功解决了大数据领城中,离线批处理、交互式查询、实时流计算、机器学习与图计算等最重要的任务和问题。 Spark除了一站式的特点之外,另外一个最重要的特点,就是基于内存进行计算,从而让它的速度可以达到MapReduce、Hive的数倍甚至数十倍! Spark,是一种通用的大数据计算框架,也正如传统大数据技术Hadoop的MapReduce、Hive引擎,以及Storm流式实时计算引擎等,Spark包含了大数据领城常见的各种计算框架:比如Spark Core用于离线计算,Spark SQL用于交互式查询,Spark Streaming用于实时流式计算,Spark MILlib用于机器学习,Spark GraphX用于图计算。

spark特性

(1)spark 计算速度快

spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。

(2)易于使用

spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。

通用的大数据解决方案

相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。

(4)支持多种的资源管理模式

学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。

(3)Spark与MR

Spark相较于MapReduce速度快的最主要原因就在于,MapReduce的计算模型太死板,必须是map-reduce模式,有时候即使完成一些诸如过滤之类的操作,也必须经过map-reduce过程,这样就必须经过shuffle过程。而MapReduce的shuffle过程是最消耗性能的,因为shuffle中间的过程必须基于磁盘来读写。

而Spark的shuffle虽然也要基于磁盘,但是其大量transformation操作,比如单纯的map或者filter等操作,可以直接基于内存进行pipeline操作,速度性能自然大大提升。 但是Spark也有其劣势。由于Spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候(比如一次操作针对10亿以上级别),在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出等等。导致Spark程序可能都无法完全运行起来,就报错挂掉了,而MapReduce即使是运行缓慢,但是至少可以慢慢运行完。 此外,Spark由于是新崛起的技术新秀,因此在大数据领域的完善程度,肯定不如MapReduce,比如基于HBase、Hive作为离线批处理程序的输入输出,Spark就远没有MapReduce来的完善。实现起来非常麻烦。

对比维度 Spark MapReduce
计算模型 基于内存的DAG执行引擎 严格的Map-Reduce模型
性能 比MapReduce快10-100倍 较慢,适合批处理
API丰富度 丰富的API(SQL/Streaming/ML/Graph) 只有基本的Map和Reduce API
容错性 基于RDD血统(lineage) 基于磁盘的中间结果
适用场景 迭代算法、交互式查询、流处理 超大规模批处理
稳定性 内存不足时可能失败 更稳定,适合长时间运行作业
生态系统 快速发展但某些集成不如MR成熟 Hadoop原生,集成最好

Spark SQL实际上并不能完全替代Hive,因为Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。 ​ 严格的来说, Spark SQL能够替代的,是Hive的查询引擎,而不是Hive本身,实际上即使在生产环境下, SparkSQL也是针对Hive数据仓库中的数据进行查询, Spark本身自己是不提供存储的,自然也不可能替代Hive作为数据仓库的这个功能。 ​ Spark SQL的一个优点,相较于Hive查询引擎来说,就是速度快,同样的SQL语句,可能使用Hive的查询引擎,由于其底层基于 MapReduce,必须经过shuffle过程走磁盘,因此速度是非常缓慢的。很多复杂的SQL语句,在hive中执行都需要一个小时以上的时间。而 Spark SQLSpark由于其底层基于自身的基于内存的特点,因此速度达到了Hive查询引擎的数倍以上。

SQLSpark由于身处技术堆栈内,也是基于RDD来工作,因此可以与 Spark的其他组件无缝整合使用,配合起来实现许多复杂的功能。比如 Spark SQL支持可以直接针对hdfs文件执行sq语句。

spark架构

(1)spark 基础配置

sparkContext是spark应用程序的入口,spark应用程序的提交和执行离不开sparkContext,它隐藏了网络通信,分布式部署,消息通信,存储体系,计算存储等,开发人员只需要通过sparkContext等api进行开发即可。

sparkRpc基于netty实现,分为异步和同步两种方式。事件总线主要用于sparkContext组件间的交换,它属于监听者模式,采用异步调用。度量系统主要用于系统的运行监控。

(2)spark 存储系统

它用于管理spark运行中依赖的数据存储方式和存储位置,spark的存储系统优先考虑在各节点以内存的方式存储数据,内存不足时将数据写入磁盘中,这也是spark计算性能高的重要原因。

我们可以灵活的控制数据存储在内存还是磁盘中,同时可以通过远程网络调用将结果输出到远程存储中,比如hdfs,hbase等。

(3)spark 调度系统

spark 调度系统主要由DAGScheduler和TaskScheduler组成。

DAGScheduler 主要是把一个Job根据RDD间的依赖关系,划分为多个Stage,对于划分后的每个Stage都抽象为一个或多个Task组成的任务集,并交给TaskScheduler来进行进一步的任务调度。而TaskScheduler 负责对每个具体的Task进行调度。

具体调度算法有FIFO,FAIR:

FIFO调度:先进先出,这是Spark默认的调度模式。 FAIR调度:支持将作业分组到池中,并为每个池设置不同的调度权重,任务可以按照权重来决定执行顺序

(4)Spark基本工作原理

Spark基本工作原理的理解,其最主要的是要搞清楚什么是RDD以及RDD的特性。深刻理解了RDD的特性,也就理解了数据在spark中是如何被处理的(spark的基本工作原理)

那么RDD是什么,官方说法: RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

最简单的理解: RDD就是源数据的抽象,或者叫映射,或者就代表。也就是说,数据要被spark进行处理,在处理之前的首要任务就是要将数据映射成RDD,对于spark来说,RDD才是我们处理数据的规则,我只认RDD,只有RDD,通过spark的计算引擎,才能发挥巨大的威力!

(1)分布式数据集 RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。

(2)弹性 RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。

(3)迭代式处理 对节点1、2、3、4上的数据进行处理完成之后,可能会移动到其他的节点内存中继续处理!Spark 与Mr最大的不同在与迭代式计算模型:Mr分为两个阶段,map和reduce,两个阶段处理完了就结束了,所以我们在一个job中能做的处理很有限,只能在map和reduce中处理;而spark计算过程可以分为n个阶段,因为他是内存迭代式的,我们在处理完一个阶段之后,可以继续往下处理很多阶段,而不是两个阶段。所以Spark相较于MR,计算模型可以提供更强大的功能。

(4)容错性 RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。三、Spark作业运行流程 (1)Spark作业运行流程 spark应用程序以进程集合为单位在分布式集群上运行,通过driver程序的main方法创建sparkContext的对象与集群进行交互。具体运行流程如下:

sparkContext向cluster Manager申请CPU,内存等计算资源。 cluster Manager分配应用程序执行所需要的资源,在worker节点创建executor。 sparkContext将程序代码和task任务发送到executor上进行执行,代码可以是编译成的jar包或者python文件等。接着sparkContext会收集结果到Driver端。

(2) Spark RDD迭代过程 sparkContext创建RDD对象,计算RDD间的依赖关系,并组成一个DAG有向无环图。

img

DAGScheduler将DAG划分为多个stage,并将stage对应的TaskSet提交到集群的管理中心,stage的划分依据是RDD中的宽窄依赖,spark遇见宽依赖就会划分为一个stage,每个stage中包含来一个或多个task任务,避免多个stage之间消息传递产生的系统开销。 ​ taskScheduler 通过集群管理中心为每一个task申请资源并将task提交到worker的节点上进行执行。 ​ worker上的executor执行具体的任务。

(5)spark运行时角色

Spark Streaming 运行时的角色(standalone 模式)主要有:

Master:主要负责整体集群资源的管理和应用程序调度;

Worker:负责单个节点的资源管理,driver 和 executor 的启动等;

Driver:用户入口程序执行的地方,即 SparkContext 执行的地方,主要是 DGA 生成、stage 划分、task 生成及调度;

Executor:负责执行 task,反馈执行状态和执行结果。

spark Shuffle 解析

Spark Shuffle 分为两种:

一种是基于 Hash 的 Shuffle; 另一种是基于 Sort 的 Shuffle。

在 MapReduce 框架中,Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现Shuffle。

HashShuffleManager

shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

Spark SortShuffleManager 详细解析

SortShuffleManager 是 Spark 自 1.2 版本以来的默认 Shuffle 实现,它通过引入排序和文件合并机制,有效解决了早期 HashShuffleManager 存在的文件数量过多的问题。

一、核心设计思想

1. 基本目标

  • 减少 Shuffle 文件数量:从 MR 减少到 2M(M=map任务数)

  • 支持高效聚合:通过排序优化 reduce 端的聚合操作

  • 内存效率:平衡内存使用和磁盘 I/O

2. 关键创新

  • 排序合并:map 端输出先排序后写入磁盘

  • 索引文件:快速定位 reduce 需要的数据位置

  • 磁盘文件复用:一个 map 任务只产生一个数据文件

二、Shuffle Write 阶段详解

1. 写入流程

###

三、Shuffle Read 阶段详解

3. 关键配置参数

参数 默认值 说明
spark.shuffle.file.buffer 32KB 写缓冲区大小
spark.shuffle.spill.batchSize 10000 溢写批处理大小
spark.reducer.maxSizeInFlight 48MB 每次拉取数据量
spark.shuffle.compress true 是否压缩输出
spark.shuffle.spill.compress true 是否压缩溢写文件

五、性能优化技巧

# 增加写缓冲区大小(默认32KB)
spark.shuffle.file.buffer=64k
​
# 调整溢写阈值
spark.shuffle.spill.initialMemoryThreshold=5MB
spark.shuffle.spill.batchSize=10000
​
# 启用压缩(默认true)
spark.shuffle.compress=true
spark.io.compression.codec=snappy

2. 读阶段优化

# 增加每次拉取数据量(默认48MB)
spark.reducer.maxSizeInFlight=96m
​
# 调整重试参数
spark.shuffle.io.maxRetries=3
spark.shuffle.io.retryWait=5s
​
# 控制并行拉取数
spark.reducer.maxReqsInFlight=10

3. 内存管理

# 调整shuffle内存比例
spark.shuffle.memoryFraction=0.2
​
# 启用内存溢出(默认true)
spark.shuffle.spill=true
​
# 设置堆外内存(Tungsten模式)
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=1g

六、与 HashShuffle 对比

特性 SortShuffle HashShuffle
文件数量 2*M M*R
排序
内存使用 中等
聚合效率
适用场景 通用 小数据集
稳定性 低(OOM风险)

spark streaming与Flink

SortShuffleManager 通过精心设计的排序和文件合并机制,在文件数量、内存使用和计算效率之间取得了良好平衡,成为 Spark 默认的 Shuffle 实现。理解其内部工作原理有助于开发者在实际应用中更好地进行性能调优和问题排查。

Flink 和 Spark Streaming 都是用于实时流式数据处理的分布式计算框架,但两者的基本设计思想和内部执行机制有些不同。

Flink 基于流的理念,采用了基于数据流模型的核心运行时引擎。它可以对无界和有界数据流进行有状态的计算。Flink 使用了链式操作来表达运算逻辑,并基于流水线的方式进行任务调度。

Spark Streaming 则是通过微批处理的方式来实现对实时数据流的处理。它将数据流切分成很小的批数据,然后提交给 Spark 执行批处理任务。Spark Streaming 基于 RDD 来表达运算逻辑,并通过 Spark 的任务调度机制进行调度。

Flink 的内部把流处理算法表示为数据流图,并以流水线的方式持续运算。而 Spark Streaming 是将流任务拆解为一个个小批的 Spark 任务,这些批任务按时间顺序执行。


网站公告

今日签到

点亮在社区的每一天
去签到