Spark源码中的CAS思想
最近在看黑马视频学习CAS相关的知识,刚好也在学Spark,凑一起学了。
类中几个核心的方法:
CAS(Compare-And-Swap)是一种重要的无锁(lock-free)并发编程思想,它在高性能框架 like Spark 中无处不在。Spark 在其核心代码中大量使用了 Java 并发包(java.util.concurrent
)中的原子变量(Atomic Variables),这些原子变量的底层实现正是基于 CPU 的 CAS 指令。
阅读 Spark 源码中几个体现 CAS 思想的典型例子。
1. 核心概念:Java 原子变量
Spark 中 CAS 思想最直接的体现就是广泛使用 java.util.concurrent.atomic
包下的类,例如:
AtomicReference
AtomicInteger
AtomicLong
AtomicBoolean
这些类的 compareAndSet()
、getAndSet()
、getAndIncrement()
等方法都是基于 CAS 实现的。
2. 源码实例分析
实例 1:RPC 端点状态控制 (RpcEndpoint.scala
)
在 org.apache.spark.rpc
包中,RpcEndpoint
的生命周期状态管理就使用了 CAS 来确保线程安全。
核心思想: 一个 RpcEndpoint 的状态(如:stopped
)可能会被多个线程同时修改(例如,同时调用 stop()
方法),但必须保证它只能被成功停止一次。
简化代码逻辑:
private val stopped = new AtomicBoolean(false)def stop(): Unit = {// 使用 CAS 操作来尝试将状态从 false 改为 true// 如果成功,说明我是第一个执行 stop 的线程,可以执行关闭逻辑// 如果失败,说明其他线程已经执行了 stop,我直接返回即可if (stopped.compareAndSet(false, true)) {// 执行真正的资源释放和关闭逻辑...doRealStop()}
}
源码位置: 你可以查看 org.apache.spark.rpc.RpcEndpoint
相关的实现类,状态控制通常采用这种模式。
实例 2: 工具类中的等待机制 (Utils.scala
)
在 org.apache.spark.util.Utils
中,有一个方法 waitUntil
,它使用 CAS 和自旋循环来等待某个条件被满足。
核心思想: 不断检查一个条件,直到它成立。使用 volatile 变量保证可见性,循环检查而非锁阻塞,效率更高。
简化代码逻辑:
def waitUntil(p: => Boolean, timeout: Long = 100, maxTries: Int = 600): Boolean = {var tries = 0while (!p && tries < maxTries) {Thread.sleep(timeout)tries += 1 // 这个递增操作本身不是原子的,但这里循环条件不依赖于它,所以没问题}p
}
虽然这个例子中没有直接的 AtomicInteger
,但它体现了 CAS 的“循环尝试”核心思想。更复杂的版本可能会用 AtomicInteger
来记录 tries
。
源码位置: org.apache.spark.util.Utils#waitUntil
实例 3: 事件循环总线 (LiveListenerBus.scala
)
Spark 的事件监听总线 LiveListenerBus
用于异步传递各种事件(如任务开始、结束等)。它的 started
状态也使用 CAS 来管理。
核心思想: 确保监听总线只能被启动一次。
简化代码逻辑:
private val started = new AtomicBoolean(false)def start() {// 尝试将 started 从 false 设置为 trueif (started.compareAndSet(false, true)) {// 成功,执行启动逻辑thread.start()} else {// 失败,说明已经启动,抛出异常或忽略logError("LiveListenerBus already started!")}
}
源码位置: org.apache.spark.scheduler.LiveListenerBus#start
实例 4: 任务内存管理 (TaskMemoryManager.scala
)
这是最复杂也是最能体现 CAS 优势的例子。在 org.apache.spark.memory.TaskMemoryManager
中,管理着每个 Task 的内存分配。其中有一个核心结构叫 PageTable
,它使用 AtomicReferenceArray
来存储内存页(MemoryBlock)。
核心思想: 多个线程可能同时为同一个 Task 申请或释放内存页。使用 CAS 可以无锁地管理页表的分配和释放,极大提升了并发性能。如果使用传统的锁,内存分配将成为巨大的性能瓶颈。
简化代码逻辑:
// 页表是一个原子引用数组
private val pageTable = new AtomicReferenceArray[MemoryBlock](maxPages)// 分配一个新页的近似逻辑
def allocatePage(): MemoryBlock = {var pageNumber: Int = -1// 循环尝试,找到一个空位do {val foundPage = findFreePage() // 寻找一个空页号if (foundPage == -1) {return null // 没找到}// 关键:使用 CAS 操作,尝试将 pageTable 中 foundPage 位置从 null 设置为一个“占位符”// 如果成功,说明这个页被我抢到了;如果失败,说明被其他线程抢走了,继续循环寻找} while (!pageTable.compareAndSet(foundPage, null, placeholder))// 成功抢到页,执行实际的内存分配...val page = allocRealMemory()// 最后,将占位符替换为真正的内存页对象pageTable.set(foundPage, page)page
}
源码位置: org.apache.spark.memory.TaskMemoryManager
。这里的代码非常复杂,但 AtomicReferenceArray.compareAndSet
是其无锁化的基石。
总结
在 Spark 源码中,CAS 思想主要体现在以下几个方面:
状态标志位管理: 例如组件(RpcEndpoint, ListenerBus)的启动、停止状态。使用
AtomicBoolean.compareAndSet
确保状态转换的原子性和线程安全,避免重复启动/关闭。无锁计数器和累加器: 使用
AtomicInteger.getAndIncrement
等进行计数,避免synchronized
带来的性能开销。无锁数据结构: 最典型的例子是
TaskMemoryManager
中的页表管理,使用AtomicReferenceArray
实现高性能的无锁内存分配,这是支撑 Spark 高效内存计算的关键之一。乐观锁控制: 广泛采用“循环尝试”的模式,先读取值,计算新值,然后通过 CAS 更新。如果失败(说明期间值被其他线程修改),则回滚并重试。
Spark 是一个高性能、高并发的分布式计算框架。在分布式环境下,锁(synchronized
)容易导致线程阻塞和死锁,成为性能瓶颈。而 CAS 是一种乐观锁,它假设竞争不激烈,直接尝试更新,如果失败就重试。在大多数情况下,这种无锁操作的速度远快于锁操作,从而极大地提升了框架的并发性能和吞吐量。
这些代码可以在 Spark 源码项目中搜索 Atomic
、compareAndSet
等关键字,会发现大量的应用场景。