【Java笔记】消息队列
目录
- 1. 阻塞队列
- 2. 阻塞队列的应用场景 :消息队列
- 2.1 作用一:解耦
- 2.2 作用二:削峰填谷(流量)
- 2.3 作用三:异步操作
- 3. 自定义实现阻塞队列
1. 阻塞队列
阻塞队列是一种线程安全的数据结构,遵守“先进先出”的原则,具有以下特性:
- 当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素,有空位再入队
- 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素,有元素再出队
阻塞队列的一个典型应用场景就是 “生产者消费者模型”,这是一种非常典型的开发模型,生产者是生产资源的,消费者是消费资源的
场景: 包包子
生产者:擀包子皮的
消费者:包包子的
如果案板上没有包子皮就加油擀,如果包子皮太多了就休息一会儿,类比到阻塞队列就是阻塞等待,当面皮消耗差不多了再继续擀;包包子这边如果有面皮就加油包包子,如果没有皮就等一会儿(阻塞等待)。
使用JDK中的类创建阻塞队列:
public class Demo_701 {public static void main(String[] args) throws InterruptedException {// 创建一个阻塞队列BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); // 3表示当前的容量是3,最多可以存放3个元素// 向阻塞队列中添加元素queue.put(1);queue.put(2);queue.put(3);System.out.println("已经添加了3个元素");
// queue.put(4);
// System.out.println("已经添加了4个元素"); // 打印不出来System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println("已经取出3个元素");System.out.println(queue.take());System.out.println("已经取出4个元素");}
}
2. 阻塞队列的应用场景 :消息队列
2.1 作用一:解耦
在这个模型中,服务器之间要时刻感知到彼此,在调用过程中双方都要知道对方需要的参数和调用方式,但是,在整个链路中,如果其中一个出现了问题,就会影响整个业务的执行!
于是引入了消息队列:
如何判断消息是给服务器B还是服务器C?
在服务器A生产消息时,可以为消息打一个“标签”,相当于对消息进行分类,可以根据这个标签来获取
比如买包子时候,给老板说要买什么馅的包子,老板就会给你什么馅的包子
2.2 作用二:削峰填谷(流量)
平时业务程序很难应对流量暴增的情况,正常的流量可以满足,大流量的时候,程序会申请很多线程,各种资源最终服务器资源耗尽,被打爆,数据库也会因此崩溃;
因此引入消息队列:
比如淘宝:以双十一大流量为例
2.3 作用三:异步操作
同步:发出请求之后,必须要等到响应才进行下一步操作
异步:发出请求之后,不需要等待响应,而做其他的事情,等到响应主动通知自己
引入消息队列,服务器A不需要死等服务器B返回处理结果,而是可以去做其他事情,当服务器B处理完数据写入到消息队列中,消息队列会通知服务器A来取处理结果,实现异步操作。
3. 自定义实现阻塞队列
使用Array实现一个队列:
head,tail为头尾下标,size记录数组中元素的数量;
入队时tail++,size++,出队时head++,size–;
如果头尾下标越界,则置为0(数组开头);
size=0时表示队列为空,size=array.length时,表示队列满了;
自定义阻塞队列代码实现:
/*** 自定义阻塞队列*/
public class MyBlockingQueue {// 定义数组存放数据private Integer[] elementData;// 定义头尾下标private volatile int head = 0;private volatile int tail = 0;// 定义数组中元素的个数private volatile int size = 0;// 构造方法public MyBlockingQueue(int capacity) {if (capacity <= 0) {throw new RuntimeException("队列容量必须大于0");}elementData = new Integer[capacity];}// 插入数据public void put(Integer value) throws InterruptedException {synchronized (this) {// 判断队列是否为空while (size >= elementData.length) {this.wait();}// 对尾插入元素elementData[tail] = value;// 移动队头下标tail++;if (tail >= elementData.length) {tail = 0;}// 元素个数+1size++;// 唤醒阻塞线程this.notifyAll();}}// 获取数据public synchronized Integer take() throws InterruptedException {// 判断队列是否为空while (size == 0) {this.wait();}Integer value = elementData[head];head++;if (head >= elementData.length) {head = 0;}size--;// 唤醒阻塞线程this.notifyAll();return value;}
}
根据自定义的阻塞队列实现生产者消费者模型:
/*** 创建生产者消费者模型*/
public class Demo_703 {public static void main(String[] args) {// 定义阻塞队列MyBlockingQueue queue = new MyBlockingQueue(100);// 创建生产者线程Thread producer = new Thread(() -> {int num = 0;// 使用循环不同向队列中添加元素,直到容量占满while (true) {try {queue.put(num);System.out.println("生产了元素:" + num);num++;TimeUnit.MICROSECONDS.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 启动线程producer.start();// 创建消费者线程Thread consumer = new Thread(() -> {while (true) {try {Integer value = queue.take();System.out.println("消费了元素:" + value);TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 启动线程consumer.start();}
}
代码执行: