并发编程——10 CyclicBarrier的源码分析
1 概述
-
CyclicBarrier
与CountDownLatch
、Semaphore
不同:-
CountDownLatch
、Semaphore
直接基于 AQS(AbstractQueuedSynchronizer) 实现; -
CyclicBarrier
是基于ReentrantLock
+ConditionObject
实现,间接依赖 AQS(因为ReentrantLock
底层基于 AQS);
-
-
内部结构
-
Generation
静态内部类。它是CyclicBarrier
实现“循环复用”和“中断/异常处理”的核心:-
持有布尔属性
broken
,默认false
; -
当调用
reset()
方法、执行出现异常或中断时调用breakBarrier()
,broken
会被设为true
;
-
-
核心属性
-
计数器:用于记录等待“凑齐一组线程”的进度;
-
generation
属性:关联当前的Generation
实例,控制屏障的“代”切换;
-
-
breakBarrier()
方法。当任务执行中断、异常或调用reset()
时触发:-
将当前
Generation
的broken
设为true
; -
把
ConditionObject
的Waiter
队列中等待的线程,转移到 AQS 队列; -
执行
unlock
后,唤醒 AQS 队列中挂起的线程,让它们感知“屏障已破”;
-
-
await()
方法。这是CyclicBarrier
的核心方法:-
调用时会对“计数器”进行递减处理,直到计数器归 0,所有等待的线程才会一起继续执行;
-
过程中会处理线程的等待、唤醒,以及
Generation
的切换逻辑;
-
-
2 构造函数
-
CyclicBarrier
有两个构造函数,最终都调用CyclicBarrier(int parties, Runnable barrierAction)
这个核心构造函数:public CyclicBarrier(int parties) {this(parties, null); }public CyclicBarrier(int parties, Runnable barrierAction) {// 参数合法性校验if (parties <= 0) throw new IllegalArgumentException();// final修饰,所有线程执行完成归为或重置时使用this.parties = parties;// 在await方法中计数值,表示还有多少线程待执行awaitthis.count = parties;// 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程this.barrierCommand = barrierAction; }
-
parties
:表示需要“凑齐”的线程数量,只有当这么多线程都调用await()
后,屏障才会放行。若parties ≤ 0
,会直接抛出IllegalArgumentException
异常; -
count
:是一个计数器,初始值等于parties
。每次有线程调用await()
,count
就减 1,直到count = 0
时触发屏障放行逻辑; -
parties
(实例属性):用final
修饰,是CyclicBarrier
复用(重置)时的基准线程数; -
barrierAction
:是一个Runnable
任务,当所有线程凑齐(count
减到 0)时,会优先执行这个任务,然后再唤醒所有等待的线程。若不需要回调任务,可传null
;
-
-
它的核心作用是初始化“线程凑齐的门槛”和“凑齐后的回调逻辑”:
-
先校验
parties
的合法性,保证必须有正整数个线程参与; -
初始化
count
为parties
,用于后续await()
方法中跟踪线程凑齐的进度; -
初始化
barrierCommand
(即barrierAction
),指定所有线程凑齐后要执行的回调任务。
-
3 await()
-
await()
有两个重载版本,用于支持“无限等待”和“超时等待”两种场景:-
await()
:线程会一直等待,直到指定数量的线程都调用await()
才继续执行;若等待过程中屏障被中断、异常破坏,会抛出InterruptedException
或BrokenBarrierException
;// 执行没有超时时间的await public int await() throws InterruptedException, BrokenBarrierException {try {// 执行dowait()return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);} }
-
await(long timeout, TimeUnit unit)
:线程最多等待timeout
时间,若指定时间内线程未凑齐,会抛出TimeoutException
;同时也会处理中断、屏障破坏的情况;// 执行有超时时间的await public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout)); }
-
-
await()
最终都调用dowait()
方法,其流程可分为以下关键步骤:private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取generation对象(屏障的“代”)final Generation g = generation;// 若 Generation 的 broken 为 true(屏障已破坏),直接抛出 BrokenBarrierException// 屏障已破坏,即线程中在执行过程中是否异常、超时、中断、重置if (g.broken)throw new BrokenBarrierException();// 若当前线程被中断if (Thread.interrupted()) {breakBarrier(); // 调用breakBarrier()标记屏障为“破坏”状态,将等待线程转移到 AQS 队列throw new InterruptedException(); // 并抛出 InterruptedException}// 执行 --count,得到当前线程在“凑齐序列”中的索引 indexint index = --count;// 若所有线程已凑齐if (index == 0) {// 执行结果标识boolean ranAction = false;try {// 执行构造函数中传入的 barrierAction(若不为 null),barrierCommand 即 barrierActionfinal Runnable command = barrierCommand;if (command != null)command.run();// 执行完成,将执行结果设置为trueranAction = true;nextGeneration(); // 调用nextGeneration()重置计数器和Generation,实现屏障的“循环复用”return 0; // 返回 0,表示当前线程是最后一个凑齐的线程} finally {// 执行过程中出现问题if (!ranAction)// 重置标识与计数值,将Waiter队列中的线程转移到AQS队列breakBarrier();}}// 若线程未凑齐(index != 0),进入循环挂起逻辑(自旋):for (;;) {try {// 无超时场景(timed = false)if (!timed)trip.await(); // 调用 Condition.await(),将线程挂起在 Condition 队列// 有超时场景(timed = true)else if (nanos > 0L)nanos = trip.awaitNanos(nanos); // 调用Condition.awaitNanos(nanos),线程最多挂起nanos纳秒,返回剩余等待时间} catch (InterruptedException ie) { // 过程中若被中断、屏障破坏或超时,会触发对应异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常if (g.broken)throw new BrokenBarrierException();// g 是线程进入 dowait() 时获取的 “当前代”(Generation 实例)// generation 是 CyclicBarrier 的全局属性,表示 “最新代”// 若“当前代”与“最新代”不一致,说明在该线程等待期间,屏障已经通过nextGeneration()切换到了新的 “代”(可能是因为之前的线程已凑齐并重置了屏障)// 此时当前线程的等待已经“过期”,无需继续处理,直接返回 index 即可if (g != generation)return index;// 超时,抛出异常TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 无论等待是否成功,最终都会释放 ReentrantLock,保证锁资源的回收lock.unlock();} }
4 breakBarrier()
-
breakBarrier()
是CyclicBarrier
中用于主动“破坏屏障”的核心方法。当线程执行出现中断、异常或调用reset()
时,会触发该方法,让所有等待的线程退出等待状态;// 结束CyclicBarrier的执行 private void breakBarrier() {// 将当前 Generation 的 broken 属性设为 true,标识该屏障已被破坏。后续线程检查到这个标记时,会抛出 BrokenBarrierExceptiongeneration.broken = true;// 把计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一次屏障复用做准备count = parties;// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列// 当锁释放(unlock())后,这些线程会被唤醒,从而感知到 “屏障已破坏” 并退出等待trip.signalAll(); }
5 reset()
-
reset()
是CyclicBarrier
用于主动重置屏障状态的方法,让CyclicBarrier
可以“循环复用”,回到初始状态以支持新的一轮线程凑齐逻辑;// 重置CyclicBarrier public void reset() {// 获取 ReentrantLock 并加锁,确保 reset() 操作的原子性,避免多线程并发修改导致状态混乱final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 调用 breakBarrier() 方法,标记当前 Generation 为“破坏”状态,重置计数器,并唤醒所有等待的线程,让它们退出当前屏障的等待逻辑breakBarrier();// 生成新的 Generation 实例,重置计数器 count 为 parties(构造函数中指定的线程总数),让 CyclicBarrier 进入“新的一轮”,可以接收新的线程凑齐请求nextGeneration();} finally {// 无论重置过程是否出现异常,最终都会释放锁,保证锁资源的正确回收lock.unlock();} }
6 nextGeneration()
- 是
CyclicBarrier
实现**“循环复用”**的核心方法,用于在一组线程凑齐后,将屏障状态“归位”,以便支持下一轮线程的凑齐逻辑;
private void nextGeneration() {// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列。当锁释放后,这些线程会被唤醒,继续执行后续逻辑trip.signalAll();// 将计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一轮凑齐逻辑准备count = parties;// 创建新的 Generation 实例,标记为 “新的一代” 屏障,与上一轮的屏障状态隔离generation = new Generation();
}
7 总结
-
CyclicBarrier
基于ReentrantLock + ConditionObject
实现。构造函数必须指定parties
(初始待执行线程数),内部通过generation
这个带有布尔属性的结构,标记当前屏障执行过程中是否出现超时、异常、中断等情况; -
构造函数会把
parties
赋值给计数值count
,每当有一个线程执行await()
方法,count
就会减 1; -
线程凑齐后的执行流程:
- 当
count
减到 0 时,代表所有线程都准备就绪。此时会先判断是否初始化了barrierCommand
(构造函数中传入的Runnable
任务),如果有则优先执行该任务; - 待
barrierCommand
执行完成后,会把Condition
队列(Waiter 队列)中的线程转移到 AQS 队列,执行unlock
操作后唤醒这些线程;同时将计数值count
和generation
归位,为下一轮屏障复用做准备。
- 当