当前位置: 首页 > news >正文

并发编程——10 CyclicBarrier的源码分析

1 概述

  • CyclicBarrierCountDownLatchSemaphore不同:

    • CountDownLatchSemaphore直接基于 AQS(AbstractQueuedSynchronizer) 实现;

    • CyclicBarrier基于ReentrantLock + ConditionObject实现,间接依赖 AQS(因为 ReentrantLock 底层基于 AQS);

  • 内部结构

    • Generation 静态内部类。它是 CyclicBarrier 实现“循环复用”和“中断/异常处理”的核心:

      • 持有布尔属性 broken,默认 false

      • 当调用 reset() 方法、执行出现异常中断时调用 breakBarrier()broken 会被设为 true

    • 核心属性

      • 计数器:用于记录等待“凑齐一组线程”的进度;

      • generation 属性:关联当前的 Generation 实例,控制屏障的“代”切换;

    • breakBarrier() 方法。当任务执行中断异常或调用 reset() 时触发:

      • 将当前 Generationbroken 设为 true

      • ConditionObjectWaiter 队列中等待的线程,转移到 AQS 队列;

      • 执行 unlock 后,唤醒 AQS 队列中挂起的线程,让它们感知“屏障已破”;

    • await() 方法。这是 CyclicBarrier核心方法

      • 调用时会对“计数器”进行递减处理,直到计数器归 0,所有等待的线程才会一起继续执行;

      • 过程中会处理线程的等待、唤醒,以及 Generation 的切换逻辑;

    在这里插入图片描述

2 构造函数

  • CyclicBarrier 有两个构造函数,最终都调用 CyclicBarrier(int parties, Runnable barrierAction) 这个核心构造函数:

    public CyclicBarrier(int parties) {this(parties, null);
    }public CyclicBarrier(int parties, Runnable barrierAction) {// 参数合法性校验if (parties <= 0) throw new IllegalArgumentException();// final修饰,所有线程执行完成归为或重置时使用this.parties = parties;// 在await方法中计数值,表示还有多少线程待执行awaitthis.count = parties;// 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程this.barrierCommand = barrierAction;
    }
    
    • parties:表示需要“凑齐”的线程数量,只有当这么多线程都调用 await() 后,屏障才会放行。若 parties ≤ 0,会直接抛出 IllegalArgumentException 异常;

    • count:是一个计数器,初始值等于 parties。每次有线程调用 await()count 就减 1,直到 count = 0 时触发屏障放行逻辑;

    • parties(实例属性):用 final 修饰,是 CyclicBarrier 复用(重置)时的基准线程数;

    • barrierAction:是一个 Runnable 任务,当所有线程凑齐(count 减到 0)时,会优先执行这个任务,然后再唤醒所有等待的线程。若不需要回调任务,可传 null

  • 它的核心作用是初始化“线程凑齐的门槛”和“凑齐后的回调逻辑”

    • 先校验 parties 的合法性,保证必须有正整数个线程参与;

    • 初始化 countparties,用于后续 await() 方法中跟踪线程凑齐的进度;

    • 初始化 barrierCommand(即 barrierAction),指定所有线程凑齐后要执行的回调任务。

3 await()

  • await() 有两个重载版本,用于支持“无限等待”和“超时等待”两种场景:

    • await():线程会一直等待,直到指定数量的线程都调用 await() 才继续执行;若等待过程中屏障被中断、异常破坏,会抛出 InterruptedExceptionBrokenBarrierException

      // 执行没有超时时间的await
      public int await() throws InterruptedException, BrokenBarrierException {try {// 执行dowait()return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
      }
      
    • await(long timeout, TimeUnit unit):线程最多等待 timeout 时间,若指定时间内线程未凑齐,会抛出 TimeoutException;同时也会处理中断、屏障破坏的情况;

      // 执行有超时时间的await
      public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));
      }
      
  • await() 最终都调用 dowait() 方法,其流程可分为以下关键步骤:

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取generation对象(屏障的“代”)final Generation g = generation;// 若 Generation 的 broken 为 true(屏障已破坏),直接抛出 BrokenBarrierException// 屏障已破坏,即线程中在执行过程中是否异常、超时、中断、重置if (g.broken)throw new BrokenBarrierException();// 若当前线程被中断if (Thread.interrupted()) {breakBarrier(); // 调用breakBarrier()标记屏障为“破坏”状态,将等待线程转移到 AQS 队列throw new InterruptedException(); // 并抛出 InterruptedException}// 执行 --count,得到当前线程在“凑齐序列”中的索引 indexint index = --count;// 若所有线程已凑齐if (index == 0) {// 执行结果标识boolean ranAction = false;try {// 执行构造函数中传入的 barrierAction(若不为 null),barrierCommand 即 barrierActionfinal Runnable command = barrierCommand;if (command != null)command.run();// 执行完成,将执行结果设置为trueranAction = true;nextGeneration(); // 调用nextGeneration()重置计数器和Generation,实现屏障的“循环复用”return 0; // 返回 0,表示当前线程是最后一个凑齐的线程} finally {// 执行过程中出现问题if (!ranAction)// 重置标识与计数值,将Waiter队列中的线程转移到AQS队列breakBarrier();}}// 若线程未凑齐(index != 0),进入循环挂起逻辑(自旋):for (;;) {try {// 无超时场景(timed = false)if (!timed)trip.await(); // 调用 Condition.await(),将线程挂起在 Condition 队列// 有超时场景(timed = true)else if (nanos > 0L)nanos = trip.awaitNanos(nanos); // 调用Condition.awaitNanos(nanos),线程最多挂起nanos纳秒,返回剩余等待时间} catch (InterruptedException ie) { // 过程中若被中断、屏障破坏或超时,会触发对应异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常if (g.broken)throw new BrokenBarrierException();// g 是线程进入 dowait() 时获取的 “当前代”(Generation 实例)// generation 是 CyclicBarrier 的全局属性,表示 “最新代”// 若“当前代”与“最新代”不一致,说明在该线程等待期间,屏障已经通过nextGeneration()切换到了新的 “代”(可能是因为之前的线程已凑齐并重置了屏障)// 此时当前线程的等待已经“过期”,无需继续处理,直接返回 index 即可if (g != generation)return index;// 超时,抛出异常TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 无论等待是否成功,最终都会释放 ReentrantLock,保证锁资源的回收lock.unlock();}
    }
    

4 breakBarrier()

  • breakBarrier()CyclicBarrier 中用于主动“破坏屏障”的核心方法。当线程执行出现中断异常或调用 reset() 时,会触发该方法,让所有等待的线程退出等待状态;

    // 结束CyclicBarrier的执行
    private void breakBarrier() {// 将当前 Generation 的 broken 属性设为 true,标识该屏障已被破坏。后续线程检查到这个标记时,会抛出 BrokenBarrierExceptiongeneration.broken = true;// 把计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一次屏障复用做准备count = parties;// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列// 当锁释放(unlock())后,这些线程会被唤醒,从而感知到 “屏障已破坏” 并退出等待trip.signalAll();
    }
    

5 reset()

  • reset()CyclicBarrier 用于主动重置屏障状态的方法,让 CyclicBarrier 可以“循环复用”,回到初始状态以支持新的一轮线程凑齐逻辑;

    // 重置CyclicBarrier
    public void reset() {// 获取 ReentrantLock 并加锁,确保 reset() 操作的原子性,避免多线程并发修改导致状态混乱final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 调用 breakBarrier() 方法,标记当前 Generation 为“破坏”状态,重置计数器,并唤醒所有等待的线程,让它们退出当前屏障的等待逻辑breakBarrier();// 生成新的 Generation 实例,重置计数器 count 为 parties(构造函数中指定的线程总数),让 CyclicBarrier 进入“新的一轮”,可以接收新的线程凑齐请求nextGeneration();} finally {// 无论重置过程是否出现异常,最终都会释放锁,保证锁资源的正确回收lock.unlock();}
    }
    

6 nextGeneration()

  • CyclicBarrier 实现**“循环复用”**的核心方法,用于在一组线程凑齐后,将屏障状态“归位”,以便支持下一轮线程的凑齐逻辑;
private void nextGeneration() {// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列。当锁释放后,这些线程会被唤醒,继续执行后续逻辑trip.signalAll();// 将计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一轮凑齐逻辑准备count = parties;// 创建新的 Generation 实例,标记为 “新的一代” 屏障,与上一轮的屏障状态隔离generation = new Generation();
}

7 总结

  • CyclicBarrier 基于 ReentrantLock + ConditionObject 实现。构造函数必须指定 parties(初始待执行线程数),内部通过 generation 这个带有布尔属性的结构,标记当前屏障执行过程中是否出现超时、异常、中断等情况;

  • 构造函数会把 parties 赋值给计数值 count,每当有一个线程执行 await() 方法,count 就会减 1;

  • 线程凑齐后的执行流程

    • count 减到 0 时,代表所有线程都准备就绪。此时会先判断是否初始化了 barrierCommand(构造函数中传入的 Runnable 任务),如果有则优先执行该任务
    • barrierCommand 执行完成后,会把 Condition 队列(Waiter 队列)中的线程转移到 AQS 队列,执行 unlock 操作后唤醒这些线程;同时将计数值 countgeneration 归位,为下一轮屏障复用做准备。
http://www.dtcms.com/a/360510.html

相关文章:

  • Selenium 等待机制:编写稳定可靠的自动化脚本
  • spi总线
  • 7.2elementplus的表单布局与模式
  • MCP SDK 学习二
  • 艾体宝案例 | 数据驱动破局:DOMO 如何重塑宠物零售门店的生存法则
  • Python 2025:AI代理、Rust与异步编程的新时代
  • 张柏芝亮相林家谦演唱会 再次演绎《任何天气》
  • Spring MVC 九大组件源码深度剖析(五):HandlerAdapter - 处理器的执行引擎
  • 三、环境搭建之Docker安装mysql
  • 一、计算机系统知识
  • Springcloud-----Nacos
  • 【influxdb】InfluxDB 2.x 线性写入详解
  • 层次分析法
  • Redis实现短信登录
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘SQLModel’问题
  • 37. 解数独
  • 解锁Tensor Core性能:深入探索CUDA Warp矩阵操作
  • Dify构建AI应用
  • FART 主动调用组件深度解析:破解 ART 下函数抽取壳的终极武器
  • #Datawhale 组队学习#8月-工作流自动化n8n入门-3
  • 第七章 使用角色和Asible内容集合简化Playbook
  • 4.4 光照(4) - 高光反射
  • 硬件工程师成长之路:从入门到精通的技术旅程
  • [Plecs基础知识系列]建立自定义模块/子系统(Subsystem)
  • C++ 面试高频考点 力扣 69. x 的平方根 二分查找 题解 每日一题
  • Linux网络socket套接字(中)
  • 切片语法[::-1]及其可用的类型
  • 基于单片机智能鞋柜/智能鞋橱/智能鞋盒
  • Linux - #操作系统概念 #权限
  • 获取某天的零点日期