spark组件-spark core(批处理)-rdd持久化机制
Spark持久化机制是Spark框架中用于提升计算性能的核心特性,主要通过将RDD(弹性分布式数据集)的计算结果保存起来,避免重复计算的开销。
持久化原理与触发时机
RDD采用惰性求值方式,每次调用行动算子(Action)都会从头开始计算。持久化机制通过在节点内存或磁盘中缓存RDD的partition数据,使得后续对该RDD的反复操作可以直接使用缓存数据,而无需重新计算。需要注意的是,调用cache()
或persist()
方法时并不会立即缓存数据,而是等到后续第一个行动算子触发计算后,该RDD才会被持久化。此机制对于需要多次重复使用同一数据集的迭代计算和交互式查询场景尤为重要。
持久化操作方法
Spark提供了cache()
和persist()
两种方法来实现RDD持久化:
cache()
方法是persist()
的一种简化形式,其底层调用的是persist(MEMORY_ONLY)
,即将数据以未序列化的Java对象形式持久化到内存中。persist()
方法允许手动选择不同的存储级别,通过传入StorageLevel
对象来指定。
如果需要清除缓存,可以调用unpersist()
方法。
存储级别
通过persist()
方法可以指定多种存储级别,以适应不同的需求,主要包括:
- MEMORY_ONLY:将RDD以未序列化的Java对象形式存储于内存。这是
cache()
的默认级别,内存不足时部分分区数据将不被缓存。 - MEMORY_AND_DISK:内存不足时,将部分分区数据写入磁盘。
- MEMORY_ONLY_SER与MEMORY_AND_DISK_SER:将RDD以序列化的字节数组形式存储,节省内存空间但增加CPU开销。
- DISK_ONLY:仅将数据保存到磁盘。
带有“_2”后缀的级别(如MEMORY_ONLY_2
)会在两个不同节点上保存副本,提高容错性。
容错机制
RDD的缓存存在丢失风险,但其容错机制保证了计算的正确性。该机制基于RDD的血统(Lineage)信息,当缓存数据丢失时,Spark只需要通过依赖关系重新计算丢失的分区数据即可,无需重算整个RDD。
Checkpoint机制
除了cache
和persist
,Spark还提供了Checkpoint机制。它与持久化的主要区别在于:
- 数据管理:Checkpoint将数据永久写入HDFS等可靠文件系统,需用户管理;而
cache
和persist
的数据由Spark自动管理(包括创建和回收)。 - 血统影响:Checkpoint会切断RDD的血统关系,避免依赖链过长;而
cache
和persist
会保留血统。 - 计算次数:由于Checkpoint通常在Job完成后由另一个专门Job执行,可能导致RDD被计算两次。因此建议在使用Checkpoint前先对RDD进行缓存,以避免重复计算。Checkpoint适用于Lineage过长或包含宽依赖的场景,提供更强的容错保障
例子:
1.cache()
public class Test01_Cache {public static void main(String[] args) {// 1. 创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCount");// 2. 创建contextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 业务代码JavaRDD<Tuple2<String, Integer>> javaRDD = sc.parallelize(Arrays.asList(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 1),new Tuple2<>("b", 2)),2);JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {System.out.println("---------------------------");return new Tuple2<>(stringIntegerTuple2._1, stringIntegerTuple2._2