Spark 缓存(Caching)是一种重要的性能优化技术,它允许将频繁使用的数据集持久化到内存或磁盘中,避免重复计算。
缓存的基本概念
为什么要使用缓存?
1.避免重复计算:对于需要多次使用的 RDD/DataFrame/Dataset,缓存后只需计算一次
2.加速迭代算法:机器学习等迭代算法中,重复使用同一数据集时可显著提升性能
3.优化执行计划:减少从数据源重复读取数据的开销
缓存方法
主要缓存API
val rdd = sc.parallelize(1 to 100)
// 缓存方法
rdd.cache() // 等同于 persist(MEMORY_ONLY)
rdd.persist() // 使用默认存储级别
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 指定存储级别
// 取消缓存
rdd.unpersist()
存储级别(StorageLevel)
存储级别 | 描述 |
`MEMORY_ONLY` | 只存储在内存中(默认) |
`MEMORY_AND_DISK` | 内存存不下时溢出到磁盘 |
`MEMORY_ONLY_SER` | 序列化后存储内存(节省空间但增加CPU开销) |
`MEMORY_AND_DISK_SER` | 序列化存储,内存不足时存磁盘 |
`DISK_ONLY` | 只存储在磁盘 |
`OFF_HEAP` | 使用堆外内存 |
缓存的最佳实践
1. 选择性缓存:
只缓存会被多次使用的数据集
避免缓存一次性使用的数据
2. 合理选择存储级别:
内存充足时使用 `MEMORY_ONLY`
大数据集且内存有限时使用 `MEMORY_AND_DISK`
对象较大时考虑序列化存储(`_SER`)
3. 及时释放:
使用 `unpersist()` 释放不再需要的缓存
避免不必要的内存占用
4. 监控缓存使用:
通过 Spark UI 查看缓存大小和命中率
调整缓存策略基于实际使用情况
缓存示例
// 读取大数据集
val logs = spark.read.csv("huge-log-file.csv")
// 过滤并缓存常用数据
val errorLogs = logs.filter($"level" === "ERROR").cache()
// 多次使用缓存数据
val errorCount = errorLogs.count()
val recentErrors = errorLogs.filter($"date" > "2023-01-01")
// 使用完成后释放
errorLogs.unpersist()
注意事项
1. 缓存不保证数据一定在内存中(可能因内存压力被LRU淘汰)
2. 缓存是惰性的,第一次action操作时才会真正缓存
3. 序列化缓存可节省空间但增加CPU开销
4. 在Spark UI的Storage标签页可以查看缓存状态
合理使用缓存可以显著提高Spark应用性能,但需要根据数据大小、访问模式和集群资源进行适当配置。