多线程-定时任务线程池源码
定时任务线程池
ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。
定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务的执行顺序,把最先执行的任务放到第一个,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞完成后,去执行任务。对于周期性执行的任务,执行完成后,会计算下一次启动时间,然后把任务重新提交到延迟队列。
源码分析
定时任务线程池的继承体系
定时任务线程池继承了ThreadPoolExecutor,同时实现了ScheduledExecutorService,这个接口定义了定时调度相关的功能
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
ScheduledExecutorService:定义了定时调度的功能
public interface ScheduledExecutorService extends ExecutorService {
// 定时调度1次的任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 定时调度1次的任务,有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 以固定频率调度的任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 以固定延迟调度的任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
描述定时任务的类
描述定时任务的类:ScheduledThreadPoolExecutor的内部类ScheduledFutureTask
// ScheduledFutureTask:是定时任务线程池的内部类,封装了任务的启动时间、周期时间(隔多长时间执行一次),
// 任务在延迟队列中的索引、任务序号
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 任务的启动时间,单位是纳秒
private long time;
// 任务的执行周期,单位是纳秒
private final long period;
// 任务在队列中的索引
int heapIndex;
// 任务序号,通过原子类生成
private final long sequenceNumber;
// 持有自己的实例
RunnableScheduledFuture<V> outerTask = this;
// 构造方法,参数1 异步任务,参数2 结果,参数3 任务的启动时间,参数4 任务的周期时间
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
// ScheduledFutureTask实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
// 定时任务是否是周期性的
boolean isPeriodic();
}
// RunnableScheduledFuture继承了ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
// ScheduledFuture继承了Delayed,它封装了任务的延迟时间,表示任务延迟多久启动,继承
// Comparable接口,因为在循环队列中排序时需要用到
public interface Delayed extends Comparable<Delayed> {
// 返回对象相关的延迟时长
long getDelay(TimeUnit unit);
}
ScheduledFutureTask的继承体系上:
- 继承了FutureTask,代表一个异步任务。 // FutureTask在之前学习Callable接口的时候已经接触到了。
- 实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果,同时间接实现了Delayed接口,用于排序
定时任务主要有两个参数来描述任务的执行时间:
- time:任务的启动时间,这是一个绝对时间,描述到了某个时间点,任务应该启动执行
- period:任务的周期,描述两个任务之间间隔多长时间
延迟队列
定时任务线程池和普通线程池不一样的地方,在于它使用延迟队列,定时任务中封装好了任务的执行时间,任务的调度工作,是由延迟队列来执行的。
延迟队列的结构:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 队列内部使用的数组
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private int size = 0;
// 等待队列中第一个任务的线程
private Thread leader = null;
}
这里的结构很简单,主要是它的计算比较复杂,任务之间需要排序,组成一个最小堆,最先执行的任务放到前面,以及元素出队的方法、元素入队的方法。
工作机制
这里以scheduleAtFixedRate为例, 固定频率的定时任务,讲解定时任务的执行流程,其它类型的定时任务也类似。
提交定时任务
通过scheduleAtFixedRate方法,创建定时任务:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
// 构建ScheduledFutureTask实例
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit), // 任务的触发时间
unit.toNanos(period)); // 任务的执行周期
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
第一步:创建ScheduledFutureTask的实例
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result); // 调用父类FutureTask的构造方法
this.time = ns; // 任务的触发时间,这是一个绝对时间
this.period = period; // 任务的执行周期,表示两个任务之间间隔多长时间
this.sequenceNumber = sequencer.getAndIncrement(); // 当前任务的序列号,通过原子类生成
}
// 父类FutureTask的构造方法
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
第二步:添加任务到延迟队列,并且新建线程执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task); // 任务添加到延迟队列
if (isShutdown() && // 判断线程池是否关闭,如果关闭,移除任务
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); // 向线程池中添加线程,确保有线程执行任务
}
}
// 添加任务到延迟队列的方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 异步任务实例
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) { // 如果当前任务是第一个任务,要唤醒在条件变量上阻塞的线程
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 向线程池中添加线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
// 注意,这里Worker实例的第一个参数 firstTask,值为null,表示Worker只可以从队列中获取任务
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
从阻塞队列中获取任务
新线程启动后,会执行Worker类的run方法 (参考ThreadPoolExecutor的执行原理),在run方法中,会从阻塞队列中获取异步任务,定时任务使用的阻塞队列是DelayedWorkQueue。
从阻塞队列中获取任务的方法:
// take方法没有指定超时时长,类似的,还有指定了超时时长的poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0]; // 队列中的第一个元素,
if (first == null)
available.await(); // 如果队列为空,阻塞
else {
// 获取第一个任务的延迟时间,表示延迟指定时长后,开始执行任务
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first); // 如果延迟时长小于等于0,证明可以开始执行任务了
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; // 设置当前线程为leader
try {
available.awaitNanos(delay); // 如果延迟时长大于0,那么线程进入阻塞状态并且指定时长
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// 获取任务的延迟时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS); // time是绝对时间,它减去now(),就是相对时间,也就是延迟时间
}
阻塞队列负责按照任务的执行时间,对任务进行排序,最先执行的任务放在队列的第一位,这里没有展示排序的逻辑,排序是按照最小堆的逻辑来排序的。线程从阻塞队列中获取任务,会计算第一个任务的延迟时长,然后等待指定时长,在执行任务,这就是定时任务可以在指定时长后启动的逻辑,如果延迟队列中没有任务,线程会一直等待,同时,向延迟队列中添加任务时,如果发现当前任务是第一个任务,会唤醒正在等待的线程。
执行定时任务
从延迟队列中获取到任务后,线程会执行ScheduledFutureTask的run方法,因为ScheduledFutureTask间接继承了Runnable接口
// ScheduledFutureTask的run方法
public void run() {
boolean periodic = isPeriodic(); // 任务是否是周期性的
if (!canRunInCurrentRunState(periodic)) // 判断线程池是否还在运行
cancel(false);
else if (!periodic) // 如果不是周期性的任务,直接执行,这里执行的是FutureTask中的run方法
ScheduledFutureTask.super.run();
// 如果是周期性的任务,执行完之后计算下次执行时间,然后重新提交任务实例到阻塞队列
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 计算下次任务的执行时间,这个方法会更新任务的time属性
reExecutePeriodic(outerTask); // 再次向线程池中提交任务实例
}
}
// 判断任务是否是周期性的
public boolean isPeriodic() {
// 参考之前ScheduledFutureTask的实例的创建过程,period代表任务的执行周期,
// 这个值不为0,证明是周期性的任务
return period != 0;
}
1、为什么执行任务时会执行FutureTask中的run方法?因为在FutureTask的run方法中,会调用用户编写的run方法,也就是异步任务,ScheduledFutureTask中的run方法负责整体流程。
2、如果是周期性的任务,执行FutureTask中的runAndReset方法,它和run方法有什么不同?它执行完任务后,不会设置返回值,同时会把任务设置为初始状态,这个方法是为了执行多次的异步任务而设计的。
3、周期性的任务,执行完任务后,如何计算下次任务的执行时间?
// 计算任务下次执行时间的方法:
private void setNextRunTime() {
long p = period; // 任务的执行周期
if (p > 0)
time += p; // 当前时间加上周期,固定频率(scheduleAtFixedRate)的定时任务走这段逻辑
else
time = triggerTime(-p); // 更新任务的执行时间,固定延迟(scheduleWithFixedDelay)的定时任务走这段逻辑
}
这里需要解释一下,time属性是任务的执行时间,是一个绝对时间,表示到了某个点,例如 2020-01-01 00:00:00 这个固定的点,启动定时任务,period,是两个任务之间的间隔时长,例如,每隔10分钟,执行一次定时任务。对于固定频率的定时任务和固定延迟的定时任务,它们在创建任务实例的过程中稍有不同:
// 创建固定频率的定时任务
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit), // 计算time的值
unit.toNanos(period)); // 计算period的值,注意,period是正数
// 创建固定延迟的定时任务
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit), // 计算time的值
unit.toNanos(-delay)); // 计算period的值,注意,period是负数
一个执行周期是正数,一个执行周期时负数,所以在计算任务下次执行时间的方法中,它们会走向不同的链路,把该方法重新粘贴到下面,重新再看:
// 计算任务下次执行时间的方法:
private void setNextRunTime() {
long p = period; // 任务的执行周期
if (p > 0)
time += p; // 固定频率,上次任务执行时间加上执行周期,就是下次执行时间
else
time = triggerTime(-p); // 固定延迟,当前时间加上执行周期,就是下次执行时间, // triggerTime方法在下面
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
所以,固定频率执行的任务,如果上一次的任务执行超时,直到下一次任务该启动时还没有执行完成,一旦上一次任务执行完成,下一次任务立刻启动,因为上一次任务执行完成后,计算下一次任务的执行时间,发现执行时间在当前时间之前,所以线程获取任务时不会阻塞,会立刻取出任务,然后执行。固定延迟的任务,是根据上次任务结束时间来计算下次任务开始时间的,所以它是固定延迟。
总结
定时任务的执行过程:
- 第一步:向线程池提交定时任务(schedule方法)
- 第二步:创建定时任务实例(ScheduleFutureTask实例)
- 第三步:把定时任务添加到延迟队列,延迟队列会对任务进行排序,最先执行的定时任务放到开头
- 第四步:新建线程,从延迟队列中获取定时任务,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞结束后,执行定时任务
- 第五步:执行完成后,计算下一次任务的执行时间,然后重新向线程池中提交任务实例
Q&A
只执行一次的定时任务和周期性的定时任务,分别是如何执行的?
周期性的定时任务,在执行完一次后,会计算下次任务的启动时间,然后再次向阻塞队列中提交任务实例,只执行一次的定时任务则不会
线程是如何在指定时间启动定时任务的?
阻塞队列会把需要最先执行的定时任务放在队列的开头,线程会获取第一个任务的延迟时间,然后根据延迟时间休眠指定时长,休眠结束后,执行定时任务。
按照固定频率执行的定时任务和按照固定延迟执行的定时任务,分别是如何执行的?
按照固定频率执行的定时任务,下次任务的执行时间 = 上次任务的启动时间 + 周期
按照固定延迟执行的定时任务,下次任务的执行时间 = 上次任务的结束时间 + 周期
依据不同的计算方式,计算出下次任务的执行时间,然后提交任务实例到队列中