当前位置: 首页 > news >正文

多线程-定时任务线程池源码

定时任务线程池

ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。

定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务的执行顺序,把最先执行的任务放到第一个,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞完成后,去执行任务。对于周期性执行的任务,执行完成后,会计算下一次启动时间,然后把任务重新提交到延迟队列。

源码分析

定时任务线程池的继承体系

定时任务线程池继承了ThreadPoolExecutor,同时实现了ScheduledExecutorService,这个接口定义了定时调度相关的功能

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

ScheduledExecutorService:定义了定时调度的功能

public interface ScheduledExecutorService extends ExecutorService {
    // 定时调度1次的任务
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 定时调度1次的任务,有返回值
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    // 以固定频率调度的任务
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    // 以固定延迟调度的任务
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

描述定时任务的类

描述定时任务的类:ScheduledThreadPoolExecutor的内部类ScheduledFutureTask

// ScheduledFutureTask:是定时任务线程池的内部类,封装了任务的启动时间、周期时间(隔多长时间执行一次),
// 任务在延迟队列中的索引、任务序号
private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    // 任务的启动时间,单位是纳秒
    private long time;
    // 任务的执行周期,单位是纳秒
    private final long period;

    // 任务在队列中的索引
    int heapIndex;
    // 任务序号,通过原子类生成
    private final long sequenceNumber;

    // 持有自己的实例
    RunnableScheduledFuture<V> outerTask = this;

    // 构造方法,参数1 异步任务,参数2 结果,参数3 任务的启动时间,参数4 任务的周期时间
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
}

// ScheduledFutureTask实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    // 定时任务是否是周期性的
    boolean isPeriodic();
}

// RunnableScheduledFuture继承了ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

// ScheduledFuture继承了Delayed,它封装了任务的延迟时间,表示任务延迟多久启动,继承
// Comparable接口,因为在循环队列中排序时需要用到
public interface Delayed extends Comparable<Delayed> {
    // 返回对象相关的延迟时长
    long getDelay(TimeUnit unit);
}

ScheduledFutureTask的继承体系上:

  • 继承了FutureTask,代表一个异步任务。 // FutureTask在之前学习Callable接口的时候已经接触到了。
  • 实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果,同时间接实现了Delayed接口,用于排序

定时任务主要有两个参数来描述任务的执行时间:

  • time:任务的启动时间,这是一个绝对时间,描述到了某个时间点,任务应该启动执行
  • period:任务的周期,描述两个任务之间间隔多长时间

延迟队列

定时任务线程池和普通线程池不一样的地方,在于它使用延迟队列,定时任务中封装好了任务的执行时间,任务的调度工作,是由延迟队列来执行的。

延迟队列的结构:

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

    // 队列内部使用的数组
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private int size = 0;

    // 等待队列中第一个任务的线程
    private Thread leader = null;
}

这里的结构很简单,主要是它的计算比较复杂,任务之间需要排序,组成一个最小堆,最先执行的任务放到前面,以及元素出队的方法、元素入队的方法。

工作机制

这里以scheduleAtFixedRate为例, 固定频率的定时任务,讲解定时任务的执行流程,其它类型的定时任务也类似。

提交定时任务

通过scheduleAtFixedRate方法,创建定时任务:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        // 构建ScheduledFutureTask实例
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),  // 任务的触发时间
                                      unit.toNanos(period));            // 任务的执行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

第一步:创建ScheduledFutureTask的实例

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);  // 调用父类FutureTask的构造方法
    this.time = ns;  // 任务的触发时间,这是一个绝对时间
    this.period = period;  // 任务的执行周期,表示两个任务之间间隔多长时间
    this.sequenceNumber = sequencer.getAndIncrement();  // 当前任务的序列号,通过原子类生成
}

// 父类FutureTask的构造方法
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

第二步:添加任务到延迟队列,并且新建线程执行任务

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);  // 任务添加到延迟队列
        if (isShutdown() &&   // 判断线程池是否关闭,如果关闭,移除任务
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart(); // 向线程池中添加线程,确保有线程执行任务
    }
}

// 添加任务到延迟队列的方法
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  // 异步任务实例
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {  // 如果当前任务是第一个任务,要唤醒在条件变量上阻塞的线程
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

// 向线程池中添加线程
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        // 注意,这里Worker实例的第一个参数 firstTask,值为null,表示Worker只可以从队列中获取任务
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

从阻塞队列中获取任务

新线程启动后,会执行Worker类的run方法 (参考ThreadPoolExecutor的执行原理),在run方法中,会从阻塞队列中获取异步任务,定时任务使用的阻塞队列是DelayedWorkQueue。

从阻塞队列中获取任务的方法:

// take方法没有指定超时时长,类似的,还有指定了超时时长的poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  // 获取锁
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];  // 队列中的第一个元素,
            if (first == null)
                available.await();  // 如果队列为空,阻塞
            else {
                // 获取第一个任务的延迟时间,表示延迟指定时长后,开始执行任务
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);  // 如果延迟时长小于等于0,证明可以开始执行任务了
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;  // 设置当前线程为leader
                    try {
                        available.awaitNanos(delay);  // 如果延迟时长大于0,那么线程进入阻塞状态并且指定时长
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

// 获取任务的延迟时间
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);  // time是绝对时间,它减去now(),就是相对时间,也就是延迟时间
}

阻塞队列负责按照任务的执行时间,对任务进行排序,最先执行的任务放在队列的第一位,这里没有展示排序的逻辑,排序是按照最小堆的逻辑来排序的。线程从阻塞队列中获取任务,会计算第一个任务的延迟时长,然后等待指定时长,在执行任务,这就是定时任务可以在指定时长后启动的逻辑,如果延迟队列中没有任务,线程会一直等待,同时,向延迟队列中添加任务时,如果发现当前任务是第一个任务,会唤醒正在等待的线程。

执行定时任务

从延迟队列中获取到任务后,线程会执行ScheduledFutureTask的run方法,因为ScheduledFutureTask间接继承了Runnable接口

// ScheduledFutureTask的run方法
public void run() {
    boolean periodic = isPeriodic();  // 任务是否是周期性的
    if (!canRunInCurrentRunState(periodic))  // 判断线程池是否还在运行
        cancel(false);
    else if (!periodic)  // 如果不是周期性的任务,直接执行,这里执行的是FutureTask中的run方法
        ScheduledFutureTask.super.run();
    // 如果是周期性的任务,执行完之后计算下次执行时间,然后重新提交任务实例到阻塞队列
    else if (ScheduledFutureTask.super.runAndReset()) {  
        setNextRunTime();  // 计算下次任务的执行时间,这个方法会更新任务的time属性
        reExecutePeriodic(outerTask);  // 再次向线程池中提交任务实例
    }
}

// 判断任务是否是周期性的
public boolean isPeriodic() {
    // 参考之前ScheduledFutureTask的实例的创建过程,period代表任务的执行周期,
    // 这个值不为0,证明是周期性的任务
    return period != 0;
}

1、为什么执行任务时会执行FutureTask中的run方法?因为在FutureTask的run方法中,会调用用户编写的run方法,也就是异步任务,ScheduledFutureTask中的run方法负责整体流程。

2、如果是周期性的任务,执行FutureTask中的runAndReset方法,它和run方法有什么不同?它执行完任务后,不会设置返回值,同时会把任务设置为初始状态,这个方法是为了执行多次的异步任务而设计的。

3、周期性的任务,执行完任务后,如何计算下次任务的执行时间?

// 计算任务下次执行时间的方法:
private void setNextRunTime() {
    long p = period;  // 任务的执行周期
    if (p > 0)
        time += p;    // 当前时间加上周期,固定频率(scheduleAtFixedRate)的定时任务走这段逻辑
    else
        time = triggerTime(-p);  // 更新任务的执行时间,固定延迟(scheduleWithFixedDelay)的定时任务走这段逻辑
}

这里需要解释一下,time属性是任务的执行时间,是一个绝对时间,表示到了某个点,例如 2020-01-01 00:00:00 这个固定的点,启动定时任务,period,是两个任务之间的间隔时长,例如,每隔10分钟,执行一次定时任务。对于固定频率的定时任务和固定延迟的定时任务,它们在创建任务实例的过程中稍有不同:

// 创建固定频率的定时任务
new ScheduledFutureTask<Void>(command,
                              null,
                              triggerTime(initialDelay, unit),   // 计算time的值
                              unit.toNanos(period));             // 计算period的值,注意,period是正数
// 创建固定延迟的定时任务
new ScheduledFutureTask<Void>(command,
                              null,
                              triggerTime(initialDelay, unit),   // 计算time的值
                              unit.toNanos(-delay));             // 计算period的值,注意,period是负数

一个执行周期是正数,一个执行周期时负数,所以在计算任务下次执行时间的方法中,它们会走向不同的链路,把该方法重新粘贴到下面,重新再看:

// 计算任务下次执行时间的方法:
private void setNextRunTime() {
    long p = period;  // 任务的执行周期
    if (p > 0)
        time += p;    // 固定频率,上次任务执行时间加上执行周期,就是下次执行时间
    else
        time = triggerTime(-p);  // 固定延迟,当前时间加上执行周期,就是下次执行时间, // triggerTime方法在下面
}

long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

所以,固定频率执行的任务,如果上一次的任务执行超时,直到下一次任务该启动时还没有执行完成,一旦上一次任务执行完成,下一次任务立刻启动,因为上一次任务执行完成后,计算下一次任务的执行时间,发现执行时间在当前时间之前,所以线程获取任务时不会阻塞,会立刻取出任务,然后执行。固定延迟的任务,是根据上次任务结束时间来计算下次任务开始时间的,所以它是固定延迟。

总结

定时任务的执行过程:

  • 第一步:向线程池提交定时任务(schedule方法)
  • 第二步:创建定时任务实例(ScheduleFutureTask实例)
  • 第三步:把定时任务添加到延迟队列,延迟队列会对任务进行排序,最先执行的定时任务放到开头
  • 第四步:新建线程,从延迟队列中获取定时任务,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞结束后,执行定时任务
  • 第五步:执行完成后,计算下一次任务的执行时间,然后重新向线程池中提交任务实例

Q&A

只执行一次的定时任务和周期性的定时任务,分别是如何执行的?

周期性的定时任务,在执行完一次后,会计算下次任务的启动时间,然后再次向阻塞队列中提交任务实例,只执行一次的定时任务则不会

线程是如何在指定时间启动定时任务的?

阻塞队列会把需要最先执行的定时任务放在队列的开头,线程会获取第一个任务的延迟时间,然后根据延迟时间休眠指定时长,休眠结束后,执行定时任务。

按照固定频率执行的定时任务和按照固定延迟执行的定时任务,分别是如何执行的?

按照固定频率执行的定时任务,下次任务的执行时间 = 上次任务的启动时间 + 周期

按照固定延迟执行的定时任务,下次任务的执行时间 = 上次任务的结束时间 + 周期

依据不同的计算方式,计算出下次任务的执行时间,然后提交任务实例到队列中

相关文章:

  • navicat导出postgresql的数据库结构、字段名、备注等等
  • kubectl 运行脚本 kubernetes 部署springcloud微服务 yaml + Dockerfile+shell 脚本
  • 大模型巅峰对决:DeepSeek vs GPT-4/Claude/PaLM-2 全面对比与核心差异揭秘
  • PTA L2一些题目
  • PMP项目管理—沟通管理篇—1.规划沟通管理
  • 深圳区域、人口、地铁线
  • nvm的使用汇总
  • 珈和科技应邀参会农药减施增效研讨会 共探植保未来发展新篇章
  • 程序员学习强化学习之基本概念的数学表达
  • 在华为设备上,VRRP与BFD结合使用可以快速检测链路故障并触发主备切换
  • 【五.LangChain技术与应用】【14.LangChain与MoonShot、通义千问:多模型融合的实战】
  • 【时时三省】(C语言基础)算术表达式和运算符
  • 如何在 Aptos 上铸造 NFT
  • 网络安全可以从事什么工作?
  • Android14 OTA升级
  • 大模型如何协助知识图谱进行实体关系之间的分析
  • 【Azure 架构师学习笔记】- Azure Databricks (16) -- Delta Lake 和 ADLS整合
  • Android14 OTA差分包升级报Package is for source build
  • 计算机毕业设计SpringBoot+Vue.js物流管理系统(源码+文档+PPT+讲解)
  • 推导二项型事件的随机变量标准误差:两种方法
  • 刘小涛任江苏省委副书记
  • 乌克兰官员与法德英美四国官员举行会谈
  • 武大校长:人工智能不存在“过度使用”,武大不会缩减文科
  • 现场丨在胡适施蛰存等手札与文献间,再看百年光华
  • 透视社会组织创新实践中的花开岭现象:与乡村发展的融合共进
  • 泽连斯基启程前往土耳其