当前位置: 首页 > news >正文

JUC并发编程08 - 同步模式/异步模式

同步模式

保护性暂停

单任务版

Guarded Suspension,用在一个线程等待另一个线程的执行结果

  • 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式

Guarded Suspension 是一种编程模式,用于让一个线程等待另一个线程的执行结果。简单来说,就是“守卫式暂停”。想象一下,你和朋友约定在某个地方见面,但你先到了,你就得在那里等他。等到他来了,你们就可以一起走了。在这个过程中,你就在“守卫式暂停”。

public static void main(String[] args) {// 创建 GuardedObject 对象,它就像你和朋友约定的见面地点GuardedObject object = new GuardedObjectV2();// 启动了一个新线程(假设是你的朋友),让它去完成某个任务(比如买咖啡)// 任务完成后,它会调用 object.complete() 方法,把结果(咖啡)放到 GuardedObject 中// 并通知你在等待的主线程(你)new Thread(() -> {sleep(1); // 让这个线程睡1秒,模拟耗时操作object.complete(Arrays.asList("a", "b", "c")); // 完成任务,设置结果}).start();// 主线程调用 object.get(2500) 方法,表示最多等待2.5秒来获取结果// 如果在这段时间内结果还没准备好,就返回 null。Object response = object.get(2500);// 果拿到了结果,就打印出来;如果没有拿到结果,就打印“can't get response”if (response != null) {log.debug("get response: [{}] lines", ((List<String>) response).size());} else {log.debug("can't get response");}
}

GuardedObject 类详解

/**获取结果的方法 get()
*/
public Object get(long millis) {synchronized (lock) {long begin = System.currentTimeMillis();long timePassed = 0;while (response == null) {long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}timePassed = System.currentTimeMillis() - begin;log.debug("timePassed: {}, object is null {}", timePassed, response == null);}return response;}
}
  • synchronized (lock): 确保同一时间只有一个线程能访问这个方法。
  • while (response == null): 如果结果还没准备好,就一直等待。
  • lock.wait(waitTime): 让当前线程暂停,等待其他线程的通知。
  • timePassed: 记录已经等待的时间,防止超时。
/**设置结果的方法 complete()
*/
public void complete(Object response) {synchronized (lock) {this.response = response;log.debug("notify...");lock.notifyAll();}
}
  • synchronized (lock): 确保同一时间只有一个线程能访问这个方法。
  • this.response = response: 把结果存起来。
  • lock.notifyAll(): 通知所有等待的线程,结果已经准备好了。

从一个线程到另一个线程可以使用消息队列

想象你在一个快餐店。

  • 有一个厨师(生产者线程)在不停地做汉堡。
  • 有一群顾客(消费者线程)在等着吃汉堡。

如果每个顾客都傻站着等:“我点的汉堡好了没?”——那太低效了,厨师会被烦死。

所以,快餐店有个“取餐台”(这就是消息队列),厨师做好一个汉堡就放上去,顾客来了就从取餐台拿一个走。

这样:

  • 厨师不用等顾客,做好就放那儿。
  • 顾客也不用一直问,来了看看有没有自己的汉堡就行。

回到程序:

  • 如果你有多个结果要从一个线程传给另一个线程(比如:不断收到网络数据、不断处理任务结果),
  • 那就不适合用 Guarded Suspension(它只等一个结果)。
  • 而应该用消息队列,比如 Java 里的 BlockingQueue
  • 一个线程往队列里放数据(生产者),另一个线程从队列里取(消费者)。

join() 是怎么用 Guarded Suspension 的?

Thread t = new Thread(() -> {sleep(3);System.out.println("任务完成");
});
t.start();
t.join(); // 主线程在这里等着 t 执行完
System.out.println("t 线程执行完了,我继续...");
  • t.join() 的意思就是:“我(主线程)要等你(t线程)干完活再继续。”
  • 内部是怎么实现的?—— 就是 Guarded Suspension

你可以想象:

  • 每个线程对象里有个“完成标志”(就像 GuardedObject 里的 response)。
  • join() 的时候,主线程检查这个标志:
    • 如果还没完成,就 wait() 等着。
    • 等 t 线程执行完了,JVM 自动调用 notify() 唤醒等它的线程。

所以,join() 本质上就是一个线程在等另一个线程完成,完全符合 Guarded Suspension 的套路。

Future 是怎么用 Guarded Suspension 的?

ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(() -> {sleep(2);return "处理结果";
});String result = future.get(); // 等结果,可能阻塞
System.out.println("拿到结果:" + result);
  • future.get() 的意思就是:“我现在要拿结果,但如果你还没算完,我就等着。”
  • 它内部也有一个“结果变量”,一开始是空的。
  • 调用 get() 时:
  • 发现结果还没来 → 就 wait() 等着。
  • 另一个线程计算完,把结果塞进去,然后 notify() 唤醒你。

这不就是 GuardedObject 的 get() 和 complete() 吗?

图解:

  • t1: 主线程,等待结果。
  • t2: 新线程,完成任务并设置结果。
  • GuardedObject: 两者共享的对象,用于传递结果。

通过这种方式,我们可以实现线程间的同步和通信。

多任务版

什么是多任务版保护性暂停?

想象一下,你在一个小区里住,每天都有快递员给你送快递。但是,你不能一直站在门口等快递,因为你还有很多事情要做。所以,你需要一个信箱来帮你暂时存放快递,当你有空的时候再去取。

在这个例子中,People 就是你(收件人),Postman 是快递员,Mailboxes 是小区的信箱系统,而 GuardedObject 就是每个具体的信箱。

程序例子

主程序

public static void main(String[] args) throws InterruptedException {// 创建3个收件人线程for (int i = 0; i < 3; i++) {new People().start();}// 等待1秒,让收件人准备好他们的信箱Thread.sleep(1000);// 创建3个快递员线程,分别给对应的收件人送快递for (Integer id : Mailboxes.getIds()) {new Postman(id, id + "号快递到了").start();}
}

收件人(People)

@Slf4j(topic = "c.People")
class People extends Thread{@Overridepublic void run() {// 收信GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信id:{}", guardedObject.getId());// 等待快递员送信Object mail = guardedObject.get(5000);log.debug("收到信id:{},内容:{}", guardedObject.getId(), mail);}
}

快递员(Postman)

class Postman extends Thread{private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {// 取出对应的信箱GuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("开始送信id:{},内容:{}", guardedObject.getId(), mail);// 把快递放进信箱guardedObject.complete(mail);}
}

信箱系统(Mailboxes)

class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;// 生成唯一的idprivate static synchronized int generateId() {return id++;}// 获取指定id的信箱public static GuardedObject getGuardedObject(int id) {return boxes.remove(id);}// 创建一个新的信箱public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}// 获取所有信箱的idpublic static Set<Integer> getIds() {return boxes.keySet();}
}

信箱(GuardedObject)

class GuardedObject {private int id;private Object response;private final Object lock = new Object();public GuardedObject(int id) {this.id = id;}public int getId() {return id;}// 获取结果public Object get(long millis) {synchronized (lock) {long begin = System.currentTimeMillis();long timePassed = 0;while (response == null) {long waitTime = millis - timePassed;if (waitTime <= 0) {break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}timePassed = System.currentTimeMillis() - begin;}return response;}}// 设置结果public void complete(Object response) {synchronized (lock) {this.response = response;lock.notifyAll();}}
}

  • t0, t2, t4: 这些是收件人线程,它们创建自己的信箱并等待快递。
  • t1, t3, t5: 这些是快递员线程,它们找到对应的信箱并把快递放进去。
  • Futures: 这是信箱系统,管理所有的信箱。
  • GO1, GO2, GO3: 这些是具体的信箱,每个信箱都有一个唯一的id。

顺序输出

顺序输出 2 1

package com.cg.jucproject.demo;import java.util.concurrent.locks.LockSupport;public class OrderedPrintWithLockSupport {public static void main(String[] args) throws InterruptedException {// 定义线程 t1:负责打印 "1"Thread t1 = new Thread(() -> {while (true) {/** 【关键点】LockSupport.park()** - 当前线程(t1)会在此处暂停,进入阻塞状态。* - 停止的条件是:没有“许可”(permit)。* - 一旦其他线程调用 LockSupport.unpark(t1),t1 就会收到一个“许可”,*   然后 park() 方法返回,t1 继续向下执行。** 注意:*   1. 如果 unpark(t1) 在 park() 之前调用,许可会提前存在,park() 不会阻塞。*   2. 多次 unpark(t1) 只会保留一个许可(不会叠加)。** 所以:t1 的执行节奏完全由 t2 的 unpark(t1) 控制。*/LockSupport.park();// 只有被 unpark 唤醒后,才会执行下面这行System.out.println("1");}}, "t1"); // 给线程命名,便于调试// 定义线程 t2:负责打印 "2",并唤醒 t1Thread t2 = new Thread(() -> {while (true) {// 1. 先打印 "2"System.out.println("2");/** 2. 调用 unpark(t1) 给 t1 发放一个“许可”** - 这个操作是非阻塞的,立即返回。* - 如果 t1 正在 park() 阻塞,它会被立即唤醒。* - 如果 t1 还没执行到 park(),这个许可会提前存在,*   等 t1 执行 park() 时,发现已有许可,就不会阻塞。** 关键:这一步确保了 t1 只有在 t2 打印完 "2" 后才能打印 "1"。*/LockSupport.unpark(t1);/** 3. t2 自己休息 500 毫秒** - 防止 t2 循环太快,输出过于密集。* - 给 t1 足够时间被唤醒并打印 "1"。* - 同时避免 CPU 空转。*/try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}, "t2");/** 启动两个线程** 执行流程分析(按时间顺序):** Step 1: t2 先运行(不一定,但逻辑上不影响)*   -> 打印 "2"*   -> 调用 unpark(t1):给 t1 发放许可*   -> t2 进入 sleep(500)** Step 2: t1 开始运行*   -> 执行 park():但此时已经有许可(由 t2 提前发放)*      => 所以 t1 不会阻塞,直接通过*   -> 打印 "1"*   -> 进入下一轮循环*   -> 再次执行 park():此时没有许可(上一个已被消耗)*      => t1 阻塞,等待下一个 unpark** Step 3: t2 睡醒后继续*   -> 打印 "2"*   -> unpark(t1):唤醒阻塞的 t1*   -> t2 再次 sleep** Step 4: t1 被唤醒*   -> 打印 "1"*   -> 再次 park() -> 阻塞** 如此循环,形成稳定输出:*     2*     1*     2*     1*     ...** 保证了输出顺序:2 在前,1 在后*/t1.start();t2.start();}/** LockSupport 是 JDK 提供的底层线程阻塞工具* 相比 wait/notify:*   - 不需要 synchronized 块*   - 不会因通知过早而丢失(unpark 可以在 park 前调用)*   - 更简洁、更安全*/
}
  • LockSupport.park():阻塞当前线程,直到获得许可

  • LockSupport.unpark(thread):为指定线程提供许可,唤醒被阻塞的线程

  • 执行顺序控制:t2先打印"2"并唤醒t1,t1被唤醒后打印"1"

  • 循环机制:两线程都使用无限循环,通过LockSupport实现交替执行

交替输出

连续输出 5 次 abc:

package com.cg.jucproject.demo;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 演示:使用 ReentrantLock + Condition 实现三个线程按顺序循环打印 a b c* 输出效果:a b c a b c ... (共5轮)*/
public class day2_14 {public static void main(String[] args) throws InterruptedException {// 创建 AwaitSignal 对象,指定循环次数为 5AwaitSignal awaitSignal = new AwaitSignal(5);// 为每个线程创建独立的 Condition 条件变量Condition a = awaitSignal.newCondition(); // 控制线程A(打印"a")Condition b = awaitSignal.newCondition(); // 控制线程B(打印"b")Condition c = awaitSignal.newCondition(); // 控制线程C(打印"c")// 启动三个线程,每个线程负责打印一个字符,并唤醒下一个new Thread(() -> {awaitSignal.print("a", a, b); // 打印"a",由a控制,唤醒b}, "t1").start();new Thread(() -> {awaitSignal.print("b", b, c); // 打印"b",由b控制,唤醒c}, "t2").start();new Thread(() -> {awaitSignal.print("c", c, a); // 打印"c",由c控制,唤醒a}, "t3").start();// 主线程休眠1秒,确保三个工作线程都已启动并进入 await 状态Thread.sleep(1000);/** 【关键启动步骤】主线程开始第一个信号** 此时三个线程都已执行 lock() 并调用 await(),全部阻塞在各自的 condition 上。** 我们需要手动唤醒第一个线程(打印"a"的线程),触发整个流程。** 注意:必须先获取锁才能调用 signal()*/awaitSignal.lock();try {System.out.print("开始:");a.signal(); // 唤醒等待在 condition a 上的线程(即打印"a"的线程)} finally {awaitSignal.unlock(); // 释放锁,让被唤醒的线程能获取锁继续执行}}
}/*** 自定义锁类,继承 ReentrantLock,用于管理循环打印逻辑*/
class AwaitSignal extends ReentrantLock {private int loopNumber; // 循环打印的次数(每轮每个字母打印一次)public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}/*** 打印方法* @param str 当前线程要打印的内容* @param condition 当前线程等待的条件变量* @param next 下一个要唤醒的条件变量*/public void print(String str, Condition condition, Condition next) {// 每个线程会循环 loopNumber 次(即参与 5 轮打印)for (int i = 0; i < loopNumber; i++) {lock(); // 获取锁(ReentrantLock)try {/** 1. 当前线程等待在其对应的 condition 上*    - 线程A:等待在 a 上*    - 线程B:等待在 b 上*    - 线程C:等待在 c 上** 调用 await() 会:*   - 释放当前持有的锁*   - 进入阻塞状态,直到被 signal() 唤醒*   - 唤醒后重新竞争锁,获取锁后才继续执行*/condition.await();/** 2. 被唤醒后执行打印*    只有被 signal() 唤醒的线程才能执行到这里*/System.out.print(str);/** 3. 唤醒“下一个”线程*    - 打印"a"的线程唤醒 b(即打印"b"的线程)*    - 打印"b"的线程唤醒 c(即打印"c"的线程)*    - 打印"c"的线程唤醒 a(即打印"a"的线程)** 形成闭环唤醒链:a → b → c → a → ...*/next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {unlock(); // 释放锁,让下一个线程能获取锁}}}
}
阶段事件说明
1三个线程启动每个线程调用 print(),进入 lock()
2线程获取锁,调用 await()释放锁并阻塞在各自的 Condition 上:<br> - t1 阻塞在 a<br> - t2 阻塞在 b<br> - t3 阻塞在 c
3主线程 sleep(1000)确保所有线程都已进入阻塞状态
4主线程 a.signal()唤醒等待在 a 上的线程(t1)
5t1 被唤醒,获取锁打印 "a",然后 b.signal() 唤醒 t2
6t2 被唤醒,获取锁打印 "b",然后 c.signal() 唤醒 t3
7t3 被唤醒,获取锁打印 "c",然后 a.signal() 唤醒 t1
8回到 t1,第二轮开始循环继续,直到每轮打印 5 次
机制作用
ReentrantLock提供可重入锁,保证同一时间只有一个线程执行打印
Condition每个线程有独立的等待条件,避免误唤醒
await()释放锁并阻塞,直到被 signal() 唤醒
signal()唤醒指定 Condition 上的一个线程
next.signal()实现“链式唤醒”,形成顺序执行

异步模式

传统版

异步模式之生产者/消费者:

package com.cg.jucproject.demo;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** 演示:传统版生产者/消费者模式(Producer-Consumer Pattern)* 使用 ReentrantLock + Condition 实现线程间通信* 场景:一个生产者线程生产,一个消费者线程消费,共享一个变量 number*/
class ShareData {private int number = 0;                    // 共享资源:当前产品数量(0表示无产品,1表示有待消费)private Lock lock = new ReentrantLock();   // 可重入锁,保护共享资源private Condition condition = lock.newCondition(); // 条件变量,用于线程等待/唤醒/*** 生产方法:生产一个产品(number 从 0 → 1)*/public void increment() throws Exception {lock.lock(); // 获取锁,确保同一时间只有一个线程能执行此方法try {/** 【1. 判断】是否能生产?** 使用 while 而不是 if 的原因:*   - 防止“虚假唤醒”*   - 当多个消费者线程时,signalAll() 会唤醒所有等待线程,*     但只有一个能抢到锁,其他线程醒来后必须重新判断条件是否成立** 条件:只有当 number == 0 时才能生产* 如果 number != 0(已有产品未消费),则当前生产者必须等待*/while (number != 0) {System.out.println(Thread.currentThread().getName() + " 进入等待(产品未消费)");condition.await(); // 释放锁,进入 condition 的等待队列}/** 【2. 干活】执行生产操作** 此时 number 一定为 0,可以安全生产*/number++;System.out.println(Thread.currentThread().getName() + "\t 生产:+1,当前数量:" + number);/** 【3. 通知】唤醒所有等待在 condition 上的线程** 这里使用 signalAll() 是因为:*   - 可能有多个消费者或生产者线程在等待*   - 我们不知道谁该被唤醒,所以全部唤醒,由 while 判断决定谁继续执行** 注意:signal() 只唤醒一个线程,但在多线程场景下不够安全*/condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock(); // 无论成功与否,必须释放锁}}/*** 消费方法:消费一个产品(number 从 1 → 0)*/public void decrement() throws Exception {lock.lock(); // 获取锁try {/** 【1. 判断】是否能消费?** 条件:只有当 number == 1 时才能消费* 如果 number == 0(无产品),则消费者必须等待*/while (number == 0) {System.out.println(Thread.currentThread().getName() + " 进入等待(无产品可消费)");condition.await(); // 释放锁,进入等待}/** 【2. 干活】执行消费操作*/number--;System.out.println(Thread.currentThread().getName() + "\t 消费:-1,当前数量:" + number);/** 【3. 通知】唤醒所有等待线程* 可能有多个生产者在等待生产*/condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}/*** 主程序:启动一个生产者和一个消费者线程*/
public class TraditionalProducerConsumer {public static void main(String[] args) {ShareData shareData = new ShareData();// 创建生产者线程 t1:生产 5 次new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.increment();// 模拟生产耗时// Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}}}, "生产者-t1").start();// 创建消费者线程 t2:消费 5 次new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.decrement();// 模拟消费耗时// Thread.sleep(200);} catch (Exception e) {e.printStackTrace();}}}, "消费者-t2").start();/** 【程序执行流程分析】** 初始状态:number = 0** Step 1: 生产者 t1 先运行(不一定,但假设它先抢到锁)*   -> 判断 number == 0 ,可以生产*   -> number++ → 1*   -> 打印:生产 +1*   -> signalAll() 唤醒所有线程(此时消费者可能在等)*   -> 释放锁** Step 2: 消费者 t2 开始运行*   -> 判断 number == 1 ,可以消费*   -> number-- → 0*   -> 打印:消费 -1*   -> signalAll() 唤醒所有线程*   -> 释放锁** Step 3: t1 再次生产(第二轮)*   -> number == 0 ,生产 → 1*   -> ...** Step 4: t2 再次消费*   -> number == 1 ,消费 → 0*   -> ...** 如此交替进行,形成:*     生产者-t1	 生产:+1,当前数量:1*     消费者-t2	 消费:-1,当前数量:0*     生产者-t1	 生产:+1,当前数量:1*     消费者-t2	 消费:-1,当前数量:0*     ...** 实现了“生产一个 → 消费一个”的同步协作*/// 主线程不等待,让生产者和消费者完成// 实际中可使用 CountDownLatch 等待完成}
}
机制说明
ReentrantLock提供可重入锁,保证线程安全
Condition实现线程间精确通信,替代 wait/notify
while 判断防止虚假唤醒,确保条件成立
signalAll()唤醒所有等待线程,避免死锁(尤其在多生产者/消费者时)

改进版

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;/*** 演示:改进版异步模式生产者/消费者模式* 使用阻塞队列实现线程间通信,平衡生产和消费的线程资源。*/
public class DemoD {public static void main(String[] args) {MessageQueue queue = new MessageQueue(2); // 创建一个容量为2的消息队列// 启动3个生产者线程for (int i = 0; i < 3; i++) {int id = i;new Thread(() -> {try {queue.put(new Message(id, "值" + id));System.out.println(Thread.currentThread().getName() + ": 已生产消息 -- " + id);} catch (InterruptedException e) {e.printStackTrace();}}, "生产者" + i).start();}// 启动一个消费者线程,持续从队列中取消息new Thread(() -> {while (true) {try {TimeUnit.SECONDS.sleep(1); // 模拟消费耗时Message message = queue.take();System.out.println(Thread.currentThread().getName() + ": 已消费消息 -- " + message.getValue());} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者").start();}
}// 消息队列类,用于Java线程之间通信
class MessageQueue {private LinkedList<Message> list = new LinkedList<>(); // 消息的队列集合private int capacity; // 队列容量public MessageQueue(int capacity) {this.capacity = capacity;}/*** 获取消息** @return 消息对象* @throws InterruptedException 如果当前线程被中断,则抛出此异常*/public synchronized Message take() throws InterruptedException {// 【1. 判断】检查队列是否为空while (list.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": 队列为空,消费者线程等待");wait(); // 当前线程进入等待状态,释放锁}// 【2. 干活】从队列头部获取消息并返回Message message = list.removeFirst();System.out.println(Thread.currentThread().getName() + ": 已消费消息 -- " + message.getValue());// 【3. 通知】唤醒所有等待在该对象上的线程notifyAll();return message;}/*** 存入消息** @param message 要存入的消息对象* @throws InterruptedException 如果当前线程被中断,则抛出此异常*/public synchronized void put(Message message) throws InterruptedException {// 【1. 判断】检查队列是否已满while (list.size() == capacity) {System.out.println(Thread.currentThread().getName() + ": 队列已满,生产者线程等待");wait(); // 当前线程进入等待状态,释放锁}// 【2. 干活】将消息加入队列尾部list.addLast(message);System.out.println(Thread.currentThread().getName() + ": 已生产消息 -- " + message.getValue());// 【3. 通知】唤醒所有等待在该对象上的线程notifyAll();}
}// 消息类
final class Message {private int id;private String value;public Message(int id, String value) {this.id = id;this.value = value;}public int getId() {return id;}public String getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value='" + value + '\'' +'}';}
}
机制说明
LinkedList实现消息队列,支持先进先出(FIFO)
synchronized确保方法同步,防止并发问题
wait() 和 notifyAll()实现线程间的精确通信
while 循环防止虚假唤醒,确保条件成立

为什么使用 LinkedList

  • LinkedList 是双向链表实现,支持高效的插入和删除操作(O(1) 复杂度)
  • 适合实现队列结构,提供 addLast() 和 removeFirst() 方法

阻塞队列

package com.cg.jucproject.demo;import java.util.concurrent.*;/*** 演示:阻塞队列 BlockingQueue 的使用* 特别聚焦:SynchronousQueue —— 一种“不存储元素”的特殊阻塞队列** 核心特点:*   - 容量为 0!不存储任何元素*   - 生产者线程必须等待消费者线程“手递手”交接数据(hand-off)*   - put() 和 take() 必须同时就绪,才能完成数据传递*   - 类似“交换机”或“管道”,实现线程间直接通信*/
public class BlockingQueueSynchronousDemo {public static void main(String[] args) {// 创建一个单线程的消费者线程池ExecutorService consumer = Executors.newFixedThreadPool(1);// 创建一个单线程的生产者线程池ExecutorService producer = Executors.newFixedThreadPool(1);/** 【关键】使用 SynchronousQueue** 与 ArrayBlockingQueue、LinkedBlockingQueue 的区别:** | 队列类型             | 是否存储元素 | 容量 | 数据传递方式         |* |----------------------|--------------|------|------------------------|* | Array/LinkedBlockingQueue | 是           | >0   | 生产 → 存 → 消费       |* | SynchronousQueue     | 否           | 0    | 生产 ↔ 消费(直接交接) |** SynchronousQueue 的 put() 和 take() 必须“配对”执行:*   - 如果先调用 put(),生产者会阻塞,直到有消费者调用 take()*   - 如果先调用 take(),消费者会阻塞,直到有生产者调用 put()** 这是一种“ rendezvous(会面)”机制,确保生产者和消费者直接见面交接*/BlockingQueue<Integer> queue = new SynchronousQueue<>();// 提交生产者任务producer.submit(() -> {try {System.out.println("【生产者】开始生产数据...");// 模拟生产耗时(例如计算、IO等)Thread.sleep(1000);System.out.println("【生产者】准备 put(10) 到队列...");/** 【阻塞点】queue.put(10)** 此时:*   - 队列是空的(且容量为0)*   - 如果没有消费者在等待 take(),put() 会一直阻塞*   - 直到消费者调用 take(),两者“会面”后数据直接传递** 注意:put() 不会返回,直到交接完成*/queue.put(10);System.out.println("【生产者】数据 10 已成功交接,任务结束。");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("【生产者】被中断!");e.printStackTrace();}});// 提交消费者任务consumer.submit(() -> {try {System.out.println("【消费者】启动,等待消费...");/** 【阻塞点】Integer result = queue.take()** 此时:*   - 队列中没有数据(且无法预先存储)*   - 消费者会一直阻塞,直到生产者调用 put()*   - 一旦 put() 发生,take() 立即返回,获取数据*/Integer result = queue.take();System.out.println("【消费者】成功从队列 take() 数据:" + result);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("【消费者】被中断!");e.printStackTrace();}});}
}

SynchronousQueue 的核心特性

特性说明
容量为 0不存储任何元素,peek() 永远返回 null
手递手传递(Hand-off)生产者必须等待消费者,直接传递数据
高吞吐、低延迟适合工作窃取、线程池(如 CachedThreadPool
put/take 必须配对任一操作都会阻塞,直到对方就绪
公平性可选支持 FIFO 或非公平(默认非公平)

与其他阻塞队列对比

队列类型存储典型用途
ArrayBlockingQueue有界数组固定线程池,任务队列
LinkedBlockingQueue无界/有界链表Web 服务器请求队列
SynchronousQueue无存储快速交接,Executors.newCachedThreadPool() 底层实现
DelayQueue延迟元素定时任务
PriorityBlockingQueue优先级队列任务优先级调度
http://www.dtcms.com/a/350654.html

相关文章:

  • ROS2 python功能包launch,config文件编译后找不到
  • 链表OJ习题(2)
  • 搭建基于LangChain实现复杂RAG聊天机器人
  • AI在软件研发流程中的提效案例
  • 在vue3后台项目中使用热力图,并给热力图增加点击选中事件
  • Java中删除字符串首字符
  • 【51单片机】【protues仿真】基于51单片机数码管温度报警器系统
  • AR眼镜赋能水利智能巡检的创新实践
  • 算法题打卡力扣第167题:两数之和——输入有序数组(mid)
  • VASP计算层错能(SFE)全攻略2
  • python自学笔记12 NumPy 常见运算
  • QT(1)
  • 独立显卡接口操作指南
  • 小程序开发指南(四)(UI 框架整合)
  • Linux系统网络管理
  • UE5 UI遮罩
  • 人形机器人产业风口下,低延迟音视频传输如何成为核心竞争力
  • Linux笔记9——shell编程基础-3
  • OpenFeign的原理解析
  • FMS回顾和总结
  • C++ 中 `std::map` 的 `insert` 函数
  • 【机器学习项目 心脏病预测】
  • 【广告系列】流量归因模型
  • centos 用 docker 方式安装 dufs
  • 【C++11】auto关键字:自动类型推导
  • Python爬虫实战: 爬虫常用到的技术及方案详解
  • Leetcode top100之链表排序
  • Swift 解法详解 LeetCode 362:敲击计数器,让数据统计更高效
  • 【猿人学】web第一届 第16题 js-逆向 windows蜜罐 / webpack初体验
  • 通过C#上位机串口写入和读取浮点数到stm32实战5(通过串口读取bmp280气压计的数值并在上位机显示)