网站备案和域名备案千峰培训可靠吗?
CountDownLatch结构与核心实现分析
CountDownLatch是一个一次性同步工具,基于AQS(AbstractQueuedSynchronizer)实现。它的核心思想是维护一个计数器,当计数器归零时释放所有等待的线程。
主要组件结构
CountDownLatch
├── Sync (内部类,继承AbstractQueuedSynchronizer)
│ ├── tryAcquireShared() - 尝试获取共享锁
│ └── tryReleaseShared() - 尝试释放共享锁
└── 核心方法├── await() - 等待计数归零├── countDown() - 计数减一└── getCount() - 获取当前计数
内部同步器 (Sync类)
CountDownLatch通过内部类Sync
继承AbstractQueuedSynchronizer
来实现同步控制:
private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count); // 将计数值设置为AQS的state}
}
关键设计:
- 使用AQS的
state
字段存储计数值 - 利用AQS的共享模式实现多线程等待机制
等待机制 (await方法)
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
工作流程:
- 调用AQS的
acquireSharedInterruptibly()
方法 - 内部调用
tryAcquireShared()
检查状态 - 如果计数不为0,线程进入等待队列阻塞
- 当计数归零时,所有等待线程被唤醒
计数递减机制 (countDown方法)
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {signalNext(head);return true;}return false;}
核心实现在tryReleaseShared():
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0) return false; // 已经为0,无需操作int nextc = c - 1;if (compareAndSetState(c, nextc)) // CAS原子更新return nextc == 0; // 返回是否归零}
}
关键特性:
- 使用CAS循环确保原子性操作
- 只有当计数归零时才返回true,触发线程唤醒
- 计数为0时的countDown()调用不会产生效果
AQS共享模式运用
CountDownLatch采用AQS的共享模式:
tryAcquireShared()
:当state为0时返回1(成功),否则返回-1(失败)tryReleaseShared()
:递减state,当变为0时返回true通知AQS唤醒等待线程
线程同步机制
等待线程管理:
- 等待的线程被加入AQS的等待队列
- 使用
LockSupport.park()
阻塞线程 - 计数归零时,AQS自动唤醒所有等待线程
内存可见性保证:
- 通过AQS的volatile state字段确保内存可见性
countDown()
操作的happens-before关系确保线程安全
一次性特性
CountDownLatch的一次性设计体现在:
- 计数器不能重置,归零后永远保持为0
- 后续的
await()
调用会立即返回 - 这与
CyclicBarrier
的可重复使用形成对比
典型使用场景
- 启动信号:一个线程等待多个线程准备完毕
- 完成信号:主线程等待多个工作线程完成任务
- 分治任务:将大任务拆分,等待所有子任务完成
CyclicBarrier核心实现分析
CyclicBarrier采用ReentrantLock + Condition的传统同步方式,而非直接实现AQS框架:
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
核心组件:
Generation
类:代表栅栏的一个周期,用于标识栅栏状态count
字段:当前等待的线程数,从parties递减到0parties
字段:参与同步的线程总数(固定不变)
Generation周期管理
private static class Generation {boolean broken; // 栅栏是否被破坏
}
每次栅栏被触发或重置时,都会创建新的Generation实例,实现周期性复用。
private void nextGeneration() {trip.signalAll(); // 唤醒所有等待线程count = parties; // 重置计数器generation = new Generation(); // 创建新周期
}
核心等待逻辑 (dowait方法)
工作流程:
- 获取锁:使用
ReentrantLock
确保线程安全 - 计数递减:
int index = --count
- 判断是否最后一个线程:
- 如果
index == 0
:执行barrier action,触发nextGeneration()
- 否则:进入
trip.await()
等待状态
- 如果
int index = --count;
if (index == 0) { // 最后一个线程到达// 执行barrier actionnextGeneration(); // 开启新周期return 0;
}
// 其他线程等待
trip.await();
CountDownLatch vs CyclicBarrier 对比分析
特性 | CountDownLatch | CyclicBarrier |
---|---|---|
底层实现 | AQS共享模式 | ReentrantLock + Condition |
同步方式 | 一次性计数器 | 可重复使用的栅栏 |
线程角色 | 等待者vs触发者分离 | 所有线程平等参与 |
使用模式对比
CountDownLatch:主从模式
// 主线程等待,工作线程触发
CountDownLatch latch = new CountDownLatch(3);
// 工作线程: latch.countDown()
// 主线程: latch.await()
CyclicBarrier:协作模式
// 所有线程相互等待
CyclicBarrier barrier = new CyclicBarrier(3);
// 每个线程: barrier.await()
技术实现细节对比
计数逻辑差异
- CountDownLatch:计数从N递减到0,不可重置
- CyclicBarrier:计数从parties递减到0后自动重置
线程唤醒机制
- CountDownLatch:使用AQS的
releaseShared()
统一唤醒 - CyclicBarrier:最后到达的线程执行
signalAll()
唤醒其他线程
异常处理策略
- CountDownLatch:单纯的计数递减,异常不影响其他线程
- CyclicBarrier:采用all-or-none模式,一个线程异常会导致所有线程收到
BrokenBarrierException
Semaphore 结构与核心实现分析
总述:
- 核心机制:基于 AQS 的共享锁模式,通过
state
管理许可证。 - 策略差异:公平与非公平模式影响线程获取许可证的顺序。
- 灵活性:支持批量操作、许可证调整及清空,适用于多样化并发控制场景。
- 优势:轻量级资源控制,无需显式锁,且释放操作不限于持有线程。
整体结构
- 核心成员:
private final Sync sync
(基于 AQS 的同步器) - 同步器层级:
- 抽象类
Sync
:继承AbstractQueuedSynchronizer
,实现基础的许可证操作(获取/释放/调整)。 - 子类
NonfairSync
:非公平策略(默认),允许线程“插队”。 - 子类
FairSync
:公平策略,遵循 FIFO 顺序。
- 抽象类
核心方法
方法 | 作用 |
---|---|
acquire() | 阻塞获取 1 个许可(响应中断) |
acquireUninterruptibly() | 阻塞获取 1 个许可(不响应中断) |
tryAcquire() | 尝试立即获取 1 个许可(非公平策略) |
tryAcquire(timeout) | 限时获取 1 个许可(支持中断) |
release() | 释放 1 个许可 |
availablePermits() | 返回当前可用许可数 |
drainPermits() | 清空并返回所有可用许可 |
reducePermits(reduction) | 减少许可证数量(不可逆操作) |
底层实现机制
(1) 许可证存储
- AQS 状态
state
:存储当前可用许可证数量。Sync(int permits) {setState(permits); // 初始化许可证数量 }
(2) 许可证获取
-
非公平策略(
NonfairSync
):protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); // 直接尝试获取 }final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining; // 负数表示失败} }
- 特点:允许新线程“插队”获取许可证。
-
公平策略(
FairSync
):protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors()) // 检查是否有等待线程return -1; // 有则拒绝获取int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;} }
- 特点:严格遵循 FIFO 顺序,防止线程饥饿。
(3) 许可证释放
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) throw new Error("Overflow");if (compareAndSetState(current, next))return true; // 成功释放}
}
- 支持跨线程释放:无“持有者”概念,任何线程均可释放。
(4) 特殊操作
-
drainPermits()
:final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current; // 返回被清空的许可数} }
-
reducePermits()
:final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) throw new Error("Underflow");if (compareAndSetState(current, next))return;} }
典型应用场景
// 连接池资源控制(最大 100 个连接)
class ConnectionPool {private static final int MAX_CONNECTIONS = 100;private final Semaphore available = new Semaphore(MAX_CONNECTIONS, true);public Connection getConnection() throws InterruptedException {available.acquire(); // 阻塞直到获取许可return createConnection();}public void releaseConnection(Connection conn) {closeConnection(conn);available.release(); // 释放许可}
}
- 作用:通过许可证数量限制并发资源访问,避免资源耗尽。