【Java并发编程】AQS(AbstractQueuedSynchronizer)抽象同步器核心原理
【Java并发编程】AQS 抽象同步器核心原理
- 0. 整体学习目录
- 0.1 AQS源码
- AbstractQueuedSynchronizer 中文注释翻译
- 简介
- 子类定义建议
- 模式支持
- Condition 支持
- 检查与监控功能
- 序列化说明
- 使用指南
- 注意事项:
- 获取与释放机制
- 独占模式伪代码:
- 插队(Barging)说明
- 吞吐量策略说明
- 可扩展性说明
- 示例 1:不可重入的互斥锁
- 示例 2:一次性触发的信号量(类似 CountDownLatch)
- 作者 & 版本
- 1. 锁与队列的关系
- 1.1 CLH 锁的内部队列
- 1.2 分布式锁的内部队列
- 1.3 AQS 的内部队列
- 2.AQS 的核心成员
0. 整体学习目录
0.1 AQS源码
AbstractQueuedSynchronizer 中文注释翻译
简介
提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(如信号量、事件等)。该类是大多数同步器的基础,这些同步器使用一个原子的 int
状态值来表示同步状态。
子类必须实现受保护的方法来修改状态,并定义状态含义(表示该对象是否被获取或释放)。在此基础上,本类其他方法将完成排队和阻塞机制。
虽然子类可以维护其他状态字段,但只有通过 getState
、setState
和 compareAndSetState
操作的原子整数状态才与同步密切相关。
子类定义建议
- 子类应作为非公开的内部类,用于实现外围类的同步特性。
- 本类不实现任何同步接口。
- 提供如
acquireInterruptibly
等方法,供具体同步器使用。
模式支持
- 支持独占模式(exclusive):一个线程持有锁,其他线程无法获取。
- 支持共享模式(shared):多个线程可能获取成功。
- 本类不语义理解独占与共享,只是从机制上支持。
- 多种模式共享同一个 FIFO 队列。
举例:ReadWriteLock
可同时支持两种模式。
Condition 支持
定义了嵌套类 ConditionObject
:
-
可作为独占模式下的
Condition
实现。 -
要求子类:
- 实现
isHeldExclusively
; release
方法可释放当前状态;acquire
可恢复原状态。
- 实现
检查与监控功能
提供内部队列和条件对象的监控接口,可用于导出状态信息。
序列化说明
仅序列化内部状态变量 state
,反序列化后线程队列为空。需要序列化支持的子类应实现 readObject
方法,在反序列化后恢复为初始状态。
使用指南
重写以下方法,并通过 getState
、setState
、compareAndSetState
操作状态:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
默认实现抛出 UnsupportedOperationException
。
注意事项:
- 这些方法是唯一支持的扩展方式。
- 其他方法均为
final
,不可重写。 - 推荐使用父类
AbstractOwnableSynchronizer
提供的设置持有线程方法。
获取与释放机制
独占模式伪代码:
// 获取
while (!tryAcquire(arg)) {// 加入队列(如果未加入)// 阻塞线程
}// 释放
if (tryRelease(arg)) {// 唤醒队首线程
}
共享模式类似,但可能涉及级联唤醒。
插队(Barging)说明
由于 tryAcquire
在排队前调用,线程可能插队。
- 可通过
hasQueuedPredecessors()
判断是否允许当前线程获取; - 常用于实现公平锁。
吞吐量策略说明
默认策略为插队(barging):
- 又称贪婪(greedy)、放弃(renouncement)、避免同步阻塞(convoy-avoidance);
- 不保证公平或无饥饿;
- 每个重试机会公平;
- 可通过快速路径
hasContended()
、hasQueuedThreads()
预判断是否加锁。
可扩展性说明
本类适用于:
- 状态为
int
类型; - 获取/释放参数为
int
; - 内部使用 FIFO 队列。
不满足这些条件时,可使用更底层的原子类、自定义队列、LockSupport
进行实现。
示例 1:不可重入的互斥锁
class Mutex implements Lock, Serializable {private static class Sync extends AbstractQueuedSynchronizer {protected boolean isHeldExclusively() {return getState() == 1;}public boolean tryAcquire(int acquires) {assert acquires == 1;if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int releases) {assert releases == 1;if (getState() == 0) throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);return true;}Condition newCondition() {return new ConditionObject();}private void readObject(ObjectInputStream s)throws IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // 解锁状态}}private final Sync sync = new Sync();public void lock() { sync.acquire(1); }public boolean tryLock() { return sync.tryAcquire(1); }public void unlock() { sync.release(1); }public Condition newCondition() { return sync.newCondition(); }public boolean isLocked() { return sync.isHeldExclusively(); }public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}
}
示例 2:一次性触发的信号量(类似 CountDownLatch)
class BooleanLatch {private static class Sync extends AbstractQueuedSynchronizer {boolean isSignalled() { return getState() != 0; }protected int tryAcquireShared(int ignore) {return isSignalled() ? 1 : -1;}protected boolean tryReleaseShared(int ignore) {setState(1);return true;}}private final Sync sync = new Sync();public boolean isSignalled() { return sync.isSignalled(); }public void signal() { sync.releaseShared(1); }public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
}
作者 & 版本
- 自 JDK 1.5 起引入
- 作者:Doug Lea
核心源码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {// 等待队列的头节点private transient volatile Node head;// 等待队列的尾节点private transient volatile Node tail;// 同步状态(由子类决定其语义)private volatile int state;protected final int getState() {return state;}protected final void setState(int newState) {state = newState;}protected final boolean compareAndSetState(int expect, int update) {// CAS 操作,确保原子性地更新 statereturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
1. 锁与队列的关系
无论单体服务应用内部的锁,还是分布式环境下多体服务应用所使用的分布式锁,为了减少由于无效争夺导致的资源浪费和性能恶化,一般都基于队列进行排队与削峰。
1.1 CLH 锁的内部队列
CLH 自旋锁使用的 CLH( Craig, Landin, and Hagersten Lock Queue)是一个单向队列, 也是一个 FIFO 队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;队列头部的节点,表示占有锁的节点;新加入的抢锁线程则需要等待,会插入到队列的尾部。
CLH锁的内部结构
1.2 分布式锁的内部队列
在分布式锁的实现中,比较常见的也是基于队列的方式进行不同节点中“等锁线程”的统一调度和管理。以基于 ZooKeeper 的分布式锁为例,其等待队列的结构,大致如图。
1.3 AQS 的内部队列
AQS 是 JUC 提供的一个用于构建锁和同步容器的基础类。JUC 包内许多类都是基于 AQS 构建,例如 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask 等。AQS 解决了在实现同步容器时设计的大量细节问题。
AQS 是 CLH 队列的一个变种,主要原理和 CLH 队列差不多,这也是前面对 CLH 队列进行长篇大论介绍的原因。AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS 队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。AQS 的内部结构,具体如图所示。