【Java并发】揭秘Lock体系 -- condition等待通知机制
系列文章目录
文章目录
- 系列文章目录
- 一、condition简介
- 二、condition实现原理分析
- 1.等待队列数据结构
- 2.await () 方法实现原理
- 3.不响应中断的支持
- 三、signal/signalAll 实现原理
- 1. signal () 方法的实现原理
- 2.signalAll () 方法的实现原理
- 3.await () 与 signal ()/signalAll () 的结合
- 4.使用实例
- 总结
一、condition简介
任何一个 Java 对象都天然继承于 Object 类,线程间的通信往往会应用到 Object 类的几个方法,如 wait ()、wait (long timeout)、wait (long timeout, int nanos)、notify () 及 notifyAll ()。同样,在 Lock 体系下依然会用同样的方法实现等待 / 通知机制。从整体上来看,Object 的 wait () 和 notify ()/notifyAll () 是与对象监视器配合完成线程间的等待 / 通知机制,而 condition 则是与 Lock 配合完成等待 / 通知机制。前者是 Java 底层级别的,而后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上有所不同外,在功能特性上也有很多的不同:
(1)condition 支持不响应中断,而 Object 方式不支持;
(2)condition 支持多个等待队列(new 多个condition 对象),而 Object 方式只能支持一个;
(3)condition 支持超时时间的设置,而 Object 不支持。
参照 Object 类的 wait () 和 notify ()/notifyAll () 方法,condition 也提供了同样的方法:
1.针对 Object 的 wait () 方法
void await() throws InterruptedException//当前线程进入等待状态,如果其他线程调用了condition的signal()或signalAll()方法,则当前线程获取锁后会用await()方法返回,如果在等待状态中被中断就会抛出被中断异常
long awaitNanos(long nanosTimeout)//当前线程进入等待状态直到被通知中断或超时
boolean await(long time, TimeUnit unit)throws InterruptedException//同第2种,支持自定义时间单位
boolean awaitUntil(Date deadline) throws InterruptedException//当前线程进入等待状态直到被通知中断或到了某个时间
2.针对 Object 的 notify ()、notifyAll () 方法
void signal()//唤醒一个等待在condition上的线程,可以将该线程从等待队列中移动到同步队列中
void signalAll()//能够唤醒所有等待在condition上的线程
二、condition实现原理分析
1.等待队列数据结构
本文通过阅读底层源码的方式,进一步深入讲解 condition 机制的底层实现原理。创建一个 condition 对象可通过 lock.newCondition () 方法,而这个方法实际上是创建一个 ConditionObject 对象,该类是 AQS 的一个内部类。为什么说 ConditionObject 是属于 AQS 的一个内部类呢?
我们知道condition 是要和锁配合使用的,也就是说,condition 和锁是绑定在一起的,而锁的实现原理又依赖于 AQS,自然 ConditionObject 作为 AQS 的内部类是为了让 condition 的底层实现收敛在一起,这是完全合理的,也符合软件设计中高内聚的设计原则。
AQS 内部维护了一个同步队列,如果是独占式锁,所有获取锁失败的线程都会插入到同步队列中。condition 也是使用同样的方式,内部维护了一个等待队列,所有调用 condition.await () 方法的线程都会加入等待队列,并且线程状态转换为等待状态。另外,注意 ConditionObject 中有两个成员变量:
private transient Node firstWaiter;
private transient Node lastWaiter;
通过源码可以看出,ConditionObject 可通过持有等待队列的头尾指针管理等待队列,节点依然是复用了 AQS 中的 Node 类。Node 类有这样一个属性:
// 后继节点
Node nextWaiter;
由于 Node 类仅仅只维护了一个后继节点的属性,这也进一步说明等待队列是一个单向队列,而 AQS 中维护的同步队列是一个双向队列。接下来将通过示例代码验证同步队列是一个单向队列的猜想。示例代码如下:
public static void main(String[] args) {for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {lock.lock();try {condition.await();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});thread.start();}
}
上述示例代码中新建了 10 个线程,每个线程都是先获取到锁,然后调用 condition.await () 方法释放锁,并将当前线程加入等待队列中。通过 debug 控制,当走到第 10 个线程时查看 firstWaiter,即等待队列中的头节点,debug视图如下图:
(1)调用 condition.await () 方法后线程依次尾插入等待队列中,图 4.13 中队列的线程引用依次为 Thread-0,Thread-1,Thread-2,…,Thread-8;
(2)等待队列是一个单向队列。
根据上面的分析可以得到condition等待队列的结构示意图:
还有一点需要注意,我们可以多次调用lock.newCondition()方法创建多个 condition对象也就是说一个锁可以持有多个等待队列,而对Object类而言,仅仅只能拥有一个同步队列和一个等待队列,而并发包中的锁拥有一个同步队列和多个等待队列,如下图所示:
2.await () 方法实现原理
调用 condition.await() 方法会使当前获取锁的线程进入等待队列,接下来分析 await() 方法,源码如下:
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 1. 将当前线程包装成 Node,尾插入等待队列中Node node = addConditionWaiter();// 2. 释放当前线程所占用的锁,在释放的过程中会唤醒同步队列中的下一个节点int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 3. 当前线程进入等待状态LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 4. 自旋等待获取到同步状态(即获取到锁)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 5. 处理被中断的情况if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
代码的主要逻辑参见注释,当前线程调用 condition.await() 方法后,会在释放锁后加入等待队列,直至被 signal/signalAll 后才会把当前线程从等待队列移至同步队列。之后获得了锁后才会用 await() 方法返回,或者在等待时被中断,就做中断处理。
针对整个流程,会产生以下几个问题:
(1)当前线程是如何进入等待队列的?入队的过程是怎样的?
(2)释放锁资源后,在等待队列中的线程节点会做出怎样的操作?
(3)线程如何才能从 await() 方法中退出?
在 await() 方法中可以清楚地找到这 3 个问题的答案,第 1 步就调用 addConditionWaiter() 方法将当前线程添加到等待队列中,源码如下:
private Node addConditionWaiter() {Node t = lastWaiter;if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 将当前线程包装成 NodeNode node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;else// 尾插入t.nextWaiter = node;// 更新 lastWaiterlastWaiter = node;return node;}
整体流程是先将当前节点包装成 Node,如果等待队列的 firstWaiter 为 null(等待队列为空队列),就将 firstWaiter 指向当前的 Node;否则,更新 lastWaiter(尾节点)即可。也就是通过尾插入的方式将当前线程封装的 Node 插入等待队列中,同时可以看出等待队列是一个不带头节点的链式队列。AQS 维护的同步队列是一个带头节点的链式队列,而等待队列是不带头节点的,这是两者之间的一个显著区别。将当前节点插入等待队列之后,会使当前线程释放锁(可由 fullyRelease () 方法实现),源码如下:
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {// 成功释放同步状态failed = false;return savedState;} else {// 不成功释放同步状态,抛出异常throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}
整体流程是先调用 AQS 的模板方法 release () 释放 AQS 的同步状态,并且唤醒在同步队列中头节点的后继节点引用的线程,如果释放成功,则正常返回,否则就抛出异常。到目前为止,这两段代码已经解决了前两个问题了,还剩下最后一个问题,怎样从 await () 方法退出?现在回过头来看,await () 方法有以下一段逻辑:
while (!isOnSyncQueue(node)) {// 3. 当前线程进入等待状态LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}
显而易见,当线程第一次调用 condition.await() 方法时,会进入 while 循环中,然后通过 LockSupport.park(this) 方法使当前线程进入等待状态。那么,要想退出 await() 方法,第一个前提条件是要先退出这个 while 循环,出口就只剩下两处:
1.逻辑走到 break,退出 while 循环;
2.while 循环中的逻辑判断为 false。
再看代码,出现第一种情况的条件是当前等待的线程被中断,代码会走到 break 退出。第二种情况是当前节点被移动到了同步队列中(即有另外的线程调用了 condition 的 signal() 或 signalAll() 方法),while 循环中的逻辑判断为 false 时,结束 while 循环。
总结下来,就是当前线程被中断或调用 condition.signal() 及 condition.signalAll() 方法使当前节点移动到同步队列,这是当前线程退出 await() 方法的前提条件。当退出 while 循环后,就会调用 acquireQueued(node, savedState) 方法进入同步队列中,该方法的作用是在自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到锁)。
这样也说明了退出 await() 方法的前提是已经获得了 condition 引用的锁。到目前为止,前面的三个问题都通过源码找到了答案。流程图如下:
调用 condition.await() 方法的线程必须是已经获得了锁,也就是当前线程是同步队列中的头节点。调用该方法后会使当前线程所封装的 Node 尾插入等待队列中。
condition 还额外支持了超时机制,使用者可调用 API:awaitNanos() 以及 awaitUntil() 方法。这两个方法的实现原理与 AQS 中的 tryAcquire() 方法如出一辙。tryAcquire()方法在前面文章关于AQS中有详细介绍。
3.不响应中断的支持
要想不响应中断,可以调用 condition.awaitUninterruptibly() 方法,源码如下:
public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();
}
这个方法与上面的 await() 方法基本一致,只不过减少了对中断的处理,并省略了用 reportInterruptAfterWait() 方法抛出被中断的异常。
三、signal/signalAll 实现原理
1. signal () 方法的实现原理
调用 condition 的 signal()/signalAll() 方法,可以将等待队列中等待时间最长的节点移动到同步队列中,使该节点能够有机会获得锁。由于等待队列是先进先出(FIFO)的,所以等待队列的头节点必然是等待时间最长的节点,也就是每次调用 condition 的 signal() 方法将头节点移动到同步队列中。signal() 方法的源码如下:
public final void signal() {//1. 先检测当前线程是否已获取锁if (!isHeldExclusively())throw new IllegalMonitorStateException();//2. 获取等待队列中的第一个节点,之后的操作都是针对这个节点的Node first = firstWaiter;if (first != null)doSignal(first);
}
signal() 方法首先会检测当前线程是否已获取锁,如果没有获取锁,会直接抛出异常;如果获取到锁,就得到了等待队列的头指针引用的节点,之后调用的 doSignal() 方法也是基于该节点的。下面来看看 doSignal() 方法,源码如下:
private void doSignal(Node first) {do {if ((firstWaiter = first.nextWaiter) == null)lastWaiter = null;//1. 将头节点从等待队列中移除first.nextWaiter = null;//2. while循环中transferForSignal()方法对头节点做真正的处理} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
代码的具体逻辑参见注释,真正对头节点做处理的逻辑在 transferForSignal() 方法中,源码如下:
final boolean transferForSignal(Node node) {//1. 更新状态为 0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//2. 将该节点移入同步队列Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
关键逻辑请看注释,这段代码主要做了以下两件事情:
(1)将头节点的状态由 CONDITION 置为 0;
(2)调用 enq() 方法,将该节点尾插入同步队列中。
由此可以得出结论,调用 condition 的 signal() 方法的前提条件是当前线程已经获取锁,该方法会使等待队列中的头节点(即等待时间最长的那个节点)移入同步队列,而移入同步队列后才有机会使等待线程被唤醒,即从 await() 方法中的 LockSupport.park(this) 方法中返回,之后才有机会调用 await() 方法的线程成功退出。signal() 方法的执行示意图如下图所示:
2.signalAll () 方法的实现原理
signalAll() 与 signal() 方法的区别主要体现在 doSignalAll() 方法上,在理解 doSignal() 方法的基础上再去理解 doSignalAll() 方法就会容易很多。doSignalAll() 方法的源码如下:
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}
该方法是将等待队列中的每个节点都移入同步队列中,使每个节点引用的线程能够有机会从同步队列中获取到锁资源,能够利用 await() 方法退出。
3.await () 与 signal ()/signalAll () 的结合
本文开始时提到过等待 / 通知机制,使用 condition 提供的 await() 和 signal()/signalAll() 方法就可以实现这种机制,而这种机制能够解决的最经典的问题就是 “生产者 - 消费者问题”,await()、signal()/signalAll() 方法就像一个开关控制着线程 A(等待方)和线程 B(通知方),两者的关系如下图所示:
awaitThread 线程先通过 lock.lock() 方法获取锁,成功后调用 condition.await() 方法进入等待队列;而另一个线程 signalThread 通过 lock.lock() 方法获取锁,成功后调用 condition.signal() 或 signalAll() 方法,使 awaitThread 线程能够有机会移入同步队列中,当其他线程释放锁后使 awaitThread 线程能够有机会获取锁,从而使 awaitThread 线程能够从 await() 方法中退出并执行后续操作。如果 awaitThread 获取锁失败,则会直接进入同步队列。
4.使用实例
关于await()和signal()方法的具体应用,可以通过以下示例代码进行详细了解:
public class AwaitSignal {private static ReentrantLock lock = new ReentrantLock();private static Condition condition = lock.newCondition();private static volatile boolean flag = false;public static void main(String[] args) {Thread waiter = new Thread(new Waiter());waiter.start();Thread signaler = new Thread(new Signaler());signaler.start();}static class Waiter implements Runnable {@Overridepublic void run() {lock.lock();try {while (!flag) {System.out.println(Thread.currentThread().getName() + " 当前条件不满足等待");try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + " 接收到通知,条件满足");} finally {lock.unlock();}}}static class Signaler implements Runnable {@Overridepublic void run() {lock.lock();try {flag = true;condition.signalAll();} finally {lock.unlock();}}}
}
输出结果为:
Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足
上述代码开启了两个线程:waiter 和 signaler。waiter 线程开始执行时,由于条件不满足,执行 condition.await() 方法使该线程进入等待状态的同时释放锁;signaler 线程获取到锁之后更新了状态条件,并在通知所有的等待线程后释放锁。这时 waiter 线程获取到锁,并且由于 signaler 线程更改了条件,此时对于 waiter 来说条件满足,可以继续执行。
总结
本文主要通过源码的方式带大家学习condition等待通知相关机制,通过了解其实现的根本方法的源码来理解其原理。
以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!