JavaEE初阶第十期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(八)
专栏:JavaEE初阶起飞计划
个人主页:手握风云
目录
一、多线程案例
1.1. 阻塞队列
一、多线程案例
1.1. 阻塞队列
- 概念
阻塞队列是一种特殊的队列,也遵守“先进先出”的规则。在Java标准库大部分的类是线程不安全的,阻塞队列是一种线程安全的数据结构。队列满时:插入操作会阻塞,直到队列有空余空间。队列空时:取出操作会阻塞,直到队列有新元素。
- 生产者消费者模型
生产者消费者模型是一种通过阻塞队列来解决生产者和消费者之间强耦合问题的开发模型。在该模型中,生产者和消费者不直接进行通讯,而是通过阻塞队列作为中间容器传递数据:生产者生产完数据后,直接将数据放入阻塞队列,无需等待消费者处理;消费者则从阻塞队列中获取数据进行处理,无需主动向生产者索取数据。
我们拿包饺子来说,基本可以简化为擀饺子皮和包饺子两个过程。一种包法:每个人都进行这两个步骤一起包,相比于一个人包,效率就会快很多,但多个人同时针对擀面杖进行竞争,就会造成阻塞。另一种包法:一个人擀饺子皮,剩下的人包饺子。那么此时针对饺子皮这个资源,擀饺子皮的人就是生产者,包饺子的人就是消费者。针对生产者和消费者的定位,具体还要看资源。
阻塞队列用于协调多个线程之间的工作。如果擀饺子皮的人速度较慢,那么包饺子的人就需要阻塞等待;如果擀饺子皮的人速度较快,盖帘上已经没有位置,也需要阻塞等待。
引入生产者消费者模型的目的是为了减少锁竞争,生产者和消费者的步调不一定完全一致,此时阻塞队列就可以起到协调的效果。
优点:1.平衡处理能力(削峰填谷);2.解耦生产者与消费者;3.支持并发协作。
缺点:1.增加系统复杂性;2.资源开销;3.潜在的性能瓶颈。
- 标准库中的阻塞队列
BlockingQueue是一个接口,真正实现的类是LinkedBlockQueue和ArrayBlockQueue。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(10); // 基于顺序表实现}
}
put 方法用于阻塞式的入队列,take 用于阻塞式的出队列。BlockingQueue 也有 offer, poll, peek 等方法,但是这些方法不带有阻塞特性。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}
由于也会触发阻塞异常,也需要抛出。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");}
}
当放入了3个元素之后,就会阻塞等待。如果阻塞队列为空,出元素也会阻塞。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.take();}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;/*** @author gao* @date 2025/7/20 21:05*/public class Demo2 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);Thread producer = new Thread(() -> {int count = 0;try {while (true) {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});Thread consumer = new Thread(() -> {try {while (true) {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();producer.join();consumer.join();}
}
可以看到生产一个元素就会消费一个元素,如果我们删除掉consumer线程里的休眠时间,运行结果没什么区别。但如果删除掉producer线程里面的休眠时间,会发现只消费了一个元素,就会产生阻塞,当生产了1000个元素后,才会消费。
- 阻塞队列的实现
// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {array = new String[capacity];}public void put(String elem) {}public String take() {}
}
我们先不考虑阻塞,当向队列里面添加元素时,tail就向后移动,到达队尾时,就重新回到起始位置。同理,获取队列元素时,就让head向后移动。
/**** @param elem 入队列*/
public void put(String elem) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;
}/**** @return 获取队列元素*/
public String take() {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;
}
接下来需要考虑线程安全:两个线程操作同一个队列时,不会出现bug。我们观察上面的代码,两个线程都是针对同一个变量(size和array中的同一下标)进行修改,修改操作不是原子的,整体代码都进行加锁是比较安全的。并且还需要注意,必须使用同一个锁对象。
private Object locker1 = new Object();
public MyBlockingQueue(int capacity) {array = new String[capacity];
}/**** @param elem 入队列*/
public void put(String elem) {synchronized (locker1) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;}
}/**** @return 获取队列元素*/
public String take() {synchronized (locker1) {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;}
}
接下来需要考虑阻塞:当队列满的时候,主动进入阻塞,并且还需要放进锁里面,当有元素出队列时,就需要唤醒;同理,当队列为空时,阻塞,有元素进队列时,唤醒。
/**** @param elem 入队列*/
public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}
}/**** @return 获取队列元素*/
public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}
}
完整代码实现:
// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;private Object locker1 = new Object();public MyBlockingQueue(int capacity) {array = new String[capacity];}/**** @param elem 入队列*/public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}}/**** @return 获取队列元素*/public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}}
}public class Demo3 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread producer = new Thread(() -> {int count = 0;while (true) {try {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread consumer = new Thread(() -> {while (true) {try {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();consumer.start();}
}
上面是wait()方法的源码,官方文档建议使用while()循环来判断条件。正常来说,必须等待条件被打破了,才能被唤醒,如果是其他代码,不排除唤醒之后还会出现,条件仍然成立的可能性,那么后面的代码执行就会出现问题。while相当于“二次确认”来保证条件确实不成立。