JUC并发编程08 - 同步模式/异步模式
同步模式
保护性暂停
单任务版
Guarded Suspension,用在一个线程等待另一个线程的执行结果
- 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
Guarded Suspension 是一种编程模式,用于让一个线程等待另一个线程的执行结果。简单来说,就是“守卫式暂停”。想象一下,你和朋友约定在某个地方见面,但你先到了,你就得在那里等他。等到他来了,你们就可以一起走了。在这个过程中,你就在“守卫式暂停”。
public static void main(String[] args) {// 创建 GuardedObject 对象,它就像你和朋友约定的见面地点GuardedObject object = new GuardedObjectV2();// 启动了一个新线程(假设是你的朋友),让它去完成某个任务(比如买咖啡)// 任务完成后,它会调用 object.complete() 方法,把结果(咖啡)放到 GuardedObject 中// 并通知你在等待的主线程(你)new Thread(() -> {sleep(1); // 让这个线程睡1秒,模拟耗时操作object.complete(Arrays.asList("a", "b", "c")); // 完成任务,设置结果}).start();// 主线程调用 object.get(2500) 方法,表示最多等待2.5秒来获取结果// 如果在这段时间内结果还没准备好,就返回 null。Object response = object.get(2500);// 果拿到了结果,就打印出来;如果没有拿到结果,就打印“can't get response”if (response != null) {log.debug("get response: [{}] lines", ((List<String>) response).size());} else {log.debug("can't get response");}
}
GuardedObject
类详解
/**获取结果的方法 get()
*/
public Object get(long millis) {synchronized (lock) {long begin = System.currentTimeMillis();long timePassed = 0;while (response == null) {long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}timePassed = System.currentTimeMillis() - begin;log.debug("timePassed: {}, object is null {}", timePassed, response == null);}return response;}
}
- synchronized (lock): 确保同一时间只有一个线程能访问这个方法。
- while (response == null): 如果结果还没准备好,就一直等待。
- lock.wait(waitTime): 让当前线程暂停,等待其他线程的通知。
- timePassed: 记录已经等待的时间,防止超时。
/**设置结果的方法 complete()
*/
public void complete(Object response) {synchronized (lock) {this.response = response;log.debug("notify...");lock.notifyAll();}
}
- synchronized (lock): 确保同一时间只有一个线程能访问这个方法。
- this.response = response: 把结果存起来。
- lock.notifyAll(): 通知所有等待的线程,结果已经准备好了。
从一个线程到另一个线程可以使用消息队列
想象你在一个快餐店。
- 有一个厨师(生产者线程)在不停地做汉堡。
- 有一群顾客(消费者线程)在等着吃汉堡。
如果每个顾客都傻站着等:“我点的汉堡好了没?”——那太低效了,厨师会被烦死。
所以,快餐店有个“取餐台”(这就是消息队列),厨师做好一个汉堡就放上去,顾客来了就从取餐台拿一个走。
这样:
- 厨师不用等顾客,做好就放那儿。
- 顾客也不用一直问,来了看看有没有自己的汉堡就行。
回到程序:
- 如果你有多个结果要从一个线程传给另一个线程(比如:不断收到网络数据、不断处理任务结果),
- 那就不适合用
Guarded Suspension
(它只等一个结果)。 - 而应该用消息队列,比如 Java 里的
BlockingQueue
。 - 一个线程往队列里放数据(生产者),另一个线程从队列里取(消费者)。
join()
是怎么用 Guarded Suspension 的?
Thread t = new Thread(() -> {sleep(3);System.out.println("任务完成");
});
t.start();
t.join(); // 主线程在这里等着 t 执行完
System.out.println("t 线程执行完了,我继续...");
t.join()
的意思就是:“我(主线程)要等你(t线程)干完活再继续。”- 内部是怎么实现的?—— 就是 Guarded Suspension!
你可以想象:
- 每个线程对象里有个“完成标志”(就像
GuardedObject
里的response
)。 join()
的时候,主线程检查这个标志:- 如果还没完成,就
wait()
等着。 - 等
t
线程执行完了,JVM 自动调用notify()
唤醒等它的线程。
- 如果还没完成,就
所以,join()
本质上就是一个线程在等另一个线程完成,完全符合 Guarded Suspension 的套路。
Future
是怎么用 Guarded Suspension 的?
ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(() -> {sleep(2);return "处理结果";
});String result = future.get(); // 等结果,可能阻塞
System.out.println("拿到结果:" + result);
- future.get() 的意思就是:“我现在要拿结果,但如果你还没算完,我就等着。”
- 它内部也有一个“结果变量”,一开始是空的。
- 调用 get() 时:
- 发现结果还没来 → 就 wait() 等着。
- 另一个线程计算完,把结果塞进去,然后 notify() 唤醒你。
这不就是 GuardedObject 的 get() 和 complete() 吗?
图解:
- t1: 主线程,等待结果。
- t2: 新线程,完成任务并设置结果。
- GuardedObject: 两者共享的对象,用于传递结果。
通过这种方式,我们可以实现线程间的同步和通信。
多任务版
什么是多任务版保护性暂停?
想象一下,你在一个小区里住,每天都有快递员给你送快递。但是,你不能一直站在门口等快递,因为你还有很多事情要做。所以,你需要一个信箱来帮你暂时存放快递,当你有空的时候再去取。
在这个例子中,People
就是你(收件人),Postman
是快递员,Mailboxes
是小区的信箱系统,而 GuardedObject
就是每个具体的信箱。
程序例子
主程序
public static void main(String[] args) throws InterruptedException {// 创建3个收件人线程for (int i = 0; i < 3; i++) {new People().start();}// 等待1秒,让收件人准备好他们的信箱Thread.sleep(1000);// 创建3个快递员线程,分别给对应的收件人送快递for (Integer id : Mailboxes.getIds()) {new Postman(id, id + "号快递到了").start();}
}
收件人(People)
@Slf4j(topic = "c.People")
class People extends Thread{@Overridepublic void run() {// 收信GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信id:{}", guardedObject.getId());// 等待快递员送信Object mail = guardedObject.get(5000);log.debug("收到信id:{},内容:{}", guardedObject.getId(), mail);}
}
快递员(Postman)
class Postman extends Thread{private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {// 取出对应的信箱GuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("开始送信id:{},内容:{}", guardedObject.getId(), mail);// 把快递放进信箱guardedObject.complete(mail);}
}
信箱系统(Mailboxes)
class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;// 生成唯一的idprivate static synchronized int generateId() {return id++;}// 获取指定id的信箱public static GuardedObject getGuardedObject(int id) {return boxes.remove(id);}// 创建一个新的信箱public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}// 获取所有信箱的idpublic static Set<Integer> getIds() {return boxes.keySet();}
}
信箱(GuardedObject)
class GuardedObject {private int id;private Object response;private final Object lock = new Object();public GuardedObject(int id) {this.id = id;}public int getId() {return id;}// 获取结果public Object get(long millis) {synchronized (lock) {long begin = System.currentTimeMillis();long timePassed = 0;while (response == null) {long waitTime = millis - timePassed;if (waitTime <= 0) {break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}timePassed = System.currentTimeMillis() - begin;}return response;}}// 设置结果public void complete(Object response) {synchronized (lock) {this.response = response;lock.notifyAll();}}
}
- t0, t2, t4: 这些是收件人线程,它们创建自己的信箱并等待快递。
- t1, t3, t5: 这些是快递员线程,它们找到对应的信箱并把快递放进去。
- Futures: 这是信箱系统,管理所有的信箱。
- GO1, GO2, GO3: 这些是具体的信箱,每个信箱都有一个唯一的id。
顺序输出
顺序输出 2 1
package com.cg.jucproject.demo;import java.util.concurrent.locks.LockSupport;public class OrderedPrintWithLockSupport {public static void main(String[] args) throws InterruptedException {// 定义线程 t1:负责打印 "1"Thread t1 = new Thread(() -> {while (true) {/** 【关键点】LockSupport.park()** - 当前线程(t1)会在此处暂停,进入阻塞状态。* - 停止的条件是:没有“许可”(permit)。* - 一旦其他线程调用 LockSupport.unpark(t1),t1 就会收到一个“许可”,* 然后 park() 方法返回,t1 继续向下执行。** 注意:* 1. 如果 unpark(t1) 在 park() 之前调用,许可会提前存在,park() 不会阻塞。* 2. 多次 unpark(t1) 只会保留一个许可(不会叠加)。** 所以:t1 的执行节奏完全由 t2 的 unpark(t1) 控制。*/LockSupport.park();// 只有被 unpark 唤醒后,才会执行下面这行System.out.println("1");}}, "t1"); // 给线程命名,便于调试// 定义线程 t2:负责打印 "2",并唤醒 t1Thread t2 = new Thread(() -> {while (true) {// 1. 先打印 "2"System.out.println("2");/** 2. 调用 unpark(t1) 给 t1 发放一个“许可”** - 这个操作是非阻塞的,立即返回。* - 如果 t1 正在 park() 阻塞,它会被立即唤醒。* - 如果 t1 还没执行到 park(),这个许可会提前存在,* 等 t1 执行 park() 时,发现已有许可,就不会阻塞。** 关键:这一步确保了 t1 只有在 t2 打印完 "2" 后才能打印 "1"。*/LockSupport.unpark(t1);/** 3. t2 自己休息 500 毫秒** - 防止 t2 循环太快,输出过于密集。* - 给 t1 足够时间被唤醒并打印 "1"。* - 同时避免 CPU 空转。*/try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}, "t2");/** 启动两个线程** 执行流程分析(按时间顺序):** Step 1: t2 先运行(不一定,但逻辑上不影响)* -> 打印 "2"* -> 调用 unpark(t1):给 t1 发放许可* -> t2 进入 sleep(500)** Step 2: t1 开始运行* -> 执行 park():但此时已经有许可(由 t2 提前发放)* => 所以 t1 不会阻塞,直接通过* -> 打印 "1"* -> 进入下一轮循环* -> 再次执行 park():此时没有许可(上一个已被消耗)* => t1 阻塞,等待下一个 unpark** Step 3: t2 睡醒后继续* -> 打印 "2"* -> unpark(t1):唤醒阻塞的 t1* -> t2 再次 sleep** Step 4: t1 被唤醒* -> 打印 "1"* -> 再次 park() -> 阻塞** 如此循环,形成稳定输出:* 2* 1* 2* 1* ...** 保证了输出顺序:2 在前,1 在后*/t1.start();t2.start();}/** LockSupport 是 JDK 提供的底层线程阻塞工具* 相比 wait/notify:* - 不需要 synchronized 块* - 不会因通知过早而丢失(unpark 可以在 park 前调用)* - 更简洁、更安全*/
}
LockSupport.park():阻塞当前线程,直到获得许可
LockSupport.unpark(thread):为指定线程提供许可,唤醒被阻塞的线程
执行顺序控制:t2先打印"2"并唤醒t1,t1被唤醒后打印"1"
循环机制:两线程都使用无限循环,通过LockSupport实现交替执行
交替输出
连续输出 5 次 abc:
package com.cg.jucproject.demo;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 演示:使用 ReentrantLock + Condition 实现三个线程按顺序循环打印 a b c* 输出效果:a b c a b c ... (共5轮)*/
public class day2_14 {public static void main(String[] args) throws InterruptedException {// 创建 AwaitSignal 对象,指定循环次数为 5AwaitSignal awaitSignal = new AwaitSignal(5);// 为每个线程创建独立的 Condition 条件变量Condition a = awaitSignal.newCondition(); // 控制线程A(打印"a")Condition b = awaitSignal.newCondition(); // 控制线程B(打印"b")Condition c = awaitSignal.newCondition(); // 控制线程C(打印"c")// 启动三个线程,每个线程负责打印一个字符,并唤醒下一个new Thread(() -> {awaitSignal.print("a", a, b); // 打印"a",由a控制,唤醒b}, "t1").start();new Thread(() -> {awaitSignal.print("b", b, c); // 打印"b",由b控制,唤醒c}, "t2").start();new Thread(() -> {awaitSignal.print("c", c, a); // 打印"c",由c控制,唤醒a}, "t3").start();// 主线程休眠1秒,确保三个工作线程都已启动并进入 await 状态Thread.sleep(1000);/** 【关键启动步骤】主线程开始第一个信号** 此时三个线程都已执行 lock() 并调用 await(),全部阻塞在各自的 condition 上。** 我们需要手动唤醒第一个线程(打印"a"的线程),触发整个流程。** 注意:必须先获取锁才能调用 signal()*/awaitSignal.lock();try {System.out.print("开始:");a.signal(); // 唤醒等待在 condition a 上的线程(即打印"a"的线程)} finally {awaitSignal.unlock(); // 释放锁,让被唤醒的线程能获取锁继续执行}}
}/*** 自定义锁类,继承 ReentrantLock,用于管理循环打印逻辑*/
class AwaitSignal extends ReentrantLock {private int loopNumber; // 循环打印的次数(每轮每个字母打印一次)public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}/*** 打印方法* @param str 当前线程要打印的内容* @param condition 当前线程等待的条件变量* @param next 下一个要唤醒的条件变量*/public void print(String str, Condition condition, Condition next) {// 每个线程会循环 loopNumber 次(即参与 5 轮打印)for (int i = 0; i < loopNumber; i++) {lock(); // 获取锁(ReentrantLock)try {/** 1. 当前线程等待在其对应的 condition 上* - 线程A:等待在 a 上* - 线程B:等待在 b 上* - 线程C:等待在 c 上** 调用 await() 会:* - 释放当前持有的锁* - 进入阻塞状态,直到被 signal() 唤醒* - 唤醒后重新竞争锁,获取锁后才继续执行*/condition.await();/** 2. 被唤醒后执行打印* 只有被 signal() 唤醒的线程才能执行到这里*/System.out.print(str);/** 3. 唤醒“下一个”线程* - 打印"a"的线程唤醒 b(即打印"b"的线程)* - 打印"b"的线程唤醒 c(即打印"c"的线程)* - 打印"c"的线程唤醒 a(即打印"a"的线程)** 形成闭环唤醒链:a → b → c → a → ...*/next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {unlock(); // 释放锁,让下一个线程能获取锁}}}
}
阶段 | 事件 | 说明 |
---|---|---|
1 | 三个线程启动 | 每个线程调用 print() ,进入 lock() |
2 | 线程获取锁,调用 await() | 释放锁并阻塞在各自的 Condition 上:<br> - t1 阻塞在 a <br> - t2 阻塞在 b <br> - t3 阻塞在 c |
3 | 主线程 sleep(1000) | 确保所有线程都已进入阻塞状态 |
4 | 主线程 a.signal() | 唤醒等待在 a 上的线程(t1) |
5 | t1 被唤醒,获取锁 | 打印 "a" ,然后 b.signal() 唤醒 t2 |
6 | t2 被唤醒,获取锁 | 打印 "b" ,然后 c.signal() 唤醒 t3 |
7 | t3 被唤醒,获取锁 | 打印 "c" ,然后 a.signal() 唤醒 t1 |
8 | 回到 t1,第二轮开始 | 循环继续,直到每轮打印 5 次 |
机制 | 作用 |
---|---|
ReentrantLock | 提供可重入锁,保证同一时间只有一个线程执行打印 |
Condition | 每个线程有独立的等待条件,避免误唤醒 |
await() | 释放锁并阻塞,直到被 signal() 唤醒 |
signal() | 唤醒指定 Condition 上的一个线程 |
next.signal() | 实现“链式唤醒”,形成顺序执行 |
异步模式
传统版
异步模式之生产者/消费者:
package com.cg.jucproject.demo;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** 演示:传统版生产者/消费者模式(Producer-Consumer Pattern)* 使用 ReentrantLock + Condition 实现线程间通信* 场景:一个生产者线程生产,一个消费者线程消费,共享一个变量 number*/
class ShareData {private int number = 0; // 共享资源:当前产品数量(0表示无产品,1表示有待消费)private Lock lock = new ReentrantLock(); // 可重入锁,保护共享资源private Condition condition = lock.newCondition(); // 条件变量,用于线程等待/唤醒/*** 生产方法:生产一个产品(number 从 0 → 1)*/public void increment() throws Exception {lock.lock(); // 获取锁,确保同一时间只有一个线程能执行此方法try {/** 【1. 判断】是否能生产?** 使用 while 而不是 if 的原因:* - 防止“虚假唤醒”* - 当多个消费者线程时,signalAll() 会唤醒所有等待线程,* 但只有一个能抢到锁,其他线程醒来后必须重新判断条件是否成立** 条件:只有当 number == 0 时才能生产* 如果 number != 0(已有产品未消费),则当前生产者必须等待*/while (number != 0) {System.out.println(Thread.currentThread().getName() + " 进入等待(产品未消费)");condition.await(); // 释放锁,进入 condition 的等待队列}/** 【2. 干活】执行生产操作** 此时 number 一定为 0,可以安全生产*/number++;System.out.println(Thread.currentThread().getName() + "\t 生产:+1,当前数量:" + number);/** 【3. 通知】唤醒所有等待在 condition 上的线程** 这里使用 signalAll() 是因为:* - 可能有多个消费者或生产者线程在等待* - 我们不知道谁该被唤醒,所以全部唤醒,由 while 判断决定谁继续执行** 注意:signal() 只唤醒一个线程,但在多线程场景下不够安全*/condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock(); // 无论成功与否,必须释放锁}}/*** 消费方法:消费一个产品(number 从 1 → 0)*/public void decrement() throws Exception {lock.lock(); // 获取锁try {/** 【1. 判断】是否能消费?** 条件:只有当 number == 1 时才能消费* 如果 number == 0(无产品),则消费者必须等待*/while (number == 0) {System.out.println(Thread.currentThread().getName() + " 进入等待(无产品可消费)");condition.await(); // 释放锁,进入等待}/** 【2. 干活】执行消费操作*/number--;System.out.println(Thread.currentThread().getName() + "\t 消费:-1,当前数量:" + number);/** 【3. 通知】唤醒所有等待线程* 可能有多个生产者在等待生产*/condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}/*** 主程序:启动一个生产者和一个消费者线程*/
public class TraditionalProducerConsumer {public static void main(String[] args) {ShareData shareData = new ShareData();// 创建生产者线程 t1:生产 5 次new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.increment();// 模拟生产耗时// Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}}}, "生产者-t1").start();// 创建消费者线程 t2:消费 5 次new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.decrement();// 模拟消费耗时// Thread.sleep(200);} catch (Exception e) {e.printStackTrace();}}}, "消费者-t2").start();/** 【程序执行流程分析】** 初始状态:number = 0** Step 1: 生产者 t1 先运行(不一定,但假设它先抢到锁)* -> 判断 number == 0 ,可以生产* -> number++ → 1* -> 打印:生产 +1* -> signalAll() 唤醒所有线程(此时消费者可能在等)* -> 释放锁** Step 2: 消费者 t2 开始运行* -> 判断 number == 1 ,可以消费* -> number-- → 0* -> 打印:消费 -1* -> signalAll() 唤醒所有线程* -> 释放锁** Step 3: t1 再次生产(第二轮)* -> number == 0 ,生产 → 1* -> ...** Step 4: t2 再次消费* -> number == 1 ,消费 → 0* -> ...** 如此交替进行,形成:* 生产者-t1 生产:+1,当前数量:1* 消费者-t2 消费:-1,当前数量:0* 生产者-t1 生产:+1,当前数量:1* 消费者-t2 消费:-1,当前数量:0* ...** 实现了“生产一个 → 消费一个”的同步协作*/// 主线程不等待,让生产者和消费者完成// 实际中可使用 CountDownLatch 等待完成}
}
机制 | 说明 |
---|---|
ReentrantLock | 提供可重入锁,保证线程安全 |
Condition | 实现线程间精确通信,替代 wait/notify |
while 判断 | 防止虚假唤醒,确保条件成立 |
signalAll() | 唤醒所有等待线程,避免死锁(尤其在多生产者/消费者时) |
改进版
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;/*** 演示:改进版异步模式生产者/消费者模式* 使用阻塞队列实现线程间通信,平衡生产和消费的线程资源。*/
public class DemoD {public static void main(String[] args) {MessageQueue queue = new MessageQueue(2); // 创建一个容量为2的消息队列// 启动3个生产者线程for (int i = 0; i < 3; i++) {int id = i;new Thread(() -> {try {queue.put(new Message(id, "值" + id));System.out.println(Thread.currentThread().getName() + ": 已生产消息 -- " + id);} catch (InterruptedException e) {e.printStackTrace();}}, "生产者" + i).start();}// 启动一个消费者线程,持续从队列中取消息new Thread(() -> {while (true) {try {TimeUnit.SECONDS.sleep(1); // 模拟消费耗时Message message = queue.take();System.out.println(Thread.currentThread().getName() + ": 已消费消息 -- " + message.getValue());} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者").start();}
}// 消息队列类,用于Java线程之间通信
class MessageQueue {private LinkedList<Message> list = new LinkedList<>(); // 消息的队列集合private int capacity; // 队列容量public MessageQueue(int capacity) {this.capacity = capacity;}/*** 获取消息** @return 消息对象* @throws InterruptedException 如果当前线程被中断,则抛出此异常*/public synchronized Message take() throws InterruptedException {// 【1. 判断】检查队列是否为空while (list.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": 队列为空,消费者线程等待");wait(); // 当前线程进入等待状态,释放锁}// 【2. 干活】从队列头部获取消息并返回Message message = list.removeFirst();System.out.println(Thread.currentThread().getName() + ": 已消费消息 -- " + message.getValue());// 【3. 通知】唤醒所有等待在该对象上的线程notifyAll();return message;}/*** 存入消息** @param message 要存入的消息对象* @throws InterruptedException 如果当前线程被中断,则抛出此异常*/public synchronized void put(Message message) throws InterruptedException {// 【1. 判断】检查队列是否已满while (list.size() == capacity) {System.out.println(Thread.currentThread().getName() + ": 队列已满,生产者线程等待");wait(); // 当前线程进入等待状态,释放锁}// 【2. 干活】将消息加入队列尾部list.addLast(message);System.out.println(Thread.currentThread().getName() + ": 已生产消息 -- " + message.getValue());// 【3. 通知】唤醒所有等待在该对象上的线程notifyAll();}
}// 消息类
final class Message {private int id;private String value;public Message(int id, String value) {this.id = id;this.value = value;}public int getId() {return id;}public String getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value='" + value + '\'' +'}';}
}
机制 | 说明 |
---|---|
LinkedList | 实现消息队列,支持先进先出(FIFO) |
synchronized | 确保方法同步,防止并发问题 |
wait() 和 notifyAll() | 实现线程间的精确通信 |
while 循环 | 防止虚假唤醒,确保条件成立 |
为什么使用 LinkedList
?
LinkedList
是双向链表实现,支持高效的插入和删除操作(O(1) 复杂度)- 适合实现队列结构,提供
addLast()
和removeFirst()
方法
阻塞队列
package com.cg.jucproject.demo;import java.util.concurrent.*;/*** 演示:阻塞队列 BlockingQueue 的使用* 特别聚焦:SynchronousQueue —— 一种“不存储元素”的特殊阻塞队列** 核心特点:* - 容量为 0!不存储任何元素* - 生产者线程必须等待消费者线程“手递手”交接数据(hand-off)* - put() 和 take() 必须同时就绪,才能完成数据传递* - 类似“交换机”或“管道”,实现线程间直接通信*/
public class BlockingQueueSynchronousDemo {public static void main(String[] args) {// 创建一个单线程的消费者线程池ExecutorService consumer = Executors.newFixedThreadPool(1);// 创建一个单线程的生产者线程池ExecutorService producer = Executors.newFixedThreadPool(1);/** 【关键】使用 SynchronousQueue** 与 ArrayBlockingQueue、LinkedBlockingQueue 的区别:** | 队列类型 | 是否存储元素 | 容量 | 数据传递方式 |* |----------------------|--------------|------|------------------------|* | Array/LinkedBlockingQueue | 是 | >0 | 生产 → 存 → 消费 |* | SynchronousQueue | 否 | 0 | 生产 ↔ 消费(直接交接) |** SynchronousQueue 的 put() 和 take() 必须“配对”执行:* - 如果先调用 put(),生产者会阻塞,直到有消费者调用 take()* - 如果先调用 take(),消费者会阻塞,直到有生产者调用 put()** 这是一种“ rendezvous(会面)”机制,确保生产者和消费者直接见面交接*/BlockingQueue<Integer> queue = new SynchronousQueue<>();// 提交生产者任务producer.submit(() -> {try {System.out.println("【生产者】开始生产数据...");// 模拟生产耗时(例如计算、IO等)Thread.sleep(1000);System.out.println("【生产者】准备 put(10) 到队列...");/** 【阻塞点】queue.put(10)** 此时:* - 队列是空的(且容量为0)* - 如果没有消费者在等待 take(),put() 会一直阻塞* - 直到消费者调用 take(),两者“会面”后数据直接传递** 注意:put() 不会返回,直到交接完成*/queue.put(10);System.out.println("【生产者】数据 10 已成功交接,任务结束。");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("【生产者】被中断!");e.printStackTrace();}});// 提交消费者任务consumer.submit(() -> {try {System.out.println("【消费者】启动,等待消费...");/** 【阻塞点】Integer result = queue.take()** 此时:* - 队列中没有数据(且无法预先存储)* - 消费者会一直阻塞,直到生产者调用 put()* - 一旦 put() 发生,take() 立即返回,获取数据*/Integer result = queue.take();System.out.println("【消费者】成功从队列 take() 数据:" + result);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("【消费者】被中断!");e.printStackTrace();}});}
}
SynchronousQueue
的核心特性
特性 | 说明 |
---|---|
容量为 0 | 不存储任何元素,peek() 永远返回 null |
手递手传递(Hand-off) | 生产者必须等待消费者,直接传递数据 |
高吞吐、低延迟 | 适合工作窃取、线程池(如 CachedThreadPool ) |
put/take 必须配对 | 任一操作都会阻塞,直到对方就绪 |
公平性可选 | 支持 FIFO 或非公平(默认非公平) |
与其他阻塞队列对比
队列类型 | 存储 | 典型用途 |
---|---|---|
ArrayBlockingQueue | 有界数组 | 固定线程池,任务队列 |
LinkedBlockingQueue | 无界/有界链表 | Web 服务器请求队列 |
SynchronousQueue | 无存储 | 快速交接,Executors.newCachedThreadPool() 底层实现 |
DelayQueue | 延迟元素 | 定时任务 |
PriorityBlockingQueue | 优先级队列 | 任务优先级调度 |