AQS(AbstractQueuedSynchronizer)解析
文章目录
- 一、AQS简介
- 二、核心设计思想
- 2.1 核心设计思想回顾
- 2.2 CLH锁队列简介
- 2.3 AQS对CLH队列的改动及其原因
- 三、核心组件详解
- 3.1 `state` 状态变量
- 3.2 同步队列 (FIFO双向链表)
- 四、核心方法深度解析
- 4.1 获取同步状态 (独占模式) - `acquire(int arg)`
- 4.2 释放同步状态 (独占模式) - `release(int arg)`
- 五、条件变量 `ConditionObject`
- 六、AQS的设计特点总结
- 七、应用实例:`ReentrantLock`
- 八、总结
推荐移步下面的文章,写的非常详细
美团AQS解析通过reentrantlock反观aqs
博客园
Java全栈
只是因为今天刚复习完操作系统的锁,发现AQS有很多之前不明白的思想突然有点开朗所以整理了一下。
并发笔记-锁(一)
一、AQS简介
AQS是java.util.concurrent.locks
包下的一个抽象类,它是构建锁和同步组件(如ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock, FutureTask等)的基础框架。理解AQS是掌握Java并发包高级用法的关键。
二、核心设计思想
2.1 核心设计思想回顾
AQS的核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁的变体实现的,即将暂时获取不到锁的线程加入到队列中。
它主要通过以下两个核心组件实现:
volatile int state
: 一个原子更新的状态变量,代表同步状态。- FIFO同步队列 (CLH队列的变体): 一个用于管理等待获取同步状态的线程的队列。
AQS的设计模式是模板方法模式。它定义了获取和释放同步状态的主要逻辑(骨架),但将具体的同步状态的判断和修改操作(如tryAcquire
, tryRelease
)留给子类去实现。
2.2 CLH锁队列简介
在深入AQS的队列之前,有必要了解一下原始的CLH锁(Craig, Landin, and Hagersten lock)。CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁。
以下是一个简化的CLH锁实现示例,帮助理解其原理:
public class CLHLock {// CLH队列的节点static class CLHNode {// 当前节点的锁状态,true表示已持有锁或正在等待获取锁volatile boolean locked = false;}// 线程本地存储,每个线程持有自己的节点private ThreadLocal<CLHNode> currentNode = ThreadLocal.withInitial(CLHNode::new);// 线程本地存储,每个线程持有前驱节点的引用private ThreadLocal<CLHNode> predecessorNode = new ThreadLocal<>();// 尾节点的原子引用,用于队列的尾部操作private AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());// 获取锁public void lock() {CLHNode node = currentNode.get();node.locked = true; // 设置当前节点状态为等待或持有锁// 原子操作:将当前节点设置为新的尾节点,并获取原尾节点(前驱)CLHNode predecessor = tail.getAndSet(node);predecessorNode.set(predecessor);// 自旋等待前驱节点释放锁while (predecessor.locked) {// 自旋等待,直到前驱节点的locked变为false}// 前驱节点已释放锁,当前线程成功获取锁}// 释放锁public void unlock() {CLHNode node = currentNode.get();node.locked = false; // 设置当前节点状态为已释放// 为下次使用准备新节点(重用节点可能会有问题,在实际实现中通常需要新建)currentNode.set(predecessorNode.get());}
}
原始CLH锁的核心思想:
-
隐式链表结构: 锁维护一个等待线程队列,这个队列是通过节点之间的指针(通常是每个节点指向其前驱节点)隐式形成的。
// 每个节点持有对前驱节点的引用,形成隐式链表 private ThreadLocal<CLHNode> predecessorNode = new ThreadLocal<>();
-
节点状态: 每个线程请求锁时,会创建一个节点。节点通常包含一个状态位(例如,
locked
或waiting
),表示该线程是否获得了锁或者是否正在等待。static class CLHNode {volatile boolean locked = false; // true表示正在等待或持有锁 }
-
尾指针: 锁本身维护一个指向队列尾部的指针(
tail
)。private AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
-
获取锁:
- 线程创建一个新节点,将其
locked
状态设为true
(表示它在等待或想获取锁)。 - 线程通过原子操作(如
getAndSet
或compareAndSet
)将自己的节点设置为新的队尾,并获取到之前队尾的节点(即它的前驱节点)。 - 然后,线程开始自旋,不断检查其前驱节点的
locked
状态。 - 当前驱节点的
locked
状态变为false
时,意味着前驱已经释放了锁,当前线程就可以停止自旋,成功获取锁。
public void lock() {CLHNode node = currentNode.get();node.locked = true;// 原子操作:设置新的尾节点,返回旧的尾节点(前驱)CLHNode predecessor = tail.getAndSet(node);predecessorNode.set(predecessor);// 自旋等待前驱节点释放锁while (predecessor.locked) {// 自旋} }
- 线程创建一个新节点,将其
-
释放锁:
- 持有锁的线程将自己节点的
locked
状态设为false
。这将允许其后继节点(如果存在)停止自旋并获取锁。
public void unlock() {CLHNode node = currentNode.get();node.locked = false; // 释放锁,通知后继节点 }
- 持有锁的线程将自己节点的
-
优点:
- 公平性: 线程按照它们到达的顺序获取锁(FIFO)。
- 可扩展性好: 在多处理器系统上,每个线程只在本地缓存的前驱节点状态上自旋,减少了对共享内存的竞争和缓存一致性流量。
// 每个线程都在自己的前驱节点上自旋 while (predecessor.locked) {// 本地自旋,减少全局竞争 }
- 无饥饿: 由于是公平的,不会产生饥饿现象。
-
缺点:
- 自旋消耗CPU: 即使是高效的自旋,在锁被长时间持有时,等待的线程仍然会消耗CPU资源。
- NUMA系统下的性能问题: 虽然本地自旋,但在NUMA(Non-Uniform Memory Access)架构下,如果前驱节点在远程内存,访问开销依然存在。
CLH锁到AQS的演进
理解了原始CLH锁的原理后,就能更好地理解AQS为什么要对其进行改进。AQS借鉴了CLH锁的FIFO公平性和队列结构,但对其进行了重要改进:
- 将纯自旋改为阻塞等待(使用
LockSupport.park
) - 引入
waitStatus
等复杂状态管理 - 使用双向链表结构
- 支持取消、中断等高级特性
2.3 AQS对CLH队列的改动及其原因
AQS的同步队列是CLH锁队列的一个重要变体。它继承了CLH队列的基本思想(FIFO、节点链接),但做了关键的修改以适应更广泛的同步场景,特别是支持线程阻塞而不是纯自旋。
AQS队列与原始CLH队列的主要区别和改动:
-
节点结构 (
Node
):- 双向链表: AQS的
Node
包含prev
和next
指针,构成一个双向链表。原始CLH通常是单向的(每个节点知道其前驱)。- 原因: 双向链表使得节点的插入和移除(尤其是在处理
CANCELLED
节点时)更加方便和高效。例如,unparkSuccessor
方法中从尾部向前遍历寻找下一个有效等待者时,prev
指针非常有用。
- 原因: 双向链表使得节点的插入和移除(尤其是在处理
waitStatus
状态字段: AQS的Node
有一个waitStatus
字段,它比原始CLH中简单的locked
状态更复杂,用于协调线程的阻塞和唤醒。状态包括SIGNAL
,CANCELLED
,CONDITION
,PROPAGATE
。- 原因: AQS需要管理线程的阻塞 (
park
) 和唤醒 (unpark
),而不仅仅是自旋。waitStatus
用于确保前驱节点在释放锁时能够可靠地唤醒后继节点。例如,SIGNAL
状态表示后继节点需要被唤醒。
- 原因: AQS需要管理线程的阻塞 (
- 封装线程: AQS的
Node
直接持有等待的Thread
对象。- 原因: 当需要唤醒一个节点时,可以直接通过
node.thread
获取到线程对象并调用LockSupport.unpark(node.thread)
。
- 原因: 当需要唤醒一个节点时,可以直接通过
- 双向链表: AQS的
-
阻塞代替纯自旋:
- 原始CLH是纯自旋锁。线程会一直循环检查前驱节点的状态。
- AQS中,当一个线程发现获取锁失败且其前驱节点不是
head
时(或者即使是head
的后继,但尝试获取再次失败),它通常不会无限自旋。通过shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
机制,线程会被LockSupport.park()
挂起,进入阻塞状态,从而释放CPU。- 原因: 纯自旋只适用于锁持有时间非常短的场景。对于可能较长的锁持有时间,或者在线程数量较多时,阻塞是更有效的方式,避免了CPU资源的浪费。Java中的锁和同步器需要适应更通用的场景。
-
明确的
head
和tail
指针:- AQS显式维护
head
和tail
指针。head
是一个虚拟节点(dummy node),不代表任何等待线程,它仅仅指向当前持有锁的线程(或者说,当前持有锁的线程"曾经是"head
的后继,现在它自己成为了head
)。- 原因: 虚拟
head
节点简化了队列为空和非空时的边界条件处理。新入队的节点总是添加到tail
之后。当一个节点获取到锁后,它成为新的head
,原head
节点被断开。
- 原因: 虚拟
- AQS显式维护
-
取消操作 (
CANCELLED
状态):- AQS支持线程在等待过程中取消(例如,由于中断或超时)。被取消的节点其
waitStatus
会被标记为CANCELLED
。这些节点会被从有效的等待链中"跳过"。- 原因: 实际应用中,线程等待可能会被中断或超时,需要一种机制来处理这种情况,避免无效的等待和资源泄漏。原始CLH通常不直接处理节点取消。
- AQS支持线程在等待过程中取消(例如,由于中断或超时)。被取消的节点其
-
条件队列 (
ConditionObject
):- AQS通过内部类
ConditionObject
支持条件变量。这意味着同一个Node
对象可能先在条件队列中等待,被signal
后再转移到同步队列中等待获取锁。- 原因: 这是Java并发包中
Lock
接口提供Condition
的需要,允许更灵活的线程间协作,类似于内置锁的wait/notify
。原始CLH锁本身不包含条件变量的概念。
- 原因: 这是Java并发包中
- AQS通过内部类
-
共享模式 (
SHARED
) 支持:- AQS的节点可以标记为
SHARED
模式,允许实现如Semaphore
、CountDownLatch
、读写锁的读锁等共享型同步器。在共享模式下,锁的释放可能会唤醒多个等待线程(通过PROPAGATE
状态和doReleaseShared
的传播机制)。- 原因: 纯粹的CLH锁是独占锁。AQS需要一个更通用的框架来支持不同类型的同步器。
- AQS的节点可以标记为
为什么要进行这些改动?
- 通用性: AQS的设计目标是成为一个通用的同步框架,能够支持多种不同类型的同步器(独占锁、共享锁、可重入锁、信号量、倒计时门闩等)。原始CLH锁主要是一种特定的独占自旋锁实现。
- 效率与资源利用: 对于Java应用,纯自旋通常不是最佳选择,因为它可能在锁竞争激烈或锁持有时间长时浪费大量CPU。通过引入阻塞机制,AQS可以在线程等待时释放CPU资源,提高系统整体吞吐量。
- 功能完备性: 现实世界的并发场景需要处理中断、超时、条件等待等复杂情况。AQS通过
waitStatus
、ConditionObject
等机制提供了这些功能。 - Java并发模型的需求: Java的
Lock
接口定义了可中断的锁获取、尝试获取锁、条件变量等特性,AQS需要支持这些特性。
AQS借鉴了CLH队列FIFO和本地自旋(尽管AQS中自旋非常短暂,主要目的是在park
前尝试获取或设置状态)的思想,但对其进行了大幅度的扩展和改造,使其成为一个既能高效处理无竞争情况(通过CAS),又能有效管理有竞争情况下线程阻塞和唤醒的强大同步基础组件。它将CLH的核心思想从一个特定的自旋锁实现提升到了一个通用的同步器构建框架。
三、核心组件详解
3.1 state
状态变量
private volatile int state;
volatile
关键字: 保证了state
变量在多线程之间的可见性,并且在一定程度上防止指令重排序。但volatile
本身不保证原子性,AQS通过CAS操作来保证对state
修改的原子性。- 含义由子类定义:
- ReentrantLock:
state
表示锁的持有计数。0表示未被锁定,大于0表示已被某个线程锁定,并且值是该线程重入的次数。 - Semaphore:
state
表示当前可用的许可数量。 - CountDownLatch:
state
表示需要等待的计数数量。 - ReentrantReadWriteLock:
state
被拆分为高16位表示读锁的持有数量,低16位表示写锁的持有数量(或重入次数)。
- ReentrantLock:
- 操作方法:
protected final int getState()
: 获取当前state
的值。protected final void setState(int newState)
: 设置state
的值。注意: 这个方法不是原子的,通常在子类确认已经独占访问或者在tryAcquire/tryRelease
逻辑中,能确保线程安全时才直接调用。protected final boolean compareAndSetState(int expect, int update)
: 原子地比较state
的当前值是否等于expect
,如果是,则将其更新为update
。这是实现原子更新的关键,底层依赖Unsafe
类的CAS操作。
3.2 同步队列 (FIFO双向链表)
AQS内部维护了一个用于管理等待线程的FIFO双向链表,通常被称为CLH队列的变体。CLH队列是一种自旋锁,但AQS中的队列节点在大多数情况下会让线程阻塞,而不是纯粹自旋。
static final class Node {// 节点模式: 共享模式 vs 独占模式static final Node SHARED = new Node(); // 标记节点在共享模式下等待static final Node EXCLUSIVE = null; // 标记节点在独占模式下等待 (默认)// 节点的等待状态 (waitStatus)static final int CANCELLED = 1; // 线程的请求已被取消 (超时或中断)static final int SIGNAL = -1; // 后继节点需要被唤醒 (unpark)static final int CONDITION = -2; // 节点在条件队列中等待 (与同步队列不同)static final int PROPAGATE = -3; // (仅共享模式) releaseShared需要向后传播volatile int waitStatus; // 节点当前状态 (重要!)volatile Node prev; // 指向前驱节点volatile Node next; // 指向后继节点volatile Thread thread; // 封装的等待线程Node nextWaiter; // 指向条件队列中的下一个等待者 (用于ConditionObject)// 构造函数Node() { } // Dummy node for SHAREDNode(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// ... 其他辅助方法,如isShared()
}// 同步队列的头尾指针
private transient volatile Node head;
private transient volatile Node tail;
- 队列结构:
- 双向链表: 方便节点插入、移除以及从尾部向前遍历(
unparkSuccessor
中会用到)。 head
节点: 是一个虚拟节点(dummy node),不持有实际的等待线程。它代表当前持有锁(或已成功获取同步状态)的线程。当一个线程成功获取锁后,原来的head
节点出队,该线程对应的节点成为新的head
。tail
节点: 指向队列的尾部。新加入的等待线程会被添加到队尾。volatile
修饰:head
,tail
, 以及Node
中的prev
,next
,waitStatus
,thread
都是volatile
的,确保多线程下的可见性。
- 双向链表: 方便节点插入、移除以及从尾部向前遍历(
Node.waitStatus
详解:- 0 (默认): 节点的初始状态,或在某些情况下表示节点不需要被特别处理。
CANCELLED (1)
: 表示节点中的线程因为超时或中断而放弃了等待。已取消的节点不会再参与锁竞争,会被从队列中移除。SIGNAL (-1)
: 核心状态。表示当前节点的后继节点(或后继节点中的某个线程)已经被(或即将被)阻塞 (park
)。因此,当当前节点释放锁或被取消时,它必须唤醒 (unpark
) 它的后继节点。CONDITION (-2)
: 表示节点当前在条件队列 (Condition Queue) 中等待,而不是在同步队列中。当线程调用Condition.await()
时,它会进入条件队列。PROPAGATE (-3)
: (仅用于共享模式) 当共享模式下的操作完成(如releaseShared
),需要将状态向后传播,以确保其他共享模式的等待者也能被唤醒。
Node.nextWaiter
:- 在同步队列中,它被用来区分节点是
SHARED
还是EXCLUSIVE
模式。 - 在条件队列中,它被用来链接条件队列中的节点(形成单向链表)。
- 在同步队列中,它被用来区分节点是
四、核心方法深度解析
AQS提供了两类主要的获取/释放同步状态的方法:独占模式(如acquire
/release
)和共享模式(如acquireShared
/releaseShared
)。我们以独占模式为例进行深入分析。
4.1 获取同步状态 (独占模式) - acquire(int arg)
public final void acquire(int arg) {// 1. 尝试直接获取同步状态 (由子类实现具体逻辑)if (!tryAcquire(arg) &&// 2. 如果获取失败,则将当前线程加入等待队列,并使其在队列中等待acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 3. 如果线程在等待过程中被中断,则在获取到锁后,进行自我中断selfInterrupt();
}
流程分解:
(a) tryAcquire(int arg)
(由子类实现)
- 这是模板方法模式的核心。子类(如
ReentrantLock.FairSync
或NonfairSync
)必须重写此方法。 - 职责: 尝试以独占方式获取同步状态。
- 如果成功,返回
true
。 - 如果失败,返回
false
。
- 如果成功,返回
- 实现要点:
- 必须是线程安全的,通常需要使用CAS操作来修改
state
。 - 需要判断当前
state
是否允许获取(例如,ReentrantLock中state
为0,或当前线程已持有锁)。 - 如果获取成功,通常需要记录当前持有锁的线程(AQS提供了
setExclusiveOwnerThread
方法)。
- 必须是线程安全的,通常需要使用CAS操作来修改
- 返回值:
acquire
方法的行为完全依赖于tryAcquire
的返回值。
(b) addWaiter(Node mode)
(如果tryAcquire
失败)
- 如果
tryAcquire
返回false
,表示当前线程未能获取到锁,需要将其加入等待队列。 - 职责: 将当前线程封装成一个新的
Node
对象,并将其安全地添加到同步队列的尾部。 - 参数
mode
:Node.EXCLUSIVE
(独占模式) 或Node.SHARED
(共享模式)。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 创建新节点// 快速尝试CAS尾部插入 (乐观尝试,大部分情况下能成功)Node pred = tail;if (pred != null) { // 队列已初始化node.prev = pred;if (compareAndSetTail(pred, node)) { // CAS设置新的尾节点pred.next = node; // 原尾节点的next指向新节点return node;}}// 如果CAS失败 (说明tail被其他线程修改了) 或队列未初始化,则进入enq自旋enq(node);return node;
}
© enq(final Node node)
(如果addWaiter
中的快速尝试失败)
- 职责: 通过CAS自旋的方式,安全地将节点加入队列尾部。同时处理队列的初始化(如果
head
和tail
为null
)。
private Node enq(final Node node) {for (;;) { // 无限循环,直到成功入队Node t = tail;if (t == null) { // 队列为空,需要初始化// 创建一个虚拟头节点if (compareAndSetHead(new Node()))tail = head; // 初始化后,tail也指向虚拟头节点} else { // 队列不为空node.prev = t; // 新节点的前驱指向当前尾节点if (compareAndSetTail(t, node)) { // CAS尝试将新节点设置为尾节点t.next = node; // 成功后,原尾节点的next指向新节点return t; // 返回原尾节点 (即新节点的前驱)}}// 如果CAS失败,说明在设置过程中tail被其他线程修改,循环重试}
}
- 初始化: 如果队列为空,
enq
会先创建一个虚拟的head
节点,然后tail
也指向这个head
节点。之后真正的第一个等待节点会挂在tail
后面。
(d) acquireQueued(final Node node, int arg)
(核心等待逻辑)
- 如果线程成功加入队列 (通过
addWaiter
和enq
),此方法负责让该线程在队列中等待,直到轮到它获取锁。 - 职责:
- 检查当前节点是否是
head
的后继节点(即实际的队首)。 - 如果是,则再次尝试调用
tryAcquire
获取锁。 - 如果获取成功,则将当前节点设置为新的
head
,并从队列中断开原head
。 - 如果获取失败,或者当前节点不是队首,则判断是否需要阻塞当前线程 (
park
)。 - 如果线程在等待过程中被中断,记录中断状态。
- 检查当前节点是否是
- 返回值:
true
如果线程在等待时被中断,否则false
。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true; // 标记是否成功获取锁,用于finally块中处理取消try {boolean interrupted = false; // 标记线程是否被中断过for (;;) { // 自旋检查和等待final Node p = node.predecessor(); // 获取当前节点的前驱// 关键判断: 如果前驱是head,说明当前节点是实际的队首// 此时有资格尝试获取锁if (p == head && tryAcquire(arg)) {setHead(node); // 获取成功,将当前节点设为新的headp.next = null; // 原head的next置为null,帮助GCfailed = false; // 标记成功获取return interrupted; // 返回中断状态}// 如果获取锁失败,或者不是队首,则判断是否应该park当前线程// shouldParkAfterFailedAcquire会检查前驱节点的waitStatus// parkAndCheckInterrupt会真正park线程并检查中断if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true; // 如果park后发现被中断,标记为true}} finally {if (failed) // 如果获取锁最终失败 (例如,tryAcquire抛异常或线程被取消)cancelAcquire(node); // 取消当前节点的排队}
}
(e) shouldParkAfterFailedAcquire(Node pred, Node node)
- 在
acquireQueued
中,当一个节点尝试获取锁失败后,调用此方法来决定是否应该阻塞(park)当前线程。 - 职责: 检查前驱节点
pred
的waitStatus
。- 如果
pred.waitStatus == Node.SIGNAL (-1)
: 表示前驱节点承诺在释放锁或被取消时会唤醒当前节点。因此,当前线程可以安全地park。返回true
。 - 如果
pred.waitStatus > 0 (CANCELLED)
: 表示前驱节点已被取消。当前节点需要跳过这个已取消的前驱,向前找到一个未取消的节点作为新的有效前驱。返回false
(表示暂时不park,需要重试循环)。 - 如果
pred.waitStatus
是0或PROPAGATE
: 表示前驱节点当前没有义务唤醒后继。当前节点需要通过CAS将前驱的waitStatus
设置为SIGNAL
,以确保前驱在将来会唤醒自己。返回false
(表示暂时不park,已尝试设置SIGNAL,下一轮循环再检查)。
- 如果
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 前驱状态为SIGNAL,可以直接parkreturn true;if (ws > 0) { // 前驱已取消 (CANCELLED)// 跳过已取消的前驱节点,向前寻找do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node; // 将当前节点链接到新的有效前驱} else { // 前驱状态为0或PROPAGATE// CAS将前驱状态设置为SIGNAL,表示当前节点需要前驱来唤醒compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; // 返回false表示当前不park,acquireQueued会再次循环
}
(f) parkAndCheckInterrupt()
- 当
shouldParkAfterFailedAcquire
返回true
时调用此方法。 - 职责: 使用
LockSupport.park(this)
来阻塞当前线程,并返回线程是否被中断。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 阻塞当前线程return Thread.interrupted(); // 清除并返回中断状态
}
(g) selfInterrupt()
-
如果在
acquireQueued
中线程被中断 (interrupted = true
),acquire
方法会在最后调用selfInterrupt()
。(g)selfInterrupt()
-
如果在
acquireQueued
中线程被中断 (interrupted = true
),acquire
方法会在最后调用selfInterrupt()
。 -
职责: 重新设置当前线程的中断状态。这是因为
parkAndCheckInterrupt
中的Thread.interrupted()
会清除中断状态。AQS的设计哲学是,底层的同步机制不应该"吞噬"中断信号,而是应该在完成其主要任务(获取锁)后,将中断状态恢复,以便上层调用者可以响应这个中断。
static void selfInterrupt() {Thread.currentThread().interrupt();
}
(h) cancelAcquire(Node node)
(在acquireQueued
的finally
块中调用)
- 如果线程在获取锁的过程中失败(例如,等待超时或
tryAcquire
抛出未检查异常),或者线程在park
之前就被中断且决定不再等待,则需要取消该节点。 - 职责: 将节点的
waitStatus
设置为CANCELLED
,并尝试将其从队列中移除,同时唤醒可能的后继节点。- 这个过程比较复杂,涉及到处理已取消节点前后节点的链接,以及确保如果当前节点是
tail
,则正确更新tail
。 - 如果当前节点的前驱的
waitStatus
是SIGNAL
,或者成功将其设置为SIGNAL
,并且当前节点有线程,则会唤醒它的后继节点(如果后继节点存在且未取消)。
- 这个过程比较复杂,涉及到处理已取消节点前后节点的链接,以及确保如果当前节点是
4.2 释放同步状态 (独占模式) - release(int arg)
public final boolean release(int arg) {// 1. 尝试释放同步状态 (由子类实现)if (tryRelease(arg)) {Node h = head; // 获取当前头节点// 2. 如果头节点存在且其waitStatus不为0 (通常是SIGNAL)// 说明有后继节点在等待被唤醒if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 唤醒头节点的后继return true;}return false;
}
流程分解:
(a) tryRelease(int arg)
(由子类实现)
- 这是模板方法模式的另一部分。
- 职责: 尝试以独占方式释放同步状态。
- 如果成功释放(例如,
state
变为0,锁完全被释放),返回true
。 - 如果只是部分释放(例如,可重入锁的计数减少但未到0),或者释放失败,返回
false
。
- 如果成功释放(例如,
- 实现要点:
- 必须是线程安全的。
- 通常需要检查当前线程是否是锁的持有者。
- 修改
state
。如果锁完全释放,通常需要清除独占所有者线程(AQS提供了setExclusiveOwnerThread(null)
)。
- 返回值:
release
方法的行为依赖于此。
(b) unparkSuccessor(Node node)
(如果tryRelease
返回true
且队列需要唤醒)
- 当锁被成功释放后,需要唤醒等待队列中的下一个线程。
- 参数
node
此时是队列的head
节点。 - 职责: 找到
head
节点的第一个未被取消的后继节点,并使用LockSupport.unpark()
唤醒其线程。
private void unparkSuccessor(Node node) { // node is the headint ws = node.waitStatus;if (ws < 0) // 如果head的waitStatus是SIGNAL或PROPAGATE// 尝试将其CAS回0 (表示不再需要通知后继,或已完成传播)compareAndSetWaitStatus(node, ws, 0);Node s = node.next; // 获取head的直接后继// 如果后继为null或已取消 (waitStatus > 0)if (s == null || s.waitStatus > 0) {s = null; // 清空s,准备从尾部向前查找// 从队列尾部向前遍历,找到离head最近的、未取消的节点// 为什么要从尾部向前?// 因为在enq操作中,node.prev的设置先于t.next的设置。// 在并发情况下,从head向后遍历时,next指针可能暂时为null。// 而从tail向前遍历prev指针是相对稳定的。for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0) // 找到一个有效的等待节点s = t;}if (s != null) // 如果找到了有效的后继节点LockSupport.unpark(s.thread); // 唤醒其线程
}
五、条件变量 ConditionObject
AQS还提供了一个内部类ConditionObject
,它是java.util.concurrent.locks.Condition
接口的实现。Condition
通常与一个独占锁(如ReentrantLock
)配合使用,提供了类似Object.wait()
, notify()
, notifyAll()
的功能,但更灵活(可以有多个条件队列与一个锁关联)。
public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter; // 条件队列的头指针 (单向链表)private transient Node lastWaiter; // 条件队列的尾指针public final void await() throws InterruptedException {if (Thread.interrupted()) throw new InterruptedException();// 1. 将当前线程封装成Node加入条件队列Node node = addConditionWaiter();// 2. 完全释放当前线程持有的锁 (state会保存下来)// 这是必须的,否则其他线程无法获取锁来signal当前线程int savedState = fullyRelease(node);int interruptMode = 0;// 3. 循环检查当前节点是否已被转移到同步队列// 如果没有,则park当前线程,等待被signalwhile (!isOnSyncQueue(node)) {LockSupport.park(this);// 检查是否在park期间被中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break; // 如果被中断,跳出等待循环}// 4. 当线程被唤醒(signal)并成功转移到同步队列后,// 或者因中断跳出循环后,尝试重新获取锁// acquireQueued会处理中断模式if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 清理条件队列中可能存在的已取消节点if (node.nextWaiter != null)unlinkCancelledWaiters();// 根据中断模式处理中断if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}public final void signal() {if (!isHeldExclusively()) //必须持有锁才能signalthrow new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first); // 唤醒条件队列中的第一个等待者}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 将节点从条件队列转移到同步队列,如果成功,则unpark同步队列中被唤醒的节点} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// transferForSignal 将节点从条件队列移动到AQS同步队列的尾部,// 并可能唤醒它(如果它的前驱是SIGNAL状态或者它成为了新的head)。
}
await()
过程:- 当前线程必须持有与此Condition关联的锁。
- 创建一个新的
Node
(waitStatus
为CONDITION
),加入到ConditionObject
内部的条件队列。 - 完全释放当前线程持有的锁(调用AQS的
fullyRelease
)。这允许其他线程获取锁并最终signal
此条件。 - 线程进入循环等待,直到被
signal
并且节点被转移到AQS的同步队列中,或者线程被中断。在循环中,线程会park
。 - 当线程被唤醒(通过
signal
或中断),它会尝试重新(竞争性地)获取之前释放的锁(调用AQS的acquireQueued
)。 - 获取锁成功后,
await
方法返回。
signal()
过程:- 当前线程必须持有与此Condition关联的锁。
- 从条件队列的头部取出一个节点。
- 调用
transferForSignal
方法,尝试将该节点从条件队列转移到AQS的同步队列尾部。 - 如果转移成功,并且该节点在同步队列中的状态允许被唤醒(例如,它的前驱节点释放了锁),则该节点的线程最终会被
unpark
,并参与锁的竞争。
六、AQS的设计特点总结
-
模板方法设计模式:
- AQS 定义了同步过程的骨架(
acquire
,release
,acquireShared
,releaseShared
等final方法)。 - 子类通过重写受保护的
tryAcquire
,tryRelease
,tryAcquireShared
,tryReleaseShared
,isHeldExclusively
等方法来定制具体的同步逻辑。这使得AQS可以适应多种同步需求。
- AQS 定义了同步过程的骨架(
-
基于
volatile int state
的同步状态管理:- 简单而强大,一个
int
变量足以表达多种同步语义。 volatile
保证可见性,CAS操作保证原子更新。
- 简单而强大,一个
-
CLH队列变体:
- FIFO队列保证了线程获取锁的公平性(如果子类实现支持公平性)。
- 节点通过
waitStatus
和前驱节点的状态来决定是否阻塞,以及何时被唤醒。 head
作为虚拟节点简化了队列操作。
-
CAS操作的大量使用:
- 无论是修改
state
,还是对队列head
、tail
的修改,以及节点waitStatus
的修改,都广泛使用CAS操作来保证无锁或极少锁情况下的线程安全和高效性。
- 无论是修改
-
阻塞与唤醒机制 (
LockSupport
):- AQS使用
LockSupport.park()
来挂起线程,使用LockSupport.unpark(Thread)
来唤醒线程。 LockSupport
提供了更细粒度的线程阻塞和唤醒控制,与线程的Object.wait/notify
不同,它不需要获取对象的监视器锁。park/unpark
可以响应中断。
- AQS使用
-
对中断的良好处理:
- 在等待获取同步状态时,如果线程被中断,AQS会记录中断状态,并在获取成功后通过
selfInterrupt()
恢复中断状态,让上层代码有机会处理中断。 acquireInterruptibly
方法允许在等待时响应中断并抛出InterruptedException
。
- 在等待获取同步状态时,如果线程被中断,AQS会记录中断状态,并在获取成功后通过
-
独占模式与共享模式:
- AQS同时支持独占(只有一个线程能获取)和共享(多个线程能同时获取)两种同步模式,这使得它可以用来实现如
ReentrantLock
(独占)和Semaphore
/CountDownLatch
/ReentrantReadWriteLock
的读锁(共享)等多种同步器。
- AQS同时支持独占(只有一个线程能获取)和共享(多个线程能同时获取)两种同步模式,这使得它可以用来实现如
-
公平性与非公平性支持:
- AQS本身不强制公平性,但提供了判断是否有前驱等待者的方法(如
hasQueuedPredecessors()
),子类可以利用这个方法来实现公平锁。 - 非公平锁: 新请求锁的线程可以尝试"插队",直接获取锁,如果获取失败再入队。吞吐量通常更高,但可能导致饥饿。
- 公平锁: 严格按照线程在队列中的顺序分配锁。保证了公平性,但通常吞吐量较低,因为即使锁可用,新线程也必须入队等待。
- AQS本身不强制公平性,但提供了判断是否有前驱等待者的方法(如
七、应用实例:ReentrantLock
ReentrantLock
通过内部类Sync
(及其子类FairSync
和NonfairSync
)继承AQS来实现。
// NonfairSync (默认)
static final class NonfairSync extends Sync {// ...protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 无锁状态// 非公平:直接尝试CAS获取,不检查队列if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 重入int nextc = c + acquires;if (nextc < 0) throw new Error("Maximum lock count exceeded");setState(nextc); // 重入不需要CAS,因为当前线程已持有锁return true;}return false; // 获取失败}
}// FairSync
static final class FairSync extends Sync {// ...protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 无锁状态// 公平:先检查队列中是否有等待者 (hasQueuedPredecessors())if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 重入int nextc = c + acquires;if (nextc < 0) throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
// tryRelease 对于公平和非公平锁是一样的
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException(); // 非锁持有者不能释放boolean free = false;if (c == 0) { // 完全释放free = true;setExclusiveOwnerThread(null);}setState(c); // 设置新的statereturn free; // 返回是否完全释放
}
- 非公平锁
tryAcquire
: 如果锁空闲,直接尝试CAS获取,不关心队列中是否有等待者。 - 公平锁
tryAcquire
: 如果锁空闲,会先调用hasQueuedPredecessors()
检查同步队列中是否有比当前线程等待更久的线程。如果没有,才尝试CAS获取。 tryRelease
: 减少state
计数。如果计数变为0,则表示锁完全被释放,清除独占线程,并返回true
,这将触发AQS唤醒等待队列中的下一个线程。
八、总结
AQS 通过 volatile int state
、FIFO同步队列和CAS操作,构建了一个强大且灵活的同步基础框架。它巧妙地运用了模板方法模式,将核心的同步逻辑(线程排队、阻塞、唤醒)封装在自身,而将具体的同步状态判断和修改逻辑交由子类实现。这使得Java并发包中的各种同步器(Lock, Semaphore, CountDownLatch等)能够拥有一致且高效的底层实现。