多线程之阻塞队列
目录
1 什么是阻塞队列
2 生产者消费者模型
3 Java标准库中的阻塞队列
4 自己实现一个阻塞队列
1 什么是阻塞队列
阻塞队列是一种特殊的队列,也遵守“先进先出”的规则。
阻塞队列是一种线程安全的数据结构,并且具有下面的两点特性:
- 队列为空时,尝试出队列,出队列操作就会阻塞,等到其他线程为队列添加元素之后阻塞结束;
- 队列为满时,尝试入队列,入队列操作就会进行阻塞,等到其他线程去除队列中的元素之后阻塞结束。
阻塞队列最主要的一个应用场景就是“生产者消费者模型”。
2 生产者消费者模型
所谓“生产者消费者模型”,顾名思义,就是有的服务器负责生产,有的服务器进行消费。在生产者消费者模型中使用阻塞队列有两个很明显的优势:
- 削峰填谷
当服务器A向服务器B发送大量请求时,服务器B可能会因为性能问题一下子处理不了大量的请求而出现故障。这个时候如果在服务器A和服务器B中间加入一个阻塞队列,就能够解决这种发送大量请求的问题。
这个时候阻塞队列就相当于一个缓冲区,当A发送大量请求的时候,这些请求先被存入了阻塞队列中,而B从阻塞队列中拿取这些请求进行处理,这样B就按照自己的节奏进行处理,就不会出现像那种因为请求激增而出现故障的现象了。
- 解耦合
如果是服务器A和服务器B直接进行交互,那么在编写代码的时候,不管是编写服务器A还是服务器B的代码,总会多多少少涉及到对方的一些逻辑。这个时候如果一方有某些改变,那么另外一方也应该做出一些相应的改变来适应。这种情况就使得A和B有着较高的耦合。
如果使用阻塞队列,那么A和B之间就不用进行直接的交互,他们之间的交流通过阻塞队列进行完成,也就是说这个时候不论是A还是B都只需要在意与阻塞队列的交流就好了,至于对方做出了什么改变就对自己没有影响了。
3 Java标准库中的阻塞队列
在Java的标准库中内置了阻塞队列,BlockingQueue是一个接口,有很多实现了BlockingQueue的类,在这里我们选择LinkedBlockingQueue进行演示。

阻塞队列入队列操作使用put方法,出队列操作使用take方法。与普通的队列一样,阻塞队列也有offer和poll方法,只是如果使用这种方法阻塞队列就变成了简单的队列,没有阻塞的属性了。
下面我们使用一个简单的生产者消费者模型进行演示:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo {public static void main(String[] args) {
// 为了演示简单,这里我们让阻塞队列的大小为5BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);Thread producer = new Thread(()->{int n = 0;while(true){try {blockingQueue.put(n);System.out.println("生产元素 " + n);n++;
//此处生产者线程进行了sleep,所以在程序进行运行的时候就是,生产一个元素再消费一个元素
// Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"producer");Thread consumer = new Thread(()->{while(true){try {Integer n = blockingQueue.take();System.out.println("消费元素 " + n);
// 此处,消费者线程进行了sleep,所以在程序进行运行的时候就是,
// 先一次性生产了5个元素之后,消费一个再生产一个
// Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"consumer");producer.start();consumer.start();}
}
运行结果:
生产者线程sleep(生产一个消费一个)

消费者线程sleep(先一次性生产5个,然后消费一个生产一个)

4 自己实现一个阻塞队列
实现一个MyBlockingQueue类,其中包括put和take方法。
class MyBlockingQueue{// 首先 一个阻塞队列中应该包含哪些东西// 存放元素的数组 先不进行初始化private String[] data = null;// 队列的头 其实就是数组的下标,所以是int类型private int head = 0;// 队列的尾 也是数组下标,一开始数组为空,所以头和尾下标都为 0private int tail = 0;// 队列中元素的个数,元素的个数和数组的大小不同,因为数组不一定是满的private int size = 0;// 构造方法 再构造方法中对数组进行初始化public MyBlockingQueue(int capacity){data = new String[capacity];}// put 方法 往队列中放元素public void put(String elem) throws InterruptedException {// 使用锁实现阻塞行为synchronized(this){// 如果队列中的元素大于或者等于数组的大小,那么说明队列满了,需要阻塞// 直到有线程使用take方法取出元素才能继续putwhile(size >= data.length){this.wait();}// 在数组的尾部添加元素data[tail] = elem;// 尾部向后移一位tail++;// 如果 tail 走到了数组的最后,那么就让 tail 为 0if(tail >= data.length){tail = 0;}// 在队列中放入一个元素之后,就让 size++size++;// 此处的notify是为了唤醒因为take阻塞的线程,因为这个时候队列并不为空,所以不需要再阻塞this.notify();}}// take 方法 从队列中取出元素public String take() throws InterruptedException {// 同样使用锁来实现阻塞行为synchronized (this){// 如果队列的大小为0,说明队列中没有元素,需要阻塞// 直到有put方法将元素放入while(size == 0){this.wait();}// 取出元素的时候从头开始取String ret = data[head];head++;// 如果 head 走到最后,那么让 head 重新为 0if(head >= data.length){head = 0;}// 从队列中取出元素之后,让size--size--;// 此处的notify唤醒的是因为put线程阻塞的线程,因为这个时候队列中不是满的,所以可以进行添加元素this.notify();return ret;}}
}
public class Demo {public static void main(String[] args) {MyBlockingQueue blockingQueue = new MyBlockingQueue(5);Thread producer = new Thread(()->{while(true){try {String str = "hello";blockingQueue.put(str);System.out.println("生产元素 " + str);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"producer");Thread consumer = new Thread(()->{while(true){try {String str = blockingQueue.take();System.out.println("消费元素 " + str);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"consumer");producer.start();consumer.start();}
}
此处有两个问题:
- 在put方法中为什么在 tail 到最后的时候让tail = 0?那这样下一次放入元素的时候不会覆盖 tail[0] 中的元素吗?

- 在take方法中为什么在 head 到最后的时候让 head =0?那这样下一次取出元素的时候会不会取不到head[0]中的元素吗?

下面对这两个问题进行解释:
1.
2.
