当前位置: 首页 > news >正文

AQS(AbstractQueuedSynchronizer)抽象队列同步器

AQS(AbstractQueuedSynchronizer)抽象队列同步器

是一种用来构建锁和同步器的框架


基本构成

state

  • 维护共享资源的持有状态
  • 通常:
    • 0 表示共享资源可使用
    • 1 表示共享资源正在使用

exclusiveOwnerThread

  • 继承了一个抽象类 AbstractOwnableSynchronizer
  • 记录当前持有共享资源的线程

等待队列

  • 维护所有未获取到锁而陷入等待的线程
存储结构
  • 双向链表
  • 头节点是一个占位的无意义节点
  • 每个节点保存了陷入等待的线程和当前节点的状态

底层支持

UnSafe

  • 支持 CAS 来保证 state 的并发安全

LockSupport

  • 支持线程的等待和唤醒

内置方法

acquire

  • 以 CAS 的方式获取锁
  • 在获取失败时进入等待队列

release

  • 释放掉当前线程持有的锁
  • 并唤醒位于当前节点的后继节点的线程
    • 如果该后继节点状态非正常(被取消),则从后往前找到一个状态正常的

acquireShared

  • 以共享的方式获取锁
  • 在获取失败时进入等待队列

releaseShared

  • 做一次释放(共享资源不一定被完全释放)
  • 当共享资源完全释放时唤醒所有等待线程

核心方法

addWaiter

加入等待队列

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}

acquireQueued

进入排队,在获取锁失败后,使当前线程在队列中等待,并且在此期间响应中断或再次尝试获取锁。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

模版方法(由子类实现)

tryAcquire

  • 以独占的方式获取锁
  • 如果获取成功返回 true,失败返回 false
  • 具体获取方式由子类实现

tryRelease

  • 以独占的方式释放锁
  • 如果成功释放掉锁返回 true,失败返回 false
  • 具体释放方式由子类实现

tryAcquireShared

  • 以共享的方式获取锁
  • 如果返回值为负数表示获取锁失败,陷入等待
  • 大于等于 0 表示获取锁成功

tryReleaseShared

  • 以共享的方式释放锁
  • 如果共享资源完成释放返回 true
  • 处于等待的线程将会被唤醒
  • 具体释放方式由子类决定

Condition

Condition 是 AQS 提供的一个接口,位于包:java.util.concurrent.locks.Condition

它允许一个或多个线程在某个条件不满足时挂起等待,并在其他线程改变状态后被唤醒。这与传统的 Object.wait() / Object.notify() 类似,但它是与 Lock 配合使用的条件变量

await

进入 Condition 的等待队列,当前线程陷入等待

public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

signal

从 Condition 的等待队列移除第一个元素,绑定的线程恢复执行

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}

JDK 中的应用

ReentrantLock

非公平锁实现
lock 方法
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

大致过程

  • 首先使用 CAS 尝试获取共享资源(如果刚好空闲就能成功)
  • 成功则将当前线程设置为资源持有者
  • 否则再次使用 CAS 尝试获取(支持重入)
  • 失败则将当前线程加入等待队列
  • 使用 park 方式使当前线程陷入等待
公平锁实现
lock 方法
final void lock() {acquire(1);
}

基本和非公平锁相同,主要区别有只有一个:

  • 在尝试获取共享资源的时候,获取检查等待队列中是否存在其他等待的线程
  • 如果有就放弃获取锁(也不会一上来就去做 CAS)

释放锁

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

比较简单

  • 判断当前锁的状态,如果持有者是当前线程
  • 将状态变更为可使用
  • 并唤醒后继节点

CountDownLatch

是一种同步辅助类,用于控制一个或多个线程等待其他线程完成操作,然后再继续执行

countdown
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}

state 减 1,如果 state = 0,表示共享资源被完全释放,而后等待队列中的所有线程被唤醒

await
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

state = 0 时才能获取成功,否则失败,如果获取失败陷入等待


Semaphore

信号量是一个非负整数,获取信号量的任务都会将该整数减 1,当为 0 时,所有试图获取该信号量的任务都处于阻塞状态,正值表示有一个或者多个释放信号量操作。

acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

当 Semaphore 中的 permit 大于零时表示共享资源可获取,否则获取时失败,加入等待队列。

PS:实际上 Semaphore 也和 ReentrantLock 一样有公平和非公平的实现,默认也是非公平,使用公平时会检查等待队列中是否由处于等待中的前驱节点

release
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}

调用 Semaphore 的 release 时将会使 permit 值加 1,表示对共享资源的持有释放,并尝试唤醒一个等待中的线程

http://www.dtcms.com/a/282233.html

相关文章:

  • 第十八节:第二部分:java高级:反射-获取构造器对象并使用
  • AI产品经理面试宝典第23天:AI赋能商业服务相关面试题与解答指导
  • vue的provide和inject
  • Liunx-Lvs配置项目练习
  • APIs案例及知识点串讲(上)
  • LVS-DR的ARP污染问题
  • Promise与Axios:异步编程
  • LiFePO4电池的安全详解
  • 关于redis各种类型在不同场景下的使用
  • 意义作为涌现现象的本质是什么
  • pthread线程的控制
  • ZYNQ Petalinux系统FLASH固化终极指南:创新多分区与双系统切换实战
  • Java数据结构第二十五期:红黑树传奇,当二叉树穿上 “红黑铠甲” 应对失衡挑战
  • 【Mysql协议解析处理流程】
  • React+Next.js+Tailwind CSS 电商 SEO 优化
  • 个人笔记(linux/tr命令)
  • JAVA AI+elasticsearch向量存储版本匹配
  • es 启动中的一些记录
  • ELK日志分析,涉及logstash、elasticsearch、kibana等多方面应用,必看!
  • 深入解析交互设计流程:流程图+线框图+原型制作
  • Node.js基础用法
  • AJAX 技术
  • ubuntu+windows双系统恢复
  • 【官方回复】七牛云开启referer防掉链后小程序访问七牛云图片显示403
  • Golang学习之常见开发陷阱完全手册
  • CocosCreator3.8.x——多语言功能(简单版)
  • STM32CubeIDE生成 .hex 文件
  • Android 15 源码修改:为第三方应用提供截屏接口
  • 设计模式三:观察者模式 (Observer Pattern)
  • ble连接参数分析