聊聊 JUC 下的 CountDownLatch、CyclicBarrier、Phaser 和 Semaphore
聊聊 JUC 下的 CountDownLatch、CyclicBarrier、Phaser 和 Semaphore
(一)CountDownLatch
CountDownLatch 核心功能是让一个或多个线程等待其他线程完成操作。它的典型用法是通过一个计数器实现线程间的协调,计数器初始化后不可重置。
CountDownLatch 最经典的两种用法:
-
主线程等待多个子线程执行完任务后再继续执行
public class MainWaitForThreads {public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));latch.countDown(); // 任务完成 计数器减 1} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}latch.await(); // 主线程阻塞 直到计数器归零System.out.println("所有子线程任务完成 主线程继续执行");} }
-
多个子线程等待主线程发令后同时开始执行
public class ThreadsWaitForMain {public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch latch = new CountDownLatch(1); // 初始化计数器为 1for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {latch.await(); // 所有子线程在此等待System.out.println("子线程开始执行任务");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}Thread.sleep(1000);latch.countDown(); // 主线程发令 计数器归零 所有子线程同时开始System.out.println("主线程发令 所有子线程开始执行");} }
(二)CyclicBarrier
问题:如何实现多个线程之间互相等待?
-
使用 CountDownLatch 是否可以实现?答案是可以的。
public class ThreadsWaitForEachOther {public static void main(String[] args) {int threadCount = 3;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));latch.countDown();System.out.println("开始等待其他线程");latch.await();System.out.println("继续执行后续操作");} catch (InterruptedException e) {e.printStackTrace();}}).start();}} }
-
使用 CountDownLatch 需要先后调用 countDown 和 await 方法,显得有点麻烦,能不能合并一下?
-
我们可以使用 CyclicBarrier 通过直接调用 await 方法实现多线程互相等待。
public class ThreadsWaitForEachOther {public static void main(String[] args) {int threadCount = 3;CyclicBarrier barrier = new CyclicBarrier(threadCount);for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep((long) (Math.random() * 1000));barrier.await(); // 当所有线程都调用 await 后,所有线程继续执行System.out.println("继续执行后续操作");} catch (Exception e) {e.printStackTrace();}}).start();}} }
-
CyclicBarrier 还支持 重复使用 以及 同步回调 的功能,这里简单演示:
public class ThreadsCyclicWorkTogether {public static void main(String[] args) {int threadCount = 3; // 线程数量int cycles = 2; // 重复使用次数CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程完成一次同步,执行回调操作..."); });for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {for (int cycle = 0; cycle < cycles; cycle++) { // 模拟多次重复使用System.out.println("开始执行任务...");Thread.sleep((long) (Math.random() * 1000));System.out.println("完成任务,等待其他线程...");barrier.await(); // 等待所有线程完成任务System.out.println("继续执行后续操作...");}} catch (Exception e) {e.printStackTrace();}}).start();}} }
-
留给读者的问题:下面的代码会打印多少个 done ?(与 m 和 n 有关)
// 创建一个 CyclicBarrier 指定参与的线程数量为 n 并设置回调CyclicBarrier barrier = new CyclicBarrier(n, () -> {System.out.println("done");});// 启动 m 个线程for (int i = 0; i < m; i++) {new Thread(() -> {try {barrier.await();} catch (Exception e) {e.printStackTrace();}}).start();}
(三)Phaser
Phaser 在 CyclicBarrier 的基础上对重复使用更进一步区分为一个大任务的多个阶段,同时支持动态调整参与的线程数。
-
阶段(Phase)的概念
Phaser 将任务划分为多个阶段,当所有参与的线程都到达同步点(调用 arriveAndAwaitAdvance 或 arriveAndDeregister)后,当前阶段完成,onAdvance 方法被调用。 -
动态调整参与线程数量
Phaser 支持动态注册和注销线程:- 注册线程:通过 phaser.register() 方法,可以在运行时动态增加参与的线程数量。
- 注销线程:通过 phaser.arriveAndDeregister() 方法,线程可以在完成任务后注销自己,不再参与后续阶段。
- 注销线程后,后续阶段的参与线程数量会减少。
-
运行下面的代码,观察输出结果(我们的可能不一样~~)
Phaser phaser = new Phaser(1) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("Phase " + phase + " completed.");System.out.println("Registered parties " + registeredParties);return super.onAdvance(phase, registeredParties);}};for (int i = 0; i < 5; i++) {phaser.register(); // 注册新参与者new Thread(() -> {doWork("Phase 0");phaser.arriveAndAwaitAdvance(); // 阶段 0 同步doWork("Phase 1");phaser.arriveAndAwaitAdvance(); // 阶段 1 同步if (new Random().nextInt() % 2 == 0) {phaser.arriveAndDeregister(); // 阶段 1 完成后 随机注销} else {doWork("Phase 2");phaser.arriveAndAwaitAdvance(); // 阶段 2 同步doWork("Phase 3");phaser.arriveAndDeregister(); // 阶段 3 完成后注销}}).start();}phaser.arriveAndDeregister(); // 主线程注销自己
我的运行结果:
Phase 0
Phase 0
Phase 0
Phase 0
Phase 0
Phase 0 completed.
Registered parties 5
Phase 1
Phase 1
Phase 1
Phase 1
Phase 1
Phase 1 completed.
Registered parties 5
Phase 2
Phase 2
Phase 2 completed.
Registered parties 2
Phase 3
Phase 3
Phase 3 completed.
Registered parties 0
(四)Semaphore
Semaphore 意思是信号量,代表共享资源可以同时被多少线程占有。
Semaphore 的两个核心方法分别是 acquire 和 release
- acquire 操作,每个线程操作共享资源之前,先从信号量对象中获取许可(非阻塞 使用 tryAcquire 尝试获取失败后就直接放弃);
- release 操作,线程执行完对共享资源的操作后,再释放对应的许可,一般会使用 try 块包围执行操作并在 finally 块中执行 release 确保异常抛出后自动释放许可。
和 ReentrantLock 一样,信号量 Semaphore 也支持公平模式和非公平模式。
下面是一个使用 Semaphore 实现的阻塞队列,用于实现生产-消费模式。
public class SemaphoreBlockingQueue<T> {private final Queue<T> queue; // 内部队列,用于存储元素private final int capacity; // 队列的最大容量private final Semaphore availableSpots; // 控制可用的空位数量private final Semaphore availableItems; // 控制可用的元素数量public SemaphoreBlockingQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException();}this.capacity = capacity;this.queue = new LinkedList<>();this.availableSpots = new Semaphore(capacity); // 初始时,所有空位都可用this.availableItems = new Semaphore(0); // 初始时,没有可用的元素}// 放入元素到队列中public void put(T item) throws InterruptedException {availableSpots.acquire(); // 等待一个空位try {synchronized (queue) {queue.add(item); // 添加元素到队列}} finally {availableItems.release(); // 增加一个可用元素}}// 从队列中取出元素public T take() throws InterruptedException {availableItems.acquire(); // 等待一个可用元素try {synchronized (queue) {T item = queue.poll(); // 从队列中取出元素return item;}} finally {availableSpots.release(); // 增加一个可用空位}}// 获取队列的当前大小public int size() {synchronized (queue) {return queue.size();}}
}