多线程经典案例
学习完多线程并发执行的基础知识后,我们可以完成一些多线程经典案例的编写,以此熟悉掌握多线程编程的应用。
单例模式
单例模式是一种设计模式,正如其名,单例模式能确保某个类在程序中只存在唯一一份实例,而不会创建出多个实例。常见实现单例模式的方法有两种:饿汉模式 和 懒汉模式。
1. 饿汉模式
饿汉方式实现的单例模式突出迫切的特点,就是说我们希望类的实例尽快的创建。虽然Java中并没有给我们提供构造唯一实例的直接手段,但是我们可以通过构造方法私有化使得类外部不能够实例对象,而只能交由类内部来实例对象,借而实现构造唯一实例的单例模式。
//java 实现的饿汉模式
//饿汉模式 实现 单例模式 (只有一个实例)
class SingletonHungry {
//饿汉模式要求尽早创建实例
private static SingletonHungry instance = new SingletonHungry();//静态成员的初始化是在类加载阶段完成的
private SingletonHungry() {
//构造方法私有化,使得外部不能完成实例 (确保单例模式的实现)
}
public static SingletonHungry getInstance() {
//多线程并发执行不会涉及线程安全问题 (多线程读)
return instance;
}
}
public class Demo29 {
public static void main(String[] args) {
//SingletonHungry singletonHungry1 = new SingletonHungry(); //Error,构造方法私有化使得外部不能实例对象
SingletonHungry singletonHungry2 = SingletonHungry.getInstance();
SingletonHungry singletonHungry3 = SingletonHungry.getInstance();
System.out.println("使用饿汉模式实现单例模式得到的是否只有一个实例 (singletonHungry2 == singletonHungry3 ?) :" + (singletonHungry2 == singletonHungry3));
}
}
- 饿汉模式要求尽早创建实例 (静态成员的初始化是在类加载阶段完成的)
private static SingletonHungry instance = new SingletonHungry();//静态成员的初始化是在类加载阶段完成的
- 构造唯一实力的关键 (构造方法私有化)
private SingletonHungry() {
//构造方法私有化,使得外部不能完成实例 (确保单例模式的实现)
}
需要注意的是饿汉模式实现的单例模式并不涉及线程安全问题 (多线程读不会导致线程安全问题)。
2. 懒汉模式 [ ★★★(1) -> (4) ]
懒汉模式实现的单例模式突出“懒”的特点,也就是说懒汉模式实现的单例模式要求实例创建的尽可能晚 (只有当线程需要实例的时候,实例才真正创建),这样的“懒”在计算机也称为高效,只有当需要的时候才真正创建,不需要则不创建。
//Java 实现的懒汉模式
//多线程案例(一)
//使用懒汉模式 实现 单例模式
class SingletonLazy {
//private static SingletonLazy instance = null;
private volatile static SingletonLazy instance = null; // (4) 的操作
private static Object locker = new Object();
private SingletonLazy() {
//构造方法私有化
}
public static SingletonLazy getInstance() {
// // (1) 多线程并发执行可能存在线程安全问题 --> 这里的判断和赋值并不是原子操作 (instance == null 和 instance = new SingletonLazy 并不是原子操作)
// // 这样的非原子操作 在多线程并发执行的过程中 可能穿插了 多个线程的判断 和 赋值 ,这就使得单例模式 并不是真正只能得到一个实例,而是得到多个实例后又迅速放弃 迎接 下一个新的实例
// if(instance == null) {
// instance = new SingletonLazy();
// }
// return instance;
// // =======================================
// // (2) 对上述需要打包成原子操作的代码 通过synchronzied 加锁打包成 原子操作 进而 解决线程安全问题
// // 进一步分析:我们发现只有 instance == null 时,我们才真正需要进入加锁 (将 (instance == null 和 instance = new SingletonLazy)打包成原子操作);
// // 当instance != null 时,这时候只需要涉及 return (读操作) ; 频繁的加锁 和 解锁 会进一步 降低我们的效率。
// synchronized(locker) {
// if(instance == null) {
// instance = new SingletonLazy();
// }
// }
// return instance;
// // =======================================
// // (3) 对于(2) 中分析的性能降低的情况,我们可以通过 加多一句判断来 减少加锁/解锁 的开销
// if(instance == null) { // 第一次判断 instance == null, 判断是否需要加锁
// synchronized(locker) {
// if(instance == null) { //第二次判断 instance == null, 判断是否需要new 实例
// instance = new SingletonLazy();
// }
// }
// }
// return instance;
// // =======================================
// (4) 其实 instance = new SingletonLazy(); 这样的代码 对应了 三个CPU指令
//1. 申请内存空间
//2. 在空间上构造对象 (对引用对象进行初始化)
//3. 将引用对象赋值给变量
// 正常情况下,CPU指令会顺序执行 ;但是由于编译器优化 (编译器 & JVM 共同作用下) 可能发生指令重排序 这样的情况
// 当CPU 指令按照 1 -> 3 -> 2 这样的逻辑执行下,由于多线程的并发执行 我们(可能出现 发生在指令3 之后 ,同时又发生在指令 2 之前的 针对instance 的修改操作)
// 这样的修改是发生在初始化instance 内部属性前,这样的操作可能带来 某些危险,volatile 可以指明当前变量不会发生指令重排序这样的现象
if(instance == null) { // 第一次判断 instance == null, 判断是否需要加锁
synchronized(locker) {
if(instance == null) { //第二次判断 instance == null, 判断是否需要new 实例
instance = new SingletonLazy();
}
}
}
return instance;
}
}
public class Demo30 {
public static void main(String[] args) {
SingletonLazy singletonLazy1 = SingletonLazy.getInstance();
SingletonLazy singletonLazy2 = SingletonLazy.getInstance();
System.out.println("使用懒汉模式实现单例模式得到的是否只有一个实例 (singletonLazy1 == singletonLazy2 ?) :" + (singletonLazy1 == singletonLazy2));
}
}
上述展示的是经过修改后最终线程安全的懒汉模式代码,那么在修改 ( (1) -> (4) )的过程中其实我们可以发现懒汉模式涉及到线程安全问题,这个过程也是从线程不安全到线程安全的过程。我们通过展示修改过程中遇到的问题来进一步学习多线程并发编程。
- (1)
我们发现instance == null 和 instance = new SingletonLazy() 这两个操作不是原子的,在多线程并发执行的过程中对非原子操作的修改可能会导致线程安全问题。(多线程并发执行的过程中多个线程的判断和赋值的穿插,可能使得我们可能new 了多个实例)
- (2)
通过使用synchronized 将上述操作打包成原子操作后,我们解决了线程安全问题。随之进一步讨论,我们发现只有 instance == null 时,我们才真正需要进入加锁 (将 (instance == null 和 instance = new SingletonLazy)打包成原子操作); 当instance != null 时,这时候只需要涉及 return (读操作) ; 频繁的加锁 和 解锁 会进一步 降低我们的效率。
- (3)
(3) 对于(2) 中分析的性能降低的情况,我们可以通过 加多一句判断来 减少加锁/解锁 的开销。
第一次判断 instance == null, 判断是否需要加锁。
第二次判断 instance == null, 判断是否需要new 实例 (创建实例)。
- (4)
其实 instance = new SingletonLazy(); 这样的代码 对应了 三个CPU指令
//1. 申请内存空间
//2. 在空间上构造对象 (对引用对象进行初始化)
//3. 将引用对象赋值给变量
// 正常情况下,CPU指令会顺序执行 ;但是由于编译器优化 (编译器 & JVM 共同作用下) 可能发生指令重排序 这样的情况
// 当CPU 指令按照 1 -> 3 -> 2 这样的逻辑执行下,由于多线程的并发执行 我们(可能出现 发生在指令3 之后 ,同时又发生在指令 2 之前的 针对instance 的修改操作)
// 这样的修改是发生在初始化instance 内部属性前,这样的操作可能带来 某些危险,volatile 可以指明当前变量不会发生指令重排序这样的现象.
上述 (1) -> (4) 的修改过程,实际也是分析多线程代码 到写出正确多线程代码的过程 ,最终得到正确展示的 (4)。
生产者消费者模型
这里讨论的生产者消费者模型是基于阻塞队列实现的,通过阻塞队列我们可以保证线程安全。
1. 阻塞队列
阻塞队列是一种能够保证线程安全的数据结构,具有以下特性:
- 当队列满的时候,继续⼊队列就会阻塞等待,直到有其他线程从队列中取⾛元素 (队列不满)。
- 当队列空的时候,继续出队列就会阻塞等待,直到有其他线程往队列中插入元素 (队列不空)。
//阻塞队列的模拟实现
//String
class MyBlockingQueue {
private String[] elem = null;
//队头、队尾信息
private int head = 0;
private int tail = 0;
//记录队列使用情况
private int size = 0;
public MyBlockingQueue(int capacity) {
this.elem = new String[capacity];
}
//实现阻塞队列的关键方法 put 和 take //对队列的生产 和 消费 的动作间必须互斥进行,(同一时间内只能有一个线程操作队列)
//线程安全问题 主要是由于 多线程同时修改同一变量 导致的
public void put(String e) throws InterruptedException {
synchronized(this) {
//队列满
while(this.elem.length <= this.size) {
this.wait();//阻塞等待 直到队列不满 (消费者消费元素时唤醒)
//考虑到可能存在提前唤醒的情况
}
//队列不满
this.elem[tail++] = e;
if(tail >= this.elem.length) {
tail = 0;
}
size++;//生产元素后 有效个数++
this.notify();//唤醒因队列空而阻塞的take 线程, 通知其他线程队列不空
}
}
public String take() throws InterruptedException {
synchronized(this) {
//队列空
while(size == 0) {
this.wait();//阻塞等待 直到队列不空 (生产者生产元素时唤醒)
//考虑到可能存在提前唤醒的情况 因此加入while循环 进行再一次判断是否正确唤醒
}
//队列不空
String e = this.elem[head++];
if(head >= elem.length) {
head = 0;
}
//head %= elem.length;
size--;//消费元素后有效数量--
//通知生产者线程队列不满
this.notify();
return e;
}
}
观察上述代码,当我们判断队列为空或队列为满时,应该执行阻塞等待的逻辑,这是没有问题的,那么为什么这里的判断逻辑不用if 而是使用 while 这样的死循环来判断呢?
这里使用while 其实是根据使用场景来决定的,阅读wait 方法的文档如下:
阅读jdk的相关文档,我们发现wait 不一定能够被正确唤醒,可能因为 interrupt 这样的中断方法提前唤醒 或者被其他线程虚假唤醒。正是因为这样现象出现的可能性,所以jdk 的原生文档中建议我们视情况搭配 while 这样的循环来使用wait, while 这样的循环存在的作用是进行再一次判断 (二次判断),判断wait 是否被错误唤醒,从而出现错误操作的情况。
需要注意的是阻塞队列是线程安全的,所以当我们实现put 和 take 方法时,要针对同一个锁对象加锁,起到互斥的效果,使得多线程不能同时修改队列。
2. 生产者消费者模型
使用上述模拟实现的阻塞队列实现生产者消费者模型。
//阻塞队列的模拟实现
//多线程案例(二)
// 模拟实现阻塞队列(数组版本) 并基于实现的阻塞队列实现生产者消费者模型
//String
class MyBlockingQueue {
private String[] elem = null;
//队头、队尾信息
private int head = 0;
private int tail = 0;
//记录队列使用情况
private int size = 0;
public MyBlockingQueue(int capacity) {
this.elem = new String[capacity];
}
//实现阻塞队列的关键方法 put 和 take //对队列的生产 和 消费 的动作间必须互斥进行,(同一时间内只能有一个线程操作队列)
//线程安全问题 主要是由于 多线程同时修改同一变量 导致的
public void put(String e) throws InterruptedException {
synchronized(this) {
//队列满
while(this.elem.length <= this.size) {
this.wait();//阻塞等待 直到队列不满 (消费者消费元素时唤醒)
//考虑到可能存在提前唤醒的情况
}
//队列不满
this.elem[tail++] = e;
if(tail >= this.elem.length) {
tail = 0;
}
size++;//生产元素后 有效个数++
this.notify();//唤醒因队列空而阻塞的take 线程, 通知其他线程队列不空
}
}
public String take() throws InterruptedException {
synchronized(this) {
//队列空
while(size == 0) {
this.wait();//阻塞等待 直到队列不空 (生产者生产元素时唤醒)
//考虑到可能存在提前唤醒的情况 因此加入while循环 进行再一次判断是否正确唤醒
}
//队列不空
String e = this.elem[head++];
if(head >= elem.length) {
head = 0;
}
//head %= elem.length;
size--;//消费元素后有效数量--
//通知生产者线程队列不满
this.notify();
return e;
}
}
public class Demo33 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue(100);
Thread producer = new Thread(() -> {
int product = 0;
while (true) {
try {
queue.put(product + "");
System.out.println("生产元素: " + product++);
//Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "producer");
Thread consumer = new Thread(() -> {
while (true) {
String e = null;
try {
e = queue.take();
System.out.println("消费元素:" + e);
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}, "consumer");
producer.start();
consumer.start();
}
}
执行程序,运行结果如下:
线程池
为什么使用线程池,而不是创建线程?
线程的创建和销毁是开销较大的操作,涉及系统的资源的分配和回收。线程池通过复用已存在的线程有效避免频繁创建和销毁线程。 (复用线程池中线程是在纯用户态下就能完成的操作,创建线程会涉及用户态->内核态的操作,在内核态中的情况就完全交由操作系统来执行,这个过程是不可控的)
Java标准库里为我们提供了线程池 ThreadPoolExecutor
,我们通过线程池的核心方法 submit(Runnable task)
来提交一段任务放到线程池中,交给线程池里的线程来执行。
1.ThreadPoolExecutor
了解Java标准库中提供的线程池 ThreadPoolExecutor ,我们主要从了解这个线程池的构造方法出发。
ThreadPoolExecutor
的构造方法:
在构造方法中各个参数的含义是什么呢?[★★★]
-
int corePoolSize
: 核心线程数 (描述一个线程池中至少有多少个线程)。当线程池一创建,这些线程也随之创建,直到线程池销毁,这些线程才会销毁。int maximumPoolSize
:最大线程数 (核心线程数 + 非核心线程数) 。Java的线程池中包含多少线程是可以动态调整的。非核心线程就是自适应的,也就是说线程池中动态调整的部分就是非核心线程。
-
long keepAliveTime
:非核心线程允许空闲的最大时间。(一旦超过这个时间非核心线程就会自动销毁)。- TimeUnit unit:允许空闲最大时间的时间单位,是一个枚举类型。
-
BlockingQueue<Runnable> workQueue
:submit
提交任务的场所,线程池的工作队列,线程池里的线程从这个带有阻塞特点的任务队列中取任务执行。
-
- ThreadFactory threadFactory:创建线程的工厂类,参与线程创建的具体工作 (对线程中一些属性的初始化)。
题外话:工厂模式(Factory Pattern)是 Java 中最常用的设计模式之一,它提供了一种创建对象的方式,使得创建对象的过程与使用对象的过程分离。
-
- RejectedExecutionHandler handler [★★★]: 拒绝策略。对于线程池来说,我们使用
submit(Runnable task)
提交任务交由任务队列 (阻塞队列 workQueue) ,当队列满时,我们尝试入队列操作,此时不会真的触发入队列操作,从而阻塞等待,而是转而执行拒绝策略相关的代码。拒绝策略:
- ThreadPoolExecutor.AbortPolicy:超过负荷,线程池直接抛出异常。
- ThreadPoolExecutor.CallerRunsPolicy:让调用submit 的线程自动执行任务 (即调用者线程自行处理)。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃任务队列 (阻塞队列)中最老的任务。
- ThreadPoolExecutor.DiscardPolicy:丢弃最新的任务 (当前submit 的任务)。
- RejectedExecutionHandler handler [★★★]: 拒绝策略。对于线程池来说,我们使用
2. Executors
Java鉴于构造线程池ThreadPoolExecutor
的构造方法中的核心参数太多,也提供了简化线程池使用的线程池工厂类 Executors
。
ExecutorService threadPool = Executors.newFixedThreadPool(4);
:创建线程数量固定的线程池。ExecutorService threadPool = Executors.newCachedThreadPool();
:创建线程数量很大的线程池 (可以认为线程可以无限增加)。
3. 模拟实现线程池
创建指定线程数量的线程池
//多线程案例(三)
//模拟实现线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class MyThreadPool {
//带有阻塞功能的 任务队列(阻塞队列)
private BlockingQueue<Runnable> taskQueue = null;
public MyThreadPool(int maximumPoolSize) {
//初始化任务队列 (任务队列容量为1000)
taskQueue = new ArrayBlockingQueue<>(1000);
//创建指定数量线程 的固定线程池
for(int i = 0; i < maximumPoolSize; i++) {
//线程池中的线程会不断的从任务队列中取任务来执行
Thread t = new Thread(() -> {
while(true) {
Runnable task = null;
try {
task = taskQueue.take();//取任务
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
task.run();//执行任务
}
});
//t.setDaemon(true);
t.start();
}
}
public void submit(Runnable task){
try {
taskQueue.put(task);//带有阻塞方法 线程安全
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public class Demo35 {
public static void main(String[] args) throws InterruptedException {
MyThreadPool threadPool = new MyThreadPool(4);
for(int i = 0; i < 1000; i++) {
int id = i;
threadPool.submit(() -> {
System.out.println("线程池中的" + Thread.currentThread().getName() + "执行了任务 " + id);
});
}
Thread.sleep(1000);
}
}
线程池中线程的任务就是不断从任务队列取任务来执行:
定时器
定时器相当于一个闹钟,能够在指定时间到了之后执行指定的任务。Java标准库中提供的定时器 Timer 实现的实质是使用一个线程来扫描任务队列中到达指定时间的任务并执行的过程。
1. Timer的使用
使用schedule 方法来安排任务和执行时间。schedule 中的TimerTask task 参数可以认为是对Runnable 任务的进一步封装。
//定时器的使用
import java.util.Comparator;
import java.util.Timer;
import java.util.TimerTask;
public class Demo37 {
public static void main(String[] args) {
Timer timer = new Timer();//定时器设计定时任务
//timer 内部也有一个线程,用来实现定时任务的完成
//Lambda 只能实现一个抽象方法的接口(函数式接口) 的匿名内部类
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello 1000");
}
},1000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello 2000");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello 3000");
}
},3000);
System.out.println("main 线程结束");
// Timer 内部的线程也是前台线程
}
}
程序运行结果如下:
2. 模拟实现Timer
通过对定时器的描述,我们发现实现一个定时器的核心如下:
- 创建一个类 (MyTimerTask),用来完成对Timer 要执行的任务的描述。
- 定时器中要能够有组织管理多个任务的数据结构 (优先级队列)。
- 实现schedule 方法,将任务添加到任务队列中。
- 创建一个额外线程执行任务队列中的任务。
//多线程案例(四)
//模拟实现定时器
//创建一个类标识表示定时器要执行的任务
//定时器中创建一个组织任务的数据结构 -> 优先级队列(基于任务时间比较)
//基于抽象类的形式定义 MyTimerTask//abstract class MyTimerTask implements Runnable {
// @Override
// public void run() {
//
// };
//}
import java.util.PriorityQueue;
//类内部持有任务
class MyTimerTask {
private Runnable task = null;
//任务执行的时间
private long time = 0;
public MyTimerTask(Runnable task,long time) {
this.task = task;
this.time = time;
}
public long getTime() {
return time;
}
//向外界提供任务执行的方法
public void run() {
task.run();
}
}
class MyTimer{
private PriorityQueue<MyTimerTask> taskQueue = null;
//定时器内部要有能够执行任务的线程
public MyTimer() {
this.taskQueue = new PriorityQueue<>((o1,o2) -> (int)(o1.getTime() - o2.getTime()));//闯入比较器类
Thread t = new Thread(() -> {
while(true) {
synchronized(this) {
//任务队列为空的情况
while(taskQueue.isEmpty()) {
try {
this.wait();//为了防止提前唤醒情况的出现,需要在循环中
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//非空
MyTimerTask task = taskQueue.peek();//后续可能插入需要更早执行的任务,因此不能直接poll
//还没到执行时间
if(System.currentTimeMillis() < task.getTime()) {
try {
this.wait(task.getTime() - System.currentTimeMillis());//带有超时时间的等待,当新任务来临时 提前唤醒后要重新检查队列情况,因此不用再循环中反复判断
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}else {
task.run();
taskQueue.poll();
}
}
}
});
//MyTimer 内部的线程也是前天线程
t.start();//启动执行逻辑的线程
}
public void schedule(Runnable task,long delay) {
//以调用schedule 方法时间为准
MyTimerTask myTimerTask = new MyTimerTask(task,System.currentTimeMillis() + delay);
synchronized(this) {
taskQueue.offer(myTimerTask);
this.notify();//通知执行流线程有新的任务加入队列
}
}
}
//标准库中提供的Timer 也是使用一个线程来扫描优先级队列中的任务并执行的
public class Demo38 {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(() -> {
System.out.println("Hello 1000");
},1000);
myTimer.schedule(() -> {
System.out.println("Hello 2000");
},2000);
myTimer.schedule(() -> {
System.out.println("Hello 3000");
},3000);
System.out.println("main 线程结束");
}
}
- (1) 描述定时器要执行的任务的核心信息 (执行的任务和执行的时间),所以我们定义的类内部不仅持有任务,同时还必须拥有描述任务执行时间的变量。
MyTimerTask 是对执行任务的进一步封装,使我们具有足够的信息来描述定时器要执行的任务。
- (2)schedule 方法的实现:
以调用schedule 方法的时间为准来计算任务的真正执行时间,当我们将任务提交到任务队列这个过程要加锁,将其打包成原子操作 (队列不能被同时修改)。
- (3) 创建额外的线程执行任务队列中的任务:
需要注意从任务队列中取任务执行的这个过程也要用锁打包起来,避免对任务队列的同时修改。
对于模拟实现的定时器代码,程序运行结果如下: