【JavaEE】(2) 多线程1
一、线程的概念
以“吃完 100 只烤鸡”的任务为例:
单进程的效率低下:(一个人吃 100 只)
多进程提升了效率,但是申请资源分配有额外的开销:(2 人吃 100 只烤鸡,但需要定另外的房间和桌子)
多线程不仅提升了效率,且共享同一进程中的资源,额外开销比进程小很多:(2 人在同一房间吃 100 只烤鸡)
多线程虽然提升了执行程序的效率,但并不是线程越多越好。因为 cpu 的逻辑核心数有限,同一时刻只能并发执行固定数目的线程,超出数量的线程只能等待被调度到 cpu。线程数越多,cpu 调度的开销愈加明显。(100 人吃 100 只烤鸡,但桌子和房间大小有限,只有靠近桌子的才能吃到。互相的推挤反而效率下降。)
在多线程编程时,也会存在线程安全问题,比如:多个线程访问内存中的同一变量,容易发生冲突,引发 bug。
因为多线程共享同一进程中的资源,一旦有一个线程发生异常,并且其它的线程没有处理这个异常,那么整个进程都会崩溃。(多进程中,一个进程的崩溃并不会影响其它进程)
二、Java 中使用多线程
多进程、多线程编程都是调用的操作系统提供的 api,而 JVM 对操作系统提供的 api 又进行了封装(对多进程的 api 封装粗糙,很多多进程能力 JVM 都没提供,因为 Java 的设计者不鼓励多进程编程),所以 Java 不关心操作系统的差异,我们只需要调用封装好的 api 即可。
为了更好地顺应 AI 时代,我们可以使用 AI 加持的开发工具 Trae,下载好后安装插件、配置环境变量:
使用 Builder 跟 AI 对话,自动生成代码:
1、第一个多线程程序
Tread 类就是 JVM 封装的操作系统提供的多线程 api,而 Thread 类的 run 方法是执行线程逻辑的入口,但它的实现没有我们想要的逻辑,所以我们需要继承 Thread 类,重写 run 方法。
start 方法用于创建一个新线程。main 方法是程序的主线程(随着进程存在而存在的线程),如果只是在 main 方法里调用 run,虽然也能执行线程逻辑,但这只是在主线程中执行的,并没有在新的线程中并发执行。
main 中调用 run,是在主线程中执行,需要等待主线程的 run 中 while 执行完毕:
public class Demo1 {public static class MyThread extends Thread {@Overridepublic void run() {while (true) {System.out.println("Hello World!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}public static void main(String[] args) {MyThread myThread = new MyThread();myThread.run();while (true) {System.out.println("Hello Java!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}
main 中调用 start,是在新线程中并发执行,主线程和新线程同时执行 while:
public class Demo1 {public static class MyThread extends Thread {@Overridepublic void run() {while (true) {System.out.println("Hello World!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}public static void main(String[] args) {MyThread myThread = new MyThread();myThread.start();while (true) {System.out.println("Hello Java!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}
注意 Tread.sleep 会抛出一个受查异常,如果在 run 里 throws,那么就和父类的 run 方法的标签不一致,不能构成重写,产生报错,所以只能捕获异常。
2、jconsole 工具
Java JDK 提供了一个 jconsole 工具,可用于观察进程中的线程情况:
列出了当前机器上执行的所有 Java 进程:
使用 start:
使用 run:
3、创建线程
有两种:
- 继承 Thread 类,重写 run,调用 Thread 的 start。
- 实现 Runnable 接口,重写 run,调用 Thread 的 start。
推荐使用 Runnable,因为希望我们的程序“高内聚,低耦合”(模块内共同目标明确,模块间互相影响尽量小)。实现 Runnable,重写 run,只描述了一段执行逻辑,并没有涉及到线程,因此还能把实现的子类的实例给进程、协程执行。因此,Runnable 负责描述任务,Tread 负责创建线程,耦合度低。
Runnable 实现多线程:
public class Demo2 {public static class MyRunnable implements Runnable {@Overridepublic void run() {while (true) {System.out.println("Hello World!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}public static void main(String[] args) {MyRunnable myRunnable = new MyRunnable();Thread thread = new Thread(myRunnable);thread.start();while (true) {System.out.println("Hello Java!");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}
匿名内部类写法:
lambda 表达式写法(推荐):实现函数式接口
4、Tread 的其它功能
创建 Thread 对象:
- name:给线程起名字。(主要是为了方便调试)
- group:将多个线程放到同一组,以便设置相同的属性。(不常用,了解即可,更多用到线程池)
其他方法:
- 状态、优先级、上下文(Java 拿不到)、记账信息(Java 拿不到)都跟上一节说的线程调度有关。
- isDaemon:后台线程/守护线程为 true(同一进程内,所有前台线程结束了,后台线程就要结束),前台线程为 false(前台线程没结束,进程就不能结束)。主线程、我们创建的线程都是前台线程;JVM 幕后做的工作是后台线程,如日志记录、垃圾回收。在 start 前 setDaemon 可以设置为后台线程。
- isAlive:线程已经启动且尚未终止,true;线程尚未启动(new)或已终止(
terminated
),false。
public class Demo3 {public static void main(String[] args) {Thread t = new Thread(() -> {while (true) {System.out.println("hello thread");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}, "我的线程");// 把线程设为 "后台线程"t.setDaemon(true);System.out.println("是否存活: " + t.isAlive());t.start();System.out.println("是否存活: " + t.isAlive());System.out.println("是否是后台线程: " + t.isDaemon());for (int i = 0; i < 3; i++) {System.out.println("hello main");try {Thread.sleep(1000); } catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}
前台进程(主线程)执行结束,后台的新线程就会被自动销毁。
5、启动线程
线程只能启动一次。因为 start 后立马新建线程,准备启动。而 start 中有一个线程状态判断,只有状态为0(新建)才能执行后续 start 逻辑,否则会抛出错误。
重复启动,程序崩溃:
不允许重启的原因,是为了避免多个线程执行同一段逻辑,对同样的变量进行修改导致并发冲突。如果想要重启线程,需要重新创建实例,或者使用线程池。
6、线程中断
有时,我们希望线程提前结束。特别是使用 sleep 时,我们希望程序员自行选择是忽略中断、立马中断、执行一些任务后再中断,而不是强制中断(防止一些任务只做了一半,如对数据库进行多次增删改查,出现脏数据输出)。
- Thread.currentThread():哪个线程调用,就返回哪个线程对象。
- .interrupt():中断线程,将中断标志设置为 true。
- .isInterrupted():未被中断为 false,被中断为 true。
如果线程正在 sleep,此时被其它线程中断该线程,那么就会提前唤醒 sleep,触发 InterruptedException 异常(这个异常用于区分是正常结束还是被中断结束),并将中断标志恢复为 false。
此时,程序员可自由发挥。
忽略中断:
立刻终止线程:
执行一段任务后,再终止线程:
如果中断线程中没有 sleep,就是通过中断标志作为循环条件,结束循环了。
7、线程等待
线程调度是随机的,但有时希望线程的结束顺序可以预知,就可以用 .join() (被等待线程 加入 等待线程)。
只要发生阻塞(如 sleep、join),都会被 Interrupt 提前唤醒,所以 join 也会抛出中断异常。
也可以让 myThread 等待 main:
join 是死等,只要被等待线程没结束,等待线程就会一直等下去。有时这样做并不好,比如网络通信中,数据传输“丢包”是常事,过了超过时间就不能再等。
join 有重载的方法,可以指定超时时间:主要是用毫秒精度的,第二个更高精度没多大意义,因为计算机衡量时间时,存在毫秒级误差。
三、线程状态
打印线程的所有状态:
观察线程状态的转换(除了 new 和 terminated,其它状态 isAlive 都是 true):
main 死等 Thread-0:
详细版线程的生命周期:
四、线程安全
1、什么是线程安全问题
单线程中不存在的问题,在多线程中发生。即程序执行结果与预期不一致。观察下面的程序运行结果:
执行结果小于 1w。因为 count++ 不是原子的,它由三条指令组成:
- load:把数据从内存加载到寄存器。
- add:把寄存器中的数据 + 1。
- save:把计算结果从寄存器保存到内存。
而线程的调度是随机的,那么上面的程序就有很多种指令执行顺序:
等等。比如下面的顺序,加了 2 次,最终结果却是 1:
所以程序执行结果是一个小于 1w 的数。
2、线程不安全的原因
- 线程调度是随机的。
- 修改共享数据:多个线程修改同一变量。
- 修改语句不是原子的。
- 内存可见性(见后文)。
- 指令重排序(见后文)。
3、解决线程不安全问题
核心思路:把修改操作变为原子的。
3.1、synchronized
3.1.1、synchronized 是互斥的
锁可以把几条指令包装成一个原子的操作,当一个线程拥有一把锁并且没有释放,另一个线程想竞争同一把锁,就会阻塞等待锁释放(synchronized 是互斥的)。对于锁,有加锁和解锁两个操作。在 Java 中,用 synchronized 一个关键字就能完成两个操作,避免因忘记解锁而导致其它线程一直阻塞。锁对象,可以是任意一个引用类型的对象(不能是内置类型)。
① 修饰代码块:
把锁加在循环外面,虽然也能保证线程安全,但是并发程度大大降低,没有充分利用 cpu 的多核心资源,相当于要等线程1执行完了,线程2才能执行,效率下降。
② 修饰普通方法:
锁对象就是 this, 相当于:
③ 修饰静态方法:
锁对象就是类对象 Demo2.class(通过反射机制获得),相当于:
3.1.2、synchronized 是可重入的
当对同一把锁,连续多次加锁,就会发生死锁(锁无法解开):第一次加锁成功,第二次加锁阻塞。想继续执行,第二次就要加锁成功,那么就要等第一次加的锁解锁,那么就要让线程继续执行,陷入一种死循环。
但 Java 引入了可重入机制,对于上述情况不会产生死锁。
如何实现重复加锁不阻塞?对于每个类对象,不仅保存了成员信息,还有一个隐藏区域,保存 JVM 维护的对象头(加锁状态、被哪个线程加锁)。加锁时,先判断,如果是锁持有者加锁,不阻塞;如果不是锁持有者加锁,阻塞。
连续加锁时,如何知道在什么时候解锁?JVM 维护一个计数器,加锁一次就 + 1,解锁一次就 -1,当计数器为 0,就表示出最外层,需要解锁。
3.1.3、三个死锁场景
第一场景就是上述的,连续加锁的情况。
下面讲述另外两种。
(1)、两个线程两把锁的死锁
可重入锁只能解决一种死锁,但还有其他的死锁。
两个线程竞争两个资源,每个线程都持有不同的锁且不释放,同时尝试获取对方的锁。
public class Demo4 {private static Object locker1 = new Object();private static Object locker2 = new Object();public static void main(String[] args) throws InterruptedException {Thread thread1 = new Thread(() -> {synchronized(locker1) {System.out.println("线程1拿到了锁1");// 睡眠是为了让线程2有充分的时间拿到锁2try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}synchronized(locker2) {System.out.println("线程1拿到了锁2");}}});Thread thread2 = new Thread(() -> {synchronized(locker2) {System.out.println("线程2拿到了锁2");// 睡眠是为了让线程1有充分的时间拿到锁1try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}synchronized(locker1) {System.out.println("线程2拿到了锁1");}}});thread1.start();thread2.start();thread1.join();thread2.join();}
}
(2)、N个线程N把锁的死锁(哲学家就餐问题)
5个哲学家5只筷子,每个哲学家同时拿起左手边的筷子并且不放下,又同时尝试获取右手边的筷子(已经被右边的人拿走)。
死锁是概率性发生的,虽然概率小,但一旦触发危害就很大,导致程序卡死。
3.1.4、如何解决死锁
解决死锁,要从产生死锁的 4 个必要条件入手:
- 锁是互斥的(synchronized 本身的特点,无法从此入手)。
- 锁是不可抢夺的(synchronized 本身的特点,无法从此入手)。
- 保持和请求(持有锁且不释放锁,同时尝试获取其它锁)(有些场景确实需要拿到锁 1 后再拿锁 2)。
- 循环等待(等待锁释放的顺序构成循环)。
日常开发中,通常从 3、4 点入手,如打破循环关系:让线程 1、2 获取锁的顺序一致。
public class Demo4 {private static Object locker1 = new Object();private static Object locker2 = new Object();public static void main(String[] args) throws InterruptedException {Thread thread1 = new Thread(() -> {synchronized(locker1) {System.out.println("线程1拿到了锁1");// 睡眠是为了让线程2有充分的时间拿到锁2try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}synchronized(locker2) {System.out.println("线程1拿到了锁2");}}});Thread thread2 = new Thread(() -> {synchronized(locker1) {System.out.println("线程2拿到了锁1");// 睡眠是为了让线程1有充分的时间拿到锁1try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}synchronized(locker2) {System.out.println("线程2拿到了锁2");}}});thread1.start();thread2.start();thread1.join();thread2.join();}
}
3.1.5、Java 标准库中的线程安全类
线程安全的:
之所以不推荐使用,是因为类里很多方法都加了 sychronized,限制死了。加锁不是越多越好,因为会发生竞争、阻塞,影响程序执行效率。像单线程、多进程没有修改共享数据的情况,就不需要加锁。
String 虽然没有加锁,但是它不可修改,也就不涉及修改共享数据,因此也是线程安全的。
线程不安全的:
这些线程不安全的类,留给程序员更多的发挥空间。
3.2、内存可见性引起的线程安全问题
看下面代码的执行结果:就算输入了1,flag 依旧是 0,陷入了循环中。
import java.util.Scanner;public class Demo5 {private static int flag = 0;public static void main(String[] args) {Thread thread1 = new Thread(() -> {while (flag == 0) {};System.out.println("t1结束");});Thread thread2 = new Thread(() -> {System.out.println("输入数字");Scanner scanner = new Scanner(System.in);flag = scanner.nextInt();scanner.close();System.out.println("t2结束");});thread1.start();thread2.start();}
}
这是因为在很短的时间内,while 执行了很多次 load(从内存读取 flag 放入寄存器) 和 cmp(将寄存器中的值与 0 比较),内存的读写(load)相对于寄存器读写(cmp)来说慢很多;并且在什么时候输入值也是不确定的。所以,编译器就对上面的代码进行了优化,直接从寄存器读取 flag 而不是内存。因此出现上面的情况,这就是内存可见性问题。
Java 官方文档中是这样描述的:每个 Java 进程有自己的主内存(main memory)存储空间,每个 Java 线程又有自己的工作内存(work memory)存储空间,这就是 Java Memory Model(Java 内存模型)。
上面的代码中,线程1先多次从主内存读取 flag 到工作内存进行比较;后编译器进行优化,直接从工作内存读取 flag 进行比较;而线程2修改的是主内存中的 flag。
这里的主内存就是内存,工作内存就是 cpu 的寄存器和 cpu 的缓存(缓存是比寄存器空间大,速度比内存快的存储空间)。Java 之所以这么称呼,是因为设计者希望 Java 是跨平台的,不希望用户去了解操作系统底层和硬件;此外,不同 cpu 的底层结构也是不同的(可能是从寄存器读,可能是从缓存读,以后也可能出现其它的存储结构)。
为了解决上述的内存可见性问题,使用 volatile 关键字修饰 falg 变量,表明该变量是易变的,提醒 JVM 运行时不要进行上述优化。
如果在 while 里加 sleep,编译器也不会对代码进行优化,因为此时主要降低效率的不再是 load,而是 sleep(阻塞、调度触发切换上下文)。因此也不能用 sleep 解决内存可见性问题。
注:synchronized(两个线程修改共享数据) 和 volatile(一个线程读,一个线程修改)解决的是两个不同维度的问题。
3.3、wait 和 notify
线程的调度是随机的,我们有时希望线程按照一定的顺序执行。join 控制线程的结束顺序,我们还希望控制线程中间逻辑的执行顺序。比如让线程1执行完某段逻辑后,线程2再执行:先用 wait 阻塞线程2,线程1执行完某段逻辑后,再通过 notify 唤醒线程2。
wait 和 notify 还能解决线程饿死问题:线程1释放锁后,其它的线程还在等待操作系统唤醒,线程1就近水楼台先得月,多次连续获得锁,其它线程拿到锁的时间延长,导致效率变低。
3.3.1、wait 的使用
wait 和 notify 是 Object 类的方法,所以任何类都能调用这个方法。因为 wait 也属于阻塞,所以会抛出 Interrupted 异常。在线程内调用 wait 后,将当前线程放入阻塞队列和释放锁同时执行,然后等待被唤醒,因此需要在 sychronized 块中调用(先有锁,才能释放锁)。
如果没有 notify 唤醒,那么 wait 就会死等,在网络编程中这是不行的,所以可以设置超时时间。
3.3.2、notify 的使用
同样,notify 需要在 sychronied 块中使用。
wait 的阻塞有两个阶段:等待被唤醒,此时是 WAITING;被唤醒后,因为锁竞争而阻塞,此时是 BLOCKED。
当有多个线程 wait 阻塞后,一次 notify 只会随机唤醒其中一个线程,想唤醒全部就是用 notifyAll。
import java.util.Scanner;public class Demo3 {private static Object locker = new Object();public static void main(String[] args) {Thread t1 = new Thread(() -> {System.out.println("t1 wait 之前");synchronized (locker) {try {locker.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t1 wait 之后");});Thread t2 = new Thread(() -> {System.out.println("t2 wait 之前");synchronized (locker) {try {locker.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t2 wait 之后");});Thread t3 = new Thread(() -> {System.out.println("t3 wait 之前");synchronized (locker) {try {locker.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t3 wait 之后");});t1.start();t2.start();t3.start();Scanner scanner = new Scanner(System.in);synchronized (locker) {scanner.nextLine();locker.notify(); // 随机唤醒一个线程// locker.notifyAll(); // 唤醒所有等待的线程}scanner.close();}
}
随机唤醒一个:
唤醒全部:
如果没有 wait 就 notify,也没有任何影响。
3.3.3、wait 和 sleep 的区别
- wait 可以灵活控制线程的唤醒时机;而 sleep 要等待一定时间后才能被唤醒(虽然使用 interrupt 也能提前唤醒 sleep,但是会触发异常,属于异常情况 )。
- wait 要求在同步块里执行,且会释放锁;sleep 没有要求,如果在同步块里执行不会释放锁。
五、多线程案例
1、单例模式
单例模式保证程序中某个类只能创建一个实例。类内部创建唯一实例,提供静态方法供外部使用,将构造函数私有化。
1.1、饿汉模式
创建实例的时机,是在类加载时,近似程序一启动时。
1.2、懒汉模式(单线程)
创建实例的时机,是在调用获取唯一实例的方法时,用的时候在创建。
计算机里,呆板和懒是褒义词,呆板就不容易出错,懒效率就高。比如用户查看一个文件时,一次性将整个文件文件内容加载到内存效率低下,因为用户不一定能看完;而看到哪就加载到哪,效率就比较高。因此,推荐懒汉模式。
1.3、懒汉模式(多线程)
1.3.1、加锁
在多线程中,饿汉模式是线程安全的,因为线程中只能访问实例;而懒汉模式是线程不安全的,因为线程在访问实例时,可能会创建唯一的实例,就造成多个线程对同一变量进行修改。比如:单例模式的构造函数里,可能会加载很多文件内容到内存,每创建一次实例就需要耗费很多时间。
解决办法,加锁:对创建实例加锁是错误的,因为我们希望的是 判空+创建实例 是一个原子的操作。
1.3.2、加锁外层实例判空
但是这样,每次访问实例,都会加锁,影响效率。我们需要的,只是在第一次访问创建实例时加锁,因此还要在外层判空。
在多线程中,这样做并不奇怪,因为外层 instance 为 null,进入,此时可能线程调度,阻塞,其他线程创建了实例,在被调度回 cpu 后,内层的 instance 就不为空了。
1.3.3、指令重排序
编译器会进行一种指令重排序的优化,对于一系列指令,在不改变逻辑一致的前提下,调整指令的顺序,提高执行效率。
对于创建实例这一条语句,主要有三个步骤:
- 申请内存空间。
- 执行构造方法,在内存空间上进行初始化。
- 将内存地址保存到引用变量。
申请内存空间是首要的步骤,但 2、3 步骤的顺序就不确定了。当为 1、3、2 顺序时,多线程中会存在以下线程不安全问题:
t1 线程将未初始化的实例的内存空间地址放到引用变量,此时 instance 不为 null,t2 线程调用 getInstance 直接得到实例,如果后续读取其成员变量、方法,那么读到的是错误的数据。
给实例的引用变量加上 volatile,说明对于该变量的读写操作不触发优化。
单例模式不止饿汉模式和懒汉模式,还有其他比如针对序列化、反序列化、反射情况下还能确保单例。但实际开发中,懒汉和饿汉就够了。
2、阻塞队列
2.1、什么是阻塞队列
- 同样具有队列 “先进先出” 的特性,额外带有阻塞功能:
① 队列为空,尝试出队列,触发阻塞,直到队列不为空。
② 队列满,尝试入队列,触发阻塞,直到队列不满。
- 是线程安全的。
2.2、生产者消费者模型
生产者(擀皮者)和消费者(包饺子者)不直接通信,而是以容器为中介(案板),生产者将生产的数据放入容器,消费者从容器中取数据消费。这样有三点好处:
- 减少资源竞争,提高效率。(相对于某个线程同时为生产者和消费者,不会因为竞争资源(擀面杖)触发阻塞;生产者、消费者各司其职,效率更高)
- 更好做到模块间解耦。
- 容器作为一个缓冲器,起到了削峰填谷的功能(大雨时,水坝阻挡上流,平缓下流;干旱时,水坝释放上流,给下流正常供水)。
缺点:
- 系统更复杂,维护成本更大。
- 网络开销更多(模块间不直接通信,还需经过容器)。
2.3、标准库中的阻塞队列
- Java 标准库中的阻塞队列是 BlockingQueue 接口,实现类是 LinkedBlockingQueue 和 ArrayBlockingQueue。
- BlockingQueue 的 offer、add、peek、poll 没有阻塞效果,继承自普通队列。
- put:阻塞式入队;take:阻塞式出队。
队满阻塞:
队空阻塞:
实现生产者消费者模型:
public class Demo2 {public static void main(String[] args) {BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1000);Thread thread = new Thread(() -> {int count = 0;while (true) {try {queue.put(count);System.out.println("生产:" + count);count++;// Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});thread.start();while (true) {try {Integer take = queue.take();System.out.println("消费:" + take);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
瞬间生产很多到队满,生产者线程阻塞;消费者消费。
2.4、实现一个阻塞队列
我们希望多个线程对同一个变量(size、array)的修改是原子的操作,避免线程安全问题,而判断队满、队空虽然没有修改,但跟修改操作密切相关,所以把它们都放在 synchronized 块里。队满入队阻塞,需要出队后唤醒;队空出队阻塞,需要入队后唤醒,使用 wait 和 notify 实现。
而在 wait 的官方文档中,有一段描述是最好使用循环条件判断是否使用 wait,这样做可以起到二次判断的效果。比如当碰到,线程被唤醒后,阻塞条件仍然成立,我们需要继续阻塞,而不是继续执行造成后续的逻辑出现问题。
class MyBlockingQueue {private int[] array = null; // 用于存储元素的数组private int size = 0; // 队列的大小private int front = 0; // 队列头的索引private int rear = 0; // 队列尾的索引Object locker = new Object();public MyBlockingQueue(int capacity) {array = new int[capacity];}// 入队操作public void put(int element) throws InterruptedException {synchronized (locker) {// 队满,阻塞while(size >= array.length){locker.wait();}// 入队array[rear] = element;rear = (rear + 1) % array.length; // 环形队列size++;// 唤醒消费者locker.notify();}}// 出队操作public int take() throws InterruptedException {synchronized (locker) {// 队空,阻塞while(size == 0) {locker.wait();}// 出队int element = array[front];front = (front + 1) % array.length; // 环形队列size--;// 唤醒生产者locker.notify();return element;}}
}public class Demo3 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread producer = new Thread(() -> {int count = 0;try {while (true) {queue.put(count);System.out.println("生产:" + count);count++;// Thread.sleep(1000);}} catch(InterruptedException e) {e.printStackTrace();}});Thread consumer = new Thread(() -> {try {while (true) {int take = queue.take();System.out.println("消费:" + take);Thread.sleep(1000);}} catch(InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();}
}
2.5、线程池
提出线程的目的,是为了解决进程创建销毁 “太重” 的问题。但是当线程的创建销毁达到一定的量,其开销也非常大。之前学过的字符串常量池,是 JVM 将一些常用的字符串放到常量池中,随取随用。类似,把一些线程提前创建好并放到线程池,申请线程就直接从线程池取,用完之后再放回,就节省了许多创建和销毁的开销。
2.5.1、ThreadPoolExecutor
Java 中的线程池 ThreadPoolExecutor 在 java.util.concurrent 包(与并发相关的一些工具)内,我们需要注意的是构造函数参数的使用。官方说明书有4个构造函数,主要看第 4 个,因为包含了所有参数。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
线程池中的线程有两种,核心线程和临时线程。核心线程:线程池创建后一直存在的线程。临时线程:核心线程不够用时创建的线程,核心线程空闲后又被销毁。
- corePoolSize:核心线程数。
- maximumPoolSize:线程总数。
- keepAliveTime:临时线程最大的闲置时间。
- unit:闲置时间的单位(毫秒、秒、分钟)。
- workQueue:线程池要完成的任务队列。
- threadFactory:Thread 的工厂类,自定义线程的初始化(设置线程名字、后台线程、优先级等)。
ps:线程工厂属于工厂设计模式,可以将创建对象的一系列初始化操作包装,这样可以将对象的创建和使用隔离,在不改变现有代码的条件下新增新的产品类型。为什么不直接在产品类的构造函数函数内初始化?例如坐标系有很多种表示,如直角坐标系、极坐标系,如果使用构造函数初始化,那么就需要将构造函数重载,以满足不同的产品初始化。但重载必须满足一定的条件,比如参数列表不能相同,但对于直角、极坐标系就不能构成重载:
- handler:线程池中的任务队列是一个阻塞队列,当队满时会发生阻塞。但是一般情况下我们并不希望阻塞(比如自动驾驶,碰到障碍需要减速反应,如果这个时候触发了阻塞就完了),因此就引入 4种 拒绝策略。
2.5.2、Executors
标准库还提供了一个简化版的线程池 Executors,它对 ThreadPoolExecutor 进行了封装,简化了参数的使用,但同时也降低了自定义可控性。它本质也是一个工厂类,定义了创建不同风格线程的工厂方法。
部分使用示例:
本质是对 ThreadPoolExecutor 进行了封装:
submit 提交任务:
2.5.3、实现一个固定数目的简单线程池
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;class MyFixedThreadPool {private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); // 任务阻塞队列MyFixedThreadPool(int poolSize) {for(int i = 0; i < poolSize; i++) {Thread t = new Thread(() -> {while(true) {try {Runnable task = taskQueue.take(); // 从队列中取出任务task.run(); // 执行任务} catch (InterruptedException e) {e.printStackTrace();}}});t.start();}}// 将任务放入队列public void submit(Runnable task) throws InterruptedException {taskQueue.put(task);}
}public class Demo3 {public static void main(String[] args) throws InterruptedException {MyFixedThreadPool pool = new MyFixedThreadPool(5);for(int i = 0; i < 100; i++) {int id = i;pool.submit(() -> {System.out.println("执行任务 " + id);});}}
}
2.6、定时器
用于在一定时间后再执行任务的场景。
2.6.1、使用
2.6.2、实现一个简单定时器 MyTimer
我们首先需要使用一个数据结构,来组织任务列表。如果使用 ArryList,则没有优先级关系,每次取任务还要去找最先执行的。因此我们使用优先级队列。阻塞队列也有优先级队列的实现,但它只能控制队空的阻塞,不能控制未到时间的阻塞,需要再在阻塞队列外部使用另一个锁来控制,但这样容易发生死锁,比如当生产线程和消费线程交叉拥有锁时(生产者拥有外部锁,但没有内部锁;消费者拥有内部锁,但没有外部锁,形成死锁)。因此,为了能控制未到时间的阻塞,同时也防止死锁发生,我们不用阻塞优先级队列实现。
然后我们要写一个 schedule 方法,用于存放任务;在 MyTimer 构造函数中初始化一个线程,来执行队列里的任务。
为了指定优先级队列中的排序规则,我们需要在 MyTask 内实现 Comparable 接口。而在记录时间时,调用者使用的相对时间(30秒后),在保存时需要的是绝对时间(16:30 时)。
import java.util.PriorityQueue;class MyTimerTask implements Comparable<MyTimerTask> {private Runnable task;private long time;public MyTimerTask(Runnable task, long delay) {this.task = task;this.time = System.currentTimeMillis() + delay;}public Runnable getTask() {return task;}public long getTime() {return time;}@Overridepublic int compareTo(MyTimerTask o) {// 小根堆return (int) (this.time - o.time);}
}class MyTimer {PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();MyTimer() {Thread thread = new Thread(() -> {// 不断地检查任务队列while (true) {// 队列为空,就一直等待if (taskQueue.isEmpty()) {continue; }// 访问队首元素MyTimerTask task = taskQueue.peek();// 如果当前时间小于任务的执行时间,就等待if (System.currentTimeMillis() < task.getTime()) {continue;}// 执行任务task.getTask().run();taskQueue.poll(); // 任务已执行,扔掉}});thread.start();}public void schedule(Runnable runnable, long delay) {taskQueue.offer(new MyTimerTask(runnable, delay));}
}
但这样还存在两个问题:
- 线程安全问题:调用者线程调用 schedule 和 thread 线程执行任务,都是对 taskQueue 进行修改,属于不同线程修改同一变量,易发生线程不安全问题。因此我们需要加上 sychronized 块。
- 线程饿死问题:加了锁后,thread 线程由于 “队空”、“时间未到”,反复进行加锁解锁,长时间占有锁,导致调用者线程饿死。为了解决这个问题,我们需要使用 wait、notify:
import java.util.PriorityQueue;class MyTimerTask implements Comparable<MyTimerTask> {private Runnable task;private long time;public MyTimerTask(Runnable task, long delay) {this.task = task;this.time = System.currentTimeMillis() + delay;}public Runnable getTask() {return task;}public long getTime() {return time;}@Overridepublic int compareTo(MyTimerTask o) {// 小根堆return (int) (this.time - o.time);}
}class MyTimer {PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();Object locker = new Object();MyTimer() {Thread thread = new Thread(() -> {// 不断地检查任务队列while (true) {try {synchronized(locker) {// 队列为空,就一直等待if (taskQueue.isEmpty()) {locker.wait(); // 释放锁,阻塞,等待 schedule 存锁后唤醒}// 访问队首元素MyTimerTask task = taskQueue.peek();// 如果当前时间小于任务的执行时间,就等待Long now = System.currentTimeMillis();if (now < task.getTime()) {// 当存放新任务后,也会被唤醒,因为新任务可能更靠前,需要重新计算比较时间locker.wait(task.getTime() - now); // 阻塞,等到最近的等待时间后,停止等待} else {// 时间到了,就执行任务task.getTask().run();taskQueue.poll(); // 执行完了,就扔掉}}} catch(InterruptedException e){e.printStackTrace();}}});thread.start();}public void schedule(Runnable runnable, long delay) {synchronized(locker) {taskQueue.offer(new MyTimerTask(runnable, delay));locker.notify();}}
}public class Demo5 {public static void main(String[] args) {MyTimer timer = new MyTimer();timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("执行任务 3000");}}, 3000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("执行任务 2000");}}, 2000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("执行任务 1000");}}, 1000);}
}