并发编程——08 Semaphore源码分析
1 概述
-
Semaphore 是基于 AQS + CAS 实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore 默认使用非公平锁;
2 构造函数
// AQS的实现
private final Sync sync;// 默认使用非公平锁
public Semaphore(int permits) {sync = new NonfairSync(permits);
}// 根据fair布尔值选择使用公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
-
sync
是Semaphore
内部用于实现同步控制的核心组件,它基于 **AQS(AbstractQueuedSynchronizer,抽象队列同步器)**实现。AQS 是 Java 并发包中同步组件(如锁、信号量)的“基石”,通过维护同步状态和等待队列来实现线程的同步与协作; -
构造函数
public Semaphore(int permits)
:- 入参
permits
表示信号量的许可数量(即同时允许多少个线程访问共享资源); - 该构造函数默认创建
NonfairSync
实例(非公平锁实现); - 非公平锁的特点:线程获取许可时不会严格遵循“先到先得”,新线程可能直接抢占许可,导致等待队列中的线程长时间阻塞,但吞吐量通常更高;
- 入参
-
构造函数
public Semaphore(int permits, boolean fair)
:- 入参
fair
是布尔值,用于指定是否使用公平锁; - 若
fair
为true
,则创建FairSync
实例(公平锁实现);若为false
,则创建NonfairSync
实例(非公平锁); - 公平锁的特点:线程会严格按照“等待时间先后”获取许可,保证了等待队列中线程的公平性,但由于需要维护队列顺序,吞吐量可能略低。
- 入参
3 公平锁与非公平锁
-
Semaphore 中公平锁与非公平锁的实现,可以在
tryAcquireShared()
方法中找到两种锁的区别;
3.1 NonfairSync
-
Semaphore#NonfairSync#tryAcquireShared(int acquires)
// 非公平锁,获取信号量 protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }
- 该方法是
NonfairSync
(非公平锁实现类)中对 AQS 共享式获取逻辑的实现; - 它直接调用
nonfairTryAcquireShared(acquires)
方法,把“非公平获取信号量”的核心逻辑委托给该方法处理;
- 该方法是
-
Semaphore#Sync#nonfairTryAcquireShared(int acquires)
// 非公平锁,获取信号量 final int nonfairTryAcquireShared(int acquires) {// 自旋for (;;) {// 获取Semaphore中可用的信号量数int available = getState();// 当前可用信号量数 - acquiresint remaining = available - acquires;// 可用信号量数不足 或 CAS操作获取信号量失败,返回 当前可用信号量数 - acquiresif (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }
-
这个方法通过自旋 + CAS 实现非公平的信号量获取,步骤如下:
-
自旋(
for(;;)
循环):不断尝试获取信号量,直到成功或确定无法获取; -
获取当前可用信号量:通过
getState()
方法获取Semaphore
中当前可用的信号量数量(available
)。getState()
是 AQS 提供的方法,用于维护同步状态(这里同步状态代表可用信号量的数量); -
计算剩余信号量:
remaining = available - acquires
,其中acquires
是线程要获取的信号量数量; -
CAS 尝试更新状态:
-
若
remaining < 0
,说明可用信号量不足,直接返回remaining
(表示获取失败); -
若
remaining ≥ 0
,则通过compareAndSetState(available, remaining)
尝试原子性地将“可用信号量”从available
更新为remaining
。若 CAS 成功,返回remaining
(表示获取成功);若 CAS 失败,说明有其他线程同时修改了信号量状态,继续自旋重试。
-
-
-
3.2 FairSync
-
Semaphore#FairSync#tryAcquireShared()
:该方法是FairSync
(公平锁实现类)对 AQS 共享式获取逻辑的实现,核心是保证线程“先到先得”的公平性;protected int tryAcquireShared(int acquires) {// 自旋for (;;) {// 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中)if (hasQueuedPredecessors())return -1;// 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }
-
第 5 行的
hasQueuedPredecessors()
是 AQS 提供的方法,用于判断当前线程是否有“前驱节点”(即等待队列中存在比当前线程更早等待的线程);-
若返回
true
,说明等待队列中已有更早的线程在等待,当前线程直接返回-1
(表示获取失败,会被 AQS 加入等待队列); -
这一步是公平锁与非公平锁的核心区别:公平锁会严格检查等待队列的顺序,避免“插队”,而非公平锁则直接尝试抢占;
-
-
在通过
hasQueuedPredecessors()
确认“可以尝试抢占”后,后续逻辑与非公平锁类似:-
自旋(
for(;;)
循环):不断尝试获取信号量,直到成功或确定无法获取; -
获取并计算信号量:通过
getState()
获取当前可用信号量(available
),再计算获取acquires
个信号量后的剩余量(remaining = available - acquires
); -
CAS 原子更新:若
remaining ≥ 0
,通过compareAndSetState(available, remaining)
尝试原子性更新信号量状态;若成功则返回remaining
(获取成功),若失败则继续自旋重试;若remaining < 0
,则直接返回remaining
(获取失败)。
-
4 acquire()
-
Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;
-
Semaphore#acquire()
:入口方法public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }
-
作用:尝试获取 1 个信号量,若信号量不足则阻塞线程;支持响应线程中断;
-
实现:委托给
sync
(基于 AQS 的同步组件)的acquireSharedInterruptibly(1)
方法执行,1
表示要获取的信号量数量;
-
-
AQS#acquireSharedInterruptibly(int arg)
:共享式可中断获取逻辑public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }
-
中断检查:
if (Thread.interrupted())
先检查线程是否被中断,若已中断则抛出InterruptedException
; -
尝试获取资源:调用
tryAcquireShared(arg)
(由Semaphore
的公平/非公平锁实现,如前所述的FairSync
或NonfairSync
的逻辑)。若返回值< 0
,说明获取失败,进入doAcquireSharedInterruptibly(arg)
处理阻塞逻辑;
-
-
AQS#doAcquireSharedInterruptibly(int arg)
:共享式阻塞获取(可中断)private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); // 获取前驱节点if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) { // 获取成功setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)p.next = null; // 断开原头节点引用,帮助GCfailed = false;return;}}// 若获取失败,判断是否需要挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 挂起线程并检查中断throw new InterruptedException();}} finally {if (failed)cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求} }
-
加入等待队列:
addWaiter(Node.SHARED)
将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部; -
自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试
tryAcquireShared(arg)
; -
线程挂起与中断:若获取失败,通过
shouldParkAfterFailedAcquire
判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt()
会返回true
,进而抛出InterruptedException
;
-
-
AQS#setHeadAndPropagate(Node node, int propagate)
:设置头节点并传播唤醒(共享模式关键)private void setHeadAndPropagate(Node node, int propagate) {Node h = head;setHead(node); // 将当前节点设为新的头节点// 若剩余资源>0、原头节点状态异常(或为null),则传播唤醒后续共享节点if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared()) // 若后续节点是共享模式,唤醒它doReleaseShared();} }
-
设置头节点:
setHead(node)
将当前节点标记为新的头节点(头节点代表“已获取资源并执行完毕”的线程); -
传播唤醒逻辑:由于
Semaphore
是共享锁,获取资源的线程需要“传播”唤醒后续等待的共享节点。若满足propagate > 0
(剩余资源充足)或队列状态异常等条件,会调用doReleaseShared()
唤醒后续节点,保证共享资源的并发获取效率。
-
5 release()
-
Semaphore 默认实现的是非公平锁,下面就按非公平锁的实现进行源码分析;
-
Semaphore#release()
:入口方法public void release() {sync.releaseShared(1); }
-
作用:归还 1 个信号量;
-
实现:委托给
sync
(基于 AQS 的同步组件)的releaseShared(1)
方法执行,1
表示要归还的信号量数量;
-
-
AQS#releaseShared(int arg)
:共享式释放逻辑public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试归还信号量doReleaseShared(); // 唤醒等待队列中的线程return true;}return false; }
-
尝试归还资源:调用
tryReleaseShared(arg)
(由Semaphore
实现),若归还成功则进入下一步; -
唤醒等待线程:调用
doReleaseShared()
唤醒 AQS 等待队列中阻塞的线程,让它们有机会获取刚归还的信号量;
-
-
Semaphore#Sync#tryReleaseShared(int releases)
:信号量归还的核心逻辑protected final boolean tryReleaseShared(int releases) {for (;;) { // 自旋int current = getState(); // 获取当前可用信号量(AQS的同步状态)int next = current + releases; // 计算归还后的信号量总数if (next < current) // 防止int溢出(若next为负,说明超出int最大值)throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) // CAS原子更新信号量return true;} }
-
自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,确保“归还信号量”的操作是原子的(避免多线程同时归还时的状态冲突);
-
状态更新:
getState()
获取当前信号量数量,next = current + releases
计算归还后的数量,再通过compareAndSetState
原子性更新状态;
-
-
AQS#doReleaseShared()
:唤醒等待队列的共享线程private void doReleaseShared() {for (;;) { // 自旋Node h = head; // 获取等待队列的头节点if (h != null && h != tail) { // 队列非空且有等待线程int ws = h.waitStatus;if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // CAS更新状态失败,继续自旋unparkSuccessor(h); // 唤醒头节点的后继线程} // 处理JDK1.5的bug:将头节点状态设为PROPAGATE,保证共享模式下的唤醒传播else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) // 头节点未变化,说明唤醒操作完成break;} }
-
自旋保证唤醒可靠性:循环处理队列,确保唤醒操作能覆盖所有需要唤醒的线程。
-
状态判断与唤醒:若头节点状态为
SIGNAL
,则通过unparkSuccessor(h)
唤醒其后继线程;同时处理共享模式下的状态传播(PROPAGATE
),保证多个共享线程能依次被唤醒。
-