并发编程——06 JUC并发同步工具类的应用实战
0 常用并发同步工具类的真实应用场景
-
JDK 提供了比
synchronized
更加高级的各种同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以实现更加丰富的多线程操作;
1 ReentrantLock(可重入的占用锁)
1.1 简介
-
ReentrantLock
是可重入的独占锁;- “可重入”是指同一线程能多次获取同一把锁,不会自己阻塞自己;
- “独占”是说同一时间,最多只有一个线程能成功拿到锁,其他线程得等待;
- 和
synchronized
作用类似,都是解决多线程并发访问共享资源时的线程安全问题;
-
相比
synchronized
,ReentrantLock
多了这些灵活特性:-
可中断:获取锁的过程中,线程能响应中断(比如其他地方调用了
interrupt()
),不用死等锁释放,更灵活控制执行流程; -
可设置超时时间:调用
tryLock(long timeout, TimeUnit unit)
时,线程在指定时间内没拿到锁,就会放弃尝试,避免无限阻塞; -
可设置为公平锁:默认
ReentrantLock
是 “非公平锁”(新线程和等待队列里的线程抢锁,可能插队),但它支持通过构造方法ReentrantLock(true)
设为“公平锁”,严格按线程等待顺序分配锁,减少线程饥饿(某些线程一直拿不到锁);
-
-
与
synchronized
一样,都支持可重入:synchronized
靠wait/notify
实现线程通信,只能关联一个等待队列;ReentrantLock
可通过newCondition()
创建多个Condition
,精准控制不同线程的等待 / 唤醒,比如生产者 - 消费者模型里,能区分 “生产条件”“消费条件” 分别处理;
-
应用场景:多线程抢共享资源时,需要独占访问保证数据安全,比如卖票系统(如下两图)、银行账户转账;
- 线程 A、B 抢锁:线程 A、B 同时尝试获取锁,假设线程 A 先拿到(锁的独占性,同一时间只有 A 能持有),此时 A 可以操作共享资源(比如修改车票库存 ),B 因为没抢到,进入 “等待” 状态;
- 线程 A 释放锁:A 操作完共享资源后,会释放锁;接着 B 再次尝试获取锁,这次就能成功拿到,然后 B 开始操作共享资源(修改车票库存)。
1.2 常用API
-
ReentrantLock 实现了Lock接口规范,常见API如下:
方法 方法声明 功能说明 lock
void lock()
获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回 lockInterruptibly
void lockInterruptibly() throws InterruptedException
可中断的获取锁,和 lock()
方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程tryLock
boolean tryLock()
尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到锁返回 true
,否则返回false
tryLock
(带超时)boolean tryLock(long time, TimeUnit unit) throws InterruptedException
超时获取锁,当前线程在以下三种情况下会返回:当前线程在超时时间内获取了锁;当前线程在超时时间内被中断;超时时间结束,返回 false
unlock
void unlock()
释放锁 newCondition
Condition newCondition()
获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的 await()
方法,而调用后,当前线程将释放锁 -
基本用法:
private final Lock lock = new ReentrantLock();public void foo() {// 获取锁lock.lock();try {// 程序执行逻辑} finally {// finally语句块可以确保lock被正确释放lock.unlock();} }// 尝试获取锁,最多等待 100 毫秒 if (lock.tryLock(100, TimeUnit.MILLISECONDS)) { try { // 成功获取到锁,执行需要同步的代码块 // ... 执行一些操作 ... } finally { // 释放锁 lock.unlock(); } } else { // 超时后仍未获取到锁,执行备选逻辑 // ... 执行一些不需要同步的操作 ... }
-
在使用时要注意以下 4 个问题:
- 默认情况下 ReentrantLock 为非公平锁而非公平锁;
- 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
- 加锁操作一定要放在
try
代码之前,这样可以避免未加锁成功又释放锁的异常; - 释放锁一定要放在
finally
中,否则会导致线程阻塞;
-
工作原理:
-
当有线程调用
lock
方法时,会用 CAS(Compare-And-Swap,比较并交换) 操作,尝试把 AQS(AbstractQueuedSynchronizer,抽象队列同步器,Java 并发包的核心基础组件 )内部的state
变量从0
改成1
;state=0
表示“锁没人用”,CAS 成功 → 线程拿到锁,开始执行临界区代码(操作共享资源);state=1
表示“锁被占用”,CAS 失败 → 线程抢锁失败,进入阻塞队列(CLH 队列,按 FIFO 排队 ) 等待;
-
抢锁失败的线程,会被包装成节点(Node),加入队列尾部(tail),队列头部是
head
节点(代表 “即将拿到锁的线程”);- 队列里的线程,都在等锁释放,避免线程忙等(一直重试抢锁,浪费 CPU 资源);
- 队列是 FIFO(先进先出) 顺序,理论上保证线程公平性,但实际还受“公平锁 / 非公平锁”策略影响;
-
当持有锁的线程执行完
unlock()
,会把state
改回0
(释放锁),然后唤醒队列里的线程。这时分两种策略:-
公平锁(
ReentrantLock(true)
):严格按队列顺序唤醒:释放锁后,优先唤醒head
节点的下一个节点(head.next
),让队列里“等最久”的线程拿到锁;- 优点:绝对公平,避免线程 饥饿(某些线程一直抢不到锁);
- 缺点:频繁唤醒 / 切换线程,性能略低(线程上下文切换有开销);
- 优点:绝对公平,避免线程 饥饿(某些线程一直抢不到锁);
-
非公平锁(默认策略,
ReentrantLock()
):释放锁后,不严格按队列顺序,允许新线程和队列里被唤醒的线程重新用 CAS 抢锁:-
新线程(没进队列的)可能直接 CAS 抢锁成功(插队),不用进队列等;
-
队列里的线程也会被唤醒,参与竞争;
-
优点:减少线程切换,吞吐量更高(适合竞争不激烈的场景);
-
缺点:可能让队列里的线程等更久,存在小概率线程饥饿;
-
-
-
1.3 使用
1.3.1 独占锁
-
模拟抢票场景。8张票,10个人抢,如果不加锁,会出现什么问题?
/*** 模拟抢票场景*/ public class ReentrantLockDemo {// 创建 ReentrantLock 实例,默认使用非公平锁策略private final ReentrantLock lock = new ReentrantLock();//默认非公平// 共享资源:总票数,会有多个线程同时操作这个变量private static int tickets = 8;/*** 购买车票的方法* 核心逻辑:通过加锁保证同一时间只有一个线程能执行购票操作*/public void buyTicket() {// 1. 获取锁:调用 lock() 方法,当前线程会尝试获取锁// 如果锁未被占用,则当前线程获得锁并继续执行// 如果锁已被占用,则当前线程会进入阻塞队列等待lock.lock(); // 获取锁// 2. try-finally 结构保证锁一定会被释放// 即使代码执行过程中发生异常,finally 块也会执行解锁操作try {// 3. 临界区:操作共享资源(tickets 变量)if (tickets > 0) { // 检查是否还有剩余车票try {// 休眠 10ms,放大并发问题的可能性// 如果不加锁,这里会出现多个线程同时进入判断并扣减票数的情况Thread.sleep(10); // 模拟出并发效果} catch (InterruptedException e) {e.printStackTrace();}// 打印购票信息,并将票数减 1(原子操作)System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");} else {// 票已售罄时的提示System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");}} finally {// 4. 释放锁:无论是否发生异常,都必须释放锁// 否则会导致其他线程永远无法获取锁,造成死锁lock.unlock(); // 释放锁}}public static void main(String[] args) {// 创建抢票系统实例(共享同一个锁和票数变量)ReentrantLockDemo ticketSystem = new ReentrantLockDemo();// 创建 10 个线程模拟 10 个用户抢票(总票数只有 8 张)for (int i = 1; i <= 10; i++) {Thread thread = new Thread(() -> {// 每个线程执行抢票操作ticketSystem.buyTicket(); // 抢票}, "线程" + i); // 给线程命名,方便观察输出// 启动线程,线程进入就绪状态,等待 CPU 调度thread.start();}try {// 主线程休眠 3000ms,等待所有抢票线程执行完毕// 避免主线程提前打印剩余票数Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 打印最终剩余票数,验证是否正确(应该为 0)System.out.println("剩余票数:" + tickets);} }
-
不加锁:出现超卖问题
-
加锁:正常,两人抢票失败
1.3.2 公平锁和非公平锁
-
ReentrantLock 支持公平锁和非公平锁两种模式:
- 公平锁:线程在获取锁时,按照线程等待的先后顺序获取锁;
- 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock 默认是非公平锁;
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁 ReentrantLock lock = new ReentrantLock(true); //公平锁
-
比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:
1.3.3 可重入锁
-
可重入锁又名递归锁,是指在同一线程里,只要锁对象相同,内层方法(或代码块)能直接复用已获取的锁,不用重新竞争。比如线程执行
方法A
时拿到锁,方法A
里调用方法B
(也需要同一把锁),线程进方法B
时不用等自己释放锁,直接继续用; -
Java 中
ReentrantLock
和synchronized
都是可重入锁synchronized
:隐式的(JVM 自动管加锁 / 释放)、可重入的内置锁,只要是同一线程、同一对象锁,内层同步代码直接重入;ReentrantLock
:显式的(手动lock()
加锁、unlock()
释放)可重入锁,功能更灵活(支持公平 / 非公平、可中断、超时获取等),但得手动配对加锁释放,否则容易死锁;
-
可重入锁的一个优点是可一定程度避免死锁:
- 要是锁不可重入,同一线程内层方法需要锁时,会因为自己占着锁没放,导致自己等自己(阻塞),最后死锁。可重入锁允许同一线程重复拿锁,从设计上就避免了这种自己堵死自己的情况;
- 注意:只是一定程度避免,要是代码逻辑乱(比如忘记释放锁、不同锁交叉嵌套不当),还是可能死锁,只是解决了同一线程重入锁这类场景的死锁风险;
-
应用场景:
- 递归操作:递归函数里加锁,每次递归调用都是内层“方法”,可重入锁让线程不用反复竞争锁,比如计算阶乘时用
ReentrantLock
保护共享变量,递归调用时直接重入; - 调用同一类其他方法:类里多个
synchronized
方法,线程调完一个调另一个,因为是同一对象锁,直接重入,不用额外处理; - 锁嵌套:多层代码块都需要同一把锁,外层加锁后,内层嵌套的加锁逻辑直接复用,不用释放外层锁再重新加;
- 递归操作:递归函数里加锁,每次递归调用都是内层“方法”,可重入锁让线程不用反复竞争锁,比如计算阶乘时用
-
例:
class Counter {// 创建 ReentrantLock 对象,作为可重入锁的实例// ReentrantLock 是显式锁,支持可重入、可中断、公平/非公平等特性private final ReentrantLock lock = new ReentrantLock(); // 递归调用方法,演示可重入锁的核心场景public void recursiveCall(int num) {// 1. 获取锁:同一线程再次调用时,可直接重入,不会阻塞自己// 可重入的关键体现:锁对象识别当前持有线程,允许重复获取lock.lock(); try {// 递归终止条件:num 减到 0 时停止if (num == 0) {return;}// 打印当前递归层级,证明方法执行(锁已成功获取)System.out.println("执行递归,num = " + num);// 2. 递归调用自身:再次进入方法时,会再次执行 lock.lock()// 由于是【同一线程】操作【同一把锁】,可直接重入,不会阻塞recursiveCall(num - 1); } finally {// 3. 释放锁:递归调用多少次,就要释放多少次// 保证锁的获取与释放次数严格匹配,避免死锁lock.unlock(); }}// 主方法:测试可重入锁的递归场景public static void main(String[] args) throws InterruptedException {// 创建 Counter 实例,所有递归调用共享同一把锁Counter counter = new Counter(); // 启动递归测试:从 num=10 开始调用// 预期行为:线程安全执行递归,不会因锁重入导致阻塞counter.recursiveCall(10); } }
1.3.4 Condition 详解
-
Condition
是 Java 并发包里的线程协调工具,依赖Lock
(如ReentrantLock
)使用,比Object
的wait/notify
更灵活,解决线程间按条件等待 / 唤醒的问题。可以把它理解成:给Lock
搭配“专属等待队列”,让线程能按需等待条件、精准唤醒,而不是像wait/notify
只能用 Object 的单一队列; -
核心优势(对比
Object.wait/notify
)-
多条件分离:
Object
里,一个对象只有 1 个等待队列(所有wait()
的线程都挤在一起);Condition
让一个Lock
可以有多个等待队列(比如锁lock
可以创建condition1
、condition2
,不同条件的线程进不同队列),唤醒时能精准选队列,避免唤醒无关线程;
-
更灵活的等待控制:
- 支持超时等待(
await(long time, TimeUnit unit)
),避免线程无限阻塞; - 唤醒时可选单个唤醒(
signal()
)或全部唤醒(signalAll()
),比notify()
(随机唤醒一个)、notifyAll()
(唤醒全部)更精准;
- 支持超时等待(
-
-
核心方法解析
返回值类型 方法 作用说明 void
await()
让当前线程进入等待,直到被 signal()
/signalAll()
唤醒、被中断,或意外唤醒(如假唤醒) 等待前释放当前持有的Lock
,唤醒后重新竞争获取锁,再继续执行boolean
await(long time, TimeUnit unit)
限时等待:等待 time
时间后,若没被唤醒就自动返回false
;被唤醒则返回true
同样会先释放锁,超时 / 唤醒后重新抢锁。void
signal()
唤醒 此 Condition
等待队列中一个线程(选一个唤醒,类似notify()
但更可控) 唤醒后,线程不会直接执行,要重新竞争锁void
signalAll()
唤醒 此 Condition
等待队列中所有线程(类似notifyAll()
) 线程被唤醒后,重新竞争锁,抢到锁的继续执行 -
比如生产者 - 消费者模型中,想区分“队列满了让生产者等”和“队列空了让消费者等”:
-
用
ReentrantLock
加锁,然后创建两个Condition
:notFull
(生产者等)、notEmpty
(消费者等); -
生产者发现队列满了 → 调用
notFull.await()
等待;消费者取走数据后 → 调用notFull.signal()
唤醒生产者; -
消费者发现队列空了 → 调用
notEmpty.await()
等待;生产者放入数据后 → 调用notEmpty.signal()
唤醒消费者; -
这样就能 精准控制不同条件的线程等待 / 唤醒,比
wait/notify
更清晰。
-
1.3.5 结合 Condition 实现生产者消费者模式
案例:基于ReentrantLock
和Condition
实现一个简单队列
public class ReentrantLockDemo3 {public static void main(String[] args) {// 1. 创建容量为 5 的队列,作为生产者和消费者共享的资源Queue queue = new Queue(5);// 2. 启动生产者线程:传入队列,线程执行 Producer 的 run 方法new Thread(new Producer(queue)).start();// 3. 启动消费者线程:传入队列,线程执行 Customer 的 run 方法new Thread(new Customer(queue)).start();}
}/*** 队列封装类:* 用 ReentrantLock + Condition 实现线程安全的生产者-消费者队列* 核心逻辑:* - 队列满时,生产者通过 notFull.await() 等待;* - 队列空时,消费者通过 notEmpty.await() 等待;* - 生产/消费后,用 signal() 唤醒对应等待线程*/
class Queue {private Object[] items; // 存储队列元素的数组int size = 0; // 当前队列中元素数量int takeIndex = 0; // 消费者取元素的索引int putIndex = 0; // 生产者放元素的索引private ReentrantLock lock; // 控制并发的锁public Condition notEmpty; // 消费者等待条件:队列空时阻塞,生产后唤醒public Condition notFull; // 生产者等待条件:队列满时阻塞,消费后唤醒// 初始化队列,指定容量public Queue(int capacity) {this.items = new Object[capacity];lock = new ReentrantLock();// 为同一把锁创建两个 Condition,分别控制“空”和“满”的等待notEmpty = lock.newCondition();notFull = lock.newCondition();}/*** 生产者放入元素的方法* 必须在 lock 保护下调用,保证线程安全*/public void put(Object value) throws Exception {// 加锁:同一时间只有一个线程能操作队列lock.lock();try {// 队列满了(size == 数组长度),让生产者等待// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)while (size == items.length) notFull.await(); // 释放锁,进入等待队列,直到被唤醒// 队列有空位,放入元素items[putIndex] = value;// 索引循环:如果放到数组末尾,重置为 0if (++putIndex == items.length) putIndex = 0;size++; // 元素数量+1// 生产完成,唤醒等待的消费者(队列非空了)notEmpty.signal(); } finally {// 测试用:打印生产日志(实际可删除或放业务逻辑里)System.out.println("producer生产:" + value);// 必须释放锁:无论是否异常,保证锁能被其他线程获取lock.unlock();}}/*** 消费者取出元素的方法* 必须在 lock 保护下调用,保证线程安全*/public Object take() throws Exception {// 加锁:同一时间只有一个线程能操作队列lock.lock();try {// 队列空了(size == 0),让消费者等待// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)while (size == 0) notEmpty.await(); // 释放锁,进入等待队列,直到被唤醒// 取出元素Object value = items[takeIndex];items[takeIndex] = null; // 清空位置,避免内存泄漏// 索引循环:如果取到数组末尾,重置为 0if (++takeIndex == items.length) takeIndex = 0;size--; // 元素数量-1// 消费完成,唤醒等待的生产者(队列非满了)notFull.signal(); return value; // 返回取出的元素} finally {// 释放锁:无论是否异常,保证锁能被其他线程获取lock.unlock();}}
}/*** 生产者线程:* 每隔 1 秒生产一个随机数(0~999),放入队列*/
class Producer implements Runnable {private Queue queue; // 共享的队列public Producer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) { // 无限循环生产Thread.sleep(1000); // 每隔 1 秒生产一次// 生产随机数,放入队列queue.put(new Random().nextInt(1000));}} catch (Exception e) {e.printStackTrace(); // 捕获并打印异常}}
}/*** 消费者线程:* 每隔 2 秒从队列取出一个元素,打印消费日志*/
class Customer implements Runnable {private Queue queue; // 共享的队列public Customer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) { // 无限循环消费Thread.sleep(2000); // 每隔 2 秒消费一次// 取出元素并打印消费日志System.out.println("consumer消费:" + queue.take());}} catch (Exception e) {e.printStackTrace(); // 捕获并打印异常}}
}
-
Condition
的作用:notEmpty
:消费者专属等待条件。队列空时,消费者调用notEmpty.await()
释放锁并阻塞;生产者放入元素后,用notEmpty.signal()
唤醒。notFull
:生产者专属等待条件。队列满时,生产者调用notFull.await()
释放锁并阻塞;消费者取出元素后,用notFull.signal()
唤醒;- 注意:
await()
和signal()
都必须被lock.lock()
和lock.unlock()
包裹,即都在 lock 保护范围内;
-
为什么用
while
而非if
检查条件?防止**“假唤醒”**:线程可能在未被signal()
的情况下醒来(如系统调度)。用while
会重新检查条件,确保队列状态符合预期后再继续执行; -
锁的配对使用:
lock.lock()
和lock.unlock()
必须成对出现,且unlock()
放在finally
中,保证无论是否发生异常,锁都会释放,避免死锁; -
生产者-消费者的节奏:生产者 1 秒生产一次,消费者 2 秒消费一次 → 队列会逐渐被填满(生产者更快),但通过
Condition
协调,不会出现“队列满了还生产”或“队列空了还消费”的情况。
1.4 应用场景总结
-
ReentrantLock
最基本的作用是多线程环境下,让共享资源只能被一个线程独占访问,保证操作共享资源时数据不会乱(比如多个线程同时改同一个变量,用锁让它们排队改); -
应用场景总结:
-
解决多线程竞争资源的问题
-
场景描述:多个线程抢同一资源(比如同时写同一个数据库、操作同一个文件、修改同一个内存变量),需要保证同一时间只有一个线程能改,避免数据冲突;
-
例子:多个线程同时往数据库同一表插入/修改数据,用
ReentrantLock
加锁,让线程排队执行写操作,保证数据最终是正确的,不会因为并发写入导致数据混乱(比如库存扣减、订单状态修改);
-
-
实现多线程任务的顺序执行
-
场景描述:希望线程 A 执行完某段逻辑后,线程 B 再执行;或者多个线程严格按特定顺序跑任务;
-
例子:比如线程 1 先初始化配置,线程 2 再加载数据,线程 3 最后处理业务。用
ReentrantLock
配合Condition
(条件变量 ),线程 2 等线程 1 释放锁并发信号后再执行,线程 3 等线程 2 发信号后执行,实现顺序控制;
-
-
实现多线程等待/通知机制
-
场景描述:线程 A 完成某个关键步骤后,需要主动通知线程 B、C 可以继续执行了;或者线程需要等待某个条件满足后再干活(类似生产者 - 消费者模型);
-
例子:生产者线程生产完数据,通过
ReentrantLock
的Condition
发信号,唤醒等待的消费者线程来处理数据;反之,消费者处理完,也能发信号让生产者继续生产。这比Object
的wait/notify
更灵活,能精准控制哪些线程被唤醒。
-
-
2 Semaphore(信号量)
2.1 简介
-
Semaphore
是多线程同步工具,核心解决控制同时访问共享资源的线程数量,让有限的资源(比如数据库连接、文件句柄)在同一时间被合理数量的线程使用,避免因资源耗尽导致系统崩溃; -
工作原理:使用
Semaphore
的过程实际上是多个线程获取访问共享资源许可证的过程-
Semaphore
内部维护一个计数器,可以把它理解成剩余许可证数量。比如设置Semaphore(3)
,就代表最多允许 3 个线程同时用资源,相当于发 3 张“访问许可证”; -
线程获取许可证(
acquire()
):- 线程要访问共享资源时,必须先调用
acquire()
拿许可证; - 如果计数器 > 0(还有许可证):线程拿到许可证,计数器减 1,然后继续执行(访问资源);
- 如果计数器 == 0(没许可证了):线程会被阻塞,直到有其他线程释放许可证;
- 线程要访问共享资源时,必须先调用
-
线程释放许可证(
release()
):线程用完资源后,调用release()
归还许可证,计数器加 1,这样阻塞的线程里就有机会拿到新的许可证,继续执行;
-
-
Semaphore
专门用来限制资源的并发访问数量,典型场景比如:-
数据库连接池:假设连接池只有 10 个连接,用
Semaphore(10)
控制,避免几百个线程同时抢连接,把数据库压垮; -
文件访问:如果一个文件同一时间只能被 3 个线程读写,用
Semaphore(3)
限制,防止文件被疯狂读写导致错误; -
网络请求:控制同时发起的 HTTP 请求数量,避免把服务器或本地网络打满,保证系统稳定。
-
2.2 常用API
2.2.1 构造器
-
构造器是用来创建
Semaphore
对象的,主要决定两件事:- 允许同时访问资源的 最大线程数(许可证数量);
- 线程获取许可证时,是用 公平策略 还是 非公平策略;
-
Semaphore
有两个构造器:public Semaphore(int permits) {sync = new NonfairSync(permits); }
-
permits
:设置最大并发数(比如传3
,就代表最多允许 3 个线程同时拿许可证); -
NonfairSync
:默认用非公平策略。意思是,当许可证释放时,新线程和等待队列里的线程一起抢许可证,新线程可能“插队”,不用严格排队; -
等价写法:
new Semaphore(3)
等价于new Semaphore(3, false)
,因为默认是非公平的;
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
-
permits
:同上,设置最大并发数; -
fair
:布尔值,决定用公平还是非公平策略:fair = true
:用公平策略(FairSync
),线程严格按“等待队列顺序”拿许可证,先等的线程一定先拿到,不会被插队;fair = false
:用非公平策略(NonfairSync
),新线程和等待线程一起抢,可能插队。
-
2.2.2 acquire
方法
-
acquire
是Semaphore
用于获取许可证的核心方法,特点是获取不到许可证时,线程会一直阻塞等待,直到拿到许可证或者被中断。它有两种重载形式,适配不同的许可证获取需求;-
void acquire() throws InterruptedException
-
尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数减 1)并返回;如果没有可用许可证,当前线程会进入阻塞状态,直到:
- 其他线程释放许可证(使当前线程有可用许可证),此时当前线程会竞争获取许可证;
- 当前线程被其他线程中断(触发
InterruptedException
);
-
类比:类似去银行办事,只有 1 个窗口(许可证 = 1),当前窗口有人(没许可证),你就只能排队等着,直到窗口空出来(有人办完业务,释放许可证);
-
-
void acquire(int permits) throws InterruptedException
-
功能:尝试获取指定数量(
permits
)的许可证。如果Semaphore
中剩余许可证数量 ≥permits
,直接获取(许可证计数减permits
)并返回;否则,线程进入阻塞,直到:- 其他线程释放许可证,使剩余许可证 ≥
permits
,当前线程竞争获取; - 当前线程被中断(触发
InterruptedException
);
- 其他线程释放许可证,使剩余许可证 ≥
-
注意:使用该方法时,要确保最终能释放对应数量的许可证,否则容易导致其他线程长期无法获取足够许可证而阻塞,引发“资源饥饿”问题;
-
类比:如果银行办理大额业务需要占 2 个窗口(
permits = 2
),只有当至少空出 2 个窗口时,你才能开始办理,否则就得等;
-
-
-
例:主线程先占许可证,子线程等待,主线程释放后子线程再获取
// 1. 创建 Semaphore:许可证数量=1,公平策略(true 表示严格按线程等待顺序分配许可证) final Semaphore semaphore = new Semaphore(1, true); // 2. 主线程直接抢许可证:因为初始许可证是 1,主线程能直接拿到(许可证计数变为 0) semaphore.acquire(); // 3. 创建子线程,尝试获取许可证 Thread t = new Thread(() -> { try {// 子线程执行到这,尝试获取许可证:但主线程已占用(许可证计数 0),所以子线程进入阻塞等待System.out.println("子线程等待获取permit"); semaphore.acquire(); // 4. 子线程被唤醒(主线程释放许可证后),执行到这,打印获取成功System.out.println("子线程获取到permit"); } catch (InterruptedException e) { // 子线程等待中被中断时,会走到这e.printStackTrace(); } finally {// 5. 子线程执行完,释放许可证(许可证计数 +1 )semaphore.release(); } }); t.start(); // 启动子线程try {// 6. 主线程休眠 5 秒:模拟做其他事情,期间子线程一直阻塞等待许可证TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }// 7. 主线程释放许可证(许可证计数从 0 变为 1 ),子线程此时会被唤醒,竞争获取许可证 System.out.println("主线程释放permit"); semaphore.release();
- 初始化:
Semaphore
许可证数量1
,公平策略; - 主线程抢许可证:
semaphore.acquire()
执行后,许可证计数0
,主线程持有许可证; - 子线程启动:执行
semaphore.acquire()
时,因许可证0
,子线程进入阻塞,打印子线程等待获取permit
; - 主线程休眠:5 秒内,子线程一直阻塞;
- 主线程释放许可证:
semaphore.release()
执行,许可证计数1
;因为是公平策略,阻塞的子线程被唤醒,竞争拿到许可证,执行后续逻辑,打印子线程获取到permit
; - 子线程释放许可证:子线程执行
semaphore.release()
,许可证计数0
(子线程获取时减 1,释放时加 1,整体回到初始逻辑)。
- 初始化:
2.2.3 tryAcquire
方法
-
tryAcquire
是Semaphore
用于尝试获取许可证的方法,特点是:-
非阻塞优先:如果拿不到许可证,不会一直阻塞,而是直接返回
false
(表示没拿到); -
灵活控制:支持获取 1 个许可证、获取指定数量许可证、带超时等待等场景,比
acquire
更灵活;
-
-
tryAcquire
方法有三种重载形式-
boolean tryAcquire()
:尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数 -1),返回true
;否则,直接返回false
(不阻塞); -
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
:尝试获取 1 个许可证,但增加超时等待机制。如果一开始没许可证,线程会阻塞最多timeout
时间:- 期间有许可证释放,线程拿到许可证,返回
true
; - 超时后还没许可证,返回
false
; - 等待中被中断,抛出
InterruptedException
;
- 期间有许可证释放,线程拿到许可证,返回
-
boolean tryAcquire(int permits)
:尝试获取 指定数量(permits
)的许可证。如果Semaphore
剩余许可证 ≥permits
,直接获取(计数 -permits
),返回true
;否则,返回false
(不阻塞);- 注意:要确保最终释放对应数量的许可证,否则会导致其他线程无法获取足够许可证;
-
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
:结合指定数量和超时等待,尝试获取permits
个许可证,最多等timeout
时间,逻辑和上面类似;
-
-
例:
// 1. 创建 Semaphore:1 个许可证,公平策略 final Semaphore semaphore = new Semaphore(1, true); // 2. 启动线程 t,尝试获取许可证 new Thread(() -> { // 2.1 尝试获取 1 个许可证:返回 true/falseboolean gotPermit = semaphore.tryAcquire(); // 2.2 如果拿到许可证if (gotPermit) { try {System.out.println(Thread.currentThread() + " 拿到许可证");TimeUnit.SECONDS.sleep(5); // 模拟占用 5 秒} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 释放许可证(必须!)}} }).start();// 3. 主线程休眠 1 秒:确保线程 t 启动并拿到许可证 TimeUnit.SECONDS.sleep(1); // 4. 主线程尝试“带超时的获取”:最多等 3 秒 if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println("主线程拿到许可证"); } else {System.out.println("主线程 3 秒内没拿到许可证,失败"); }
- 线程 t 启动:
tryAcquire()
拿到许可证(gotPermit = true
),打印日志,休眠 5 秒; - 主线程休眠 1 秒:等线程 t 启动并占用许可证;
- 主线程尝试获取:调用
tryAcquire(3, ...)
,但线程 t 会占用许可证 5 秒,所以主线程等 3 秒后超时,进入else
,打印主线程 3 秒内没拿到许可证,失败
;
- 线程 t 启动:
-
例:
// 1. 创建 Semaphore:5 个许可证,公平策略 final Semaphore semaphore = new Semaphore(5, true); // 2. 尝试获取 5 个许可证:成功(因为初始有 5 个) assert semaphore.tryAcquire(5) : "获取 5 个许可证失败"; // 3. 此时许可证已耗尽(5 - 5 = 0 ),尝试获取 1 个许可证:失败 assert !semaphore.tryAcquire() : "获取 1 个许可证失败";
-
tryAcquire(5)
一次性拿 5 个许可证,成功后Semaphore
剩余 0 个; -
后续
tryAcquire()
(默认拿 1 个)返回false
,因为没许可证了。
-
2.2.4 正确使用release
-
Semaphore
的release
是用来 归还许可证 的,让其他线程有机会获取。它有两种形式:-
release()
:归还 1 个许可证,内部计数器 +1; -
release(int permits)
:归还permits
个许可证,内部计数器 +permits
; -
关键点:许可证数量有限,必须谁拿的谁还,否则会导致计数器混乱,破坏
Semaphore
控制并发的逻辑;
-
-
错误用法示例(用
finally
无脑释放)// 1. 创建 1 个许可证的 Semaphore(公平策略) final Semaphore semaphore = new Semaphore(1, true); // 2. 线程 t1:拿到许可证后,霸占 1 小时(休眠) Thread t1 = new Thread(() -> { try {semaphore.acquire(); // 拿许可证System.out.println("t1 拿到许可证");TimeUnit.HOURS.sleep(1); // 霸占 1 小时} catch (InterruptedException e) {System.out.println("t1 被中断");} finally {semaphore.release(); // 不管怎样,finally 里释放} }); t1.start(); TimeUnit.SECONDS.sleep(1); // 等 t1 启动// 3. 线程 t2:尝试拿许可证,但若被中断,会在 finally 里错误释放 Thread t2 = new Thread(() -> { try {semaphore.acquire(); // 尝试拿许可证(但 t1 没释放,所以会阻塞)System.out.println("t2 拿到许可证");} catch (InterruptedException e) {System.out.println("t2 被中断");} finally {semaphore.release(); // 问题:t2 没拿到许可证,却释放了!} }); t2.start(); TimeUnit.SECONDS.sleep(2); // 4. 主线程逻辑:中断 t2,然后自己拿许可证 t2.interrupt(); // 中断 t2(此时 t2 还在阻塞等许可证) semaphore.acquire(); // 主线程尝试拿许可证 System.out.println("主线程拿到许可证");
-
t2
根本没拿到许可证(因为t1
霸占着),但由于release
写在finally
里,t2
被中断时,会错误地归还 1 个许可证(相当于无中生有多了 1 个许可证); -
原本
Semaphore
只有 1 个许可证,被t1
占用后,计数器是 0。但t2
错误释放后,计数器变成 1,导致主线程能直接拿到许可证(而预期中t1
要 1 小时后才释放,主线程不该拿到);
-
-
修改后的
t2
逻辑:Thread t2 = new Thread(() -> { boolean acquired = false; // 标记是否成功拿到许可证try {semaphore.acquire(); // 尝试拿许可证acquired = true; // 拿到了,标记为 trueSystem.out.println("t2 拿到许可证");} catch (InterruptedException e) {System.out.println("t2 被中断");} finally {// 只有成功拿到许可证(acquired=true),才释放if (acquired) { semaphore.release(); }} });
- 用
acquired
标记是否真的拿到许可证,只有拿到许可证的线程,才在finally
里释放,避免没拿到却释放的问题;
- 用
-
Semaphore
的设计里,不强制检查释放许可证的线程是否真的拿过,而是靠开发者自己保证。官方文档说明:“没有要求释放许可证的线程必须是通过acquire
拿到许可证的,正确用法由开发者通过编程规范保证。”
2.3 使用
2.3.1 Semaphore
实现商品服务接口限流
-
Semaphore
可以用于实现限流功能,即限制某个操作或资源在一定时间内的访问次数; -
代码:限制同一时间,最多有
N
个线程能访问接口(比如下面代码中的N=2
),超过的请求要么排队,要么直接拒绝,保证服务稳定;@Slf4j public class SemaphoreDemo {/*** 同一时刻最多只允许有两个并发* 即许可证数量=2 → 同一时间最多允许 2 个线程访问*/private static Semaphore semaphore = new Semaphore(2);// 创建线程池,最多 10 个线程(模拟大量请求)private static Executor executor = Executors.newFixedThreadPool(10);public static void main(String[] args) {// 循环 10 次,模拟 10 个请求for(int i = 0; i < 10; i++){ // 提交任务到线程池,执行 getProductInfo() 或 getProductInfo2()executor.execute(() -> getProductInfo());}}// 阻塞式限流public static String getProductInfo() {// 1. 尝试获取许可证:拿不到就阻塞,直到有许可证try {semaphore.acquire();log.info("请求服务"); // 拿到许可证,执行逻辑Thread.sleep(2000); // 模拟接口执行耗时(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 释放许可证:不管是否异常,必须释放,让其他线程能用semaphore.release();}return "返回商品详情信息";}// 非阻塞式限流public static String getProductInfo2() {// 1. 尝试获取许可证:拿不到直接返回 false,不阻塞if(!semaphore.tryAcquire()){log.error("请求被流控了"); // 没拿到许可证,直接拒绝return "请求被流控了";}try {log.info("请求服务"); // 拿到许可证,执行逻辑Thread.sleep(2000); // 模拟接口执行耗时(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 释放许可证:必须释放semaphore.release();}return "返回商品详情信息";} }
-
假设运行
getProductInfo()
:-
前 2 个线程能拿到许可证,执行
log.info("请求服务")
,然后 sleep 2 秒。 -
第 3~10 个线程调用
acquire()
时,因为许可证被占满,会阻塞等待。 -
2 秒后,前 2 个线程
release()
归还许可证,阻塞的线程开始竞争,每次放 2 个执行,直到所有请求处理完。
-
-
如果运行
getProductInfo2()
:前 2 个线程拿到许可证,执行逻辑;第 3 个线程tryAcquire()
返回false
,直接走限流逻辑(log.error
)。
2.3.2 Semaphore
限制同时在线的用户数量
-
模拟一个登录系统,最多限制给定数量的人员同时在线,如果所能申请的许可证不足,那么将告诉用户无法登录,请稍后重试;
-
主类
SemaphoreDemo7
(模拟多用户登录):public class SemaphoreDemo7 {public static void main(String[] args) {// 最多允许 10 个用户同时在线final int MAX_PERMIT_LOGIN_ACCOUNT = 10; LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);// 启动 20 个线程(模拟 20 个用户登录)IntStream.range(0, 20).forEach(i -> new Thread(() -> {// 执行登录boolean login = loginService.login(); if (!login) {// 登录失败(超过并发数)System.out.println(Thread.currentThread() + " 因超过最大在线数被拒绝");return;}try {simulateWork(); // 模拟登录后的业务操作} finally {loginService.logout(); // 退出时释放许可证}}, "User-" + i).start());}// 模拟登录后的业务操作(随机休眠,模拟用户在线时长)private static void simulateWork() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) { /* 忽略中断 */ }} }
-
创建
LoginService
,传入最大在线数10
; -
启动 20 个线程(用户),调用
loginService.login()
尝试登录; -
登录成功 → 执行
simulateWork()
(模拟用户在线操作,随机休眠 )→ 退出时logout()
释放许可证; -
登录失败 → 直接提示并返回;
-
-
LoginService
类(控制登录逻辑)private static class LoginService {private final Semaphore semaphore; public LoginService(int maxPermitLoginAccount) {// 创建 Semaphore:许可证数量=maxPermitLoginAccount,公平策略(true)this.semaphore = new Semaphore(maxPermitLoginAccount, true); }public boolean login() {// 尝试获取许可证:非阻塞,拿不到直接返回 falseboolean success = semaphore.tryAcquire(); if (success) {System.out.println(Thread.currentThread() + " 登录成功");}return success;}public void logout() {// 释放许可证:登录成功的用户退出时,归还许可证semaphore.release(); System.out.println(Thread.currentThread() + " 退出成功");} }
-
Semaphore
许可证数量 = 最大在线用户数(10
),保证同一时间最多 10 个线程(用户)能拿到许可证;login()
用tryAcquire()
尝试拿许可证:- 拿到 → 返回
true
(登录成功); - 拿不到 → 返回
false
(登录失败,超过并发);
- 拿到 → 返回
-
logout()
用release()
释放许可证,让其他用户可登录;
-
-
运行效果:
-
登录成功:前 10 个线程(用户)能拿到许可证,打印
登录成功
,执行simulateWork()
随机休眠; -
登录失败:后 10 个线程调用
tryAcquire()
时,许可证已耗尽,返回false
,打印因超过最大在线数被拒绝
; -
用户退出:休眠结束后,线程执行
logout()
释放许可证,Semaphore
计数器 +1,后续新线程(或之前阻塞的线程)有机会拿到许可证登录;
-
-
如果把
login
里的tryAcquire()
换成acquire()
(阻塞式获取 ):public boolean login() {try {// 阻塞式获取:拿不到就一直等,直到有许可证semaphore.acquire(); System.out.println(Thread.currentThread() + " 登录成功");return true;} catch (InterruptedException e) {// 被中断时返回登录失败return false; } }
- 效果:超过并发数的用户不会直接失败,而是阻塞等待,直到有用户退出释放许可证,再继续登录。
2.4 应用场景总结
-
Semaphore
(信号量)是高并发工具,核心能力是控制同时访问共享资源的线程数量,让有限的资源(比如连接池、文件句柄)在高并发下被合理使用,避免系统被压垮; -
应用场景总结
-
限流(流量控制):系统的某个资源(比如接口、数据库连接)能承受的并发量有限,需要限制同时访问的线程数,防止资源被打满导致系统崩溃;
- 接口限流:比如商品详情接口,最多允许 100 个线程同时访问,用
Semaphore(100)
控制,超过的请求排队或拒绝; - 数据库连接限流:数据库连接池有 20 个连接,用
Semaphore(20)
控制,避免几千个线程同时抢连接,把数据库压垮; - 侧重于控制并发访问量,保护资源不被压垮,比如接口、网关层的流量控制;
- 接口限流:比如商品详情接口,最多允许 100 个线程同时访问,用
-
资源池(维护有限资源):系统有一组有限资源(比如数据库连接、文件句柄、网络端口),需要让线程按需借用、用完归还,保证资源被合理复用;
- 数据库连接池:初始化
Semaphore(连接数)
,线程需要连接时acquire()
拿许可证(同时从池里取连接),用完后release()
释放许可证(同时把连接还回池); - 文件访问池:如果有 5 个文件句柄,用
Semaphore(5)
控制,线程访问文件时拿许可证,访问完归还,保证同一时间最多 5 个线程操作文件; - 侧重于管理有限资源的借用/归还,保证资源复用,比如连接池、句柄池的资源调度;
- 数据库连接池:初始化
-
-
但本质都是用
Semaphore
的许可证数量,限制同时使用资源的线程数。
3 CountDownLatch(闭锁)
3.1 简介
-
CountDownLatch
是多线程同步工具,解决的问题是:让一个或多个线程等待其他多个任务全部完成后,再继续执行。比如:- 主线程要等 10 个子线程都跑完初始化任务,才开始处理业务;
- 或者多个线程要等某个“总开关”任务完成,再一起执行;
-
工作流程:
-
初始化计数器:
CountDownLatch latch = new CountDownLatch(N);
,这里的N
是需要等待的任务数量(比如有 3 个子线程要执行,N=3
); -
等待线程:
latch.await();
,调用await()
的线程(比如主线程)会阻塞等待,直到N
减到 0; -
任务线程计数减 1:每个子任务线程执行完自己的逻辑后,调用
latch.countDown();
,让计数器N-1
; -
计数器归 0,等待线程唤醒:当所有子任务线程都调用
countDown()
,N
变成 0 ,之前阻塞的线程(await()
的线程)会被唤醒,继续执行;
TA
:等待线程、T1/T2/T3
:任务线程cnt = 3
:对应CountDownLatch latch = new CountDownLatch(3);
,表示需要等待 3 个任务完成;过程:
-
线程
TA
调用await()
:TA
执行到latch.await()
时,会检查计数器cnt
:此时cnt=3≠0
,所以TA
进入阻塞状态(awaiting...
),暂停执行; -
任务线程
T1
完成:T1
执行latch.countDown()
→ 计数器cnt
从3→2
。此时cnt≠0
,TA
仍阻塞; -
任务线程
T2
完成:T2
执行latch.countDown()
→ 计数器cnt
从2→1
。此时cnt≠0
,TA
仍阻塞; -
任务线程
T3
完成:T3
执行latch.countDown()
→ 计数器cnt
从1→0
; -
当
cnt=0
时,CountDownLatch
会唤醒所有等待的线程(这里是TA
):TA
从阻塞状态恢复(resumed
),继续执行后续逻辑;
-
-
关键特性:
-
一次性:计数器
N
减到 0 后,就不能再重置或复用,只能用一次; -
多线程等待:可以有多个线程调用
await()
,一起等待N
归 0 后被唤醒; -
任务结束的宽泛性:子任务结束包括正常跑完或者抛异常终止,只要调用
countDown()
,就会让计数器减 1;
-
-
典型场景:
- 并行任务汇总:比如计算一个大数组的和,拆成 10 个子数组并行计算,主线程等 10 个子线程都算完,再汇总结果;
- 系统启动初始化:系统启动时,需要初始化 5 个服务(比如缓存、数据库连接、配置加载),主线程等 5 个服务都初始化完,再对外提供服务;
- 测试多线程并发:测试时,让 100 个线程等信号,信号发出(
countDown()
)后一起执行,模拟高并发场景。
3.2 常用API
3.2.1 构造器
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
CountDownLatch
构造时,count
必须 ≥0,否则抛IllegalArgumentException
;count
减到 0 后,无法重置,CountDownLatch
只能用一次。
3.2.2 常用方法
-
总览:
// 1. await():调用的线程会阻塞,直到 count 减到 0 public void await() throws InterruptedException {}; // 2. await(long timeout, TimeUnit unit):阻塞等待,但最多等 timeout 时间; // 若超时后 count 仍≠0,不再等待,返回 false public boolean await(long timeout, TimeUnit unit) throws InterruptedException {}; // 3. countDown():让 count 减 1,直到 count=0 时唤醒所有等待的线程 public void countDown() {};
-
await()
:调用线程进入阻塞状态,直到countDownLatch
的count
减到 0;- 如果等待中被其他线程中断,会抛出
InterruptedException
; count=0
时,调用await()
会立即返回,不阻塞;
- 如果等待中被其他线程中断,会抛出
-
await(long timeout, TimeUnit unit)
:阻塞等待,但增加了超时退出机制;- 返回值为
true
→ 等待中count
减到 0(正常唤醒); - 返回值为
false
→ 超时后count
仍≠0(放弃等待);
- 返回值为
-
countDown()
:让count
减 1。当count
从1→0
时,会唤醒所有等待的线程(await()
的线程);count
已经是 0 时,调用countDown()
会被忽略(count
最小为 0);- 只有
count
从1→0
时,才会触发唤醒;count
从3→2
这类变化,不会唤醒线程;
-
例:
// 1. 初始化:count=2 → 需要 2 次 countDown() 才会唤醒 await() 的线程 CountDownLatch latch = new CountDownLatch(2); // 2. 第一次 countDown() → count=2→1 latch.countDown(); // 3. 第二次 countDown() → count=1→0 → 唤醒所有 await() 的线程 latch.countDown(); // 4. 第三次 countDown() → count 已经是 0,调用被忽略 latch.countDown(); // 5. count=0,调用 await() 直接返回,不阻塞 latch.await();
3.3 使用
3.3.1 多任务完成后合并汇总
-
开发中常见多个任务并行执行,必须等所有任务完成后,再统一处理结果 的需求:
-
比如“数据详情页”需要同时调用 5 个接口(并行),等所有接口返回数据后,再合并结果展示;
-
或者“多个数据操作完成后,统一做校验(check)”;
-
-
代码:
public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {// 1. 初始化 CountDownLatch:需要等待 5 个任务完成CountDownLatch countDownLatch = new CountDownLatch(5); // 2. 启动 5 个线程(模拟 5 个并行任务)for (int i = 0; i < 5; i++) { final int index = i;new Thread(() -> {try {// 模拟任务执行耗时(1~3 秒随机)Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000)); System.out.println("任务 " + index + " 执行完成");// 3. 任务完成,计数器减 1(countDownLatch.countDown())countDownLatch.countDown(); } catch (InterruptedException e) {e.printStackTrace();}}).start();}// 4. 主线程阻塞等待:直到 5 个任务都完成(count=0)countDownLatch.await(); // 5. 所有任务完成后,主线程执行汇总逻辑System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); } }
-
初始化
CountDownLatch
:new CountDownLatch(5)
→ 表示需要等待 5 个任务完成(计数器初始值5
); -
启动并行任务:循环创建 5 个线程,模拟 5 个并行任务。每个线程:
-
Thread.sleep(...)
:模拟任务执行耗时(随机 1~3 秒); -
countDownLatch.countDown()
:任务完成后,计数器减 1(5→4→3→2→1→0
);
-
-
主线程等待:
countDownLatch.await()
→ 主线程阻塞,直到计数器减到0
(所有任务完成); -
汇总结果:计数器归
0
后,主线程被唤醒,执行System.out.println(...)
做结果汇总;
-
-
运行效果
-
5 个任务线程会随机顺序完成(因为
sleep
时间随机),比如:任务 2 执行完成 任务 0 执行完成 任务 4 执行完成 任务 1 执行完成 任务 3 执行完成
-
主线程必须等所有任务打印完,才会输出:
主线程:在所有任务运行完成后,进行结果汇总
-
-
核心价值
-
并行效率:5 个任务并行执行,不用等待前一个任务完成再执行下一个,节省时间;
-
同步控制:主线程通过
CountDownLatch
精准等待所有任务完成,保证汇总逻辑在所有数据就绪后执行。
-
3.3.2 电商场景中的应用——等待所有子任务结束
-
需求:根据商品品类 ID,获取 10 个商品,并行计算每个商品的最终价格(需调用 ERP、CRM 等系统,计算复杂、耗时),最后汇总所有价格返回;
ERP系统:ERP是企业资源计划系统,它整合企业内部各部门的核心业务流程,如财务、采购、生产、销售和人力资源等,以实现数据共享和资源优化;
CRM系统:CRM是客户关系管理系统,它专注于管理公司与当前及潜在客户的交互和业务往来,旨在改善客户服务、提升销售效率并维护客户关系;
-
串行问题:如果一个一个计算(串行),总耗时 = 获取商品时间 + 10×单个商品计算时间,商品越多越慢;
-
并行优化:用多线程并行计算商品价格,总耗时 = 获取商品时间 + 最长单个商品计算时间,效率更高;
-
-
代码:工具方法 & 数据类
// 根据品类 ID 获取商品 ID 列表(模拟返回 1~10 号商品) private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray(); }// 商品价格数据类:存储商品 ID 和计算后的价格 private static class ProductPrice {private final int prodID; // 商品 IDprivate double price; // 计算后的价格// 构造方法、get/set、toString 略... }
-
主逻辑:并行计算商品价格
public static void main(String[] args) throws InterruptedException {// 1. 获取商品 ID 列表(1~10)final int[] products = getProductsByCategoryId(); // 2. 转换为 ProductPrice 列表(初始价格未计算)List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new) // 每个商品 ID 对应一个 ProductPrice.collect(Collectors.toList()); // 3. 初始化 CountDownLatch:需要等待 10 个商品计算完成(products.length=10)final CountDownLatch latch = new CountDownLatch(products.length); // 4. 为每个商品启动线程,并行计算价格list.forEach(pp -> {new Thread(() -> {try {System.out.println(pp.getProdID() + " -> 开始计算商品价格.");// 模拟耗时操作(调用外部系统):随机休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(模拟业务逻辑:奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折}System.out.println(pp.getProdID() + " -> 价格计算完成.");} catch (InterruptedException e) {e.printStackTrace();} finally {// 6. 任务完成,计数器减 1latch.countDown(); }}).start(); // 启动线程});// 7. 主线程阻塞等待:直到 10 个商品都计算完成(latch.await())latch.await(); // 8. 所有商品计算完成,汇总结果System.out.println("所有价格计算完成.");list.forEach(System.out::println); }
-
准备商品数据:调用
getProductsByCategoryId()
获取 1~10 号商品 ID,转成ProductPrice
列表(初始价格未计算); -
初始化
CountDownLatch
:new CountDownLatch(10)
→ 表示需要等待10 个商品的计算任务完成; -
并行计算价格:为每个商品启动线程:
- 模拟耗时操作(
TimeUnit.SECONDS.sleep(...)
); - 根据商品 ID 奇偶,设置不同折扣价格(模拟业务逻辑);
- 任务完成后,
latch.countDown()
让计数器减 1;
- 模拟耗时操作(
-
主线程等待 & 汇总结果:
latch.await()
阻塞主线程,直到 10 个任务都完成(计数器归 0),最后打印所有商品的计算结果。
-
3.4 应用场景总结
-
并行任务同步:多个任务并行执行(比如 5 个线程同时下载文件),必须等所有任务都完成后,再执行下一步(比如合并文件);
- 用
CountDownLatch
让主线程等待所有并行任务完成,保证后续操作在所有任务就绪后执行;
- 用
-
多任务汇总:需要统计多个线程的执行结果(比如 10 个线程分别计算一部分数据,最后汇总总和);
- 主线程等所有线程计算完,再统一汇总结果,避免部分数据未计算就开始汇总的问题;
-
资源初始化:系统启动时,需要初始化多个资源(比如缓存、数据库连接、配置加载),必须等所有资源初始化完成,再对外提供服务;
- 主线程等待所有资源初始化任务完成,保证系统启动后资源可用。
3.5 不足
-
CountDownLatch
是一次性工具:-
构造时设置的计数器(比如
new CountDownLatch(5)
),一旦减到0
,就无法重置或复用; -
如果业务需要“重复等待多个任务完成”,
CountDownLatch
无法满足,必须重新创建新的实例。
-
4 CyclicBarrier(回环栅栏/循环屏障)
4.1 简介
-
CyclicBarrier
是多线程同步工具,解决的问题是:让一组线程互相等待,直到所有线程都到达同一个“屏障点”,然后再一起继续执行; -
关键特点:可循环使用(屏障可以重置,重复让线程等待、一起执行);
-
工作流程:
-
初始化屏障:
CyclicBarrier barrier = new CyclicBarrier(N);
,N
是“需要等待的线程数量”(比如 5 个线程要一起执行后续逻辑); -
线程到达屏障点:每个线程执行到
barrier.await();
时,会阻塞等待,直到有N
个线程都调用了await()
; -
所有线程到达,一起执行:当第
N
个线程调用await()
后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑; -
循环使用:唤醒后,屏障可以重置(通过
reset()
方法 ),再次让新的一组线程等待、一起执行;
-
-
适合把一个大任务拆成多个子任务并行执行,等所有子任务完成后,再统一做下一步的场景,且需要重复执行该流程。典型场景有:
-
并行计算 + 合并结果:比如计算一个大数组的和,拆成 10 个子数组并行计算,等所有子数组算完,再合并总和。计算完一次后,还能再拆新的数组,重复使用屏障。
-
多阶段任务:系统升级时,先让 5 个节点并行执行数据迁移,全部完成后,再一起执行验证数据,验证完还能继续下一阶段(比如启动服务),屏障可循环用;
-
-
与
CountDownLatch
的核心区别特性 CyclicBarrier
CountDownLatch
是否可循环 可循环(屏障可重置,重复用) 一次性(计数器到 0 后无法重置) 等待的目标 等待“一组线程互相到达屏障点” 等待“其他线程完成任务(计数减到 0)” 典型场景 多阶段并行任务(可重复) 单次多任务同步(不可重复)
4.2 常用API
4.2.1 构造器
-
有两个构造器:
public CyclicBarrier(int parties)
-
parties
:需要等待的线程总数。比如传4
,表示必须有 4 个线程都调用await()
,屏障才会放行; -
作用:初始化一个基础的循环屏障,所有线程到达后一起执行后续逻辑;
public CyclicBarrier(int parties, Runnable barrierAction)
-
parties
:同上,需要等待的线程总数; -
barrierAction
:一个Runnable
任务。当所有线程到达屏障后,会优先执行这个任务,再让所有线程继续执行; -
作用:适合线程到达屏障后,需要先统一处理一些逻辑(比如汇总数据、初始化资源)的场景;
-
-
工作原理:以
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable(){...});
为例-
初始化:
parties=4
→ 需要 4 个线程到达屏障;barrierAction
→ 所有线程到达后执行的任务;
-
线程到达屏障:每个线程执行
barrier.await();
时:- 计数器(
count
)减 1(初始4
→ 线程 1 调用后变成3
→ 线程 2 调用后变成2
→ 线程 3 调用后变成1
→ 线程 4 调用后变成0
); - 前 3 个线程调用
await()
后,会阻塞等待;
- 计数器(
-
屏障放行(所有线程到达):第 4 个线程调用
await()
后,count=0
,执行barrierAction
,然后唤醒所有阻塞的线程,一起继续执行后续逻辑; -
循环复用:屏障放行后,
count
会重置为初始值(4),可以再次让新的一组线程(4 个)等待、触发屏障。
-
4.2.2 常用方法
-
总览:
// 1. await():线程调用后阻塞,直到所有线程都调用 await(),屏障放行 public int await() throws InterruptedException, BrokenBarrierException {}// 2. await(long timeout, TimeUnit unit):带超时的 await(),超时后屏障视为“被破坏” public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {}// 3. reset():重置屏障,让计数器回到初始值,可重复使用 public void reset() {}
-
await()
:线程调用await()
后,会阻塞等待,直到有parties
个线程都调用await()
(parties
是构造器传入的线程数);-
InterruptedException
:等待中的线程被中断; -
BrokenBarrierException
:屏障被破坏(比如其他线程await()
时被中断、超时 ),导致当前线程无法继续等待; -
返回值:返回当前线程在到达屏障的线程组中的索引(比如 4 个线程到达,第一个调用
await()
的线程返回3
,最后一个返回0
,索引从0
开始逆序);
-
-
await(long timeout, TimeUnit unit)
:和await()
类似,但增加超时机制。如果在timeout
时间内,凑不齐parties
个线程调用await()
,则触发超时,屏障被标记为破坏。防止线程因其他线程异常,无限期阻塞;- 除了
InterruptedException
和BrokenBarrierException
,还可能抛出TimeoutException
(超时);
- 除了
-
reset()
:重置CyclicBarrier
,让计数器回到初始值(parties
),屏障状态恢复到未使用;- 注意:重置时,若有线程正在
await()
,会触发BrokenBarrierException
(因为屏障被强制重置,这些线程的等待被打断)。
- 注意:重置时,若有线程正在
4.3 使用
4.3.1 等待所有子任务结束
-
需求:根据品类 ID 获取 10 个商品,并行计算每个商品的最终价格(模拟调用外部系统,耗时随机),等所有商品价格计算完成后,汇总结果返回;
-
工具方法 & 数据类
// 根据品类 ID 获取商品 ID 列表(1~10 号商品) private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray(); }// 商品价格数据类:存储商品 ID 和计算后的价格 private static class ProductPrice {private final int prodID; // 商品 IDprivate double price; // 计算后的价格// 构造方法、get/set、toString 略... }
-
主逻辑:用
CyclicBarrier
同步多线程public static void main(String[] args) throws InterruptedException {// 1. 获取商品 ID 列表,转换为 ProductPrice 列表final int[] products = getProductsByCategoryId();List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList()); // 2. 初始化 CyclicBarrier:需要等待 list.size()(10)个线程到达屏障final CyclicBarrier barrier = new CyclicBarrier(list.size()); // 3. 存储线程的列表(用于后续 join 等待)final List<Thread> threadList = new ArrayList<>(); // 4. 为每个商品启动线程,并行计算价格list.forEach(pp -> {Thread thread = new Thread(() -> {try {System.out.println(pp.getProdID() + " 开始计算商品价格.");// 模拟耗时操作(调用外部系统):随机休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折}System.out.println(pp.getProdID() + " -> 价格计算完成.");// 6. 等待其他线程:调用 await(),直到所有线程都到达屏障barrier.await(); } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});threadList.add(thread); // 记录线程thread.start(); // 启动线程});// 7. 等待所有线程执行完成(通过 join 保证主线程等所有子线程跑完)threadList.forEach(t -> {try {t.join(); } catch (InterruptedException e) {e.printStackTrace();}});// 8. 所有商品价格计算完成,汇总结果System.out.println("所有价格计算完成.");list.forEach(System.out::println); }
-
准备商品数据:调用
getProductsByCategoryId()
获取 1~10 号商品 ID,转成ProductPrice
列表(初始价格未计算); -
初始化
CyclicBarrier
:new CyclicBarrier(list.size())
→ 表示需要等待 10 个线程 到达屏障(每个商品对应一个线程); -
并行计算价格:为每个商品启动线程:
- 模拟耗时操作(
TimeUnit.SECONDS.sleep(...)
); - 根据商品 ID 奇偶,设置不同折扣价格;
- 调用
barrier.await()
:线程到达屏障,阻塞等待其他线程;
- 模拟耗时操作(
-
屏障放行:当第 10 个线程调用
await()
后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑; -
主线程等待 & 汇总结果:通过
threadList.forEach(t -> t.join())
让主线程等待所有子线程执行完成,最后打印所有商品的计算结果。
-
4.3.2 CyclicBarrier的循环特性——模拟跟团旅游
-
需求:跟团旅游
-
第一阶段(上车屏障):
- 导游要求:所有游客上车后,大巴才出发(对应
CyclicBarrier
的第一次await()
); - 类比:10 个游客 + 1 个导游(主线程)= 11 个线程,凑齐后屏障放行;
- 导游要求:所有游客上车后,大巴才出发(对应
-
第二阶段(下车屏障):
- 导游要求:所有游客下车后,大巴才去下一个景点(对应
CyclicBarrier
的第二次await()
); - 类比:同一组线程(游客 + 导游)再次凑齐,屏障放行,实现“循环复用”;
- 导游要求:所有游客下车后,大巴才去下一个景点(对应
-
-
游客线程逻辑(
Tourist
类)private static class Tourist implements Runnable {private final int touristID; // 游客编号private final CyclicBarrier barrier; // 循环屏障public Tourist(int touristID, CyclicBarrier barrier) {this.touristID = touristID;this.barrier = barrier;}@Overridepublic void run() {// 1. 模拟上车(第一阶段:上车同步)System.out.printf("游客:%d 乘坐旅游大巴\n", touristID);spendSeveralSeconds(); // 模拟上车耗时waitAndPrint("游客:%d 上车,等别人上车.\n"); // 调用 await(),等待凑齐 11 个线程// 2. 模拟下车(第二阶段:下车同步)System.out.printf("游客:%d 到达目的地\n", touristID);spendSeveralSeconds(); // 模拟下车耗时waitAndPrint("游客:%d 下车,等别人下车.\n"); // 再次调用 await(),等待凑齐 11 个线程}// 调用 barrier.await(),并打印日志private void waitAndPrint(String message) {System.out.printf(message, touristID);try {barrier.await(); // 线程到达屏障,阻塞等待} catch (InterruptedException | BrokenBarrierException e) {// 忽略异常}}// 模拟随机耗时(上车/下车的时间)private void spendSeveralSeconds() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {// 忽略异常}} }
-
主线程逻辑(导游视角)
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {// parties=11 → 需要 11 个线程到达屏障(10 个游客线程 + 1 个主线程)final CyclicBarrier barrier = new CyclicBarrier(11);// 启动 10 个游客线程for (int i = 0; i < 10; i++) {new Thread(new Tourist(i, barrier)).start();}// 4. 主线程(导游)参与第一阶段屏障:等待所有游客上车barrier.await(); System.out.println("导游:所有的游客都上了车.");// 5. 主线程(导游)参与第二阶段屏障:等待所有游客下车barrier.await(); System.out.println("导游:所有的游客都下车了."); }
-
上车同步
-
10 个游客线程 + 1 个主线程(导游),共 11 个线程;
-
每个游客线程执行到
waitAndPrint("游客:%d 上车,等别人上车.\n")
→ 调用barrier.await()
,阻塞等待; -
当 11 个线程都调用
await()
后,屏障放行:-
打印所有“游客上车等待”的日志;
-
主线程继续执行,打印
导游:所有的游客都上了车.
;
-
-
-
下车同步
-
同一组 11 个线程(10 个游客 + 1 个主线程 ),再次执行到
waitAndPrint("游客:%d 下车,等别人下车.\n")
→ 调用barrier.await()
,阻塞等待; -
当 11 个线程都调用
await()
后,屏障放行:-
打印所有“游客下车等待”的日志;
-
主线程继续执行,打印
导游:所有的游客都下车了.
;
-
-
-
CyclicBarrier
的循环特性-
可重复触发:同一
CyclicBarrier
实例,可通过多次await()
实现“多阶段同步”(上车→下车); -
线程组复用:同一组线程(游客 + 导游 )参与多个阶段的屏障,无需重新创建实例。
-
4.4 应用场景总结
-
CyclicBarrier
是多线程同步工具,核心解决让一组线程互相等待,全部到达同一屏障点后,再一起继续执行的问题,且可循环使用(屏障可重置,重复同步多阶段任务); -
应用场景:
-
多线程任务拆分与合并:一个复杂任务(比如计算大数据集的总和)拆成多个子任务(比如 10 个线程各算一部分),必须等所有子任务完成后,再合并结果;
-
多线程数据处理同步:多个线程并行处理不同的数据分片(比如处理 5 个文件),必须等所有线程处理完自己的数据,再统一汇总、校验或持久化。
-
4.5 CyclicBarrier VS CountDownLatch
-
可复用性
-
CountDownLatch
:一次性工具。构造时设置的计数器(比如new CountDownLatch(5)
),一旦减到0
,无法重置或复用; -
CyclicBarrier
:可循环复用。计数器(parties
)可以通过reset()
重置,重复让新的线程组等待、触发屏障;
-
-
等待目标
-
CountDownLatch
:await()
的线程等待其他线程调用countDown()
把计数器减到0
(主线程等子线程完成); -
CyclicBarrier
:await()
的线程等待其他线程也到达屏障点(调用await()
)(线程组互相等待);
-
-
计数器特性
-
CountDownLatch
:计数器只能递减(从N→0
),且无法重置; -
CyclicBarrier
:计数器可以重置(通过reset()
回到初始值parties
),支持多轮同步。
-
5 Exchange(数据交换机)
5.1 简介
-
Exchanger
专门解决两个线程需要互相交换数据的场景,让两个线程在“交换点”(调用exchange
方法时)同步,安全交换数据; -
工作流程
- 线程 1 调用
exchange(object1)
:线程 1 会阻塞等待,直到线程 2 也调用exchange
方法; - 线程 2 调用
exchange(object2)
:此时两个线程都到达“交换点”,Exchanger
会将object1
传递给线程 2,将object2
传递给线程 1; - 交换后继续执行:线程 1 拿到
object2
,线程 2 拿到object1
,继续执行后续逻辑;
- 线程 1 调用
5.2 常用API
-
public V exchange(V x) throws InterruptedException
-
功能:
- 当前线程携带数据
x
,阻塞等待另一个线程到达交换点; - 对方线程到达后,交换数据:当前线程接收对方数据,返回给调用方;
- 当前线程携带数据
-
异常:等待中若线程被中断,抛出
InterruptedException
;
-
-
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
- 同上,但增加超时机制。如果在
timeout
时间内,对方线程未到达交换点,抛出TimeoutException
; - 适用场景:防止线程因对方异常,无限期阻塞。
- 同上,但增加超时机制。如果在
5.3 使用
5.3.1 模拟交易场景
-
模拟买卖双方交易:
-
卖家带“商品”(
goods = "电脑"
),买家带“钱”(money = "$4000"
); -
双方必须都到达“交易点”(调用
exchanger.exchange(...)
),才能交换数据(一手交钱,一手交货);
-
-
代码:
public class ExchangerDemo {private static Exchanger exchanger = new Exchanger(); static String goods = "电脑";static String money = "$4000";public static void main(String[] args) throws InterruptedException {System.out.println("准备交易,一手交钱一手交货...");// 卖家线程:携带 goods,等待买家new Thread(() -> {try {System.out.println("卖家到了,已经准备好货:" + goods);// 交换数据:卖家发送 goods,接收 moneyString receivedMoney = (String) exchanger.exchange(goods); System.out.println("卖家收到钱:" + receivedMoney);} catch (Exception e) { /* 忽略异常 */ }}).start();// 主线程休眠 3 秒,模拟买家延迟到达Thread.sleep(3000); // 买家线程:携带 money,等待卖家new Thread(() -> {try {System.out.println("买家到了,已经准备好钱:" + money);// 交换数据:买家发送 money,接收 goodsString receivedGoods = (String) exchanger.exchange(money); System.out.println("买家收到货:" + receivedGoods);} catch (Exception e) { /* 忽略异常 */ }}).start();} }
-
同步交换:卖家先调用
exchange(goods)
会阻塞,直到买家调用exchange(money)
,双方交换数据; -
数据流向:卖家发送
goods
→ 接收money
;买家发送money
→ 接收goods
。
-
5.3.2 模拟对账场景
-
模拟数据对账:
- 线程 1 生成数据
A
,线程 2 生成数据B
; - 双方交换数据后,线程 2 校验
A
和B
是否一致;
- 线程 1 生成数据
-
代码:
public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) {// 线程 1:发送数据 AthreadPool.execute(() -> {try {String A = "12379871924sfkhfksdhfks";exchanger.exchange(A); // 发送 A,等待线程 2} catch (InterruptedException e) { /* 忽略 */ }});// 线程 2:发送数据 B,接收数据 A,校验一致性threadPool.execute(() -> {try {String B = "32423423jkmjkfsbfj";String A = exchanger.exchange(B); // 发送 B,接收 ASystem.out.println("A和B数据是否一致:" + A.equals(B));System.out.println("A= " + A);System.out.println("B= " + B);} catch (InterruptedException e) { /* 忽略 */ }});threadPool.shutdown();} }
-
数据校验:线程 2 接收线程 1 的数据
A
后,对比自己的B
,判断是否一致; -
线程池简化:用线程池管理两个线程,避免手动创建
Thread
;
-
5.3.3 模拟队列中交换数据
-
模拟生产者 - 消费者模式,但通过
Exchanger
动态交换“满队列”和“空队列”:-
生产者往
emptyQueue
放数据,满了就和消费者交换队列(拿空队列继续生产); -
消费者从
fullQueue
取数据,空了就和生产者交换队列(拿满队列继续消费);
-
-
代码:
public class ExchangerDemo3 {// 满队列(消费者初始用)、空队列(生产者初始用)private static ArrayBlockingQueue<String> fullQueue = new ArrayBlockingQueue<>(5); private static ArrayBlockingQueue<String> emptyQueue = new ArrayBlockingQueue<>(5); private static Exchanger<ArrayBlockingQueue<String>> exchanger = new Exchanger<>(); public static void main(String[] args) {new Thread(new Producer()).start(); // 启动生产者new Thread(new Consumer()).start(); // 启动消费者}// 生产者:往队列放数据,满了就交换队列static class Producer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = emptyQueue; try {while (current != null) {String str = UUID.randomUUID().toString();try {current.add(str); // 往队列放数据System.out.println("producer:生产了一个序列:" + str + ">>>>>加入到交换区");Thread.sleep(2000);} catch (IllegalStateException e) {// 队列满了,交换队列(拿空队列)System.out.println("producer:队列已满,换一个空的");current = exchanger.exchange(current); }}} catch (Exception e) { /* 忽略 */ }}}// 消费者:从队列取数据,空了就交换队列static class Consumer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = fullQueue; try {while (current != null) {if (!current.isEmpty()) {String str = current.poll(); // 从队列取数据System.out.println("consumer:消耗一个序列:" + str);Thread.sleep(1000);} else {// 队列空了,交换队列(拿满队列)System.out.println("consumer:队列空了,换个满的");current = exchanger.exchange(current); System.out.println("consumer:换满的成功~~~~~~~~~~~~~~~~~~~~~~");}}} catch (Exception e) { /* 忽略 */ }}} }
-
动态队列交换:
- 生产者队列满 → 用
exchanger.exchange(current)
交换出空队列,继续生产; - 消费者队列空 → 用
exchanger.exchange(current)
交换出满队列,继续消费;
- 生产者队列满 → 用
-
解耦生产和消费:通过交换队列,避免生产者/消费者因队列满/空阻塞,灵活控制数据流转。
-
5.4 应用场景总结
- 数据交换:两个线程需要安全交换数据(如交易场景的“钱 - 货”交换);
- 保证“交换原子性”,避免数据不一致;
- 数据采集:采集线程(生产者)和处理线程(消费者)交换数据(如日志采集→日志处理);
- 解耦数据生产和消费,通过交换数据缓冲,提升系统吞吐量。
6 Phaser(阶段协同器)
6.1 简介
-
Phaser
用于协调多个线程的多阶段执行,支持:-
动态调整参与线程的数量(可增、可减);
-
分阶段同步(线程完成当前阶段,再一起进入下一阶段);
-
比
CyclicBarrier
更灵活(支持动态线程数、多阶段),比CountDownLatch
更强大(可循环、可动态调整);
-
-
核心特性
-
多阶段同步:线程可以分多个阶段执行(如
phase-0 → phase-1 → phase-2
),每个阶段都需要线程同步后再继续; -
动态线程管理:
-
可通过
register()
动态增加参与线程; -
可通过
arriveAndDeregister()
动态减少参与线程;
-
-
灵活的阶段控制:每个阶段完成后,可自定义逻辑(重写
onAdvance
方法),决定是否继续下一阶段;
-
-
工作流程
- 阶段 0(
phase-0
):- 多个线程执行“阶段 0”的任务;
- 线程调用
arriveAndAwaitAdvance()
表示“阶段 0 完成”,等待其他线程也完成“阶段 0”;
- 进入阶段 1(
phase-1
):- 所有线程都完成“阶段 0”后,一起进入“阶段 1”;
- 重复“执行任务 → 同步等待”的流程;
- 多阶段循环:支持多个阶段(
phase-0 → phase-1 → phase-2 → ...
),直到手动终止或所有线程退出;
- 阶段 0(
6.2 常用 API
-
构造方法
构造方法 作用 Phaser()
初始化一个“参与任务数为 0”的 Phaser
,后续用register()
动态添加线程Phaser(int parties)
指定初始参与线程数(类似 CyclicBarrier
的parties
)Phaser(Phaser parent)
作为子阶段协同器,依附于父 Phaser
,适合复杂多阶段场景Phaser(Phaser parent, int parties)
结合父 Phaser
和初始线程数,更灵活的初始化 -
增减参与线程
方法 作用 int register()
动态增加一个参与线程,返回当前阶段号 int bulkRegister(int parties)
动态增加多个参与线程(批量注册),返回当前阶段号 int arriveAndDeregister()
线程完成任务后,退出参与(减少一个线程),返回当前阶段号 -
到达、等待方法
方法 作用 int arrive()
标记“当前线程完成阶段任务”,但不等待其他线程,继续执行 int arriveAndAwaitAdvance()
标记“当前线程完成阶段任务”,等待其他线程也完成,再进入下一阶段 int awaitAdvance(int phase)
等待进入指定阶段(需当前阶段匹配) int awaitAdvanceInterruptibly(int phase)
同上,但等待中可被中断 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
带超时的等待,超时后抛出异常 -
阶段自定义逻辑
protected boolean onAdvance(int phase, int registeredParties)
-
作用:每个阶段完成后,自动调用此方法,决定是否继续下一阶段;
-
返回值:
true
:阶段结束,Phaser
不再继续(可用于终止多阶段流程);false
:继续下一阶段。
-
6.3 使用
-
需求:模拟了公司团建的多阶段活动,团建分4个阶段,参与人数动态变化:
-
阶段0:所有人到公司集合 → 出发去公园
-
阶段1:所有人到公园门口 → 出发去餐厅
-
阶段2:部分人到餐厅(有人提前离开,有人新增加入)→ 开始用餐
-
阶段3:用餐结束 → 活动终止
-
参与人数不固定(有人早退、有人中途加入),每个阶段必须等人齐了再继续
-
-
代码:
public class PhaserDemo {public static void main(String[] args) {final Phaser phaser = new Phaser() {// 每个阶段完成后自动调用下面的 onAdvance 方法,打印阶段总结,并判断是否终止(只剩主线程时终止)@Overrideprotected boolean onAdvance(int phase, int registeredParties) {// registeredParties 是当前注册的线程数(包括主线程),减去 1 得到实际员工数// 主线程:作为协调者,全程参与并动态添加中途加入者int staffs = registeredParties - 1;// 每个阶段完成后的提示信息switch (phase) {case 0:System.out.println("大家都到公司了,出发去公园,人数:" + staffs);break;case 1:System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);break;case 2:System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);break;}// 终止条件:只剩主线程(registeredParties == 1)return registeredParties == 1;}};// 注册主线程————让主线程全程参与phaser.register();final StaffTask staffTask = new StaffTask();// 全程参与者:3 人(参与所有 4 个阶段)for (int i = 0; i < 3; i++) {// 添加任务数phaser.register();new Thread(() -> {try {staffTask.step1Task();//到达后等待其他任务到达phaser.arriveAndAwaitAdvance();staffTask.step2Task();phaser.arriveAndAwaitAdvance();staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 早退者:2 人(只参与前 2 个阶段,到公园后离开)for (int i = 0; i < 2; i++) {phaser.register();new Thread(() -> {try {staffTask.step1Task();phaser.arriveAndAwaitAdvance();staffTask.step2Task();System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 中途加入者:4 人(从阶段 2 开始参与,直接到餐厅聚餐)while (!phaser.isTerminated()) {int phase = phaser.arriveAndAwaitAdvance();if (phase == 2) {for (int i = 0; i < 4; i++) {phaser.register();new Thread(() -> {try {staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}}static final Random random = new Random();// 封装了 4 个阶段的具体动作(从家出发→到公司→去公园→去餐厅→用餐),每个阶段用 Thread.sleep 模拟耗时static class StaffTask {public void step1Task() throws InterruptedException {// 第一阶段:来公司集合String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "从家出发了……");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达公司");}public void step2Task() throws InterruptedException {// 第二阶段:出发去公园String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出发去公园玩");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达公园门口集合");}public void step3Task() throws InterruptedException {// 第三阶段:去餐厅String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出发去餐厅");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达餐厅");}public void step4Task() throws InterruptedException {// 第四阶段:就餐String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "开始用餐");Thread.sleep(random.nextInt(5000));System.out.println(staff + "用餐结束,回家");}} }
-
阶段0:到公司集合
- 主线程 + 3个全程者 + 2个早退者 → 共6个线程,调用
step1Task()
(从家到公司) - 完成后调用
phaser.arriveAndAwaitAdvance()
→ 等待所有人到公司 - 阶段结束:
onAdvance
触发,打印“出发去公园,人数:5”(6-1=5)
- 主线程 + 3个全程者 + 2个早退者 → 共6个线程,调用
-
阶段1:到公园门口
- 所有线程调用
step2Task()
(从公司到公园),完成后调用phaser.arriveAndAwaitAdvance()
→ 等待所有人到公园 - 2个早退者完成后调用
phaser.arriveAndDeregister()
→ 退出(注册线程数变为6-2=4) - 阶段结束:
onAdvance
触发,打印“出发去餐厅,人数:3”(4-1=3,只剩3个全程者+主线程)
- 所有线程调用
-
阶段2:到餐厅集合
- 主线程动态添加:检测到阶段2时,新增4个中途加入者 → 注册线程数变为4+4=8
- 3个全程者调用
step3Task()
(从公园到餐厅) - 4个新加入者直接调用
step3Task()
(到餐厅)
- 3个全程者调用
- 所有人调用
phaser.arriveAndAwaitAdvance()
→ 等待到餐厅 - 阶段结束:
onAdvance
触发,打印“开始用餐,人数:7”(8-1=7,3+4=7个员工+主线程)
- 主线程动态添加:检测到阶段2时,新增4个中途加入者 → 注册线程数变为4+4=8
-
阶段3:用餐结束
- 所有7人调用
step4Task()
(用餐),完成后调用phaser.arriveAndDeregister()
→ 所有人退出(注册线程数逐渐减少至1,只剩主线程) - 终止条件:
onAdvance
检测到registeredParties == 1
→ 返回true
,Phaser
终止
- 所有7人调用
-
-
Phaser
核心特性体现-
多阶段同步:通过
arriveAndAwaitAdvance()
实现每个阶段的等待,确保人齐后再进入下一阶段; -
动态线程管理:
-
phaser.register()
:新增参与者(如中途加入的4人); -
phaser.arriveAndDeregister()
:参与者退出(如早退者和用餐结束的人);
-
-
阶段自定义逻辑:
onAdvance
方法实现每个阶段的总结,并控制流程终止条件; -
灵活的协同:相比
CyclicBarrier
(固定线程数),Phaser
能应对“有人早退、有人中途加入”的动态场景。
-
6.4 应用场景总结
-
多线程任务分配:把一个复杂任务拆成多个子任务,分配给不同线程并行执行,且需要协调子任务的进度(比如所有子任务完成后,再合并结果);
-
用
Phaser
分阶段管理:- 阶段 0:子任务分配,线程开始执行
- 阶段 1:所有子任务完成,合并结果
-
支持动态调整线程数(比如某个子任务需要更多线程,用
register()
新增);
-
-
多级任务流程:任务需要分多个层级/阶段执行,必须等当前级所有任务完成,才能触发下一级任务(比如“数据采集→数据清洗→数据汇总→结果输出”);
-
每个层级对应
Phaser
的一个阶段(phase-0
采集→phase-1
清洗→phase-2
汇总); -
通过
arriveAndAwaitAdvance()
确保“当前级完成后,再进入下一级”,流程更清晰;
-
-
模拟并行计算:模拟分布式并行计算(比如科学计算、大数据处理 ),需要协调多个线程的“计算阶段”(比如矩阵计算分块执行,所有分块完成后再合并);
- 用
Phaser
同步“分块计算阶段”和“合并阶段”,确保:- 所有分块计算完成(阶段 0 同步);
- 合并结果后,再进入下一阶段(阶段 1 同步);
- 用
-
阶段性任务:任务天然是阶段性的,每个阶段需要所有线程同步后再继续(比如“团队项目”:需求评审→开发→测试→上线,每个阶段必须全员完成);
-
每个阶段对应
Phaser
的phase
,通过arriveAndAwaitAdvance()
实现“阶段同步”; -
支持动态调整参与线程(比如测试阶段需要新增测试人员,用
register()
加入);
-
-
上面所有场景都需要多阶段同步 + 动态线程协作,每个阶段必须等所有线程完成,再进入下一阶段。
Phaser
优势:-
比
CyclicBarrier
更灵活:支持动态增减线程、多阶段自定义逻辑(onAdvance
); -
比
CountDownLatch
更强大:可循环分阶段,而非一次性同步。
-