当前位置: 首页 > news >正文

从源码和设计模式深挖AQS(AbstractQueuedSynchronizer)

AQS 概念

AbstractQueuedSynchronizer(AQS) 是 Java 并发包 (java.util.concurrent.locks) 的核心基础框架,它的实现关键是先进先出 (FIFO) 等待队列和一个用volatile修饰的锁状态status。具体实现有 : ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock 等常用的同步工具类。

AQS 的核心思想

AQS 的核心思想是 : 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 实现的,即将暂时获取不到锁的线程加入到队列中。

CLH 队列:Craig, Landin, and Hagersten (CLH) locks,是一种基于链表的可扩展、高性能、公平的自旋锁。AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO)。

AQS 的核心架构

在这里插入图片描述

  • AQS 使用一个 volatile int state 成员变量来表示同步状态
  • 通过内置的 FIFO 队列 来完成资源获取线程的排队工作。
  • Node中的thread变量用来存放进入AQS队列里面的线程,Node节点内部:
    • prev记录当前节点的前驱节点
    • next 记录当前节点的后继节点
  • SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
  • EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的
  • waitStatus 记录当前线程等待状态,可以为
    • CANCELLED (线程被取消了)
    • SIGNAL(线程需要被唤醒)
    • CONDITION(线程在CONDITION条件队列里面等待)
    • PROPAGATE(释放共享资源时需要要通知其他节点);

图中源码解释 :

static final int WAITING   = 1;          // 等待状态,必须为1
static final int CANCELLED = 0x80000000; // 取消状态,必须为负数(最高位为1)
static final int COND      = 2;          // 在条件等待中/** CLH 节点 */
abstract static class Node {volatile Node prev;       // 前驱节点,最初通过 casTail 附加volatile Node next;       // 当可发出信号时 visibly nonnullThread waiter;            // 当入队时 visibly nonnullvolatile int status;      // 由所有者写入,其他线程通过原子位操作读取// 用于 cleanQueue 的比较并交换前驱节点final boolean casPrev(Node c, Node v) {return U.weakCompareAndSetReference(this, PREV, c, v);}// 用于 cleanQueue 的比较并交换后继节点final boolean casNext(Node c, Node v) {return U.weakCompareAndSetReference(this, NEXT, c, v);}// 用于信号传递的获取并清除状态位final int getAndUnsetStatus(int v) {return U.getAndBitwiseAndInt(this, STATUS, ~v);}// 用于离队赋值的宽松设置前驱节点final void setPrevRelaxed(Node p) {U.putReference(this, PREV, p);}// 用于离队赋值的宽松设置状态final void setStatusRelaxed(int s) {U.putInt(this, STATUS, s);}// 用于减少不必要信号的状态清除final void clearStatus() {U.putIntOpaque(this, STATUS, 0);}// 内存偏移量常量private static final long STATUS = U.objectFieldOffset(Node.class, "status");private static final long NEXT = U.objectFieldOffset(Node.class, "next");private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}// 按类型标记的具体节点类
static final class ExclusiveNode extends Node { }  // 独占模式节点
static final class SharedNode extends Node { }     // 共享模式节点// 条件节点,实现 ManagedBlocker 接口用于 ForkJoinPool
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {ConditionNode nextWaiter; // 链接到下一个等待节点/*** 允许条件在 ForkJoinPools 中使用,而不会冒险耗尽固定池。* 这仅适用于非定时条件等待,不适用于定时版本。*/public final boolean isReleasable() {return status <= 1 || Thread.currentThread().isInterrupted();}public final boolean block() {while (!isReleasable()) LockSupport.park();return true;}
}// 等待队列头节点,延迟初始化
private transient volatile Node head;// 等待队列尾节点。初始化后,仅通过 casTail 修改
private transient volatile Node tail;// 同步状态
private volatile int state;/*** 返回同步状态的当前值。* @return 当前状态值*/
protected final int getState() {return state;
}

AQS 底层数据结构

state

state 是 AQS 的核心字段,表示共享资源的状态。不同的同步器对 state 的解释不同:

  • ReentrantLockstate 表示当前线程获取锁的可重入次数(如果发现可以可重入,则state+1; 执行完成则-1; 为0时可以释放)
  • Semaphorestate 表示当前可用信号的个数
  • CountDownLatchstate 表示计数器当前的值

AQS 提供了一系列访问 state 的方法:

// 获取当前同步状态
protected final int getState() {return state;
}// 设置同步状态
protected final void setState(int newState) {state = newState;
}// 使用CAS原子性地设置同步状态
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同步队列:Node 节点

AQS 内部维护了一个 FIFO 双向队列,队列中的每个元素都是一个 Node 对象:

static final class Node {// 节点模式:共享模式static final Node SHARED = new Node();// 节点模式:独占模式static final Node EXCLUSIVE = null;// 等待状态:线程已取消static final int CANCELLED =  1;// 等待状态:后继节点的线程需要被唤醒static final int SIGNAL    = -1;// 等待状态:节点在条件队列中等待static final int CONDITION = -2;// 等待状态:下一次acquireShared应无条件传播static final int PROPAGATE = -3;// 当前节点的等待状态volatile int waitStatus;// 前驱节点volatile Node prev;// 后继节点volatile Node next;// 节点关联的线程volatile Thread thread;// 指向下一个等待条件或共享模式的节点Node nextWaiter;// 判断节点是否在共享模式下等待final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 构造方法Node() {    // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

AQS 的整体结构

AQS 的整体结构如下图所示:

在这里插入图片描述

线程获取锁失败时的 AQS 队列变化

初始状态

当没有线程竞争锁时,AQS 的同步队列处于初始状态,headtail 都为 null

// 初始状态
head = null;
tail = null;

第一个线程获取锁失败

当第一个线程尝试获取锁失败时,AQS 会初始化同步队列:

  1. 创建空节点(dummy node)作为头节点
  2. 将当前线程包装成 Node 节点添加到队列尾部
// addWaiter 方法:将当前线程包装成Node并加入队列
private Node addWaiter(Node mode) {// 将当前线程包装成Node节点Node node = new Node(Thread.currentThread(), mode);// 尝试快速入队Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 快速入队失败,使用enq方法自旋入队enq(node);return node;
}// enq方法:通过自旋CAS确保节点成功入队
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 队列未初始化// 创建空节点(dummy node)作为头节点if (compareAndSetHead(new Node()))tail = head; // 头尾都指向空节点} else {// 将新节点添加到队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

初始化后的队列结构:

head → [dummy node] ← tail↑新加入的节点

后续线程获取锁失败

当后续线程尝试获取锁失败时,它们会被添加到队列的尾部:

// acquireQueued方法:线程在队列中自旋获取资源或阻塞
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取前驱节点final Node p = node.predecessor();// 如果前驱是头节点,尝试获取资源if (p == head && tryAcquire(arg)) {// 获取成功,设置当前节点为头节点setHead(node);p.next = null; // help GC (方便垃圾回收)failed = false;return interrupted;}// 判断是否应该阻塞,并在需要时安全地阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

多个线程竞争后的队列结构 :
在这里插入图片描述

线程被唤醒时的 AQS 队列变化

锁释放与线程唤醒

当持有锁的线程释放资源时,会唤醒队列中的下一个线程:

// release方法:释放独占模式下的资源
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 唤醒后继节点return true;}return false;
}// unparkSuccessor方法:唤醒指定节点的后继节点
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 清除状态// 查找下一个需要唤醒的节点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 从尾部向前查找,找到队列中最前面且未取消的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 唤醒线程
}

4.2 线程被唤醒后的队列变化

当队列中的线程被唤醒后,它会尝试获取锁,如果获取成功,会将自己设置为新的头节点:

// setHead方法:将节点设置为头节点
private void setHead(Node node) {head = node;node.thread = null; // 清空线程引用node.prev = null;
}

原来的头节点(dummy node)会被垃圾回收,新头节点的 thread 字段被设置为 null,表示它不再关联任何线程。

公平锁与非公平锁

AQS 支持两种锁获取模式:公平锁非公平锁

非公平锁(如 ReentrantLock 的默认实现):

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 不管队列中是否有等待线程,直接尝试获取锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...
}

公平锁

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 只有队列中没有等待线程或当前线程是队列中的第一个时才获取锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...
}

hasQueuedPredecessors() 方法检查队列中是否有比当前线程等待时间更长的线程,这是实现公平性的关键。

AQS 的设计模式

模板方法模式

AQS 使用了模板方法模式,定义了同步器的主要逻辑骨架,而将一些特定操作留给子类实现。以下是需要子类实现的方法:

方法名描述
boolean tryAcquire(int arg)尝试以独占方式获取资源
boolean tryRelease(int arg)尝试释放独占资源
int tryAcquireShared(int arg)尝试以共享方式获取资源
boolean tryReleaseShared(int arg)尝试释放共享资源
boolean isHeldExclusively()判断当前线程是否独占资源

以 ReentrantLock 为例的实现

ReentrantLock 中的同步器 Sync 继承自 AQS,并实现了相关方法:

abstract static class Sync extends AbstractQueuedSynchronizer {// 尝试获取锁abstract void lock();// 非公平尝试获取final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// 尝试释放锁protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// 判断当前线程是否独占资源protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
}

优势

  1. 代码复用:AQS 提供了通用的同步框架,多个同步器可以复用相同的队列管理逻辑
  2. 职责分离:AQS 负责线程管理,子类专注于状态管理

装饰器模式

装饰器模式动态地给一个对象添加一些额外的职责,就增加功能来说,装饰器模式相比生成子类更为灵活。

在 AQS 中的实现

AQS 中的 Node 类可以看作是对 Thread 的装饰,它添加了同步所需的额外信息:

static final class Node {// 节点模式:共享或独占volatile Node nextWaiter;// 等待状态volatile int waitStatus;// 前驱和后继指针volatile Node prev;volatile Node next;// 被装饰的线程对象volatile Thread thread;// 构造方法 - 装饰线程Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}
}

优势

  1. 功能扩展:在不修改 Thread 类的情况下为其添加同步功能
  2. 灵活性:可以根据需要为线程添加不同的同步属性

状态模式

状态模式允许一个对象在其内部状态改变时改变它的行为。

在 AQS 中的实现

AQS 中节点的 waitStatus 字段代表了不同的状态,每种状态对应不同的行为:

static final class Node {// 状态常量static final int CANCELLED =  1;  // 取消状态static final int SIGNAL    = -1;  // 需要唤醒后继节点static final int CONDITION = -2;  // 在条件队列中等待static final int PROPAGATE = -3;  // 传播唤醒操作// 状态字段volatile int waitStatus;
}

根据状态的不同,AQS 采取不同的处理策略:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 根据状态决定行为return true;if (ws > 0) { // 处理取消状态do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}

AQS 的使用

ReentrantLock 的实现

ReentrantLock 通过内部类 Sync 继承 AQS,实现了可重入的独占锁:

public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {}// 非公平锁实现static final class NonfairSync extends Sync {final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}// 公平锁实现static final class FairSync extends Sync {final void lock() {acquire(1);}protected final boolean tryAcquire(int acquires) {// 公平获取逻辑}}
}

CountDownLatch 的实现

CountDownLatch 通过内部类 Sync 继承 AQS,实现了共享模式的同步器:

public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
}

文章转载自:

http://meMU28vi.bndkf.cn
http://bXQtPYlu.bndkf.cn
http://E08QEwUf.bndkf.cn
http://8ZcaExuN.bndkf.cn
http://AOOscDdo.bndkf.cn
http://8J1ZLMRR.bndkf.cn
http://MrWZTlv5.bndkf.cn
http://UkkaEUBy.bndkf.cn
http://zgFx2swX.bndkf.cn
http://BuSfKeE2.bndkf.cn
http://PHdkW5vm.bndkf.cn
http://w8q4hCGU.bndkf.cn
http://pZIpeiRG.bndkf.cn
http://gfjAXVgN.bndkf.cn
http://ztcbfWcN.bndkf.cn
http://MFyt0X5U.bndkf.cn
http://TTGrcjgJ.bndkf.cn
http://Pk7smnhc.bndkf.cn
http://RWiADLEm.bndkf.cn
http://CwG0Lk6p.bndkf.cn
http://ivgDG1xE.bndkf.cn
http://PUh6A7Xr.bndkf.cn
http://Nt5SlzHL.bndkf.cn
http://PNErYf6l.bndkf.cn
http://ugtNHFFZ.bndkf.cn
http://LeJDq3Ns.bndkf.cn
http://OjG4PelJ.bndkf.cn
http://jMBHOtE0.bndkf.cn
http://wWuDaVLq.bndkf.cn
http://VDM92aVe.bndkf.cn
http://www.dtcms.com/a/379459.html

相关文章:

  • 四、计算机网络与分布式系统(中)
  • 半导体学习笔记
  • 深入解析Dart虚拟机运行原理
  • 一文教您解决Ubuntu ModuleNotFoundError: No module named ‘_tkinter‘问题
  • 部署合约常见的问题
  • Python快速入门专业版(二十三):for循环基础:遍历字符串、列表与range()函数(计数案例)
  • MySQL 非空约束(NOT NULL):看似简单,却决定数据质量的关键细节
  • 【笔记】悬架减振器的阻尼带宽
  • C++:迭代器失效问题(vector为例)
  • TDengine 选择函数 TAIL() 用户手册
  • 在Linux系统中清理大文件的方法
  • oracle里的int类型
  • 【开关电源篇】整流及其滤波电路的工作原理和设计指南-超简单解读
  • 第五章 Logstash深入指南
  • 猫狗识别算法在智能喂食器上的应用
  • 数据库事务详解
  • Linux学习:基于环形队列的生产者消费者模型
  • size()和length()的区别
  • Windows系统下安装Dify
  • 企业云环境未授权访问漏洞 - 安全加固笔记
  • sv时钟块中default input output以及@(cb)用法总结
  • 广谱破局!芦康沙妥珠单抗覆罕见突变,一解“少数派”的用药困境
  • Guli Mall 25/08/12(高级上部分)
  • 彩笔运维勇闯机器学习--随机森林
  • Python 面向对象实战:私有属性与公有属性的最佳实践——用线段类举例
  • 使用deboor法计算三次B样条曲线在参数为u处的位置的方法介绍
  • 认识HertzBeat的第一天
  • AUTOSAR进阶图解==>AUTOSAR_EXP_ApplicationLevelErrorHandling
  • 线程同步:条件变量实战指南
  • OpenLayers数据源集成 -- 章节七:高德地图集成详解