当前位置: 首页 > news >正文

ThreadPoolExecutor 源码分析

理解 ThreadPoolExecutor 的源码是掌握 Java 线程池工作原理的关键。以下是其核心源码逻辑的深入分析,结合设计思想和关键代码片段。


一、核心设计思想

  1. 状态与线程数合并存储
    使用 AtomicInteger 类型的 ctl 变量,高 3 位存储线程池状态低 29 位存储工作线程数workerCount),通过位运算高效管理。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 最大线程数 (2^29-1)
    
    // 状态定义(高3位)
    private static final int RUNNING    = -1 << COUNT_BITS; // 111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000
    private static final int STOP       =  1 << COUNT_BITS; // 001
    private static final int TIDYING    =  2 << COUNT_BITS; // 010
    private static final int TERMINATED =  3 << COUNT_BITS; // 011
    
    // 获取状态和线程数
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
  2. 任务队列与工作线程解耦

    • 任务队列 (workQueue):存储待执行的 Runnable 任务,实现生产者-消费者模式。
    • 工作线程 (Worker):封装 Thread 和任务执行逻辑,通过 AQS 实现简单锁。

二、核心源码分析

1. 任务提交:execute() 方法
public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();
    
    // 1. 当前线程数 < corePoolSize,创建新线程(核心线程)
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // true表示使用corePoolSize限制
            return;
        c = ctl.get(); // 创建失败后重新获取状态
    }
    
    // 2. 任务入队(线程池处于RUNNING状态)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查:若线程池已关闭,则回滚任务并拒绝
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 若线程数为0(如corePoolSize=0),则创建非核心线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列已满,尝试创建非核心线程(不超过maximumPoolSize)
    else if (!addWorker(command, false))
        reject(command); // 触发拒绝策略
}
2. 工作线程管理:Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread; // 实际执行任务的线程
    Runnable firstTask;  // 初始任务(可能为null)

    Worker(Runnable firstTask) {
        setState(-1); // 防止线程未启动时被中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建线程
    }

    public void run() {
        runWorker(this); // 核心执行逻辑
    }

    // ... 其他方法(如锁操作)
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 循环从队列中获取任务(getTask()方法)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查线程池状态,若处于STOP则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 钩子方法(子类可扩展)
                try {
                    task.run(); // 执行任务
                    afterExecute(task, null); // 钩子方法
                } catch (Throwable ex) {
                    afterExecute(task, ex); // 处理异常
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出处理(回收或替换)
    }
}
3. 任务获取:getTask() 方法
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1. 线程池已关闭且队列为空,返回null(线程退出)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 2. 判断是否允许超时回收线程:
        //    - 允许核心线程超时(allowCoreThreadTimeOut为true)
        //    - 当前线程数超过corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 3. 线程数超过maximumPoolSize或需要回收线程时,减少线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 4. 从队列中获取任务(支持超时)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true; // 超时标记
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

三、关键机制解析

1. 线程池状态转换
  • RUNNING → SHUTDOWN:调用 shutdown(),停止接受新任务,继续处理队列任务。
  • RUNNING/SHUTDOWN → STOP:调用 shutdownNow(),中断所有线程并清空队列。
  • SHUTDOWN/STOP → TIDYING:当所有线程终止且队列为空时自动转换。
  • TIDYING → TERMINATED:调用 terminated() 钩子方法后完成终止。
2. 线程回收逻辑
  • 核心线程:默认不回收(allowCoreThreadTimeOut=false),除非显式设置允许。
  • 非核心线程:空闲超过 keepAliveTime 后被回收。
3. 拒绝策略触发条件
  • 线程池非RUNNING状态。
  • 队列已满且线程数达到 maximumPoolSize

四、设计精髓总结

  1. 状态压缩与原子操作:通过 ctl 变量高效管理状态和线程数,减少锁竞争。
  2. Worker与AQS:利用 AQS 实现简单锁,控制任务执行期间的中断。
  3. 任务获取策略getTask() 方法通过 poll/take 实现灵活的任务获取和超时控制。
  4. 动态参数调整:支持运行时修改 corePoolSizemaximumPoolSize(通过 setCorePoolSize() 等方法)。

五、使用注意事项

  1. 合理配置参数:根据任务类型(CPU密集型 vs IO密集型)调整 corePoolSize 和队列类型。
  2. 避免任务堆积:使用有界队列防止内存溢出,并设置合理的拒绝策略。
  3. 异常处理:通过 afterExecute()UncaughtExceptionHandler 捕获任务异常,避免线程泄漏。

相关文章:

  • 荣耀手机卸载应用商店、快应用中心等系统自带的
  • Linux 命令:按内存使用大小排序查看 PID 的完全指南
  • Swift实战(微调多模态模型Qwen2.5 vl 7B)
  • 基于香橙派 KunpengPro学习CANN(3)——pytorch 模型迁移
  • JavaScript基础-获取元素
  • Shell脚本中的弱治简写
  • 平衡树的模拟实现
  • Golang开发
  • ROS合集(一)ROS常见命令及其用途
  • springboot多种生产打包方式教程
  • 循环神经网络中用到的概率论知识
  • YOLOv8 OBB 旋转目标检测模型详解与实践
  • 59. 螺旋矩阵 II
  • 深度洞察:特种设备作业考试的核心要点与备考策略
  • 蓝桥杯 修剪灌木
  • opencv初步学习——图像处理3
  • LeetCode BFS层序遍历树
  • 工作记录 2017-02-04
  • 【css酷炫效果】纯CSS实现照片堆叠效果
  • 2025年通信安全员考试题库及答案
  • 著名心血管病学专家李国庆教授逝世,享年63岁
  • 征稿启事|澎湃·镜相第三届非虚构写作大赛暨2026第六届七猫现实题材征文大赛
  • 牛市早报|持续推进城市更新行动意见印发,证监会强化上市公司募资监管
  • 南京江宁区市监局通报:盒马一批次猕猴桃检出膨大剂超标
  • 奥运“四朝元老”华天回国参赛,伤势未愈谨慎出战全国锦标赛
  • 著名词作家陈哲逝世,代表作《让世界充满爱》《同一首歌》等