Spark源码中的volatile
分享几个spark中的案例关于volatile,希望对大家有帮助
示例 1:保证线程之间的可见性
1.SparkContext
的停止状态 (stopped
)
这是最直接、最经典的 volatile
应用场景,完美对应了图中提到的“保证线程间的可见性”。
源码位置:
org.apache.spark.SparkContext
核心代码:
// 标记SparkContext是否已经被关闭 @volatile private var stopped: Boolean = false// 检查状态的函数,会被多个线程调用 def isStopped: Boolean = stopped// 关闭SparkContext的函数 def stop(): Unit = {if (!stopped) {// ... (一些同步控制逻辑,例如synchronized块,确保关闭操作只执行一次)stopped = true // 设置volatile变量,立即可见} }
为什么需要
volatile
:Spark 中有很多后台线程(如心跳线程、状态汇报线程、作业执行线程等)。
当用户调用
sc.stop()
在主线程中设置stopped = true
后,必须立即让所有其他后台线程看到这个变化,这样它们才能快速终止自己的任务,释放资源。如果没有
volatile
,某个后台线程可能会因为看不到状态变化而继续运行,导致资源无法释放或程序无法正常退出。
示例 2:禁止指令的重排序
当然有!Spark 核心源码中还有几个非常经典且重要的 volatile
应用场景,完美体现了“禁止指令重排序”的特性。让我们深入分析这些高级用法:
1. UnsafeShuffleWriter
中的内存页分配(核心性能优化)
这是 Spark Shuffle 过程中最关键的优化点之一,直接关系到 shuffle 性能。
源码位置:org.apache.spark.shuffle.UnsafeShuffleWriter
核心代码:
private volatile ShuffleInMemorySorter inMemSorter;@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {// 初始化内存排序器(延迟初始化)if (inMemSorter == null) {// 双重检查锁定模式(Double-Checked Locking)synchronized (this) {if (inMemSorter == null) {// 创建内存排序器(涉及多个步骤的复杂初始化)inMemSorter = new ShuffleInMemorySorter(...);}}}// 使用inMemSorter处理记录...while (records.hasNext()) {inMemSorter.insertRecord(...);}
}
为什么需要 volatile
禁止重排序?
双重检查锁定(DCL)模式:这是经典的并发设计模式
初始化重排序风险:
创建
ShuffleInMemorySorter
对象需要多个步骤:a. 分配内存
b. 初始化内部状态
c. 设置引用
如果没有
volatile
,JVM 可能重排序这些步骤,导致其他线程看到inMemSorter != null
,但对象还未完全初始化
volatile
的作用:写屏障:确保在
inMemSorter = new ...
之前的所有初始化操作完成读屏障:确保其他线程读取
inMemSorter
时看到完全初始化的对象
2. TaskMemoryManager
中的内存页表(安全发布)
这是 Spark 内存管理的核心,确保内存页的安全分配。
源码位置:org.apache.spark.memory.TaskMemoryManager
核心代码:
private volatile MemoryBlock[] pageTable = new MemoryBlock[INITIAL_PAGE_NUMBER];public long allocatePage(long size, MemoryConsumer consumer) {int pageNumber = -1;synchronized (this) {// 查找空闲页...pageNumber = findFreePage();// 创建内存块对象(非原子操作)MemoryBlock newPage = new MemoryBlock(...);// !!! 关键点:将新页加入页表pageTable[pageNumber] = newPage; // 对volatile数组的写入}return newPage.getBaseOffset();
}
为什么需要 volatile
禁止重排序?
对象构造重排序:
MemoryBlock
构造可能被重排序:先分配内存地址,后初始化内部状态如果没有
volatile
,其他线程可能看到pageTable[pageNumber]
不为 null,但对象未完全初始化
volatile
数组的特殊性:volatile
只保证数组引用本身的可见性,不保证数组元素的可见性但这里结合
synchronized
块,确保:a) 构造在同步块内完成(避免重排序)
b)
volatile
保证数组更新立即可见
3. DAGScheduler
中的事件队列(状态一致性)
保证事件处理系统状态的一致性。
源码位置:org.apache.spark.scheduler.DAGScheduler
核心代码:
private val eventQueue: BlockingQueue[DAGSchedulerEvent] = new LinkedBlockingQueue[DAGSchedulerEvent]()
@volatile private var stopped = falseprivate val eventThread = new Thread("dag-scheduler-event-loop") {override def run(): Unit = {while (!stopped) {// 从队列取出事件val event = eventQueue.take()try {// 处理事件doOnReceive(event)} catch {case NonFatal(e) => ...}}}
}
为什么需要 volatile
禁止重排序?
启动顺序问题:
def start(): Unit = {eventThread.start()// 初始化操作(可能耗时)initializeComponents() stopped = false // 如果没有volatile,可能被重排序到start()之前 }
volatile
的屏障作用:确保
stopped = false
不会被重排序到eventThread.start()
之前防止事件线程在初始化完成前就开始处理事件