当前位置: 首页 > news >正文

并发编程——08 Semaphore源码分析

1 概述

  • Semaphore 是基于 AQS + CAS 实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore 默认使用非公平锁;

    在这里插入图片描述

2 构造函数

// AQS的实现
private final Sync sync;// 默认使用非公平锁
public Semaphore(int permits) {sync = new NonfairSync(permits);
}// 根据fair布尔值选择使用公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
  • syncSemaphore 内部用于实现同步控制的核心组件,它基于 **AQS(AbstractQueuedSynchronizer,抽象队列同步器)**实现。AQS 是 Java 并发包中同步组件(如锁、信号量)的“基石”,通过维护同步状态和等待队列来实现线程的同步与协作;

  • 构造函数 public Semaphore(int permits)

    • 入参 permits 表示信号量的许可数量(即同时允许多少个线程访问共享资源);
    • 该构造函数默认创建 NonfairSync 实例(非公平锁实现);
    • 非公平锁的特点:线程获取许可时不会严格遵循“先到先得”,新线程可能直接抢占许可,导致等待队列中的线程长时间阻塞,但吞吐量通常更高;
  • 构造函数 public Semaphore(int permits, boolean fair)

    • 入参 fair 是布尔值,用于指定是否使用公平锁
    • fairtrue,则创建 FairSync 实例(公平锁实现);若为 false,则创建 NonfairSync 实例(非公平锁);
    • 公平锁的特点:线程会严格按照“等待时间先后”获取许可,保证了等待队列中线程的公平性,但由于需要维护队列顺序,吞吐量可能略低。

3 公平锁与非公平锁

  • Semaphore 中公平锁与非公平锁的实现,可以在tryAcquireShared()方法中找到两种锁的区别;

    在这里插入图片描述

3.1 NonfairSync

  • Semaphore#NonfairSync#tryAcquireShared(int acquires)

    // 非公平锁,获取信号量
    protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
    }
    
    • 该方法是 NonfairSync(非公平锁实现类)中对 AQS 共享式获取逻辑的实现;
    • 它直接调用 nonfairTryAcquireShared(acquires) 方法,把“非公平获取信号量”的核心逻辑委托给该方法处理;
  • Semaphore#Sync#nonfairTryAcquireShared(int acquires)

    // 非公平锁,获取信号量
    final int nonfairTryAcquireShared(int acquires) {// 自旋for (;;) {// 获取Semaphore中可用的信号量数int available = getState();// 当前可用信号量数 - acquiresint remaining = available - acquires;// 可用信号量数不足 或 CAS操作获取信号量失败,返回  当前可用信号量数 - acquiresif (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
    }
    
    • 这个方法通过自旋 + CAS 实现非公平的信号量获取,步骤如下:

      • 自旋(for(;;) 循环):不断尝试获取信号量,直到成功或确定无法获取;

      • 获取当前可用信号量:通过 getState() 方法获取 Semaphore 中当前可用的信号量数量(available)。getState() 是 AQS 提供的方法,用于维护同步状态(这里同步状态代表可用信号量的数量);

      • 计算剩余信号量remaining = available - acquires,其中 acquires 是线程要获取的信号量数量;

      • CAS 尝试更新状态

        • remaining < 0,说明可用信号量不足,直接返回 remaining(表示获取失败);

        • remaining ≥ 0,则通过 compareAndSetState(available, remaining) 尝试原子性地将“可用信号量”从 available 更新为 remaining。若 CAS 成功,返回 remaining(表示获取成功);若 CAS 失败,说明有其他线程同时修改了信号量状态,继续自旋重试。

3.2 FairSync

  • Semaphore#FairSync#tryAcquireShared():该方法是 FairSync(公平锁实现类)对 AQS 共享式获取逻辑的实现,核心是保证线程“先到先得”的公平性;

    protected int tryAcquireShared(int acquires) {// 自旋for (;;) {// 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中)if (hasQueuedPredecessors())return -1;// 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
    }
    
  • 第 5 行的 hasQueuedPredecessors() 是 AQS 提供的方法,用于判断当前线程是否有“前驱节点”(即等待队列中存在比当前线程更早等待的线程)

    • 若返回 true,说明等待队列中已有更早的线程在等待,当前线程直接返回 -1(表示获取失败,会被 AQS 加入等待队列);

    • 这一步是公平锁与非公平锁的核心区别:公平锁会严格检查等待队列的顺序,避免“插队”,而非公平锁则直接尝试抢占;

  • 在通过 hasQueuedPredecessors() 确认“可以尝试抢占”后,后续逻辑与非公平锁类似:

    • 自旋(for(;;) 循环):不断尝试获取信号量,直到成功或确定无法获取;

    • 获取并计算信号量:通过 getState() 获取当前可用信号量(available),再计算获取 acquires 个信号量后的剩余量(remaining = available - acquires);

    • CAS 原子更新:若 remaining ≥ 0,通过 compareAndSetState(available, remaining) 尝试原子性更新信号量状态;若成功则返回 remaining(获取成功),若失败则继续自旋重试;若 remaining < 0,则直接返回 remaining(获取失败)。

4 acquire()

  • Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;

  • Semaphore#acquire():入口方法

    public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }
    
    • 作用:尝试获取 1 个信号量,若信号量不足则阻塞线程;支持响应线程中断;

    • 实现:委托给 sync(基于 AQS 的同步组件)的 acquireSharedInterruptibly(1) 方法执行,1 表示要获取的信号量数量;

  • AQS#acquireSharedInterruptibly(int arg):共享式可中断获取逻辑

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
    }
    
    • 中断检查if (Thread.interrupted())先检查线程是否被中断,若已中断则抛出 InterruptedException

    • 尝试获取资源:调用 tryAcquireShared(arg)(由 Semaphore 的公平/非公平锁实现,如前所述的 FairSyncNonfairSync 的逻辑)。若返回值 < 0,说明获取失败,进入 doAcquireSharedInterruptibly(arg) 处理阻塞逻辑;

  • AQS#doAcquireSharedInterruptibly(int arg):共享式阻塞获取(可中断)

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); // 获取前驱节点if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) { // 获取成功setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)p.next = null; // 断开原头节点引用,帮助GCfailed = false;return;}}// 若获取失败,判断是否需要挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 挂起线程并检查中断throw new InterruptedException();}} finally {if (failed)cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求}
    }
    
    • 加入等待队列addWaiter(Node.SHARED) 将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部;

    • 自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试 tryAcquireShared(arg)

    • 线程挂起与中断:若获取失败,通过 shouldParkAfterFailedAcquire 判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt() 会返回 true,进而抛出 InterruptedException

  • AQS#setHeadAndPropagate(Node node, int propagate):设置头节点并传播唤醒(共享模式关键)

    private void setHeadAndPropagate(Node node, int propagate) {Node h = head;setHead(node); // 将当前节点设为新的头节点// 若剩余资源>0、原头节点状态异常(或为null),则传播唤醒后续共享节点if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared()) // 若后续节点是共享模式,唤醒它doReleaseShared();}
    }
    
    • 设置头节点setHead(node) 将当前节点标记为新的头节点(头节点代表“已获取资源并执行完毕”的线程);

    • 传播唤醒逻辑:由于 Semaphore 是共享锁,获取资源的线程需要“传播”唤醒后续等待的共享节点。若满足 propagate > 0(剩余资源充足)或队列状态异常等条件,会调用 doReleaseShared() 唤醒后续节点,保证共享资源的并发获取效率。

5 release()

  • Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;

  • Semaphore#release():入口方法

    public void release() {sync.releaseShared(1);
    }
    
    • 作用:归还 1 个信号量

    • 实现:委托给 sync(基于 AQS 的同步组件)的 releaseShared(1) 方法执行,1 表示要归还的信号量数量;

  • AQS#releaseShared(int arg):共享式释放逻辑

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试归还信号量doReleaseShared(); // 唤醒等待队列中的线程return true;}return false;
    }
    
    • 尝试归还资源:调用 tryReleaseShared(arg)(由 Semaphore 实现),若归还成功则进入下一步;

    • 唤醒等待线程:调用 doReleaseShared() 唤醒 AQS 等待队列中阻塞的线程,让它们有机会获取刚归还的信号量;

  • Semaphore#Sync#tryReleaseShared(int releases):信号量归还的核心逻辑

    protected final boolean tryReleaseShared(int releases) {for (;;) { // 自旋int current = getState(); // 获取当前可用信号量(AQS的同步状态)int next = current + releases; // 计算归还后的信号量总数if (next < current) // 防止int溢出(若next为负,说明超出int最大值)throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) // CAS原子更新信号量return true;}
    }
    
    • 自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,确保“归还信号量”的操作是原子的(避免多线程同时归还时的状态冲突);

    • 状态更新getState() 获取当前信号量数量,next = current + releases 计算归还后的数量,再通过 compareAndSetState 原子性更新状态;

  • AQS#doReleaseShared():唤醒等待队列的共享线程

    private void doReleaseShared() {for (;;) { // 自旋Node h = head; // 获取等待队列的头节点if (h != null && h != tail) { // 队列非空且有等待线程int ws = h.waitStatus;if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // CAS更新状态失败,继续自旋unparkSuccessor(h); // 唤醒头节点的后继线程} // 处理JDK1.5的bug:将头节点状态设为PROPAGATE,保证共享模式下的唤醒传播else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) // 头节点未变化,说明唤醒操作完成break;}
    }
    
    • 自旋保证唤醒可靠性:循环处理队列,确保唤醒操作能覆盖所有需要唤醒的线程。

    • 状态判断与唤醒:若头节点状态为 SIGNAL,则通过 unparkSuccessor(h) 唤醒其后继线程;同时处理共享模式下的状态传播(PROPAGATE),保证多个共享线程能依次被唤醒。

http://www.dtcms.com/a/358327.html

相关文章:

  • 免费在线图片合成视频工具 ,完全免费
  • 文件夹命名软件,批量操作超简单
  • 美团8-30:编程题
  • 深入解析前缀和算法:原理、实现与应用
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(六)
  • react组件
  • C++优先级队列priority_queue的模拟实现
  • Trailing Zeros (计算 1 ~ n 中质因子 p 的数量)
  • Java全栈开发面试实战:从基础到高并发的全面解析
  • Redis数据类型概览:除了五大基础类型还有哪些?
  • leetcode643. 子数组最大平均数 I
  • AI-调查研究-65-机器人 机械臂控制技术的前世今生:从PLC到MPC
  • vscode+cmake+mingw64+opencv环境配置
  • wpf之依赖属性
  • 具有类人先验知识的 Affordance-觉察机器人灵巧抓取
  • C++_多态和虚构
  • 卡片一放,服务直达!实现信息零层级触达
  • Python实现京东商品数据自动化采集的实用指南
  • (双指针)Leetcode283.移动零-替换数字类别+Leetcode15. 三数之和
  • UI前端大数据可视化实战策略:如何设计符合用户认知的数据可视化界面?
  • 【计算机网络】HTTP是什么?
  • Ansible Playbook 调试与预演指南:从语法检查到连通性排查
  • 一体化步进伺服电机在汽车线束焊接设备中的应用案例
  • MongoDB 源码编译与调试:深入理解存储引擎设计 内容详细
  • HarmonyOS元服务开发
  • 深入解析HarmonyOS:UIAbility与Page的生命周期协同
  • TensorFlow 面试题及详细答案 120道(71-80)-- 性能优化与调试
  • 坚鹏请教DEEPSEEK:请问中国领先的AI智能体服务商有哪些?知行学
  • 深度学习系列 | Seq2Seq端到端翻译模型
  • 离线大文件与断点续传:ABP + TUS + MinIO/S3