深入解析 CountDownLatch、Semaphore 和CyclicBarrier
一、CountDownLatch(倒计时门闩):底层机制与设计思想
1. 核心实现原理
- 基于 AQS(AbstractQueuedSynchronizer):
CountDownLatch
内部通过 AQS 的共享锁模式实现。 - 计数器不可逆:初始化时设定计数值(
count
),每次调用countDown()
将计数器减 1,减到 0 时唤醒所有等待线程。 - 等待一组线程完成操作:允许一个或多个线程等待其他线程完成某个任务后再继续执行。
- 次性使用:计数器归零后不可重置。
2. 关键源码分析
- 初始化时设定一个计数器(count)。
- 线程调用 countDown() 减少计数器,当计数器归零时,所有等待的线程被唤醒。
public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) { setState(count); }int getCount() { return getState(); }protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// 自旋减少计数器,直到归零for (;;) {int c = getState();if (c == 0) return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
}
使用案例
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 完成任务");latch.countDown(); // 计数器减1}).start();}latch.await(); // 主线程等待所有任务完成System.out.println("所有任务已完成,主线程继续执行");}
}
tryAcquireShared
:当计数器为 0 时返回 1(表示成功),否则返回 -1(表示需要阻塞)。tryReleaseShared
:通过 CAS 操作减少计数器,直到归零时返回true
,触发唤醒操作。
3. 典型应用场景
- 服务启动依赖:主线程等待所有微服务(如数据库、缓存)初始化完成。
- 批量任务汇总:多个并行任务完成后触发结果合并。
4. 注意事项
- 一次性使用:计数器归零后无法重置,重复调用
await()
会直接返回。 - 资源泄漏风险:若未正确调用
countDown()
,主线程可能永久阻塞。
二、Semaphore(信号量):并发控制的底层逻辑
1. 核心实现原理
- 基于 AQS 的共享锁:
Semaphore
通过 AQS 管理许可(permits
)的获取和释放。 - 公平性与非公平性:通过构造函数指定是否公平(默认非公平)。
- 控制资源并发访问数量:限制同时访问某个资源的线程数。
- 可重复使用:通过释放许可证(release())可重复利用信号量。
2. 关键源码分析
- 维护一组许可证(permits)。
- 线程调用 acquire() 获取许可证(若无可用许可证则阻塞)。
- 线程完成操作后调用 release() 释放许可证。
public class Semaphore {abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) { setState(permits); }final int getPermits() { return getState(); }final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}// 其他方法省略...}
}
使用案例
public class SemaphoreDemo {public static void main(String[] args) {int maxConcurrent = 2; // 最大并发数Semaphore semaphore = new Semaphore(maxConcurrent);for (int i = 0; i < 5; i++) {new Thread(() -> {try {semaphore.acquire(); // 获取许可System.out.println(Thread.currentThread().getName() + " 占用资源");Thread.sleep(2000); // 模拟资源占用} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 释放许可System.out.println(Thread.currentThread().getName() + " 释放资源");}}).start();}}
}
- 非公平获取许可:直接尝试减少许可数,若剩余许可不足则返回负数(触发阻塞)。
- 公平性实现:在
tryAcquireShared
中检查是否有前驱节点在等待。
3. 典型应用场景
- 连接池管理:限制同时使用的数据库连接数。
- 限流保护:防止高并发场景下系统过载。
4. 注意事项
- 死锁风险:若线程获取许可后未释放(如忘记调用
release()
),会导致资源耗尽。 - 许可动态调整:可通过
reducePermits()
动态减少许可数(但需谨慎使用)。
三、CyclicBarrier(循环屏障):线程协同的底层设计
1. 核心实现原理
- 基于 ReentrantLock 和 Condition:通过锁和条件变量实现线程等待。
- 可重置性:通过
reset()
方法重置屏障,支持重复使用。 - 多线程协同执行:让一组线程互相等待,直到所有线程都到达某个屏障点,再一起继续执行。
- 可重复使用:屏障重置后可再次使用。
2. 关键源码分析
- 初始化时设定参与线程数(parties)。
- 每个线程调用 await() 等待其他线程到达屏障。
- 当所有线程到达屏障后,可选执行一个回调任务(Runnable),然后重置屏障。
public class CyclicBarrier {private static class Generation { boolean broken = false; }private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();private final int parties;private final Runnable barrierCommand;private Generation generation = new Generation();private int count;// 核心等待逻辑private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken) throw new BrokenBarrierException();// 每到达一个线程,计数器减1int index = --count;if (index == 0) { // 最后一个线程到达Runnable command = barrierCommand;if (command != null) command.run();nextGeneration(); // 重置屏障return 0;}// 非最后一个线程进入等待for (;;) {try {if (!timed) trip.await();else if (nanos > 0L) nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {breakBarrier(); // 中断处理throw ie;}// 唤醒后检查屏障状态if (g.broken) throw new BrokenBarrierException();if (g != generation) return index;}} finally {lock.unlock();}}
}
使用案例
public class CyclicBarrierDemo {public static void main(String[] args) {int threadCount = 3;CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程已就位,开始执行下一步");});for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 准备就绪");barrier.await(); // 等待其他线程System.out.println(Thread.currentThread().getName() + " 继续执行");} catch (Exception e) {e.printStackTrace();}}).start();}}
}
dowait
方法:处理线程的等待和唤醒逻辑。nextGeneration()
:重置计数器并唤醒所有线程,生成新的屏障代(Generation)。
3. 典型应用场景
- 并行计算分阶段:多个线程分别计算部分结果,在屏障点汇总。
- 多玩家游戏同步:所有玩家准备就绪后同时开始游戏回合。
4. 注意事项
- 屏障断裂(BrokenBarrierException):若某个线程在等待时被中断或超时,屏障会标记为断裂,其他线程将抛出此异常。
- 重置风险:调用
reset()
会中断所有等待线程,需谨慎使用。
四、对比总结与选型指南
特性 | CountDownLatch | Semaphore | CyclicBarrier |
---|---|---|---|
核心目标 | 等待一组操作完成 | 控制资源并发访问 | 多线程协同到达屏障点 |
重用性 | 否(一次性) | 是(许可可释放) | 是(自动或手动重置) |
底层实现 | AQS 共享锁 | AQS 共享锁 | ReentrantLock + Condition |
适用场景 | 主从协作(一等多) | 资源池化/限流(多对多) | 多线程分阶段协作(多对多) |
异常处理 | 无特殊异常 | 需处理 InterruptedException | 需处理 BrokenBarrierException |
性能影响 | 低(仅计数器操作) | 中等(CAS 竞争) | 高(锁竞争 + 条件变量) |