Spark的persist和cache
在 Spark 中,cache() 和 persist() 都是用于“数据持久化”的 API——核心作用是将计算后的 RDD/DataFrame 存储到内存或磁盘,避免重复计算(比如多次复用同一个数据集时,不用重新执行前面的转换逻辑),是 Spark 性能优化的核心手段之一。
结合你的数仓/ETL 工作场景(比如直播数据清洗、聚合、关联),用“核心区别+使用场景+实操细节”讲清二者的关系和用法:
一、先搞懂:为什么需要 cache/persist?(解决重复计算的痛点)
Spark 中 RDD/DataFrame 是“惰性求值”的——只有执行 action 操作(如 count()、show()、write())时,才会触发实际计算。如果同一个数据集被多次使用(比如清洗后的点击数据,既要用于聚合统计,又要用于关联维表),默认会重复执行所有前置转换逻辑(如过滤、去重),导致大量冗余计算。
举个例子(直播点击数据处理):
// 1. 读取并清洗直播点击数据(转换操作,惰性求值)
val cleanedClicks = spark.read.json("hdfs:///live/click_log").filter("商品ID is not null") // 过滤空值.dropDuplicates("用户ID", "商品ID", "click_time") // 去重重复点击// 2. 第一次使用:统计各商品点击量(action 操作,触发计算)
val clickCount = cleanedClicks.groupBy("商品ID").count()
clickCount.write.parquet("hdfs:///live/click_count")// 3. 第二次使用:关联商品维表补全信息(action 操作,再次触发计算)
val clickWithDim = cleanedClicks.join(productDim, "商品ID")
clickWithDim.write.parquet("hdfs:///live/click_with_dim")
- 问题:
cleanedClicks被使用了 2 次,每次action都会重新执行“读取→过滤→去重”的完整流程,效率极低; - 解决:用
cache()或persist()持久化cleanedClicks,第一次计算后就把数据存起来,第二次直接复用,避免重复计算。
二、cache() 和 persist() 的核心关系与区别
1. 核心关系:cache() 是 persist() 的“简化版”
cache()本质是persist(StorageLevel.MEMORY_ONLY)的简写——即默认只将数据持久化到内存中;persist()是更灵活的 API,支持手动指定“存储级别”(内存、磁盘、序列化、副本数等),覆盖更多场景。
2. 关键区别(表格清晰对比)
| 对比维度 | cache() | persist() |
|---|---|---|
| 存储级别 | 固定为「内存-only」(不序列化) | 可指定多种存储级别(如内存+磁盘、序列化存储) |
| 灵活性 | 低(仅一种存储方式) | 高(支持 12 种存储级别,按需选择) |
| 底层实现 | 调用 persist(StorageLevel.MEMORY_ONLY) | 直接接收 StorageLevel 参数,实现更灵活 |
| 适用场景 | 数据量小、内存充足,需快速复用 | 数据量大(内存放不下)、需容错(多副本)、需长期复用 |
| 数据格式 | 原始对象格式(读取快,内存占用大) | 可选择序列化格式(内存占用小,读取略慢) |
3. Spark 核心存储级别(persist() 可指定)
常用存储级别(重点记前 4 个):
| 存储级别 | 含义 | 适用场景 |
|---|---|---|
| MEMORY_ONLY(cache 默认) | 仅存内存,不序列化,无副本 | 数据量小、内存充足,追求最快读取速度 |
| MEMORY_AND_DISK | 优先存内存,内存放不下存磁盘,不序列化 | 数据量略大(内存不够),需平衡速度和容量 |
| MEMORY_ONLY_SER | 仅存内存,序列化(对象转字节),无副本 | 数据量大,内存紧张,允许轻微读取延迟 |
| MEMORY_AND_DISK_SER | 优先存内存,内存放不下存磁盘,序列化 | 数据量很大,需严格控制内存占用 |
| DISK_ONLY | 仅存磁盘,不序列化 | 数据量极大(内存完全放不下),长期复用 |
带 _2 后缀(如 MEMORY_ONLY_2) | 对应级别+2个副本(分布式存储在不同节点) | 需高容错(避免节点宕机导致数据丢失) |
- 序列化的作用:将 Java 对象转换成字节数组,内存占用减少 30%-70%,但读取时需要反序列化,有轻微性能开销;
- 副本的作用:多节点存储相同数据,节点宕机时可直接复用副本,不用重新计算,适合关键任务。
三、实操用法(Scala/Spark SQL 示例)
1. 基础用法(Scala 代码)
// 1. cache() 用法(简单,默认内存存储)
val cleanedClicks = spark.read.json("hdfs:///live/click_log").filter("商品ID is not null").dropDuplicates().cache() // 持久化到内存// 2. persist() 用法(指定存储级别)
import org.apache.spark.storage.StorageLevel
val cleanedClicks = spark.read.json("hdfs:///live/click_log").filter("商品ID is not null").dropDuplicates().persist(StorageLevel.MEMORY_AND_DISK_SER) // 优先内存,放不下存磁盘,序列化// 3. 触发计算(cache/persist 是惰性的,需 action 触发持久化)
cleanedClicks.count() // 第一次 action:执行计算并持久化数据// 4. 后续复用:直接读取持久化数据,不重复计算
cleanedClicks.groupBy("商品ID").count().write.parquet(...)
cleanedClicks.join(productDim, "商品ID").write.parquet(...)// 5. 不再使用时,释放资源(避免内存/磁盘占用)
cleanedClicks.unpersist() // 手动释放持久化数据
2. Spark SQL 中的用法(临时视图持久化)
在数仓 ETL 中,你可能用 Spark SQL 处理数据,可通过 CACHE TABLE 语句持久化临时视图:
-- 1. 创建临时视图并清洗数据
CREATE TEMPORARY VIEW cleaned_click_view AS
SELECT * FROM parquet.`hdfs:///live/click_log`
WHERE 商品ID IS NOT NULL
DISTINCT 用户ID, 商品ID, click_time;-- 2. 持久化视图(默认 MEMORY_ONLY,对应 cache())
CACHE TABLE cleaned_click_view;-- 3. 或指定存储级别(对应 persist())
CACHE TABLE cleaned_click_view OPTIONS ('storageLevel' 'MEMORY_AND_DISK_SER');-- 4. 多次复用视图,无需重复计算
SELECT 商品ID, COUNT(*) FROM cleaned_click_view GROUP BY 商品ID;
SELECT c.*, p.商品名称 FROM cleaned_click_view c JOIN product_dim p ON c.商品ID = p.商品ID;-- 5. 释放资源
UNCACHE TABLE cleaned_click_view;
四、关键注意点(避坑指南,结合你的业务场景)
1. cache/persist 是“惰性的”,需 action 触发持久化
- 仅调用
cleanedClicks.cache()不会实际存储数据,必须执行count()、show()等 action 操作,才会触发计算并持久化; - 示例:如果只写
cache()而不执行 action,后续复用依然会重复计算。
2. 数据更新后,持久化数据不会自动刷新
- 如果持久化的数据集依赖的源数据(如 HDFS 上的日志文件)被修改,Spark 不会自动更新持久化的数据,依然会使用旧数据;
- 解决:修改源数据后,需调用
unpersist()释放旧数据,再重新计算并持久化。
3. 内存不足时,cache() 会导致 OOM,persist() 更安全
cache()仅存内存,若数据量超过内存,会抛出 OutOfMemoryError(OOM);- 建议:生产环境中,除了小数据集(如维表),优先用
persist(MEMORY_AND_DISK)或MEMORY_AND_DISK_SER,避免 OOM。
4. 持久化后的数据会跨任务复用
- 同一个 Spark 应用中,多个 Job(如多个
write()操作)可以复用持久化的数据; - 但不同 Spark 应用(如不同的 ETL 任务)无法复用——持久化数据仅在当前应用的生命周期内有效,应用结束后自动释放。
5. 维表关联场景:优先用 persist 持久化维表
- 数仓中维表(如商品维表、商家维表)通常较小且被多次关联,用
persist(MEMORY_ONLY)持久化后,可触发 Spark 的Broadcast Join(广播 join),避免 Shuffle,大幅提升关联效率。
五、使用场景对比(什么时候用 cache?什么时候用 persist?)
用 cache() 的场景
- 数据集小(如几万条维表数据),内存充足;
- 需频繁复用(如多次关联、统计),追求最快读取速度;
- 示例:直播商品维表(
product_dim),数据量小且被多个 ETL 任务关联。
用 persist() 的场景
- 数据量大(如上亿条直播点击日志),内存放不下;
- 需容错(关键任务,避免节点宕机导致数据丢失),可指定
_2副本; - 内存紧张,需控制内存占用(用序列化存储级别
_SER); - 示例:清洗后的直播点击日志(
cleanedClicks),数据量极大,需用于聚合、关联、写入多个表。
六、性能优化建议(结合数仓 ETL 实操)
- 只持久化需要复用的数据:不要盲目对所有数据集持久化,仅对多次使用的数据集(如清洗后的中间结果、维表)进行持久化,避免浪费资源;
- 选择合适的存储级别:
- 小数据集(维表):
MEMORY_ONLY(最快); - 中大数据集(中间结果):
MEMORY_AND_DISK_SER(平衡内存和速度); - 超大数据集(原始日志):
DISK_ONLY(仅当需长期复用且内存完全不够时);
- 小数据集(维表):
- 及时释放资源:数据集不再使用时,调用
unpersist()释放内存/磁盘(尤其是 Spark 应用运行时间长、数据集多的场景); - 避免持久化临时中间结果:如果数据集仅使用一次(如读取后直接写入,无复用),无需持久化,反而会增加存储开销。
总结
- 核心关系:
cache()是persist()的简化版,仅支持内存存储;persist()更灵活,支持多存储级别; - 核心作用:避免重复计算,提升复用数据集的任务性能;
- 数仓实操建议:维表用
cache(),中间大数据集用persist(MEMORY_AND_DISK_SER),关键任务加副本(_2),用完及时unpersist()。
结合你的直播数仓场景,比如“清洗后的点击数据需用于聚合、关联、写入多个表”,直接用 persist(MEMORY_AND_DISK_SER) 持久化,既能避免重复计算,又能防止 OOM,是性价比最高的选择。
