深入AQS源码:解密Condition的await与signal
在Java并发编程中,ReentrantLock
配合Condition
,是我们替代synchronized
和wait/notify
的常用工具。它提供了更灵活的线程等待和唤醒机制。那么,当我们调用condition.await()
时,线程到底经历了什么?signal()
又是如何唤醒它的?要回答这些问题,就必须深入AQS(AbstractQueuedSynchronizer
)的源码,跟踪一个线程的全过程。
await()
的执行路径
当一个已经获取了锁的线程调用condition.await()
时,它会进入一段精心设计的等待旅程。
public final void await() throws InterruptedException {if (Thread.interrupted()) throw new InterruptedException();// 1. 创建节点并加入条件队列Node node = addConditionWaiter();// 2. 释放锁(完全释放,包括重入次数)int savedState = fullyRelease(node);int interruptMode = 0;// 3. 自旋等待:直到节点被移动到同步队列(通过signal)或线程被中断while (!isOnSyncQueue(node)) {LockSupport.park(this); // 挂起线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;}// 4. 重新竞争锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 5. 清理无效节点if (node.nextWaiter != null) unlinkCancelledWaiters();// 6. 处理中断if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}
我们来分解一下这个过程。首先,代码会调用addConditionWaiter()
,将当前线程包装成一个Node
节点,并加入到条件队列中。
private Node addConditionWaiter() {Node t = lastWaiter;// 清除被取消的尾节点if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 将当前线程保存在Node中Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;else// 队尾插入t.nextWaiter = node;// 更新lastWaiterlastWaiter = node;return node;
}
可以看到,addConditionWaiter
的逻辑就是将新节点追加到条件队列这个链表的末尾。这个条件队列由firstWaiter
和lastWaiter
指针维护,专门用来存放调用了await()
的线程。
节点入队后,线程必须释放它当前持有的锁,这由fullyRelease()
完成。
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {// 成功释放同步状态failed = false;return savedState;} else {// 不成功释放同步状态抛出异常throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}
这个方法会彻底释放当前线程持有的锁,无论重入了多少次,并将原始的同步状态savedState
保存下来,以便后续恢复。锁释放后,线程就进入while (!isOnSyncQueue(node))
循环,并通过LockSupport.park(this)
将自己挂起,进入WAITING
状态,静静等待唤醒信号。
signal()
的执行路径
唤醒过程由另一个持有锁的线程调用signal()
来触发。
public final void signal() {// 1. 先检测当前线程是否已经获取lockif (!isHeldExclusively())throw new IllegalMonitorStateException();// 2. 获取等待队列中第一个节点,之后的操作都是针对这个节点Node first = firstWaiter;if (first != null)doSignal(first);
}
signal()
方法会先检查当前线程是否持有锁,然后获取条件队列的头节点,并调用doSignal()
。
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 1. 将头结点从等待队列中移除first.nextWaiter = null;// 2. while中transferForSignal方法对头结点做真正的处理} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
doSignal
的核心是循环调用transferForSignal
,这个方法负责将等待的线程节点从条件队列转移到同步队列,这是整个唤醒机制的关键。
final boolean transferForSignal(Node node) {// 1. 更新状态为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 2. 将该节点移入到同步队列中去Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
transferForSignal
主要做了两件事:一是通过CAS将节点的waitStatus
从CONDITION
改为0;二是调用enq()
方法,将该节点追加到AQS同步队列的队尾。节点入队后,线程就从“等待条件”的状态,转变成了“等待获取锁”的状态,并通过LockSupport.unpark()
来唤醒。
await()
方法的返回
一旦在signal()
中unpark
被调用,原先在await()
中被挂起的线程就会醒来。此时,因为它所在的节点已经被移入了同步队列,while (!isOnSyncQueue(node))
的循环条件不再满足,循环退出。
接下来,线程会执行acquireQueued(node, savedState)
。这会进入AQS标准的获取锁流程,线程会和其他所有正在排队获取锁的线程一样,在同步队列中竞争,直到成功获取之前保存的savedState
数量的锁。
当acquireQueued()
成功返回,await()
方法才算执行完毕,在进行一些清理后,正式返回。
总结来看,Condition
的实现精髓在于它维护了两个队列:一个用于等待条件的条件队列,一个用于竞争锁的同步队列。await()
过程是“加入条件队列 -> 释放锁 -> 等待”,而signal()
过程则是“将节点从条件队列转移到同步队列”。一个线程必须先被signal()
,然后重新成功竞争到锁,才能最终从await()
方法中退出。