AQS 基本思想与源码分析
充分了解 AbstractQueuedSynchronizer 对于深入理解并发编程是有益处的,它是用来构建锁或者其他同步组件的基础框架,我们常用的同步工具类如 CountDownLatch、Semaphore、ThreadPoolExecutor、ReentrantLock 和 ReentrantReadWriteLock 内部都用到了它。
以上提到的同步工具类,都是用静态内部类继承了 AQS。因此,平时在使用这些同步工具类时,我们感觉不到 AQS 的存在。
AbstractQueuedSynchronizer 中有一个很重要的变量 state:
/*** The synchronization state.* 当 state 为 0 时,表示锁没有被占用。*/private volatile int state;
不论是 JDK 还是我们自定义的锁/工具类,都是在围绕这个 state 做文章。比如上锁时要将 state 置为 1,而解锁时将其置为 0,只不过并不是由自定义的锁直接操作,而是通过 AQS 去直接操作 state。
可以这样理解锁和 AQS 之间的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节。
- AQS 面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
锁和 AQS 很好地隔离了使用者和实现者所需关注的领域。
1、AQS 的使用方法
1.1 模板方法
AQS 使用了模板方法设计模式,要实现自定义的同步工具类,推荐使用静态内部类继承 AQS,并根据需求重写对应的模板方法:
/*** 尝试以独占模式获取同步状态。该方法应该查询对象的状态是否允许在独占模式下被获取,* 如果允许就去获取它。** 这个方法总是被执行获取的线程调用。如果这个方法返回 false,* 获取方法可能会将尚未排队的线程排队,直到其他线程发出释放信号。* 这个可以用来实现 {@link Lock#tryLock()} 方法。* * @param arg 获取参数。这个值总是传递给获取方法,或者是在进入条件等待时保存。* 否则,该值是未解释的,可以表示你喜欢的任何值。* @return true 表示成功。一旦成功,这个对象就被获得了。* @throws IllegalMonitorStateException 如果获取动作会将此同步器置于非法状态就抛出这个异常。* 必须以一致的方式抛出此异常,才能使同步正常工作。* @throws UnsupportedOperationException 如果不支持独占模式抛出此异常。**/protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}/*** 在独占模式下,尝试设置 state 状态变量来反映释放操作。** 这个方法总是被执行释放操作的线程调用。** @param arg 释放参数. 这个值总是传递给一个释放方法,或者是进入等待条件时的当前状态值。* 否则,该值是未解释的,可以表示您喜欢的任何值。* @return 如果这个对象当前处于完全释放的状态,就返回 true,这样在等待中的线程就可以尝试* 去获取。否则,返回 false。* * @throws IllegalMonitorStateException,UnsupportedOperationException* 同 tryAcquire() 方法*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 尝试在共享模式下获取。该方法应该查询对象的状态是否允许在共享模式下被获取,* 如果允许就去获取它。** 参数、返回值、抛出的异常同 tryAcquire(),只不过本方法是在共享模式下。*/protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}/*** 共享模式下的 tryRelease()。*/protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/*** 如果同步是被当前(调用)线程独占的,就返回 true。* 每次调用 ConditionObject 类中非等待的方法时,都会调用此方法。(等待的方法会调用 release())* 这个方法只在 ConditionObject 的方法内部调用,因此如果没用到 Condition 就不用定义这个方法。*/protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
除此之外,还有一些模板方法可能会在实现自定义锁的过程中被调用到:
这些模板方法负责独占式&共享式获取与释放同步状态,同步状态和查询同步队列中的等待线程情况。
此外,跟 state 相关的还有三个方法 getState()、setState()、compareAndSetState() 分别用来获取同步状态、设置同步状态、使用 CAS 设置同步状态。
1.2 举例
如果要实现一个独占锁,就要重写 tryAcquire()、tryRelease() 和 isHeldExclusively(),而实现共享锁就要重写 tryAcquireShared()、tryReleaseShared() 。
比如我要自定义一个不可重入的显式独占锁,那么就要实现 Lock 接口,定义静态内部类继承 AQS,实现获取锁、释放锁等方法:
public class MyLock implements Lock {/* MyLock 有关锁的操作仅需要将操作代理到 Sync 上即可*/private final Sync sync = new Sync();private final static class Sync extends AbstractQueuedSynchronizer {// 判断处于独占状态@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}// 获得锁@Overrideprotected boolean tryAcquire(int i) {// CAS 操作设置 state 保证同步if (compareAndSetState(0, 1)) {// 设置占有独占锁的线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 释放锁@Overrideprotected boolean tryRelease(int i) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 返回一个Condition,每个condition都包含了一个condition队列public Condition newCondition() {return new ConditionObject();}}@Overridepublic void lock() {System.out.println(Thread.currentThread().getName() + " ready get lock");sync.acquire(1);System.out.println(Thread.currentThread().getName() + " already got lock");}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long timeout, TimeUnit timeUnit) throws InterruptedException {return sync.tryAcquireNanos(1, timeUnit.toNanos(timeout));}@Overridepublic void unlock() {System.out.println(Thread.currentThread().getName() + " ready release lock");sync.release(1);System.out.println(Thread.currentThread().getName() + " already released lock");}@Overridepublic Condition newCondition() {return sync.newCondition();}
}
Lock 接口中的方法实现都是借助于静态内部类 Sync 的实例,调用相应的方法即可。在 lock() 和 unlock() 中分别调用 Sync 对象的 acquire() 和 release() 方法,其内部调用了 Sync 内部重写的 tryAcquire() 和 tryRelease():
AbstractQueuedSynchronizer.java:public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
2、AQS 的基本思想 CLH 队列锁
AQS 以 CLH(Craig、Landin、Hagersten 三人名字首字母)队列锁为基本思想,而 CLH 队列锁是一种基于链表的可扩展、高性能、公平的 FIFO 自旋锁。申请锁的线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
先把申请锁的线程打包成一个 QNode,myPred 指向前驱节点,locked 表示当前线程是否需要锁:
QNode 入队要按照先后顺序排列,当线程 A 需要获取锁时,要把包装该线程的 QNode 节点加入到 CLH 队列尾部,并且把 locked 设为 true:
随后线程 B 也需要获取锁被加入到 CLH 队列尾部:
加入队列的 QNode 节点要自旋检测前一个节点的 locked 是否变为了 false,变成 false 就认为前一个节点已经释放了锁,该轮到当前节点获取锁了:
QNode_A 拿到锁后停止自旋。在线程执行完任务释放锁后,也要把 locked 置为 false。
AQS 在实现时是基于 CLH 算法但是又做了很多改进,例如在 QNode 中加入 next 域实现双向队列,自旋检测锁状态时有次数限制等。
3、AQS 锁流程源码分析
AQS 使用头指针(head)与尾指针(tail)参与管理这个双向同步队列。其一般工作流程为:
- 初始状态下,head 与 tail 都为 null。当有线程获取同步器失败后,需要打包成 Node 节点进入同步队列,此时先通过 new Node() 创建一个空节点作为头节点,并且让尾节点也指向这个初始头节点。然后把入队节点添加到尾节点之后,由于可能会有多个竞争失败的线程所在的节点都要做入队操作,因此添加到队尾的这个动作要使用 CAS 原子操作,CAS 失败的 Node 要自旋直到成功添加到队尾为止。(addWaiter() 和 acquireQueued() 方法)
- 在同步队列中等待获取同步器的节点,也会进行自旋操作,尝试再次获取同步器。规则是只有当前节点的前驱节点是头节点才有获取同步器的资格,如果该节点成功获取到,那么就释放掉原来的头节点,并把当前节点设置为新的头节点(也就是说头节点一般是占有锁且正在执行线程任务的节点)。
- 队列中的线程如果一直无法获得同步器,那么在一定时间后就要先中断,一定时间是通过前驱节点的等待状态变化决定的。所有入队节点的等待状态 waitStatus 初始值都为 0,在上一步的自旋过程中,如果当前节点没有获取到同步器,那么就要给前驱节点的 waitStatus 设置为 SIGNAL 表示将要把后继节点(前驱结点的后继节点就是当前节点)阻塞。待自旋进入第二次循环时,当前节点如果还没拿到同步器,并且检查到它的前置节点已经是 SIGNAL 状态了,就要使用 LockSupport.park() 暂停当前线程,直到它的前置节点执行完任务释放同步器时再唤醒它。
3.1 acquire()
在自定义锁时,需要调用 AQS 的 acquire() 来获取锁:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 其实就是调用 Thread.currentThread().interrupt()selfInterrupt();}
如果 tryAcquire() 返回 true 说明拿到了锁,拿锁过程直接结束,后续代码就不会执行。否则,要先后执行 addWaiter() 和 acquireQueued()。
3.2 addWaiter()
addWaiter() 会在线程获取锁失败后,将该线程封装进 Node 并将其入队:
/*** 等待队列的队尾,延迟初始化。只有在队尾添加新的等待 Node 时* 才由 enq 方法修改。*/private transient volatile Node tail;/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {// 把当前线程包装成一个 NodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {// 如果尾节点不为空,就让 node 的 prev 字段指向这个尾节点 pred。node.prev = pred;// CAS 操作把 node 设置为队列尾节点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果队列没有初始化,则执行初始化操作,否则 CAS 把 node 加入到队列。enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {// 自旋,直到成功把 node 添加为队列尾节点。for (;;) {Node t = tail;if (t == null) { // Must initialize// 原子操作设置头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 原子操作设置尾节点if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
addWaiter() 内部在添加一个 Node 到队尾时,会先直接用 compareAndSetTail() 尝试将 Node 加到队尾。如果尝试失败,才会在 enq() 内自旋,直到成功的将 Node 加到队尾。
3.3 acquireQueued()
acquireQueued() 负责在独占且不可中断模式下,为已经在队列中的线程获取同步器。该方法会返回一个布尔类型值 interrupted 表示当前线程是否应该被中断:
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {// 因为 acquire() 的 if 语句中 tryAcquire() 失败了,因此这里的 failed 初始化为 true。boolean failed = true;try {boolean interrupted = false;// 自旋,再次用 tryAcquire() 尝试拿锁for (;;) {final Node p = node.predecessor();// 如果当前节点的前驱结点是头节点,并且当前节点拿锁成功if (p == head && tryAcquire(arg)) {// 把当前节点设为头节点,并把原来的头节点从队列中删除并返回 interrupted。setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 检查前驱节点状态,并执行中断操作if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 如果失败,就取消当前节点获取锁的操作。if (failed)cancelAcquire(node);}}
shouldParkAfterFailedAcquire() 会根据前驱节点的 waitStatus 状态判断是否应该在当前节点获取锁失败之后中断当前线程,返回 true 表示应该中断:
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前驱节点状态如果已经是 SIGNAL,就应该中断当前线程int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 前驱节点状态值大于 0 说明其取消获取线程,从队列中移除这些节点if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
pred 是 node 的前驱结点,根据前驱结点的状态做不同的操作:
- 如果是 Node.SIGNAL,那么就返回 true 表示前驱节点的后继节点,也就是当前节点需要被 park;
- 如果状态数值大于 0,说明前驱节点处于取消状态,那么就要向前找,一直找到一个不是取消状态的节点,让这个节点作为 node 的前驱节点;
- 如果是 0 或者 PROPAGATE(-3),就用 CAS 操作把这个前驱节点的状态值设置为 Node.SIGNAL。
这里注意,每个节点的 waitStatus 初始值都为 0,在 acquireQueued() 的 for 循环中,第一次循环假设仍没有拿到锁,那么进入到 shouldParkAfterFailedAcquire(),把前驱结点的 waitStatus 置为 Node.SIGNAL 并返回 false;然后执行第二次 for 循环,假设也没拿到锁,那么 shouldParkAfterFailedAcquire() 就会返回 true,使得 parkAndCheckInterrupt() 得以执行:
/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
其实就是使用 LockSupport.park() 暂停当前线程,并返回线程状态。如果线程已经中断,那么设置 interrupted 为 true。
3.4 release()
释放锁的操作就很简单了:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 传入的 node 是头节点,要找到头节点后第一个不是取消状态的节点,唤醒其中的线程。Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
tryRelease() 为 true 表示成功释放了同步器,那么就要用 LockSupport.unpark() 把当前节点(即头节点)后第一个不是 CANCELLED 状态的节点(如果有的话)中的线程唤醒。
4、总结
AQS 是公平锁(因为新来的节点都是加到队尾)。公平锁与非公平锁的实现几乎一模一样,在 ReentrantLock 中,提供了 FairSync 和 NonfairSync 两个内部类,分别实现公平锁和非公平锁。比较这两个内部类的代码,几乎一模一样,只不过在 tryAcquire() 拿锁时,公平锁要先判断队列中是否有前驱节点已经在等待中(hasQueuedPredecessors())。
不可重入锁在同一线程做递归调用时会发生死锁,需要在自定义锁时对 state 做特殊处理(即实现可重入锁,一般都是要实现可重入锁的)。
AQS 参考资料:
AbstractQueuedSynchronizer的介绍和原理分析
深入理解AbstractQueuedSynchronizer(AQS)
[官方文档AbstractQueuedSynchronizer](