多线程 -- 初阶(4) [单例模式 阻塞队列]
点点关注,拜托一键三连🙏🏻
9.多线程案例
设计模式 : 软件设计中针对高频问题的通用可复用的解决方案 , Java 常用的 23 种设计模式通常分为 创建型(单例模式 , 工厂方法模式 , 抽象工厂模式 , 原型模式 , 建造者模式) , 结构型(适配器模式 , 装饰器模式 , 代理模式 , 组合模式 , 外观模式 , 桥接模式 , 享元模式) , 行为型(观察者模式 , 策略模式 , 迭代器模式 , 模板方法模式 , 命令模式 , 状态模式 , 责任链模式 , 备忘录模式 , 中介者模式 , 访问者模式 , 解释器模式) 3 类
此处讲一下单例模式
9.1 单例模式
核心逻辑 : 保证类仅有一个实例 , 而不会创建出多个实例 , 并提供全局访问点
这一点在 JDBC 中的 DataSource 实例就只需要一个
单例模式具体实现方式有很多 , 最常见的是"饿汉"和"懒汉"两种
① 饿汉模式
核心逻辑 : 在类加载时创建实例(用 static尽早创建实例) , 并将构造方法私有化
饿汉模式 只是涉及到读操作 , 因此不会有线程不安全问题
class Singleton{private static Singleton instance = new Singleton();public static Singleton getInstance(){return instance;}private Singleton(){}
}
public class demo26 {public static void main(String[] args) {Singleton t1 = Singleton.getInstance();Singleton t2 = Singleton.getInstance();System.out.println(t1==t2);//Singleton t3 = new Singleton();//报错}
}


此处只是简化代码 , 也可以加入带有参数的构造方法 , 在 new 对象时调用带有参数的构造方法
② 懒汉模式 - 单线程版
核心逻辑 : 类加载时不创建对象 , 第一次使用的时候才创建实例 (延迟创建实例) , 并将构造方法私有化
class Singletonlazy1{private static Singletonlazy1 instance = null;public static Singletonlazy1 getInstance(){if(instance == null){instance = new Singletonlazy1();}return instance;}private Singletonlazy1(){}
}
public class demo27 {public static void main(String[] args) {Singletonlazy1 s1 = Singletonlazy1.getInstance();Singletonlazy1 s2 = Singletonlazy1.getInstance();System.out.println(s1 == s2);//true//Singletonlazy1 s3 = new Singletonlazy1();}
}

③ 懒汉模式 - 多线程版
问题 1 : 发生在创建实例时 , 在多线程中如果多个线程同时调用 getInstance() 方法 , 就可能导致创建出多个实例 ; 虽然只是实例之间相互覆盖 , 但是如果每个实例创建时需要一定的时间 , 那么多次覆盖操作就会严重拖慢时间
解决方法 : 加锁操作 (显然是将条件判断和赋值操作都加上锁) ; 或直接对方法加锁
]class Singletonlazy1{private static Singletonlazy1 instance = null;private static Object locker1 = new Object();//锁对象public static Singletonlazy1 getInstance(){synchronized (locker1){if(instance == null){instance = new Singletonlazy1();}}return instance; }private Singletonlazy1(){}
}
问题 2 : 加锁操作引入的新的问题 , 上述代码当实例创建好之后 , 每次调用都需要执行加锁操作 , 才能执行 ruturn ; 在多线程中 , 加锁就相当于阻塞 , 会影响执行效率
解决方法 : 按需加锁 , 真正涉及到加锁操作再加锁 , 引入 if(instance == null)
class Singletonlazy1{private static Singletonlazy1 instance = null;private static Object locker1 = new Object();public static Singletonlazy1 getInstance(){if(instance == null){synchronized (locker1) {if (instance == null) {instance = new Singletonlazy1();}}}return instance;}private Singletonlazy1(){}
}
问题 3 : 是否会出现指令重排序问题 , 即 编译器会优化执行顺序 ; 可能会是双重 if 导致的 , 这个问题不好说 , 这个问题不好直观体现出来 , 保险起见 加入 volatile
解决方法 : 引入 volatile 关键字 ; 此处 volatile 的作用有两方面 : 1) 确保每次读取操作 , 都是读内存 , 2)关于该变量的读取和修改操作 , 不会触发指令重排序
class Singletonlazy1{private volatile static Singletonlazy1 instance = null;private static Object locker1 = new Object();public static Singletonlazy1 getInstance(){if(instance == null){synchronized (locker1) {if (instance == null) {instance = new Singletonlazy1();}}}return instance;}private Singletonlazy1(){}
}在面试中问到上述问题 , 可以按步骤修改 , 最好不要一次写完整
9.2 阻塞队列
阻塞队列 (Blocking Queue) 是 Java 并发编程中常用的数据结构 ,
① 核心特征 :
- 队列为空时 阻塞获取元素的线程 , 直到其他线程添加元素为止
- 队列为满时 阻塞添加元素的线程 , 直到其他线程取走元素为止
- 线程安全 , 内部通过锁机制 (ReentrantLock) 保证多线程操作的安全性 , 无需额外同步
阻塞队列的一个典型应用场景就是 : "生产者-消费者"
生产者-消费者模型 : 是多线程并发编程中的经典设计模式 , 用于解决生产者线程(生成数据) 和 消费者线程(处理数据) 之间的协作问题
其核心就是 : 通过一个共享缓冲区(阻塞队列) 隔离生产者和消费者 , 实现解耦 , 削峰填谷 , 并发控制
- 生产者:负责生成数据,将数据放入共享缓冲区
- 消费者:从共享缓冲区中获取数据并处理
- 缓冲区:存储数据的中间容器(通常用阻塞队列实现),平衡生产者和消费者的速度差异
② 标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列 , 如果我们要在一些程序中使用阻塞队列 , 直接使用标准库中的即可
- BlockingQueue 是一个接口 , 真正实现的类是LinkedBlockingQueue 等
- put()方法用于阻塞式的入队列 , take()用于阻塞式的出队列
- BlockingQueue 也有 offer,poll,peek 等方法 , 但这些方法都不带有阻塞特性
③ 常用实现类 (java.util.concurrent 包中)
实现类 | 特点 | 适用场景 |
| 基于数组的有界队列,容量固定(创建时指定大小) | 已知最大任务量,需要固定容量 |
| 基于链表的可选有界队列(默认容量为 | 任务量不确定,需高效插入删除 |
| 无缓冲队列,添加元素后必须等待另一个线程取走才能继续添加(容量为 0) | 线程间直接传递任务(如线程池) |
| 支持优先级的无界队列(元素需实现 | 按优先级处理任务 |
| 延迟队列,元素需实现 | 定时任务(如缓存过期清理) |
public class demo28 {public static void main(String[] args) throws InterruptedException {BlockingDeque<String> queue = new LinkedBlockingDeque<>();// 入队列// queue.put("abc");// System.out.println("执行入队列操作");//出队列String tmp = queue.take();System.out.println("执行出队列操作");}
}

④ 创建一个简单的生产者 - 消费者模型
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;public class demo29 {public static void main(String[] args) {BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();Thread producer = new Thread(()->{Random random = new Random();while(true) {try {int n = random.nextInt(1000);System.out.println("生产元素" + n);blockingDeque.put(n);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}},"生产者");Thread consumer = new Thread(()->{while(true){try {int value = blockingDeque.take();System.out.println("消费元素" + value);} catch (InterruptedException e) {e.printStackTrace();}}},"消费者");consumer.start();producer.start();}}

从结果看出 , 隔一秒生产一个元素后立马被消费 , 原因是消费者线程执行速度远远大于生产者的执行速度(sleep 的引入) , 在阻塞队列为空时 , 由于消费者线程是 take()操作 , 会陷入阻塞 ; 由于没有给阻塞队列的构造方法传参 , 此时这个队列的大小会很大 , 大概为二十亿
⑤ 阻塞队列的实现
package myThread;class MyBlockingQueue{private String[] data = null;//队首private int head = 0;//队尾private int tail = 0;//元素个数private int size = 0;public MyBlockingQueue(int capacity){data = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized (this){while (size>=data.length){//满了,阻塞this.wait();}data[tail++] = elem;if(tail>=data.length){tail = 0;}size++;this.notify();//通知消费者线程,此时线程不空,可以继续消费了}}public String take() throws InterruptedException {synchronized (this){while (size == 0){//为空,阻塞this.wait();}String ret = data[head++];if(head>=data.length){head = 0;}size--;this.notify();//通知生产者线程,线程此时不为满,可以生产元素return ret;}}
}public class demo30 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread prodcer = new Thread(()->{int n = 0;while(true){try {queue.put(n+"");System.out.println("生产元素" +n);n++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread consumer = new Thread(()->{while(true){try {String tmp = queue.take();System.out.println("消费元素"+tmp);//Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});prodcer.start();consumer.start();}
}注意 :
- 阻塞队列中的 put()和 take()操作必须是原子的 , 若两个线程同时执行 put(),可能会导致元素被覆盖
- synchronized(this)会将 put()和 tank()方法中的代码块变为临界区 , 同一时间只有一个线程能进入 , 从而避免上述问题 ; 此处的 this 是锁对象 , 所有线程必须基于同一锁对象进行等待 /通知 , 才能真正实现线程间的协作 ; this 代表当前线程 MyBlockingQueue 实例本身 , 对与一个队列来说 , 它是唯一的
- 此处判断为空或满 用的是 while()循环而不是 if() , 是为了避免 wait()被意外唤醒(例如 Interrupt , 如果 此处的InterruptedException 是用 try-catch 来处理的 , 则可能会继续往下执行) , 从而发生操作风险 , while()循环是为了二次确认 , 防止发生意外唤醒
