JavaEE初阶——多线程(6)定时器和线程池
目录
一、定时器
1.1 定时器的概念
1.2 标准库中的定时器
1.3 模拟实现定时器
1.3.1 实现
1.3.2 优化
1.3.3 多线程情况
1.3.4 设置后台线程
二、线程池
2.1 线程池的概念
2.2 标准库中的线程池
2.3 线程池的使用和参数
2.4 线程池工作流程
2.5 拒绝策略
2.5.1 AbortPolicy
2.5.2 CallerRunPolicy
2.5.3 DiscardOldestPolicy
2.5.4 DiscardPolicy
一、定时器
1.1 定时器的概念
定时器是软件开发中的一个重要组件,类似于一个“闹钟”。达到一个设定好的时间之后,就执行某个指定好的代码
在实际场景中
- 比如网络通讯中,如果对方500ms没有返回数据,则断开连接尝试重连
- 比如一个Map,希望里面某个key在3s之后自动删除
这样的场景都需要定时器
1.2 标准库中的定时器
标准库中提供了一个Timer类,Timer类的核心方法是schedule

我们看这个schedule方法,他有两个参数,一个是TimerTask类,用来指定要执行的任务代码,一个是long类型,指定多长时间后执行任务代码,单位是毫秒
我们来具体看一下TimerTask类
![]()
明显的TimerTask是集成于Runnable的,所以我们可以定义自己的任务

再看延迟时间是如何设定的,System.currentTimeMillis()获取了当前时间,再加上我们设定的delay时间,就可以得到要执行任务的具体时间
1.3 模拟实现定时器
1.3.1 实现
- 首先我们需要一个类描述任务和执行任务的时间
描述任务我们可以使用Runnable接口设置具体任务,执行时间我么可以使用一个long类型的dealy表示
// 用一个类来描述任务及任务执行的时间
class MyTask implements Comparable<MyTask> {// 任务private Runnable runnable;// 任务执行的时间private long time;public MyTask(Runnable runnable, long delay) {// 校验任务不能为空if (runnable == null) {throw new IllegalArgumentException("任务不能为空.");}// 时间不能为负数if (delay < 0) {throw new IllegalArgumentException("执行时间不能小于0.");}this.runnable = runnable;// 计算出任务执行的具体时间this.time = delay + System.currentTimeMillis();}public Runnable getRunnable() {return runnable;}public long getTime() {return time;}
}
- 然后我们要组织任务
因为我们需要让时间设定最早的任务最先执行,所以可以使用一个优先级队列
private BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
- 之后我们需要向用户提供一个方法schedule,用来提交任务
public void schedule(Runnable runnable, long delay) throws InterruptedException {// 根据传处的参数,构造一个MyTaskMyTask task = new MyTask(runnable, delay);// 把任务放入阻塞队列queue.put(task);// System.out.println("添加成功");}
创建任务,将任务放入队列
- 需要一个线程执行任务
public class MyTimer {// 用一个阻塞队列来组织任务private BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();private Object locker = new Object();public MyTimer() {// 创建扫描线程Thread thread = new Thread(() -> {// 不断地扫描队列中的任务while (true) {try {// 1. 从队列中取出任务MyTask task = this.queue.take();// 2. 判断到没到执行时间long currentTime = System.currentTimeMillis();if (currentTime >= task.getTime()) {// 时间到了,执行任务task.getRunnable().run();} else {// 没有到时间,重新放回队列queue.put(task);}}} catch (InterruptedException e) {e.printStackTrace();}}}, "scanThread");// 启动线程,真正去系统中申请资源thread.start();
}
线程执行任务的逻辑是从队头取出任务,获取该任务的时间,如果时间到了则执行,如果时间没到则再放回队列
设定好一切后,我们来执行代码,结果出现报错

这是因为优先级队列中的类必须接入Comparable接口,指定排序规则,才能使时间早的任务排在队头
@Overridepublic int compareTo(MyTask o) {if (this.getTime() > o.getTime()) {return 1;} else if (this.getTime() < o.getTime()) {return -1;} else {return 0;}
我们接入Comparable接口之后实现compareTo方法,就可以正常运行了
1.3.2 优化
我们现在的定时器代码中的线程是在不断扫描任务判断是否执行,比如最近的一个任务还需要3个小时才执行,线程在这3个小时里面会一直循环,这个现象叫做“忙等”,浪费了计算机的资源
我们之前学过让线程暂时阻塞的wait方法,这个方法也可以指定阻塞时间,我们就可以采用这个方法避免忙等这种情况
private Object locker = new Object();while (true) {try {
// System.out.println("实例化出来的对象 "+ this);// 1. 从队列中取出任务MyTask task = this.queue.take();// 2. 判断到没到执行时间long currentTime = System.currentTimeMillis();if (currentTime >= task.getTime()) {// 时间到了,执行任务task.getRunnable().run();} else {// 当前时间与任务执行时间的差long waitTime = task.getTime() - currentTime;// 没有到时间,重新放回队列queue.put(task);synchronized (locker) {// 加入等时间locker.wait(waitTime);}}} catch (InterruptedException e) {e.printStackTrace();}}
我们用任务的执行时间减去当前时间,获取到时间差,让线程执行wait方法等待。
那如果在等待的时候我们加入了新的任务,这个任务只需要半个小时就要执行了,此时线程还在等待。很明显,我们必须唤醒线程!所以每加入新任务后就要唤醒一次线程
1.3.3 多线程情况
如果在多线程情况下,由于线程随机调度,可能会出现以下情况
现在是8点钟,有两个线程t1,t2,队列中现在只有一个9点的任务,按照线程扫描任务的逻辑代码来分析,t1此时取出了9点的任务,获取了执行时间,准备要等待1小时,就在上锁阻塞等待之前,CPU把线程调度给了t2,此时又入队了一个8点30分新任务,按照添加任务的代码逻辑会执行一次唤醒操作,但是t1在阻塞等待之前就被调度走了,也就是说这次唤醒是一个空操作。八点半的任务入队后,线程又调度会t1了,紧接着就执行阻塞等待1小时。
此时问题就出现了,t1要等待一小时,但是队列中现在有一个8点30分的任务,这个任务就不能及时被执行了,这个现象的原因就是没有保证原子性,导致t1扫描任务的时候被调度走了
synchronized (locker) {// 1. 从队列中取出任务MyTask task = this.queue.take();// 2. 判断到没到执行时间long currentTime = System.currentTimeMillis();if (currentTime >= task.getTime()) {// 时间到了,执行任务task.getRunnable().run();} else {// 当前时间与任务执行时间的差long waitTime = task.getTime() - currentTime;// 没有到时间,重新放回队列queue.put(task);synchronized (locker) {// 加入等时间locker.wait(waitTime);}}
}
所以我们在扫描任务中加上synchronized修饰,保证原子性
此时我们来执行下面的代码来测试
public class Demo_802 {public static void main(String[] args) throws InterruptedException {// 创建一个定时器对象MyTimer timer = new MyTimer();// 添加任务timer.schedule(() -> {System.out.println("马上就执行的任务1...");}, 1000);timer.schedule(() -> {System.out.println("马上就执行的任务2...");}, 2000);timer.schedule(() -> {System.out.println("马上就执行的任务3...");}, 3000);}
}

打印结果是按照我们设定的时间先后执行的,结果正确
特殊情况
如果我们把任务的时间改成如下情况,又会有什么样的结果
timer.schedule(() -> {System.out.println("马上就执行的任务1...");
}, 0);
timer.schedule(() -> {System.out.println("马上就执行的任务2...");
}, 0);
timer.schedule(() -> {System.out.println("马上就执行的任务3...");
}, 0);
我们把执行时间都改成了0,结果打印如下

代码在执行了第一个任务之后就不再打印,这是为什么呢,我们来逐步分析
第一个任务执行完了之后,按照逻辑来说,线程应该紧接着去获取第二个任务执行,但是这里没有执行,问题出现在下面代码,线程没有获取到第二个任务,所以一直等待,直到队列中有可用元素

我们再去分析提交任务的代码,任务2按照逻辑queue.put(task)之后添加到队列之后,线程就可以获取到而不是死等,但这里为什么一直等待呢?
这是因为这里虽然执行了添加代码,但是真正的添加成功是要等这个schedule方法执行完成之后的,也就是说必须要获取锁执行唤醒方法之后才可以添加任务

破解了,线程扫描任务的时候已经获取了锁,一直在等待新任务。添加新任务的时候也要获取锁,这样就形成了“你等我,我等你”的情况,也就是死锁现象,所以不能向下执行。这种现象就是因为我们刚刚在线程扫描外层加了锁导致的
1.3.4 设置后台线程
总结来说就是我们为了避免任务无法及时执行的问题,扩大了锁的范围,却又导致了死锁这个更大的问题
所以我们应该舍弃给线程加锁的方法,我们可以创建一个后台的扫描线程,制作定时唤醒操作,定时1s或10ms唤醒一次,也不会影响前台线程,可以随着进程的结束而结束
// 创建一个后台线程Thread daemonThread = new Thread(() -> {while (true) {// 定时唤醒synchronized (locker) {locker.notifyAll();}// 休眠一会try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}});// 设置成后台线程daemonThread.setDaemon(true);// 启动线程daemonThread.start();
这样就可以可以保证代码的正常运行了!
二、线程池
2.1 线程池的概念
线程池(Thread Pool) 是一种管理和复用线程的机制,它预先创建一定数量的线程,将任务提交给线程池执行,而不是为每个任务单独创建新线程。这种方式可以避免频繁创建和销毁线程带来的性能开销,提高系统响应速度和资源利用率。
2.2 标准库中的线程池
// 1. 用来处理大量短时间工作任务的线程池,如果池中没有可用的线程将创建新的线程,如果线程空闲60秒将收回并移出缓存
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 创建一个操作无界队列且固定大小线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 3. 创建一个操作无界队列且只有一个工作线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 4. 创建一个单线程执行器,可以在给定时间后执行或定期执行。
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// 5. 创建一个指定大小的线程池,可以在给定时间后执行或定期执行。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
// 6. 创建一个指定大小(不传入参数,为当前机器CPU核心数)的线程池,并行地处理任务,不保证处理顺序
Executors.newWorkStealingPool();
这些线程池都是已经设定好默认参数的
2.3 线程池的使用和参数
查看线程池的源代码,发现是调用了ThreadPoolExecutor构造方法

我们来分析这个构造方法的参数

| 参数 | 含义 |
| int corePoolSize | 核心线程的数量 |
| int maximumPoolSize | 线程池中最大的线程数 |
| long keepAliveTime | 临时线程的存活时间 |
| TimeUnit unit | 临时线程存活的时间单位 |
| BlockingQueue<Runnable> workQueue | 组织保存任务的队列 |
| ThreadFactory threadFactory | 创建线程的工厂(暂时不讲) |
| RejectedExecutionHandler handler | 拒绝策略 |
我们以去吃火锅为例来讲解
- 火锅店里有5张桌子,去的早时店里没人,可以直接上菜服务客人
5张桌子就是核心线程数,此时空闲,线程可以直接执行任务
- 到了饭点人越来越多,5张桌子都坐满了,后来的人需要排队,最多可以排20人
人多了,也就是任务多了,线程都在执行任务,排队就是阻塞队列,20就是阻塞队列的大小
- 排队的人越来越多,排满20号了,在店外面加了10张桌子
排队满20人就是阻塞队列满了,加的10张桌子就是创建10个临时线程
线程池中最大线程数=核心线程数+临时线程数=15
- 时间越来越晚,吃饭的人少了,外面的桌子也空了,老板决定30分钟之后如果没人来就把外面的桌子收掉
30分钟就是对应的临时线程的存活时间和存活时间单位
- 30分钟后没人来外面用餐,收掉外面的桌子,店里的5张桌子已经可以满足顾客需求
此时临时线程销毁,又回归了核心线程数
- 如果中途排号满了20号,10张临时的桌子也满了,老板就决定不接待客人了,后面再来排队的人就会被拒绝用餐
这对应到线程池就是当阻塞队列满后,线程数也达到最大线程数,执行拒绝策略
2.4 线程池工作流程
- 添加任务,核心线程从队列中去任务执行
- 核心线程都在工作时,再添加的任务就会进入阻塞队列
- 阻塞队列满后,就会创建临时线程,一次性会创建最大的线程数(根据机器配置,比如CPU核数)
- 执行拒绝策略
2.5 拒绝策略
我们查看实现了RejectedExecutionHandler接口的类

2.5.1 AbortPolicy
这是直接拒绝策略,比如公司分配给你任务,你告诉公司现在没时间处理这个任务,这个任务就被放弃
public class Demo_905 {public static void main(String[] args) throws InterruptedException {// 定义一个线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.AbortPolicy());// 通过循环向线程池中提交任务for (int i = 0; i < 100; i++) {int taskId = i + 1;threadPool.submit(() -> {System.out.println("执行任务:" + taskId + ", " + Thread.currentThread().getName());});}
}
在代码中使用这个策略,拒绝任务后会抛出异常,这个任务直接丢失

2.5.2 CallerRunPolicy
这个拒绝策略的核心逻辑是:当线程池无法处理新提交的任务时(阻塞队列已满且线程数达到最大值),让提交任务的 “调用者线程” 亲自执行该任务,而非直接抛弃或抛出异常。
就像主管交给你一个任务,你无法做,你把任务再返回给主管
这样保证任务不丢失:被拒绝的任务会由调用者线程执行,保证任务最终被处理。
public class Demo_905 {public static void main(String[] args) throws InterruptedException {// 定义一个线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.CallerRunsPolicy());// 通过循环向线程池中提交任务for (int i = 0; i < 100; i++) {int taskId = i + 1;threadPool.submit(() -> {System.out.println("执行任务:" + taskId + ", " + Thread.currentThread().getName());});}}
}
我们观察打印结果,可以看到线程池无法接受新任务之后执行返回给调用者的拒绝策略,那么这个任务就被交给main调用了

2.5.3 DiscardOldestPolicy
这是放弃目前最早等待的任务拒绝策略,就像老板分配给你任务之后,你没有时间,他告诉你说我之前最先分配给你的任务可以不做了。
当满足以下条件时,DiscardOldestPolicy 会触发:
- 线程池的任务队列已满(无法直接接收新任务)。
- 线程池中的线程数已达到
maximumPoolSize(无法创建新线程处理任务)。
此时,策略会执行两步操作:
- 丢弃任务队列中等待最久的任务(队列头部的任务)。
- 将新提交的任务加入队列尾部,等待后续被线程池处理。
public class Demo_905 {public static void main(String[] args) throws InterruptedException {// 定义一个线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.DiscardOldestPolicy());// 通过循环向线程池中提交任务for (int i = 0; i < 100; i++) {int taskId = i + 1;threadPool.submit(() -> {System.out.println("执行任务:" + taskId + ", " + Thread.currentThread().getName());});}
}

2.5.4 DiscardPolicy
这是放弃新提交的任务,你告诉老板没时间,老板说那就放弃吧,不做了
DiscardPolicy 是一种简单直接的拒绝策略,其核心逻辑是:当线程池无法处理新提交的任务时(任务队列已满已满且线程数达到最大值),直接丢弃该新任务,且不抛出任何异常。
这个策略与直接拒绝策略不同的就是直接拒绝会抛出异常,而DiscardPolicy会把新提交的任务静默丢弃,不执行也不通知调用者,不抛出异常,除非手动实现
public class Demo_905 {public static void main(String[] args) throws InterruptedException {// 定义一个线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.DiscardPolicy());// 通过循环向线程池中提交任务for (int i = 0; i < 100; i++) {int taskId = i + 1;threadPool.submit(() -> {System.out.println("执行任务:" + taskId + ", " + Thread.currentThread().getName());});}
}

2.6 模拟实现线程池
public class MyThreadPool {// 定义阻塞队列来组织任务BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);public MyThreadPool (int threadNum) {if (threadNum <= 0) {throw new IllegalArgumentException("线程数量必须大于0");}// 创建线程for (int i = 0; i < threadNum; i++) {Thread thread = new Thread(() -> {// 不停的扫描队列while (true) {try {// 从队列中取出任务Runnable runnable = queue.take();// 执行任务runnable.run();} catch (InterruptedException e) {e.printStackTrace();}}});// 启动线程thread.start();}}
// 提交任务到线程池public void submit(Runnable runnable) throws InterruptedException {if (runnable == null) {throw new IllegalArgumentException("任务不能为空.");}// 把任务加入了队列中queue.put(runnable);}
}
