当前位置: 首页 > news >正文

Java并发编程:从源码分析ThreadPoolExecutor的三大核心机制

Java并发编程:从源码分析ThreadPoolExecutor 的三大核心机制

线程池的重要性不言而喻,线程池中有很多我们可以学习的地方,他的内部逻辑是如何的呢,对极端情况如何兜底,如何控制线程安全,如何严谨的控制线程池状态,线程状态和状态变量的一致性。

Java 的 ThreadPoolExecutor 是并发编程的基石。它的设计精妙而复杂,尤其是在状态管理和任务调度上。本文将通过拆解其源码中最关键的三个部分:ctl 状态控制execute() 任务提交流程、以及 Worker 机制,帮助读者彻底掌握线程池的运行原理。

在这里设计非常巧妙,对很多临界场景进行兜底设计都非常值得我们学习。

核心机制一:一石二鸟的 ctl 状态控制变量

ctl巧妙之处

ThreadPoolExecutor 的源码中最巧妙的设计之一,就是使用一个私有的 ctl (Control) 变量,这是一个 32 位整数,它同时承担了两个职责:

位数作用存储信息
高 3 位状态位存储 Run State (运行状态)
低 29 位计数位存储 Worker Count (工作线程数)

这种设计允许线程池通过一次 CAS(Compare-and-Swap)原子操作,同时更新状态和线程数,极大地提高了并发效率。

我们可以看下源码,针对ctl变量做位运算,计算出线程池状态,worker数量

关键常量:CAPACITY

private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;

代码里Integer.Size=32

在分析之前,我们需要理解 CAPACITY 这个常量:它代表了线程池能容纳的最大线程数。

  • 它的二进制表示是:高 3 位为 0低 29 位全部为 1
  • 它的作用是作为 “工作线程数” 的掩码。
  1. runStateOf(int c):提取运行状态
private static int runStateOf(int c) {return c & ~CAPACITY;
}
  • c 完整的 ctl 变量(包含状态和数量)。

  • ~CAPACITY 这是 CAPACITY按位取反。它的 高 3 位为 1低 29 位为 0

  • 逻辑(按位与 &):

    • c状态高 3 位1 进行 & 运算,状态值得以 保留

    • c数量低 29 位0 进行 & 运算,数量值被全部 清零

  • 目的: 该函数通过清除工作线程数的信息,只保留了线程池的 Run State,通过ctl得到线程池状态。

  1. workerCountOf(int c):提取工作线程数
private static int workerCountOf(int c) {return c & CAPACITY;
}
  • c 完整的 ctl 变量。

  • CAPACITY 数量掩码。它的高 3 位为 0,低 29 位为 1。

  • 逻辑(按位与 &):

    • c数量低 29 位1 进行 & 运算,数量值得以 保留
    • c状态高 3 位0 进行 & 运算,状态值被 清零
  • 目的: 该函数通过清除运行状态的信息,只保留了当前的 Worker Count,通过ctl可以得到线程数。

  1. ctlOf(int rs, int wc):合成 ctl 变量

private static int ctlOf(int rs, int wc) {return rs | wc;
}
  • rs (Run State): 状态值。在定义时,状态常量已经被设计为占据 高 3 位
  • wc (Worker Count): 数量值,自然占据 低 29 位
  • 逻辑(按位或 |): 因为状态和数量占据了不同的位域,它们在重叠区域(高 3 位的数量位和低 29 位的状态位)上都为 0。因此,使用按位或操作符 | 可以安全地将状态和数量拼接在一起,形成一个完整的 ctl 变量。
  • 目的: 创建一个完整的 ctl 整数,用于一次原子性的 CAS 更新操作。

通过这三个方法,ThreadPoolExecutor 就能在一个变量中高效地管理和操作线程池的状态与数量。

下面是线程池的状态,这些状态是数字,同时有一定的顺序性,不如running状态比其他状态的值都小,通过数字比较可以很方便的状态判断(线程状态处于某个阶段)

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

核心机制二:execute() 的三步决策流程

下面是ThreadPoolExecutor中的源码,非常的清晰。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}

当一个任务通过 execute(Runnable command) 提交时,线程池会按照严格的 三步决策流程 来处理,这个流程决定了任务是创建线程进入队列还是被拒绝

我们一次拆解

1.第一步:问核心线程

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();
}

线程池首先检查当前工作线程数是否小于 corePoolSize

  • 成立: 即使队列是空的,线程池也会立即创建一个新的核心线程addWorker()来执行任务。
  • 目的: 这是线程池的 **快速响应(Eager Execution)**策略,旨在确保任务在低负载时能够立即执行,并维持线程池的最小并行能力。

2.第二步:问任务队列(排队等待)

如果核心线程已满或者addWorker()失败,线程池会检查状态并尝试将任务放入队列。

if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);
}
  • 成立: 任务成功进入队列,等待被核心线程取出执行。

  • 二次检查的必要性: 任务成功入队后,execute() 必须进行二次检查:

    1. 检查状态: 防止在入队瞬间,线程池被外部线程关闭(竞态条件)。
    2. 检查线程数: 防止在入队瞬间,最后一个工作线程因超时而退出,导致队列有任务但没有线程执行(任务饥饿)。如果worker数量为0,会强制创建至少一个核心线程来“唤醒”线程池。

3.第三步:问最大线程数与拒绝策略

如果核心线程已满,且队列也已满,线程池将进入救火模式

else if (!addWorker(command, false))reject(command);
  • 结果一(创建非核心线程): 如果当前线程数小于 maximumPoolSize,则创建非核心线程来临时处理任务高峰。
  • 结果二(拒绝任务): 如果当前线程数已达到 maximumPoolSize 上限,说明线程池已无力承担,任务将被 RejectedExecutionHandler(拒绝策略)处理。

核心机制三: addWorker()

在addWorker里面又做了什么呢?Worker是实际执行任务的地方,那他怎么获取任务,又是怎么执行的呢?

ThreadPoolExecutor 源码中最复杂、最核心的并发控制代码之一。它不仅负责创建线程,还要在极高并发下确保线程池的状态和数量是准确的。

我们将其分解为三个主要阶段进行分析:原子性预检创建与加锁保护、以及启动与善后

1. 阶段一:原子性预检与计数递增 (The retry Loop)

这个外部 for(;;) 循环(标记为 retry)负责在创建 Worker 之前,通过 CAS (Compare-and-Swap) 操作,原子性地增加工作线程数。

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;
  • 目的: 快速失败检查。如果线程池状态已经是 SHUTDOWN 或更晚,且不满足以下特例条件,则立即返回 false
  • 特例条件 (! (rs == SHUTDOWN ...)): 唯一的例外是当线程池处于 SHUTDOWN 状态时,如果 firstTask 为空(意味着线程是在尝试执行队列任务),且 workQueue 不为空,则允许创建新的 Worker(用于排除最后一个线程死亡,导致任务饥饿的情况)。

第二次容量和 CAS 递增检查 (内层循环):

for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) // 【关键点】CAS 操作break retry; // CAS 成功,跳出所有循环c = ctl.get();  // CAS 失败,重新读取 ctlif (runStateOf(c) != rs)continue retry; // 状态发生变化,回到外层循环重新检查// else CAS 失败是因为 workerCount 变化,继续内层循环重试
}
  • 容量检查: 检查当前线程数是否已经达到全局最大 CAPACITY 或当前请求的限制(corePoolSizemaximumPoolSize)。如果超限,返回 false

  • compareAndIncrementWorkerCount(c) 这是关键的 CAS 操作,尝试原子性地将 ctl 变量中的 workerCount 加一

  • CAS 失败处理: 如果 CAS 失败,它会检查失败的原因:

    • 如果状态发生变化: 回到外层 retry 循环,重新执行第一次状态检查。
    • 如果仅是 workerCount 发生变化: 在内层循环重试,以最新的 c 值重新检查容量并尝试 CAS。

2. 阶段二:Worker 创建与锁保护检查

只有通过了阶段一的原子性检查,我们才会创建 Worker 对象。

try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 【关键点】加锁保护全局状态try {int rs = runStateOf(ctl.get()); // 再次检查状态if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// ... 检查线程是否启动 ...workers.add(w); // 将 Worker 加入全局 Set// ... 更新 largestPoolSize ...workerAdded = true;}} finally {mainLock.unlock();}// ... 启动线程逻辑 ...}
} // ... finally 处理 ...
  • w = new Worker(firstTask) 创建 Worker 对象,其中包含实际的 Thread 对象。
  • mainLock.lock() 线程池使用 mainLock(一个 ReentrantLock)来保护共享的 workers 集合和 largestPoolSize 等全局变量。在操作这些全局数据结构之前,必须加锁。
  • 锁内再次检查: 在获取锁后,代码会再次检查线程池状态。这是为了防止在 CAS 成功递增计数后,但在获取 mainLock 之前,线程池被调用 shutdownNow(),导致状态突变。

3. 阶段三:线程启动与善后处理

    // ... 锁内逻辑之后 ...if (workerAdded) {t.start();workerStarted = true;}
} finally {if (! workerStarted)addWorkerFailed(w); // 如果启动失败,执行清理
}
return workerStarted;
  • t.start() 如果 Worker 被成功创建并加入 workers 集合,则启动底层线程,开始执行 Worker.run()(进而执行 runWorker())。
  • addWorkerFailed(w) 如果线程启动失败或在任何一步发生异常,finally 块会调用 addWorkerFailed(w)回滚之前 CAS 递增的 workerCount,确保计数准确。

4.Worker.run()

真正执行任务的是 ThreadPoolExecutor 的内部类 Worker。它的 run() 方法实现了线程池的持续工作自我回收机制。
下面我们来分析run()执行的runWorker()

线程启动与初始状态 (Setup)

Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
  • task = w.firstTask 获取通过 addWorker() 带来的第一个任务(如果有)。
  • w.unlock()Worker 线程创建时,它内部的 AQS 锁是默认被锁住的。这里首次解锁是为了确保线程在进入主循环后,可以响应外部(例如 shutdownNow() 导致)的中断。
  • completedAbruptly = true 这是一个标志位,默认设置为 true,表示线程是因异常或错误而退出的。只有当线程正常退出主 while 循环(即 getTask() 返回 null)时,这个标志才会被设为 false
  1. 核心任务循环与获取任务
while (task != null || (task = getTask()) != null) {w.lock();// ... 中断处理和任务执行 ...
}
  • while (task != null || (task = getTask()) != null) 这是我们之前讨论的核心循环。它确保只要有任务 (task != null) 或能从队列中获取到新任务(getTask() != null),线程就会持续运行。
  • getTask() 实现了我们分析的 keepAliveTime 超时机制,是线程回收的关键。
  • w.lock() 在执行任务之前,Worker 再次加锁。这使得其他线程(如调用 shutdownNow() 的线程)在当前任务执行期间无法修改或中断 Worker 的内部状态。

这里getTask()便是获取任务的核心

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {//简化部分代码try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

从源码可以看到从workQueue阻塞队列中读取任务

  1. 复杂的中断管理逻辑

这是 runWorker() 最难理解的部分,用于处理线程池的关闭和中断竞态

// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();
  • 目的: 确保只有在线程池进入 STOP(或更晚)状态时,线程才应该被中断。

  • 核心逻辑:

    • runStateAtLeast(ctl.get(), STOP):检查线程池是否处于 STOP 或之后的更晚状态。
    • Thread.interrupted():这是处理 shutdownNow() 竞态的关键。如果线程池处于 SHUTDOWN 状态,并且该线程被中断(shutdownNow() 会中断所有线程),则 Thread.interrupted()清除中断标志,并检查状态是否已达到 STOP
    • 通过这一系列复杂的检查和重查,最终确保只有线程池确实决定要 立即终止 线程时(即 STOP 状态),才会调用 wt.interrupt()
  1. 任务执行与用户钩子
try {beforeExecute(wt, task); // 用户自定义钩子// ... 实际执行 task.run() ...afterExecute(task, thrown); // 用户自定义钩子
} finally {task = null;w.completedTasks++;w.unlock(); // 释放锁,允许下次循环或退出
}
  • beforeExecute() / afterExecute() 这是留给用户继承 ThreadPoolExecutor 后自定义逻辑的钩子方法。例如,可以在这里进行日志记录或资源初始化/清理。
  • task.run() 实际运行任务。它被包裹在三层 try-catch 中,用于捕获各种异常并记录下来。
  • w.completedTasks++ 统计该工作线程完成的任务总数。
  • w.unlock() 任务执行完毕后释放锁。

5.最终退出与善后处理

completedAbruptly = false;
} finally {processWorkerExit(w, completedAbruptly);
}
  • completedAbruptly = false; 如果线程正常退出 while 循环(即 getTask() 返回 null),则将此标志设置为 false
  • processWorkerExit() 这是线程的最终归宿。它根据 completedAbruptly 的值(正常退出或异常退出),执行我们之前讨论的计数递减tryTerminate() 逻辑。

总结

我们现在彻底掌握了 ThreadPoolExecutor 的三大核心机制,从任务提交到执行和退出都了如指掌!

http://www.dtcms.com/a/428124.html

相关文章:

  • DAC芯片---ES8156
  • wordpress正文底部版权声明sem优化公司
  • Java高频笔试、面试题
  • 青岛企业网站制作哪家好seo视频网页入口网站推广
  • pthread_detach:线程世界的“自清洁“革命
  • i.MX6ULL嵌入式Linux应用开发学习计划
  • 网站怎么做更新吗wordpress默认登录地址
  • NVR接入录像回放平台EasyCVR智慧农田可视化视频监控方案
  • 网页脚本 009:Next.js联合window.postMessage实现Dynamic Crawler
  • 装饰网站建设重要性网站项目设计书
  • 建立网站站点的过程中正确的是大数据营销公司
  • 扁平风格企业网站源码招商网站建设服务商
  • Coze源码分析-资源库-编辑插件-后端源码-详细流程
  • Coze源码分析-资源库-编辑插件-后端源码-核心技术与总结
  • 如何安装TraeCN(字节跳动的IDE)
  • 泉州网站的建设医疗器械网
  • 中国数学外国人做视频网站重庆高端设计公司
  • JAVAweb案例之后端的增删改查
  • 建设主管部门网站南宁网站建设报价
  • Union 和 Optional 区别
  • 太原网站建设鸣蝉公司中建官网
  • Redis List 类型全解析
  • 服务器做jsp网站教程视频城市介绍网站模板
  • 做网站一定需要虚拟主机吗自建网站定位
  • CompletableFuture原理与实践----商品信息查询接口优化---信息组装
  • 深圳求职网站哪个好网站对接微信接口
  • Cause: java.sql.SQLException: 无效的列类型: 1111
  • IMU传感器价格与高精度组合惯导市场现状分析
  • (28)ASP.NET Core8.0 SOLID原则
  • API 接口开发与实时实时采集构建京东商品数据通道方案