当前位置: 首页 > news >正文

十三、抽象队列同步器AQS

1、AQS简介

AQS是AbstractQueuedSynchronizer的简称,也即抽象队列同步器,从字面来理解:

  • 抽象:是一个抽象类,仅实现一些主要逻辑,有些方法会交由子类来实现
  • 队列:采取队列(FIFO,先进先出)这种数据结构存储数据
  • 同步:实现了多线程环境下的同步操作

那AQS有什么用呢?AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS的。

当然,我们自己也能利用AQS来定制符合我们自己需求的同步器,只要实现它的几个protected方法就可以了,在下文会有详细的介绍。

2、AQS的数据结构

AQS内部使用一个volatile关键字修饰的变量state来作为资源的标识符。

/*** The synchronization state.*/
private volatile int state;

同时定义了几个获取和设置state的原子方法:

/*** Returns the current value of synchronization state.* This operation has memory semantics of a {@code volatile} read.* @return current state value*/
protected final int getState() {return state;
}/*** Sets the value of synchronization state.* This operation has memory semantics of a {@code volatile} write.* @param newState the new state value*/
protected final void setState(int newState) {state = newState;
}/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a {@code volatile} read* and write.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that the actual*         value was not equal to the expected value.*/
protected final boolean compareAndSetState(int expect, int update) {return U.compareAndSetInt(this, STATE, expect, update);
}

这三种操作均是原子操作,其中compareAndSetState()方法是依赖于UnSafe类的compareAndSetInt()方法。

AQS内部使用了一个先进先出(FIFO)的双端队列,并使用两个指针headtail分别代表队列的头节点和尾节点。其数据结构如下图所示:

AQS的队列不直接存储线程,而是队列中每一个节点Node来具体存储线程。

AQS源码中关于节点Node类的描述:

3、AQS的Node节点

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能被一个线程访问
  • 共享模式(Share):资源是共享的,可以被多个线程同时访问。具体的资源个数可以通过参数指定,如CountDownLatchSemaphore

一般情况下,子类只需要根据需求实现其中一种模式,但也有两种模式都实现的同步类,比如ReadWriteLock.

AQS中关于这两种模式的源码全部都在Node这个内部类中,源码如下:

static final class Node {// 标记节点,不包含实际的线程信息,主要用做标识符来区分共享同步模式static final Node SHARED = new Node();// 标记节点,不包含实际的线程信息,主要用做标识符来区分独占同步模式static final Node EXCLUSIVE = null;// waitStatus的值,表示该节点(对应的线程)已经被取消static final int CANCELLED =  1;// waitStatus的值,表示后继结点(对应的线程)需要被唤醒static final int SIGNAL    = -1;// waitStatus的值,表示该节点(对应的线程)在等待某种条件static final int CONDITION = -2;/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/static final int PROPAGATE = -3;// 等待状态,取值范围:-3、-2、-1、0、1volatile int waitStatus;volatile Node prev; // 前驱节点volatile Node next; // 后继结点volatile Thread thread; // 节点对应的线程Node nextWaiter; // 等待队列里下一个等待条件的节点// 判断共享模式的方法final boolean isShared() {return nextWaiter == SHARED;}// 获取后继节点的方法final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

注意:通过Node我们可以实现两个队列,一是通过prev和next指针实现的CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待队列(单向队列),这个Condition主要用在ReentrantLock类中。

4、AQS源码解析

AQS的设计是基于模板方法设计模式,一些方法不做具体实现,抛出异常,业务逻辑交由子类做具体实现。这些方法主要是:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

父类方法不做具体实现,直接抛出异常:

protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

这里父类使用protected来修饰而不是抽象方法,这样做的目的是避免子类要把所有方法都重写一遍,增加了很多的工作量,子类只需要重写自己需要的方法。

而AQS实现了一系列的主要逻辑,下面AQS的源码剖析获取资源和释放资源的主要逻辑。

4.1、获取资源

acquire(int arg)方法是获取资源的入口,arg参数表示获取资源的个数,在独占模式下,arg始终为1。下面是这个方法的源码:

public final void acquire(int arg) {// 尝试获取资源,成功返回true,失败falseif (!tryAcquire(arg) &&// 走到这里,说明获取资源失败。调用addWaiter方法将当前线程加入到等待队列acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 中断当前线程selfInterrupt();
}

首先调用tryAcquire(arg)方法尝试获取资源,前面也提到了,这个方法的逻辑是交由子类来具体实现。如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE), arg)方法将当前线程加入到等待队列中,采用独占模式。这个方法的具体实现如下:

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 快速入队Node pred = tail;if (pred != null) {node.prev = pred;// CAS操作将当前节点设置为新的尾节点(可能会失败)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果快速入队失败,再走完整入队enq(node);return node;
}private Node enq(final Node node) {// 这里通过自旋的方式,确保CAS操作一定正确完成for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

addWaiter方法先尝试能否快速入队,如果失败了,再通过完整入队的方式,将当前线程加入到等待队列。这样做的目的是,保证在线程安全的情况下提高性能。

ok,上面方法介绍完了,让我们回到最初的acquire(int arg)方法,当获取资源失败,并且将当前线程添加到等待队列的队尾。然后我们来看看AQS最后要做的事情是什么呢?我们来看看最后一个方法acquireQueued(final Node node, int arg),源码如下:

// node节点是当前获取资源失败的节点
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 当前线程阻塞是否被中断boolean interrupted = false;// 自旋操作for (;;) {// 前驱节点final Node p = node.predecessor();// 前驱节点是头节点,说明当前节点就是等待队列中第一个等待节点,可以尝试获取资源if (p == head && tryAcquire(arg)) {// 如果成功,设置当前节点为新的头节点setHead(node);// 回收旧的头节点p.next = null; // help GC// 失败标记置为falsefailed = false;// 返回是否中断标记return interrupted;}// 如果当前节点不是首个等待节点,判断是否应该阻塞。if (shouldParkAfterFailedAcquire(p, node) &&// 且判断是否应该阻塞当前线程,如果需要阻塞,调用parkAndCheckInterrupt()方法进行阻塞// 线程唤醒后,继续下一次循环parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 如果失败,将当前线程的状态设置为 CANCELLED,等待GC回收cancelAcquire(node);}
}

AQS将获取资源失败的线程成功添加到等待队列后,反复尝试获取锁,如果获取不到就阻塞(挂起),直到获取锁成功或阻塞中断。

上述流程就是独占方式获取资源的全部执行流程了。

这里parkAndCheckInterrupt方法内部使用到了LockSupport.park(this),顺便简单介绍一下park。

LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

  • park(boolean isAbsolute, long time):阻塞当前线程
  • unpark(Thread jthread):唤醒指定的线程

现在用一张流程图总结上述过程:

在这里插入图片描述

4.2、释放资源

释放资源的逻辑比较简单,源码如下:

// 释放资源的主入口
public final boolean release(int arg) {// 尝试释放资源,具体的逻辑由子类实现if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 唤醒后继节点unparkSuccessor(h);return true;}return false;
}private void unparkSuccessor(Node node) {// 头节点的状态如果小于0,尝试设置为0int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 后继节点Node s = node.next;// 如果后继节点不存在或者状态大于0(大于0表示线程已被取消),从尾部向前遍历找到队列中第一个待唤醒的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果待唤醒的后继节点存在,唤醒该节点对应的线程。if (s != null)LockSupport.unpark(s.thread);
}

5、小结

AQS是一个用来构建锁和同步器的框架,使用AQS能够很方便的构造出我们需要定制化的同步器,而且我们耳熟能详的并发包组件ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS实现的。

下面是一个示例(互斥锁,同一时刻,只允许一个线程获取):

import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Mutex {// 自定义内部类实现AQSprivate static class Sync extends AbstractQueuedSynchronizer {// 获取资源(独占模式)@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 释放资源(独占模式)@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 如果当前线程以独占方式获取资源,返回true@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}}private final Sync sync = new Sync();// 加锁public void lock() {sync.acquire(1);}// 释放锁public void unlock() {sync.release(1);}// 资源是否被占有public boolean isLocked() {return sync.isHeldExclusively();}
}
http://www.dtcms.com/a/321071.html

相关文章:

  • luckfox开发板的usb口作为串口使用
  • 【matlab】采样信号的低通滤波、高通滤波
  • SVN下载及安装(保姆级别)
  • 【网络运维】Linux:MariaDB 数据库介绍及管理
  • 6、图片上方添加波浪效果
  • 深入探索 PDF 数据提取:PyMuPDF 与 pdfplumber 的对比与实战
  • Dubbo应用开发之基于xml的第一个Dubbo程序
  • 第五十五章:AI模型的“专属定制”:LoRA微调原理与高效合并技巧
  • Vue 3 表单数据缓存架构设计:从问题到解决方案
  • 站在Vue的角度,对比鸿蒙开发中的数据渲染二
  • Introducing Visual Perception Token into Multimodal Large Language Model论文解读
  • GitHub 趋势日报 (2025年08月07日)
  • 厂区周界人员闯入识别误报率↓76%:陌讯动态监测算法实战解析
  • 全面解析软件工程形式化说明技术
  • 密码学中间人攻击(Man-in-the-Middle):隐藏在通信链中的“窃听者“
  • Block Styler——浏览文件控件
  • zoho crm 的用户为什么在 api 名称页面不能点进模块查看字段的 api 名称
  • 解析工业机器视觉中的飞拍技术
  • 高效数据隔离方案:SpringBoot + JSqlParser 全解析!
  • Redis五大数据类型
  • Java——类和对象
  • 数据结构(六):树与二叉树
  • 触觉导航新突破:Contactile 触觉传感器推动机器人 “零示教” 实现复杂曲面作业
  • PyQt简介
  • WinForm 工具箱内容剖析
  • Linux常见服务器配置(三):MariaDB数据库管理和WEB服务器
  • hyper-v虚拟机启动失败:Virtual Pci Express Port无法打开电源,因为发生错误,找不到即插即用设备
  • UE5 图片9宫格切割
  • 强遮挡场景误检率↓79%!陌讯多模态融合算法在充电桩占位检测的实战优化
  • 跨域解决方案