当前位置: 首页 > news >正文

ScheduledThreadPoolExecutor实现原理

文章目录

  • 概要
  • 核心方法
    • schedule(Runnable command, long delay,TimeUnit unit)
      • 参数校验
      • 任务转换
      • 加入延迟队列
      • 运行任务
        • 判断任务类型
    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

概要

ScheduledThreadPoolExecutor 是一个可以在指定一定延迟时间 后或者定时进行任务调度执行的线程池。ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 并实现了ScheduledExecutorService 接 口 。线程池 队 列 是 DelayedWorkQueue , 其和 DelayedQueue 类似 ,是一个延迟队列。

ScheduledFutureTask 是具有返回值的任务 , 继承自 FutureTask 。 FutureTask 的内部有一个变量 state 用来表示任务的状态 , 一开始状态为 NEW , 所有状态为:

private static final int NEW = 0 ; //初始状态
private static final int COMPLETING = 1; //4flA于中才犬态
private static final int NO卧1AL = 2 ; // 正常运行结束状态
private static final int EXCEPTIONAL = 3 ; // 运行中异常
private static final int CANCELLED = 4 ; // 任务被取消
private static final int INTERRUPTING= 5 ; // 任务正在被中断
private static final int INTERRUPTED = 6; // 任务已经被中断

可能的任务状态转换路径为:
NEW ——>COMPLETING——> NORMAL //初始状态——>执行中——>正常结束

NEW——> COMPLETING ——>EXCEPTIONAL //初始状态——>执行中——>执行异常

NEW——> CANCELLED//初始状态→任务取消

NEW——> INTERRUPTING——> INTERRUPTED //初始状态 一〉被 中 断 中 →被中断

ScheduledFutureTask 内部还有一个变量 period 用来表示任务的类型 ,任务类型如下 :

  • period= 0 , 说明当前任务是一次性的 ,执行完毕后就退出了 。
  • period 为负数,说明 当前任务为 fixed-delay 任务,是固定延迟的定时可重复执行任务 。
  • period 为正数,说明 当前任务为 fixed-rate 任务 , 是固定频率的定时可重复执行任务。

ScheduledThreadPoolExecutor 的 一个构造函数如下,由 该 构造函数可知线程池队列是DelayedWorkQueue 。

``

public ScheduledThreadPoolExecutor(int corePoolSize) {
// 调用父类Thre adPoolExecutor的构造函数
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());

}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

``

核心方法

schedule(Runnable command, long delay,TimeUnit unit)

``

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {

// (1) 参数校验
if (command == null || unit == null)
throw new NullPointerException();
// (2) 任务转换
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask(command, null,
triggerTime(delay, unit)));
// (3) 添加任务到延迟队
delayedExecute(t);
return t;
}

``

参数校验

如 上代码 (1) 进行参数校验,如果 command 或者 unit 为 null ,则抛出 NPE 异常 。

任务转换

代码( 2 )装饰任 务 ,把提 交 的 command 任 务转换为 ScheduledFutureTask 。ScheduledFutureTask 是 具体放入延迟 队列里面的 东西。由于是 延迟任 务 ,所以ScheduledFutureTask 实现了 long getDelay(TimeUnit unit)和 int compareTo(Delayed other) 方法。 triggerTime 方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的 long 型值 。 ScheduledFutureTask 的构造函数如下:

``

ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

``

在构造函数内部首先调用了父类 FutureTask 的构造函数 , 父类 FutureTask 的构造函数
代码如下 :

``

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

``

FutureTask 中的任务被转换为 Callable 类型后 ,被保存到 了变量 this.callable 里面 , 并设置 FutureTask 的任务状态为 NEW 。

然后在 ScheduledFutureTask 构造函数内部设置 time 为上面说的绝对时间。需要注意,这里 period 的值为 0 , 这说明当前任务为一次性的任务,不是定 时反复执行任务。其中 long getDelay(TimeUnit unit) 方法的代码如下(该方法用来计算当 前任务还有多少 时间 就过期了) 。

``

public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

``

compareTo(Delayed other) 方法的代码如下:

``

public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

``

compareTo 的作用 是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的 compareTo 方法与队列里面其他元素进行 比较,让最快要过期的元素放到队首。 所以无论什么时候向队列里面添加元素 ,队首 的元素都是最快要过期的元素。

加入延迟队列

代码(3)将任务添加到延迟队列, delayedExecute 的代码如下 :

``

private void delayedExecute(RunnableScheduledFuture<?> task) {
// (4) 如果线程池关闭了 , 则执行线程池拒绝策略
if (isShutdown())
reject(task);
else {
// (5) 添加任务到延迟队
super.getQueue().add(task);
// (6) 再次检查线程池状态
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// (7) 确保至 少一个线程在处理任务
ensurePrestart();
}
}

``

代码( 4 )首先判 断当前线程池是 否 已经关 闭 了 ,如 果 已经关 闭则 执行线程池的拒绝策略 , 否则执行代码 ( 5 )将任务添加到延迟队列 。添加完毕后还要重新检查线程池是否被关 闭了,如果 已经关 闭则从延迟队列里面删除刚才添加的任务,但是此时有可能线程池中的线程已经从任务队列里面移除 了 该任务 ,也就是该任务 已经在执行 了 ,所以还需要调用任务的 cancle 方法取消任务。

如果代码( 6 ) 判断结果为 false ,则会执行代码( 7 ) 确保至少有一个线程在处理任务 ,即使核心线程数 corePoolSize 被设置为 0 。 ensurePrestart 的代码如下 :

``

void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 增加核心线程数
if (wc < corePoolSize)
addWorker(null, true);
// 如果初始化corePoolSize= = O , 则也添加一个线程。
else if (wc == 0)
addWorker(null, false);
}

``

如上代码首先获取线程池中的线程个数,如果线程个数小于核心线程数则新增一个线程 ,否则如果当前线程数为 0 则新增一个线程。

运行任务

线程池里面的线程如何获取并执行任务?在前面讲解 ThreadPooIExecutor 时 我们说过, 具体执行任务的线程是 Worker 线程, Worker 线程调用具体任务的 run 方法来执行 。 由于这 里 的任务是ScheduledFutureTask,所以我们下面看看 ScheduledFutureTask 的 run 方法。

``

public void run() {
// (8) 是否只执行一次
boolean periodic = isPeriodic();
// (9) 取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// (9) 只执行一次,调用 schedule方法时候
else if (!periodic)
ScheduledFutureTask.super.run();
// (11) 定时执行
else if (ScheduledFutureTask.super.runAndReset()) {
// (11.1) 设置time=time+period
setNextRunTime();
// (11.2) 重新加入该任务到delay队列
reExecutePeriodic(outerTask);
}
}

``

判断任务类型

代码 ( 8 )中 的 isPeriodic 的作用是判断 当前任务是一次性任务还是可重复执行的任务(period=0为一次性任务)。

代码( 9 )判断当前任务是否应该被取消, canRunlnCurrentRunState 的代码如下 :

``

boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}

``

这里传递的 periodic 的 值为 false , 所以isRunningOrShutdown 的参数为 executeExistingDelayedTasksAfterShutdown 。 executeExistingDelayedTasksAfterShutdown 默认为 true , 表示当其他线程调用 了 shutdown 命令关闭了线程池后,当前任务还是要执行,否则如果为false ,则 当 前任务要被取消。

由 于 periodic 的 值为 false ,所以执行代码 (1 0 )调用 父类 FutureTask 的 run 方法具体执行任务。 FutureTask 的 run 方法的代码如下:

``

public void run() {
// (12)
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// (13)
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// (13.1)
setException(ex);
}
// (13.2)
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

``

代码 (12) 判断如果任 务 状态不是 NEW 则 直接返 回,或者如 果当前任务状态为NEW 但是使用 CAS 设置当前任务的持有者为当前线程失败则直接返回。代码 (13) 具体
调用 callable 的 call 方法执行任务。这里在调用前又判断了任务的状态是否为 NEW ,是为
了避免在执行代码( 12 )后其他线程修改了任务的状态 ( 比如取消了 该任务〉。

如果任务执行成功则执行代码 (13.2)修改任务状态。首先使用 CAS 将当前任务的状态从 NEW 转换到 COMPLETING。这里当有多个线程调用时只有一个线程会成功。成功 的线程再通过 UNSAFE.putOrderedlnt 设置任务的状态为正常结束状态,这里没有使用 CAS 是因为对于同一个任务只可能有一个线程运行到这里。在这里使用 putOrderedlnt 比使用 CAS 或者 putLongvolatile 效率要高,并且这里的场景不要求其他线程马上对设置的状态值可见 。

如果任务执行失败,则执行代码 (13.1)。

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

当任务执行完毕后,让其延迟固定时间后再次运行 ( fixed-delay 任务)。其 中 initia!Delay 表示提交任务后延迟多 少时间开始执行任务 command, delay 表示当任务执行完毕后延长多少时间后再次运行 command 任务 , unit 是 initia!Delay 和 delay 的时间单位。任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。 schedul eWithFixedDelay 的代码如下:

``

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// (14) 参数校验
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// (15) 任务转换 , 注意这里是period= -delay<0
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
// (16) 添加任务到队列
delayedExecute(t);
return t;
}

``

代码 (14 )进行参数校验 , 校验失败则抛出异常,代码 (15 )将 command 任务转换为 ScheduledFutureTask。这里需要注意 的是,传递给 ScheduledFutureTask 的 period 变量的值为 -delay , period<O 说明该任务为可重复执行的任务。然后代码( 16 )添加任务到延迟队列后返回。

将任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用ScheduledFutureTask 的 run 方法执行 。 由于这里 period<0 ,所以 isPeriodic 返回 true ,所以执行代码 (11)。 runAndReset 的代码如下:

``

protected boolean runAndReset() {

  // (17)
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
   // (18)      
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;   // (19)

}

``

该代码和 FutureTask 的 run 方法类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。这里多了代码 (19 ) ,这段代码判断如果当前任务正常执行完毕并且任务状态为 NEW 则返回 true , 否 则返回 false 。 如果返回了 true 则执行代码( 11.1 )的 setNextRunTime 方法设置该任务下一次的执行时间。setNextRunTime 的代码如下:

``

private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

``

这里 p< 0 说明 当前任务为 fixed-delay 类型任务。然后 设置 time 为当前时间加上 -p 的时间,也就是延迟?时间后再次执行 。

fixed-delay 类型 的任务的执行原理为,当添加一个任务到延迟队列后,等待 initialDelay 时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

该方法相对起始时间点以固定频率调用指定的任务( fixed-rate 任务) 。当把任务提交到线程池并延迟 initialDelay 时间 ( 时间单位为 unit)后开始执行任务 command 。然后从initialDelay+period 时间点再 次执行 ,而 后在 initialDelay + 2 * period 时间点再次执行 ,循环往复,直到抛出异常或者调用了任务的 cancel 方法取消了任务,或者关闭了线程池。scheduleAtFixedRate 的原理与scheduleWithFixedDelay 类似。

``

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

``

在将 fixed-rate 类型的任务 command 转换为ScheduledFutureTask 时设置 period=period,不再是 -period 。

所以当前任务执行完毕后,调用 setNextRunTime 设置任务下次执行的时间时执行的是 time += p 而不再是 time = triggerTime(-p) 。

相对于 fixed-delay 任务来说, fixed-rate 方式执行规则为,时间 为 initdelday +n*period 时 启动任务,但是如果当前任务还没有执行完,下 一 次要执行任务的时间到了 ,则不会并发执行 ,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行

相关文章:

  • 无人机遥控器接口作用详解!
  • 服务器独立IP对于网站的作用
  • GPU和FPGA的区别
  • ath9k(Atheros芯片)开源驱动之wifi连接
  • 基于SpringBoot的城乡商城协作系统【附源码】
  • elf_loader:一个使用Rust编写的ELF加载器
  • 【模型】GRU模型详解
  • 怎么在Github上readme文件里面怎么插入图片?
  • Oracle 连接报错:“ORA-12541:TNS:no listener ”,服务组件中找不到监听服务
  • CNewMenu::QueryContextMenu函数分析之新建菜单项的创建
  • z-score算法
  • expo应用,登录失败,页面自动刷新???----全局状态滥用导致的bug
  • sdut-C语言实验-二分查找
  • 学习笔记01——《深入理解Java虚拟机(第三版)》第二章
  • Android输入事件传递流程系统源码级解析
  • DeepSeek基础之机器学习
  • Spring AutoWired与Resource区别?
  • OpenHarmony构建系统-GN与子系统、部件、模块理论与实践
  • 新学一个JavaScript 的 classList API
  • 如何使用 JavaScript 模拟 Docker 中的 UnionFS 技术:从容器到文件系统的映射
  • 红场阅兵即将开始!中国人民解放军仪仗队亮相
  • 中国国家电影局与俄罗斯文化部签署电影合作文件
  • 紧盯大V、网红带货肉制品,整治制售假劣肉制品专项行动开展
  • 金融监管总局:支持银行有序设立科技金融专门机构,推动研发机器人、低空飞行器等新兴领域的保险产品
  • 线下无理由退货怎样操作?线上线下监管有何不同?市场监管总局回应
  • 谢承祥已任自然资源部总工程师