并发编程——09 CountDownLatch源码分析
1 概述
-
CountDownLatch
是 Java 并发包中用于线程同步的工具类,核心逻辑是:-
初始化时指定一个计数
count
; -
线程执行完任务后调用
countDown()
,使计数count
减1; -
其他线程调用
await()
会被阻塞,直到count
减到0时,阻塞的线程才会被唤醒并继续执行;
-
-
流程步骤:
-
初始化:创建
CountDownLatch
时指定count=4
(即图中的state=4
),代表需要等待4个线程完成任务; -
主线程阻塞:
main
线程调用await()
,进入阻塞状态(图中粉色“main 阻塞”块),直到count
减为0; -
子线程执行并计数递减:
Thread1
执行完任务,调用countDown()
,count
从4→3(图中state=3
);Thread2
调用countDown()
,count
从3→2(图中state=2
);Thread3
调用countDown()
,count
从2→1(图中state=1
);Thread4
调用countDown()
,count
从1→0(图中state=0
);
-
-
主线程唤醒并继续:当
count=0
时,main
线程的await()
不再阻塞,继续执行后续逻辑(图中绿色“main Done”块); -
源码方法总览:
2 构造函数
public CountDownLatch(int count) {// 确保传入的计数 count 不能为负数,否则直接抛出异常,保证了 CountDownLatch 初始化的合理性if (count < 0) throw new IllegalArgumentException("count < 0");// 初始化sync属性this.sync = new Sync(count);
}
Sync
是CountDownLatch
内部基于 AQS(抽象队列同步器) 实现的同步组件,它将传入的count
作为 AQS 的同步状态(state
),为后续的countDown()
计数递减和await()
阻塞唤醒逻辑提供了底层支持。
3 Sync-队列同步器
// Sync 是 AQS 的子类,用于实现 CountDownLatch 的同步逻辑
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 构造函数通过 setState(count) 将 CountDownLatch 的初始化计数赋值给 AQS 的 state 属性(state 是 AQS 用于维护同步状态的核心变量)Sync(int count) {setState(count);}// 获取当前 AQS 中 state 的值,即 CountDownLatch 剩余的未完成线程数int getCount() {return getState();}// 判断是否可以“获取共享资源”(即 CountDownLatch 的计数是否已减到 0)// 若 state == 0,返回 1(表示可以获取,await() 方法不会阻塞)// 若 state != 0,返回 -1(表示无法获取,await() 方法会阻塞线程)protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 释放锁protected boolean tryReleaseShared(int releases) {// 自旋for (;;) {int c = getState(); // 获取 AQS 的 stateif (c == 0) // 计数已为0,无需再释放return false;int nextc = c-1; // 计数减1if (compareAndSetState(c, nextc)) // CAS原子更新statereturn nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}}
}
4 await()
-阻塞等待
-
CountDownLatch#await()
:入口方法public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }
-
作用:使当前线程阻塞等待,直到
CountDownLatch
的计数减到 0; -
实现:委托给
sync
(基于 AQS 的同步组件)的acquireSharedInterruptibly(1)
方法执行,1
是 AQS 共享式获取的参数(此处无实际数值意义,仅为兼容方法签名);
-
-
AQS#acquireSharedInterruptibly(int arg)
:共享式可中断获取逻辑public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); // 检查线程中断,若已中断则抛异常if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); // 若获取失败,进入阻塞队列逻辑 }
-
中断检查:先判断线程是否被中断,若已中断则立即抛出
InterruptedException
; -
尝试获取资源:调用
tryAcquireShared(arg)
(由CountDownLatch
的Sync
实现),若返回值< 0
(表示计数未到 0),则进入doAcquireSharedInterruptibly(arg)
处理阻塞逻辑;
-
-
CountDownLatch#Sync#tryAcquireShared(int acquires)
:共享式获取的状态判断protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1; }
- 作用:判断
CountDownLatch
的计数是否已减到 0- 若
state == 0
,返回1
(表示可以获取,await()
不阻塞); - 若
state != 0
,返回-1
(表示无法获取,await()
会阻塞)。
- 若
- 作用:判断
-
AQS#doAcquireSharedInterruptibly(int arg)
:共享式阻塞获取(可中断)private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); // 获取前驱节点if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) { // 计数已到0,获取成功setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)p.next = null; // 断开原头节点引用,帮助GCfailed = false;return;}}// 若获取失败,判断是否需要挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 挂起线程并检查中断throw new InterruptedException();}} finally {if (failed)cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求} }
-
加入等待队列:
addWaiter(Node.SHARED)
将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部; -
自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试
tryAcquireShared(arg)
; -
线程挂起与中断:若获取失败,通过
shouldParkAfterFailedAcquire
判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt()
会返回true
,进而抛出InterruptedException
。
-
5 countDown()
-释放锁资源
-
CountDownLatch#countDown()
:入口方法public void countDown() {sync.releaseShared(1); }
-
作用:将
CountDownLatch
的计数减 1,若计数减到 0,则唤醒所有等待的线程; -
实现:委托给
sync
(基于 AQS 的同步组件)的releaseShared(1)
方法执行,1
表示要递减的计数(此处固定为 1,因为countDown()
每次只减 1);
-
-
AQS#releaseShared(int arg)
:共享式释放逻辑public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试递减计数doReleaseShared(); // 唤醒等待队列中的线程return true;}return false; }
-
尝试释放资源:调用
tryReleaseShared(arg)
(由CountDownLatch
的Sync
实现),若递减成功则进入下一步; -
唤醒等待线程:调用
doReleaseShared()
唤醒 AQS 等待队列中阻塞的线程(即调用await()
的线程);
-
-
CountDownLatch#Sync#tryReleaseShared(int releases)
:计数递减的核心逻辑protected boolean tryReleaseShared(int releases) {for (;;) { // 自旋保证原子性int c = getState(); // 获取当前计数(AQS的state)if (c == 0) return false; // 计数已为0,无需再递减int nextc = c - 1; // 计数减1if (compareAndSetState(c, nextc)) { // CAS原子更新计数return nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}} }
-
自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,保证多线程同时调用
countDown()
时计数递减的原子性; -
唤醒触发条件:当
nextc == 0
时返回true
,AQS 会感知到这个状态变化,进而调用doReleaseShared()
唤醒所有等待的线程;
-
-
AQS#doReleaseShared()
:唤醒等待队列的共享线程private void doReleaseShared() {for (;;) { // 自旋保证唤醒可靠性Node h = head; // 获取等待队列的头节点if (h != null && h != tail) { // 队列非空且有等待线程int ws = h.waitStatus; // 获取AQS等待队列中头节点的等待状态if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // CAS更新状态失败,继续自旋unparkSuccessor(h); // 唤醒头节点的后继线程} // 处理共享模式下的状态传播(保证多个线程依次被唤醒)else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) // 头节点未变化,说明唤醒操作完成break;} }
-
自旋循环:不断尝试唤醒操作,确保所有需要唤醒的线程都能被处理
-
状态判断与唤醒:若头节点状态为
SIGNAL
,则通过unparkSuccessor(h)
唤醒其后继线程;同时设置状态为PROPAGATE
,保证共享模式下唤醒的“传播性”(多个等待线程依次被唤醒)。
-
6 总结
-
CountDownLatch
基于 AQS(抽象队列同步器) 和 CAS(比较并交换) 实现:-
AQS 提供了同步状态管理(
state
属性)和等待队列机制,是CountDownLatch
实现“线程阻塞/唤醒”的基础; -
CAS 保证了“计数递减”操作的原子性(如
countDown()
中通过 CAS 原子更新state
);
-
-
CountDownLatch
的构造函数必须指定count
(需等待的线程数),并通过内部类Sync
(继承自 AQS)将count
赋值给 AQS 的state
属性。这一步为后续的“计数递减”和“阻塞等待”逻辑奠定了状态基础; -
调用
countDown()
时,本质是将 AQS 的state
减 1(通过 CAS 保证原子性); -
当所有线程执行完毕,
state
会被减到 0,此时countDown()
会触发 AQS 唤醒等待队列中所有挂起的线程(即调用await()
的线程); -
调用
await()
时,本质是判断 AQS 的state
是否为 0:- 若
state > 0
,说明还有线程未执行完毕,await()
会阻塞当前线程,将其加入 AQS 等待队列; - 若
state == 0
(最后一个线程执行countDown()
后),await()
会停止阻塞,当前线程继续执行。
- 若