当前位置: 首页 > news >正文

Spark源码中的CAS思想

最近在看黑马视频学习CAS相关的知识,刚好也在学Spark,凑一起学了。

类中几个核心的方法:

CAS(Compare-And-Swap)是一种重要的无锁(lock-free)并发编程思想,它在高性能框架 like Spark 中无处不在。Spark 在其核心代码中大量使用了 Java 并发包(java.util.concurrent)中的原子变量(Atomic Variables),这些原子变量的底层实现正是基于 CPU 的 CAS 指令。

阅读 Spark 源码中几个体现 CAS 思想的典型例子。

1. 核心概念:Java 原子变量

Spark 中 CAS 思想最直接的体现就是广泛使用 java.util.concurrent.atomic包下的类,例如:

  • AtomicReference

  • AtomicInteger

  • AtomicLong

  • AtomicBoolean

这些类的 compareAndSet()getAndSet()getAndIncrement()等方法都是基于 CAS 实现的。

2. 源码实例分析

实例 1:RPC 端点状态控制 (RpcEndpoint.scala)

org.apache.spark.rpc包中,RpcEndpoint的生命周期状态管理就使用了 CAS 来确保线程安全。

​核心思想​​: 一个 RpcEndpoint 的状态(如:stopped)可能会被多个线程同时修改(例如,同时调用 stop()方法),但必须保证它只能被成功停止一次。

​简化代码逻辑​​:

private val stopped = new AtomicBoolean(false)def stop(): Unit = {// 使用 CAS 操作来尝试将状态从 false 改为 true// 如果成功,说明我是第一个执行 stop 的线程,可以执行关闭逻辑// 如果失败,说明其他线程已经执行了 stop,我直接返回即可if (stopped.compareAndSet(false, true)) {// 执行真正的资源释放和关闭逻辑...doRealStop()}
}

​源码位置​​: 你可以查看 org.apache.spark.rpc.RpcEndpoint相关的实现类,状态控制通常采用这种模式。

实例 2: 工具类中的等待机制 (Utils.scala)

org.apache.spark.util.Utils中,有一个方法 waitUntil,它使用 CAS 和自旋循环来等待某个条件被满足。

​核心思想​​: 不断检查一个条件,直到它成立。使用 volatile 变量保证可见性,循环检查而非锁阻塞,效率更高。

​简化代码逻辑​​:

def waitUntil(p: => Boolean, timeout: Long = 100, maxTries: Int = 600): Boolean = {var tries = 0while (!p && tries < maxTries) {Thread.sleep(timeout)tries += 1 // 这个递增操作本身不是原子的,但这里循环条件不依赖于它,所以没问题}p
}

虽然这个例子中没有直接的 AtomicInteger,但它体现了 CAS 的“循环尝试”核心思想。更复杂的版本可能会用 AtomicInteger来记录 tries

​源码位置​​: org.apache.spark.util.Utils#waitUntil

实例 3: 事件循环总线 (LiveListenerBus.scala)

Spark 的事件监听总线 LiveListenerBus用于异步传递各种事件(如任务开始、结束等)。它的 started状态也使用 CAS 来管理。

​核心思想​​: 确保监听总线只能被启动一次。

​简化代码逻辑​​:

private val started = new AtomicBoolean(false)def start() {// 尝试将 started 从 false 设置为 trueif (started.compareAndSet(false, true)) {// 成功,执行启动逻辑thread.start()} else {// 失败,说明已经启动,抛出异常或忽略logError("LiveListenerBus already started!")}
}

​源码位置​​: org.apache.spark.scheduler.LiveListenerBus#start

实例 4: 任务内存管理 (TaskMemoryManager.scala)

这是最复杂也是最能体现 CAS 优势的例子。在 org.apache.spark.memory.TaskMemoryManager中,管理着每个 Task 的内存分配。其中有一个核心结构叫 PageTable,它使用 AtomicReferenceArray来存储内存页(MemoryBlock)。

​核心思想​​: 多个线程可能同时为同一个 Task 申请或释放内存页。使用 CAS 可以无锁地管理页表的分配和释放,极大提升了并发性能。如果使用传统的锁,内存分配将成为巨大的性能瓶颈。

​简化代码逻辑​​:

// 页表是一个原子引用数组
private val pageTable = new AtomicReferenceArray[MemoryBlock](maxPages)// 分配一个新页的近似逻辑
def allocatePage(): MemoryBlock = {var pageNumber: Int = -1// 循环尝试,找到一个空位do {val foundPage = findFreePage() // 寻找一个空页号if (foundPage == -1) {return null // 没找到}// 关键:使用 CAS 操作,尝试将 pageTable 中 foundPage 位置从 null 设置为一个“占位符”// 如果成功,说明这个页被我抢到了;如果失败,说明被其他线程抢走了,继续循环寻找} while (!pageTable.compareAndSet(foundPage, null, placeholder))// 成功抢到页,执行实际的内存分配...val page = allocRealMemory()// 最后,将占位符替换为真正的内存页对象pageTable.set(foundPage, page)page
}

​源码位置​​: org.apache.spark.memory.TaskMemoryManager。这里的代码非常复杂,但 AtomicReferenceArray.compareAndSet是其无锁化的基石。

总结

在 Spark 源码中,CAS 思想主要体现在以下几个方面:

  1. ​状态标志位管理​​: 例如组件(RpcEndpoint, ListenerBus)的启动、停止状态。使用 AtomicBoolean.compareAndSet确保状态转换的原子性和线程安全,避免重复启动/关闭。

  2. ​无锁计数器和累加器​​: 使用 AtomicInteger.getAndIncrement等进行计数,避免 synchronized带来的性能开销。

  3. ​无锁数据结构​​: 最典型的例子是 TaskMemoryManager中的页表管理,使用 AtomicReferenceArray实现高性能的无锁内存分配,这是支撑 Spark 高效内存计算的关键之一。

  4. ​乐观锁控制​​: 广泛采用“循环尝试”的模式,先读取值,计算新值,然后通过 CAS 更新。如果失败(说明期间值被其他线程修改),则回滚并重试。

Spark 是一个高性能、高并发的分布式计算框架。在分布式环境下,锁(synchronized)容易导致线程阻塞和死锁,成为性能瓶颈。而 CAS 是一种​​乐观锁​​,它假设竞争不激烈,直接尝试更新,如果失败就重试。在大多数情况下,这种无锁操作的速度远快于锁操作,从而极大地提升了框架的并发性能和吞吐量。

这些代码可以在 Spark 源码项目中搜索 AtomiccompareAndSet等关键字,会发现大量的应用场景。

http://www.dtcms.com/a/395612.html

相关文章:

  • webpack-dev-server使用
  • 现有项目添加CMake
  • c语言学习_数组使用_扫雷2
  • 轻量级KVM管理工具 —— 筑梦之路
  • 第十四章:数据分析基础库NumPy(一)
  • 课题学习——SimCSE
  • gitee.com 有raw.githubusercontent.com一样的机制吗?
  • AI原生未来:新商业机会全景洞察与商业模式深度解构
  • Spark源码中的volatile
  • IDEA运行/调试配置找不到对应脚本的命令
  • 测试duckdb的C插件模板的编译加工和加载
  • 如何用AI工具开发一个轻量化CRM系统(二):需求分析
  • ARM架构学习9——LM75温度传感器+ADC转换器
  • 再见,Windows 10:升级 Windows 11 的必要性!
  • 数据结构从入门到实战——算法的时间复杂度
  • Rust字符串
  • 【图文详解】强化学习最新进展以及核心技术突破方向、核心技术架构
  • Linux SSH 安全加固与批量管理:密钥认证 + 自动化脚本 + OpenSSH 升级
  • 一个可以直接跑满本地带宽文件分享工具 开箱即用,可用于局域网内分享文件和文件夹
  • 探索AI无人直播技术:自动化带来的新机遇
  • Codeforces Round 1051 (Div. 2) D1题 题解记录
  • 计算机视觉、图像处理国际学术会议
  • redhat7.4升级到Oracle Linux8.10
  • PEFT库实战快速入门
  • PyTorch 核心知识手册:神经网络构建与训练基础
  • DeepSeek对数学工具的分类(2025.1.13)
  • 2025年9月打磨机器人新技术解析与常见知名品牌推荐
  • STM32开发(WiFi - ESP8266)
  • ArcGIS 车辆轨迹跟踪 视频制作 第一人称视觉跟踪
  • Ansible自动化运维平台部署