Spark Core基础与源码剖析全景手册
Spark Core基础与源码剖析全景手册
Spark作为大数据领域的明星计算引擎,其核心原理、源码实现与调优方法一直是面试和实战中的高频考点。本文将系统梳理Spark Core与Hadoop生态的关系、经典案例、聚合与分区优化、算子底层原理、集群架构和源码剖析,结合流程图、源码片段和速记口诀,帮助你快速掌握Spark核心知识。
1. Spark Core与Hadoop生态复习
1.1 Spark Core概述
Spark Core作用
Spark Core是Spark的内核,负责RDD(弹性分布式数据集)管理、任务调度、内存管理和容错机制等,是所有Spark组件的基础。
核心特性
- RDD(弹性分布式数据集):核心数据抽象,支持分布式、不可变、容错。
- 懒加载(Lazy Evaluation):转换操作不会立即执行,触发Action时才真正计算。
- 容错机制:DAG血缘追踪,自动重算丢失分区。
- 内存计算:极大提升大数据处理速度。
口诀:RDD弹性,懒加载,血缘容错,快如闪电。
1.2 Hadoop生态系统梳理
- HDFS:分布式文件存储
- MapReduce:分布式计算模型
- YARN:资源调度框架
- 生态组件:Hive(数据仓库)、HBase(NoSQL)、Zookeeper、Sqoop、Flume、Oozie等
口诀:三驾马车(HDFS、MR、YARN),生态百花齐放。
1.3 Spark核心术语
- RDD:不可变、分区、弹性容错的数据集
- Partition:RDD的基本分片单位
- Stage:DAG中的阶段,窄依赖划分
- Task:作用于Partition的计算单元
- Job:用户提交的完整计算逻辑
口诀:Job拆Stage,Stage分Task,Task跑分区,RDD串血缘。
1.4 HadoopRDD源码剖析
Spark通过HadoopRDD与Hadoop生态(如HDFS、HBase)对接,底层读取数据采用Hadoop InputFormat。
getPartitions源码
override def getPartitions: Array[Partition] = {val jobContext = new JobContextImpl(conf, jobId)val inputFormat = inputFormatClass.newInstance()val rawSplits = inputFormat.getSplits(jobContext)val result = new Array[Partition](rawSplits.size)for (i <- 0 until rawSplits.size) {result(i) = new HadoopPartition(id, i, rawSplits(i))}result
}
- 流程:Hadoop切分→Spark封装分区→数据对接
compute源码
override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {val hadoopPartition = split.asInstanceOf[HadoopPartition]val attemptId = newTaskAttemptID()val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)val inputFormat = inputFormatClass.newInstance()val reader = inputFormat.createRecordReader(hadoopPartition.inputSplit.value, hadoopAttemptContext)reader.initialize(hadoopPartition.inputSplit.value, hadoopAttemptContext)new Iterator[(K, V)] {private var havePair = falseprivate var finished = falseprivate def getNext(): Boolean = {if (!finished && reader.nextKeyValue()) { havePair = true; true }else { finished = true; false }}override def hasNext: Boolean = if (!havePair) getNext() else trueoverride def next(): (K, V) = {if (!hasNext) throw new NoSuchElementException("End of stream")havePair = false(reader.getCurrentKey, reader.getCurrentValue)}}
}
- 流程图:
InputFormat.getSplits → InputSplit[] → HadoopPartition[]
HadoopPartition → RecordReader → (K,V) Iterator
口诀:分片分区,读器遍历,KV产出,迭代输出。
2. Spark常用案例与算子剖析
2.1 WordCount源码分析与图解
val lines = sc.textFile("file.txt")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect().foreach(println)
- 流程图:
文本 → flatMap → map → reduceByKey
行 拆词 (词,1) 词频聚合
口诀:读拆映聚,词频统计。
关键算子源码
- textFile(HadoopRDD + map(_._2))
- flatMap/map(MapPartitionsRDD)
- reduceByKey(combineByKey底层实现)
2.2 常用集合操作API
- map:逐元素映射
- flatMap:映射并扁平化
- filter:条件过滤
- groupByKey:按key分组
- reduceByKey:按key聚合
口诀:映射分组,过滤聚合,操作灵活。
2.3 PV/UV分析案例
val pv = logs.count()
val uv = logs.map(_.userId).distinct().count()
- PV:count计数
- UV:distinct去重后计数
口诀:PV计数,UV去重。
2.4 RDD源码结构与血缘
- 核心属性:partitions, dependencies, compute, iterator
- 依赖类型:NarrowDependency(窄依赖)、ShuffleDependency(宽依赖)
口诀:分区依赖,懒计算,血缘追溯,容错重算。
3. 聚合与分区优化源码剖析
3.1 聚合API与底层实现
reduceByKey
:底层调用combineByKey
aggregateByKey
:可设初值,分区内/间聚合combineByKey
:三步(初始、分区内、分区间)
口诀:简聚reduce,初值aggregate,灵活combine。
combineByKey源码
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,numPartitions: Int
): RDD[(K, C)] = {new ShuffledRDD[K, V, C](self, part, serializer).setAggregator(new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners))
}
口诀:首遇建桶,内聚合并,跨分区合。
3.2 分区调优
- 参数:
spark.default.parallelism
,numPartitions
- 方法:
repartition
(全shuffle),coalesce
(减少分区,少shuffle)
口诀:分区合理,快慢有度,合并coalesce,重分repartition。
4. Shuffle底层实现与优化
4.1 Shuffle写入(map端)
- 流程:分桶(partitioner)、排序(可选)、序列化与溢写(ExternalSorter)、输出(data file + index file)
- 文件结构:
shuffle_0_0.data
:所有桶数据合一shuffle_0_0.index
:记录每个桶起止偏移
口诀:分桶排序,内存溢写,索引分隔,文件合一。
4.2 Shuffle读取(reduce端)
- 流程:获取元数据(MapOutputTracker)、远程拉取(ShuffleBlockFetcherIterator)、聚合排序、容错重试
口诀:元数据定位,远程拉取,聚合排序,自动容错。
4.3 ShuffleManager对比
- SortShuffleManager(默认):合桶存储,节省空间,支持排序
- HashShuffleManager:每桶一文件,文件爆炸,不支持排序
- Tungsten/UnsafeShuffleManager:堆外内存,性能优先
口诀:排序合桶,节省空间;哈希分桶,文件爆炸;Tungsten堆外,性能优先。
5. 聚合算子物理执行与优化
5.1 reduceByKey底层流程
- 分区内聚合(mergeValue)
- shuffle分桶写文件
- reduce端拉取分桶数据并聚合(mergeCombiners)
流程简图:
分区1: (A,1),(A,2),(B,1)
分区2: (A,3),(B,2)
→ 分区内聚合 → (A,3),(B,1); (A,3),(B,2)
→ shuffle分桶
→ reduce端聚合 → (A,6),(B,3)
口诀:分区先合,桶间再聚。
5.2 combineByKey三步
- createCombiner:首次遇key建立桶
- mergeValue:分区内聚合
- mergeCombiners:跨分区聚合
口诀:首次建桶,分区内合,跨区再聚,减少传输。
6. 调度系统深度剖析
6.1 DAGScheduler与TaskScheduler
- DAGScheduler:将RDD操作DAG划分为Stage,管理Stage依赖
- TaskScheduler:负责将Stage中的分区转为Task,分发到Executor
源码流程:
// DAGScheduler.submitJob
val finalStage = newStage(...)
submitStage(finalStage)def submitStage(stage: Stage): Unit = {if (stage.parents.isEmpty) {taskScheduler.submitTasks(taskSet)} else {stage.parents.foreach(submitStage)}
}
口诀:DAG划分,Stage递进,Task分发,本地优先。
7. 容错、推测执行与参数调优
7.1 容错与推测执行
- 血缘(Lineage)容错:RDD依赖链可重算丢失分区
- Shuffle文件丢失:Driver可重算map task
- 推测执行:检测慢task,允许冗余执行,避免慢节点拖后腿
口诀:血缘追溯,丢失重算,推测执行,容错加速。
7.2 重点参数
spark.shuffle.compress
:是否压缩spark.shuffle.file.buffer
:文件缓冲区spark.reducer.maxSizeInFlight
:reduce端拉取并发量spark.shuffle.sort.bypassMergeThreshold
:bypass优化spark.speculation
:推测执行开关
口诀:压缩节流,缓冲调优,推测补位,参数先行。
8. 集群架构与部署运维
8.1 角色与架构
- Driver:任务提交与调度
- Executor:执行Task与缓存
- Cluster Manager:YARN/Standalone/K8s
- Worker:Standalone模式下运行Executor
口诀:Driver调度,Executor计算,Manager分配。
8.2 资源调度与高可用
- 参数:
spark.executor.memory
、spark.executor.cores
、spark.driver.memory
- HA:Standalone支持Zookeeper主备
- History Server:历史作业追踪与调优
口诀:内存CPU,合理分配。主备高可用,ZK做协调。历史追踪,日志可查。
8.3 YARN集群搭建与JAR包管理
- YARN模式:client/cluster
- 调优参数:
yarn.nodemanager.resource.memory-mb
- JAR包管理:
--jars
、--packages
、推荐HDFS分发
口诀:YARN调度,参数适配。依赖合规,HDFS分发。
9. 经典面试与实战答题模板
-
reduceByKey底层流程?
分区内本地聚合,shuffle分桶写文件,reduce端拉取分桶数据再聚合,采用索引+数据文件结构,丢失可血缘重算,慢任务可推测执行。 -
SortShuffle与HashShuffle区别?
SortShuffle合桶存储、索引分隔、文件少、支持排序;HashShuffle每桶一文件,文件数多,不支持排序。 -
Shuffle读写的网络协议和容错?
基于Netty RPC,reduce端并发拉取数据,失败自动重试或重算map task。
10. 速记口诀大合集
口诀 | 适用场景 | 详细解释 |
---|---|---|
RDD弹性,懒加载,血缘容错,快如闪电 | Spark Core本质 | RDD灵活、延迟、容错、快 |
分片分区,读器遍历,KV产出,迭代输出 | HadoopRDD源码 | 分片分区、RecordReader迭代KV |
读拆映聚,词频统计 | WordCount | 读文件、拆词、映射、聚合 |
分区依赖,懒计算,血缘追溯,容错重算 | RDD血缘与依赖 | 窄依赖、懒执行、血缘追溯、自动容错 |
分区先聚,跨区再合 | reduceByKey等聚合算子 | 先本地聚合,后跨节点聚合 |
三步合并,聚合核心,shuffle分发 | combineByKey底层 | 初始、分区内、分区间三步合并 |
分桶排序,内存溢写,索引分隔,文件合一 | shuffle写端 | 分桶排序、溢写磁盘、索引分隔、合一文件 |
元数据定位,远程拉取,聚合排序,自动容错 | shuffle读端 | 查元数据、拉取聚合、自动重试 |
路径规范,偏移定位,RPC拉取,容错重算 | shuffle文件与拉取 | 路径规范、index偏移、RPC拉取、重算 |
首次建桶,分区内合,跨区再聚,减少传输 | combineByKey聚合流程 | 本地聚合减少数据量,跨区再聚合 |
血缘追溯,丢失重算,推测执行,容错加速 | 容错与推测执行 | 血缘可重算,慢任务推测执行 |
DAGScheduler划分,Task分发,本地优先 | 调度原理 | DAG分Stage,Task分发本地优先 |
结语
本手册结合源码、流程、架构、调度、底层实现与调优要点,辅以口诀助记,既适合Spark初学者体系化学习,也为有经验者面试、查漏补缺与实战调优提供一站式参考。
如需**某一环节(如DAGScheduler状态流转、推测执行源码、具体shuffle二进制结构、Executor资源分配源码等)**进一步源码剖析,欢迎留言或私信交流!
关注我,获取更多大数据实战与源码剖析干货!