在 Spark 中,缓存(Caching)是一种优化技术,用于将中间计算结果存储在内存或磁盘中,避免重复计算,从而显著提升迭代计算或交互式查询的性能。以下是关于 Spark 缓存的详细介绍:
一、为什么需要缓存?
Spark 的 RDD 操作是惰性的,每次触发行动算子(如collect()
、count()
)时都会重新计算整个 DAG(有向无环图)。对于需要多次使用的 RDD(如迭代算法或交互式查询),这种重复计算会造成极大的资源浪费。缓存可以将中间结果持久化,避免重复计算。
二、缓存的基本用法
Spark 提供了两种缓存方法:
cache()
:默认将 RDD 存储在内存中(等同于persist(StorageLevel.MEMORY_ONLY)
)。persist(StorageLevel)
:可指定存储级别(如内存、磁盘、序列化等)。
示例:缓存 RDD
python
运行
from pyspark import SparkContext
sc = SparkContext("local", "CacheExample")
# 创建RDD
rdd = sc.textFile("hdfs://path/to/large/file.txt")
# 缓存RDD
rdd.cache() # 等同于 rdd.persist(StorageLevel.MEMORY_ONLY)
# 第一次行动操作:触发计算并缓存结果
count1 = rdd.count() # 第一次计算,耗时较长
# 第二次行动操作:直接使用缓存结果
count2 = rdd.count() # 直接从缓存读取,耗时极短
三、存储级别(StorageLevel)
Spark 支持多种存储级别,可根据数据规模和内存情况选择:
存储级别 | 说明 |
---|---|
MEMORY_ONLY |
默认级别,将 RDD 作为反序列化的 Java 对象存储在内存中。内存不足时部分数据会被丢弃。 |
MEMORY_ONLY_SER |
将 RDD 作为序列化的 Java 对象存储(占用空间更小)。 |
MEMORY_AND_DISK |
优先存储在内存中,内存不足时溢写到磁盘。 |
MEMORY_AND_DISK_SER |
类似MEMORY_AND_DISK ,但数据序列化存储。 |
DISK_ONLY |
只存储在磁盘上。 |
MEMORY_ONLY_2 |
类似MEMORY_ONLY ,但数据复制到两个节点。 |
OFF_HEAP |
存储在 Tungsten 堆外内存中(需要启用堆外内存)。 |
示例:指定存储级别
python
运行
from pyspark import StorageLevel
# 存储在内存和磁盘,序列化
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 只存储在磁盘
rdd.persist(StorageLevel.DISK_ONLY)
四、缓存的生命周期
- 缓存何时触发?
首次触发行动算子时,Spark 会计算 RDD 并将结果缓存。 - 缓存何时失效?
- 手动调用
unpersist()
释放缓存。 - 内存不足时,Spark 会根据 LRU(最近最少使用)策略自动删除部分缓存数据。
- Spark 应用结束时,缓存自动释放。
- 手动调用
示例:释放缓存
python
运行
rdd.unpersist() # 释放缓存
五、缓存的最佳实践
优先缓存频繁使用的 RDD
如迭代算法中的中间结果(如 MLlib 模型训练)或交互式查询中的数据集。选择合适的存储级别
- 内存充足时,使用
MEMORY_ONLY
(性能最优)。 - 数据量大时,使用
MEMORY_ONLY_SER
或MEMORY_AND_DISK_SER
(节省内存)。 - 对容错要求高的场景,使用带副本的存储级别(如
MEMORY_ONLY_2
)。
- 内存充足时,使用
避免过度缓存
缓存过多数据会导致内存压力,触发频繁的 GC 或数据溢写磁盘,反而降低性能。缓存后进行重分区
缓存后可使用coalesce()
减少分区数,降低后续任务的调度开销:python
运行
rdd_cached = rdd.cache() rdd_optimized = rdd_cached.coalesce(10) # 合并为10个分区
监控缓存使用情况
通过 Spark UI 查看缓存状态:- Storage 标签页显示各 RDD 的缓存大小、分区数和存储位置。
- 监控内存使用,避免 OOM(内存溢出)。
六、缓存 vs 检查点(Checkpointing)
特性 | 缓存(Cache) | 检查点(Checkpoint) |
---|---|---|
存储位置 | 内存或磁盘(Executor 节点) | HDFS 等可靠存储(外部系统) |
容错性 | 节点故障时可能丢失数据,需重新计算 | 数据永久存储,节点故障不影响 |
性能 | 速度快(内存读取) | 速度较慢(需写入外部存储) |
用途 | 短期重用中间结果 | 长期保存关键结果(如长时间迭代的中间点) |
触发方式 | 首次行动算子自动触发 | 手动调用checkpoint() 并触发行动算子 |
示例:使用检查点
python
运行
# 设置检查点目录
sc.setCheckpointDir("hdfs://path/to/checkpoint_dir")
# 标记RDD为检查点
rdd.checkpoint()
# 触发检查点(必须有行动算子)
rdd.count() # 此时RDD会被写入检查点目录
七、常见问题与解决方法
内存不足导致缓存失效
- 解决方案:改用
MEMORY_AND_DISK_SER
存储级别,或增加内存资源。
- 解决方案:改用
缓存数据丢失
- 解决方案:使用
MEMORY_ONLY_2
存储级别(数据复制到两个节点),或结合检查点。
- 解决方案:使用
缓存未生效
- 检查是否在行动算子前调用了
cache()
或persist()
。 - 确认 RDD 是否被重复创建(每次
map()
、filter()
等转换操作都会生成新 RDD)。
- 检查是否在行动算子前调用了
八、总结
缓存是 Spark 中提升性能的重要手段,尤其适合迭代计算和交互式查询。合理使用缓存可以显著减少计算开销,但需根据数据规模和内存情况选择合适的存储级别,并注意监控和管理缓存数据。
分享
除了textFile,Spark还有哪些方法可以创建RDD?
除了内存,还可以将RDD持久化到哪些存储介质上?
缓存的RDD是否会一直占用内存或磁盘空间?