当前位置: 首页 > news >正文

聊聊 JUC 下的 CountDownLatch、CyclicBarrier、Phaser 和 Semaphore

聊聊 JUC 下的 CountDownLatch、CyclicBarrier、Phaser 和 Semaphore

(一)CountDownLatch

CountDownLatch 核心功能是让一个或多个线程等待其他线程完成操作。它的典型用法是通过一个计数器实现线程间的协调,计数器初始化后不可重置。

CountDownLatch 最经典的两种用法:

  1. 主线程等待多个子线程执行完任务后再继续执行

    public class MainWaitForThreads {public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));latch.countDown(); // 任务完成 计数器减 1} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}latch.await(); // 主线程阻塞 直到计数器归零System.out.println("所有子线程任务完成 主线程继续执行");}
    }
    
  2. 多个子线程等待主线程发令后同时开始执行

    public class ThreadsWaitForMain {public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch latch = new CountDownLatch(1); // 初始化计数器为 1for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {latch.await(); // 所有子线程在此等待System.out.println("子线程开始执行任务");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}Thread.sleep(1000);latch.countDown(); // 主线程发令 计数器归零 所有子线程同时开始System.out.println("主线程发令 所有子线程开始执行");}
    }
    

(二)CyclicBarrier

问题:如何实现多个线程之间互相等待?

  • 使用 CountDownLatch 是否可以实现?答案是可以的。

    public class ThreadsWaitForEachOther {public static void main(String[] args) {int threadCount = 3;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));latch.countDown();System.out.println("开始等待其他线程");latch.await();System.out.println("继续执行后续操作");} catch (InterruptedException e) {e.printStackTrace();}}).start();}}
    }
    
  • 使用 CountDownLatch 需要先后调用 countDown 和 await 方法,显得有点麻烦,能不能合并一下?

  • 我们可以使用 CyclicBarrier 通过直接调用 await 方法实现多线程互相等待。

    public class ThreadsWaitForEachOther {public static void main(String[] args) {int threadCount = 3;CyclicBarrier barrier = new CyclicBarrier(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));barrier.await(); // 当所有线程都调用 await 后,所有线程继续执行System.out.println("继续执行后续操作");} catch (Exception e) {e.printStackTrace();}}).start();}}
    }
    
  • CyclicBarrier 还支持 重复使用 以及 同步回调 的功能,这里简单演示:

    public class ThreadsCyclicWorkTogether {public static void main(String[] args) {int threadCount = 3; // 线程数量int cycles = 2; // 重复使用次数CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程完成一次同步,执行回调操作..."); });for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {for (int cycle = 0; cycle < cycles; cycle++) { // 模拟多次重复使用System.out.println("开始执行任务...");Thread.sleep((long) (Math.random() * 1000));System.out.println("完成任务,等待其他线程...");barrier.await(); // 等待所有线程完成任务System.out.println("继续执行后续操作...");}} catch (Exception e) {e.printStackTrace();}}).start();}}
    }
    
  • 留给读者的问题:下面的代码会打印多少个 done ?(与 m 和 n 有关)

            // 创建一个 CyclicBarrier 指定参与的线程数量为 n 并设置回调CyclicBarrier barrier = new CyclicBarrier(n, () -> {System.out.println("done");});// 启动 m 个线程for (int i = 0; i < m; i++) {new Thread(() -> {try {barrier.await();} catch (Exception e) {e.printStackTrace();}}).start();}
    

(三)Phaser

Phaser 在 CyclicBarrier 的基础上对重复使用更进一步区分为一个大任务的多个阶段,同时支持动态调整参与的线程数。

  • 阶段(Phase)的概念
    Phaser 将任务划分为多个阶段,当所有参与的线程都到达同步点(调用 arriveAndAwaitAdvance 或 arriveAndDeregister)后,当前阶段完成,onAdvance 方法被调用。

  • 动态调整参与线程数量
    Phaser 支持动态注册和注销线程:

    • 注册线程:通过 phaser.register() 方法,可以在运行时动态增加参与的线程数量。
    • 注销线程:通过 phaser.arriveAndDeregister() 方法,线程可以在完成任务后注销自己,不再参与后续阶段。
    • 注销线程后,后续阶段的参与线程数量会减少。
  • 运行下面的代码,观察输出结果(我们的可能不一样~~)

            Phaser phaser = new Phaser(1) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("Phase " + phase + " completed.");System.out.println("Registered parties " + registeredParties);return super.onAdvance(phase, registeredParties);}};for (int i = 0; i < 5; i++) {phaser.register(); // 注册新参与者new Thread(() -> {doWork("Phase 0");phaser.arriveAndAwaitAdvance(); // 阶段 0 同步doWork("Phase 1");phaser.arriveAndAwaitAdvance(); // 阶段 1 同步if (new Random().nextInt() % 2 == 0) {phaser.arriveAndDeregister(); // 阶段 1 完成后 随机注销} else {doWork("Phase 2");phaser.arriveAndAwaitAdvance(); // 阶段 2 同步doWork("Phase 3");phaser.arriveAndDeregister(); // 阶段 3 完成后注销}}).start();}phaser.arriveAndDeregister(); // 主线程注销自己
    

我的运行结果:

Phase 0
Phase 0
Phase 0
Phase 0
Phase 0
Phase 0 completed.
Registered parties 5
Phase 1
Phase 1
Phase 1
Phase 1
Phase 1
Phase 1 completed.
Registered parties 5
Phase 2
Phase 2
Phase 2 completed.
Registered parties 2
Phase 3
Phase 3
Phase 3 completed.
Registered parties 0

(四)Semaphore

Semaphore 意思是信号量,代表共享资源可以同时被多少线程占有。

Semaphore 的两个核心方法分别是 acquire 和 release

  • acquire 操作,每个线程操作共享资源之前,先从信号量对象中获取许可(非阻塞 使用 tryAcquire 尝试获取失败后就直接放弃);
  • release 操作,线程执行完对共享资源的操作后,再释放对应的许可,一般会使用 try 块包围执行操作并在 finally 块中执行 release 确保异常抛出后自动释放许可。

和 ReentrantLock 一样,信号量 Semaphore 也支持公平模式和非公平模式。

下面是一个使用 Semaphore 实现的阻塞队列,用于实现生产-消费模式。

public class SemaphoreBlockingQueue<T> {private final Queue<T> queue; // 内部队列,用于存储元素private final int capacity;  // 队列的最大容量private final Semaphore availableSpots; // 控制可用的空位数量private final Semaphore availableItems; // 控制可用的元素数量public SemaphoreBlockingQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException();}this.capacity = capacity;this.queue = new LinkedList<>();this.availableSpots = new Semaphore(capacity); // 初始时,所有空位都可用this.availableItems = new Semaphore(0);        // 初始时,没有可用的元素}// 放入元素到队列中public void put(T item) throws InterruptedException {availableSpots.acquire(); // 等待一个空位try {synchronized (queue) {queue.add(item); // 添加元素到队列}} finally {availableItems.release(); // 增加一个可用元素}}// 从队列中取出元素public T take() throws InterruptedException {availableItems.acquire(); // 等待一个可用元素try {synchronized (queue) {T item = queue.poll(); // 从队列中取出元素return item;}} finally {availableSpots.release(); // 增加一个可用空位}}// 获取队列的当前大小public int size() {synchronized (queue) {return queue.size();}}
}

相关文章:

  • 一次引入第三方库导致的权限崩溃
  • 【YOLOv8改进- Backbone主干】CVPR2025 MambaOut :为图像分类任务设计的轻量级模型,曼巴永存!
  • std::map gdb调试ok ,直接运行会crash
  • 如何用 esProc 补充数据库 SQL 的缺失能力
  • 湖南(源点咨询)市场调研 商业综合体定位调研分享(中篇)
  • mapstruct使用详解
  • 12.第二阶段x64游戏实战-远程调试
  • 美团一面总结
  • list的一些常用接口
  • 流量统计实例
  • 域AD渗透手法【密码喷洒技术】
  • 【JavaScript】二十三、M端事件 + 轮播图Swiper插件
  • USB(TYPE-C)转串口(TTL)模块设计讲解
  • C++之 动态数组
  • 河南普瑞维升企业案例:日事清SOP流程与目标模块实现客户自主简报功能落地
  • 智能语音处理+1.5使用PocketSphinxshinx实现语音转文本(100%教会)
  • Pinpoint - 大型分布式系统的 APM(应用性能管理)工具
  • 强化学习的数学原理(五) MonteCarlo learning
  • MoogDB数据库日常维护技巧与常见问题解析
  • 未能安装包“Microsoft.VisualStudio.XXXXX
  • wordpress 消息机制/seo优化范畴
  • 网站推广工具工作室/网络广告文案案例
  • 网站抄袭别人的做可以吗/公众号推广合作平台
  • 网站编辑工具/关键字优化用什么系统
  • 网站建设属于哪个经营范围/网络推广图片大全
  • wordpress添加背景图/搜狗seo优化