当前位置: 首页 > news >正文

JavaEE初阶第十期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(八)

专栏:JavaEE初阶起飞计划

个人主页:手握风云

目录

一、多线程案例

1.1. 阻塞队列


一、多线程案例

1.1. 阻塞队列

  • 概念

        阻塞队列是一种特殊的队列,也遵守“先进先出”的规则。在Java标准库大部分的类是线程不安全的,阻塞队列是一种线程安全的数据结构。队列满时:插入操作会阻塞,直到队列有空余空间。队列空时:取出操作会阻塞,直到队列有新元素。

  • 生产者消费者模型

        生产者消费者模型是一种通过阻塞队列来解决生产者和消费者之间强耦合问题的开发模型。在该模型中,生产者和消费者不直接进行通讯,而是通过阻塞队列作为中间容器传递数据:生产者生产完数据后,直接将数据放入阻塞队列,无需等待消费者处理;消费者则从阻塞队列中获取数据进行处理,无需主动向生产者索取数据。

        我们拿包饺子来说,基本可以简化为擀饺子皮和包饺子两个过程。一种包法:每个人都进行这两个步骤一起包,相比于一个人包,效率就会快很多,但多个人同时针对擀面杖进行竞争,就会造成阻塞。另一种包法:一个人擀饺子皮,剩下的人包饺子。那么此时针对饺子皮这个资源,擀饺子皮的人就是生产者,包饺子的人就是消费者。针对生产者和消费者的定位,具体还要看资源。

        阻塞队列用于协调多个线程之间的工作。如果擀饺子皮的人速度较慢,那么包饺子的人就需要阻塞等待;如果擀饺子皮的人速度较快,盖帘上已经没有位置,也需要阻塞等待。

        引入生产者消费者模型的目的是为了减少锁竞争,生产者和消费者的步调不一定完全一致,此时阻塞队列就可以起到协调的效果。

        优点:1.平衡处理能力(削峰填谷);2.解耦生产者与消费者;3.支持并发协作。

        缺点:1.增加系统复杂性;2.资源开销;3.潜在的性能瓶颈。

  • 标准库中的阻塞队列

        BlockingQueue是一个接口,真正实现的类是LinkedBlockQueue和ArrayBlockQueue。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(10); // 基于顺序表实现}
}

        put 方法用于阻塞式的入队列,take 用于阻塞式的出队列。BlockingQueue 也有 offer, poll, peek 等方法,但是这些方法不带有阻塞特性。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}

        由于也会触发阻塞异常,也需要抛出。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");}
}

        当放入了3个元素之后,就会阻塞等待。如果阻塞队列为空,出元素也会阻塞。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.take();}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;/*** @author gao* @date 2025/7/20 21:05*/public class Demo2 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);Thread producer = new Thread(() -> {int count = 0;try {while (true) {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});Thread consumer = new Thread(() -> {try {while (true) {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();producer.join();consumer.join();}
}

        可以看到生产一个元素就会消费一个元素,如果我们删除掉consumer线程里的休眠时间,运行结果没什么区别。但如果删除掉producer线程里面的休眠时间,会发现只消费了一个元素,就会产生阻塞,当生产了1000个元素后,才会消费。

  • 阻塞队列的实现
// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {array = new String[capacity];}public void put(String elem) {}public String take() {}
}

        我们先不考虑阻塞,当向队列里面添加元素时,tail就向后移动,到达队尾时,就重新回到起始位置。同理,获取队列元素时,就让head向后移动。


/**** @param elem 入队列*/
public void put(String elem) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;
}/**** @return 获取队列元素*/
public String take() {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;
}

        接下来需要考虑线程安全:两个线程操作同一个队列时,不会出现bug。我们观察上面的代码,两个线程都是针对同一个变量(size和array中的同一下标)进行修改,修改操作不是原子的,整体代码都进行加锁是比较安全的。并且还需要注意,必须使用同一个锁对象。

private Object locker1 = new Object();
public MyBlockingQueue(int capacity) {array = new String[capacity];
}/**** @param elem 入队列*/
public void put(String elem) {synchronized (locker1) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;}
}/**** @return 获取队列元素*/
public String take() {synchronized (locker1) {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;}
}

        接下来需要考虑阻塞:当队列满的时候,主动进入阻塞,并且还需要放进锁里面,当有元素出队列时,就需要唤醒;同理,当队列为空时,阻塞,有元素进队列时,唤醒。

/**** @param elem 入队列*/
public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}
}/**** @return 获取队列元素*/
public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}
}

        完整代码实现:

// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;private Object locker1 = new Object();public MyBlockingQueue(int capacity) {array = new String[capacity];}/**** @param elem 入队列*/public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}}/**** @return 获取队列元素*/public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}}
}public class Demo3 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread producer = new Thread(() -> {int count = 0;while (true) {try {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread consumer = new Thread(() -> {while (true) {try {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();consumer.start();}
}

        上面是wait()方法的源码,官方文档建议使用while()循环来判断条件。正常来说,必须等待条件被打破了,才能被唤醒,如果是其他代码,不排除唤醒之后还会出现,条件仍然成立的可能性,那么后面的代码执行就会出现问题。while相当于“二次确认”来保证条件确实不成立。

http://www.dtcms.com/a/290556.html

相关文章:

  • 用 Three.js 实现 PlayCanvas 风格 PBR 材质教程(第二篇):核心参数与光照模型
  • CS课程项目设计4:支持AI人机对战的五子棋游戏
  • RustDesk自建服务器完整部署指南:从零开始到成功连接。成功解决rustdesk报错:未就绪,请检查网络连接
  • Linux的系统调用机制总结
  • [Python] -项目实战10- 用 Python 自动化批量重命名文件
  • 重学前端008 --- CSS 无障碍 Quiz
  • 《高并发优化方案一》:本地锁 + 分布式锁实战详解
  • Excel函数 —— TEXTJOIN 文本连接
  • 支持不限制大小,大文件分段批量上传功能(不受nginx /apache 上传大小限制)
  • Apache Ignite Closure 和 Thread Pool
  • Ubuntu安装k8s集群入门实践-v1.31
  • WinForm-免费,可商用的WinForm UI框架推荐
  • 类似腾讯会议的私有化音视频会议软件,BeeWorks Meet
  • Go语言进阶书籍:Go语言高级编程(第2版)
  • 开源 Arkts 鸿蒙应用 开发(八)多媒体--相册和相机
  • 45.sentinel自定义异常
  • RIQ模型时间管理方法详解
  • Idea或Pycharm上.idea的忽略提交的问题总结
  • go语言八股
  • MySQL(149)如何进行数据清洗?
  • 09_Spring Boot 整合 Freemarker 模板引擎的坑
  • 【C++】stack和queue拓展学习
  • 库卡气体保护焊机器人省气的方法
  • Mac上安装Homebrew的详细步骤
  • 【CNN】卷积神经网络池化- part2
  • Pytorch01:深度学习中的专业名词及基本介绍
  • 有关Maven的个人笔记总结
  • Zetane:让深度学习不再抽象,一键3D可视化
  • SpringSecurity 详细介绍(认证和授权)
  • 直播专用域名租用全解析:开启直播新境界