Spark 缓存(Caching)
Spark 缓存机制详解
1. 缓存的核心作用
- 加速计算:通过将重复使用的数据集存储在内存或磁盘,避免重复计算
- 优化迭代算法:适用于机器学习训练、图计算等需要多次访问同一数据集的场景
- 减少I/O开销:对于频繁访问的外部数据源,缓存后可降低读取成本
2. 持久化级别对比
级别 | 存储方式 | 序列化 | 适用场景 |
---|---|---|---|
MEMORY_ONLY | 仅内存 | 否 | 内存充足的小数据集 |
MEMORY_AND_DISK | 内存+磁盘溢出 | 否 | 内存受限的较大数据集 |
MEMORY_ONLY_SER | 内存(序列化存储) | 是 | 内存优化场景 |
DISK_ONLY | 仅磁盘 | 是 | 超大数据集 |
3. 代码实现示例
from pyspark import StorageLevel# 创建DataFrame
df = spark.read.parquet("hdfs://data/large_dataset")# 缓存方式一(默认MEMORY_AND_DISK)
df.cache().count() # 立即触发缓存# 缓存方式二(指定存储级别)
df.persist(StorageLevel.MEMORY_ONLY_SER)# 释放缓存
df.unpersist()
4. 使用场景判断
✅ 推荐缓存:
- 循环使用的中间结果(迭代算法)
- 被多次访问的广播连接表
- 需要快速访问的预处理数据
❌ 避免缓存:
- 仅单次使用的数据集
- 大于集群可用内存50%的数据量
- 频繁更新的动态数据
5. 性能优化技巧
- 缓存前使用
.filter()
或.select()
精简数据 - 对宽表优先使用序列化存储(节省30%-50%内存)
- 监控存储管理器:
print(spark.sparkContext.uiWebUrl) # 查看Storage选项卡
- 配合
checkpoint
使用:切断RDD血缘关系,避免堆栈溢出
6. 缓存失效场景
- JVM内存不足时自动逐出
- 节点故障导致分区丢失
- 调用
unpersist()
主动释放 - 应用结束时自动清除
7. 高级配置参数
spark.storage.memoryFraction=0.6 # 内存分配比例
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.memory.offHeap.enabled=true # 启用堆外内存
spark.memory.offHeap.size=2g
通过合理使用缓存,典型场景可提升作业性能3-10倍。建议结合Spark UI监控缓存命中率和内存使用情况,动态调整存储策略。