当前位置: 首页 > news >正文

Java 线程池ThreadPoolExecutor源码解读

目录

1,创建线程的方式

1.1 继承自Thread

1.2 通过FutureTask+Callable实现

1.3 使用线程池实现

2,ThreadExecutorPool概述

2.1 重要的常量

2.2 拒绝策略

3,源码分析

3.1 ThreadPoolExecutor#execute

3.2 ThreadPoolExecutor#addWorker

3.3 ThreadPoolExecutor#getTask

3.4 线程池的线程复用逻辑

4,总结


本文分成以下几个部分:

  1. 创建线程的方式
  2. ThreadPoolExecutor概述
  3. ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析

1,创建线程的方式

1.1 继承自Thread

写一个类继承自Thread,并且调用start方法即可开启线程。这个是老生常谈的实现方法了,继承Thread的时候需要实现run,如果直接调用run的话是无法开启线程的。调用start最后会调用到native的start()方法,然后如果是Linux的话,底层使用的是Linux的api pthread。

1.2 通过FutureTask+Callable实现

FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {@Overridepublic String call() {return "hello world";}
});
new Thread(futureTask).start();
String res = futureTask.get();
System.out.println(res);

然后将创建好的FutureTask的实例放到Thread中执行。Callable是有返回值的,我们可以通过get获取到。get是一个阻塞的,底层的实现我稍微看了一下是基于LockSupport#park实现的阻塞。

1.3 使用线程池实现

使用线程池主要类似于这样:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4, 10, TimeUnit.MINUTES,                  new LinkedBlockingDeque<>()); executor.execute(() -> {     System.out.println("hello world"); });

其实JDK也提供了一些默认的线程池创建方法,但是一般都不推荐使用,因为这些方法可能不符合我们常规的业务需求。所以一般都使用手动创建的方式实现。

2,ThreadExecutorPool概述

2.1 重要的常量

// 高3位是线程池状态 低29为是线程的最大线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位全是1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits// 111 正常接受任务
private static final int RUNNING    = -1 << COUNT_BITS;// 000 Executor#shutdown
// 不接收新的任务 阻塞队列中正在进行的任务也会正常处理
private static final int SHUTDOWN   =  0 << COUNT_BITS;// 001 Exectuor#shutdownNow
// 不接收新的任务,不去处理阻塞队列中的任务,正在进行的任务也不会处理
private static final int STOP       =  1 << COUNT_BITS;// 010 表示当前的线程池即将结束
private static final int TIDYING    =  2 << COUNT_BITS;// Exectuor#terminated
// 011 线程池结束
private static final int TERMINATED =  3 << COUNT_BITS;

ctl这个AtomicInteger是基于自旋锁+CAS操作实现的自旋锁我在这篇文章也聊过。Executor本身也有生命周期,根据数值大小排序的生命周期状态是:

Running < Shutdown < Stop < Tidying < Terminated

  • Running:当前的线程池中的线程正常运行,而且线程池接受新的Runnable
  • Shutdown:在调用了shutdown方法会走到这个状态,并且此时不接收新的Runnable,但是会将阻塞队列中的Runnable处理完成
  • Stop:在调用shutdownNow之后会走到这个状态,不接收新的Runnable,同时会暂停正在执行的线程
  • Tidying:是一个中间的过渡状态,可能做一个内容的清理工作等等。
  • Terminated:在调用terminated方法之后会到这个状态,线程池结束

        在Java线程池ThreadPoolExecutor的实现中,RUNNING = -1 << COUNT_BITS 是定义线程池运行状态的关键常量,其原理如下:

a,位运算设计

b‌,状态标识意义

  • COUNT_BITS 通常为29(32位整型的低29位用于记录线程数)
  • -1 的二进制补码表示是11111111 11111111 11111111 11111111
  • 左移29位后得到高3位111(十进制值-536870912),低29位全0
  • RUNNING 状态值最小(高3位111),表示线程池能接收新任务并处理队列任务
  • 其他状态(SHUTDOWN/STOP/TIDYING/TERMINATED)的高3位依次递减(000~010

2.2 拒绝策略

AbortPolicy

直接会抛出异常

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException(&quot;Task &quot; + r.toString() +&quot; rejected from &quot; +e.toString());}
}

CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

这块直接调用了Runnable实现的run方法。run方法会在Excecutor所在的线程中执行,所以如果是耗时操作也会出现问题。

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}

直接会放弃,什么都不会做

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}

会尝试获取ThreadPoolExecutor中的队列,然后将队列头,也就是最开始的一个出栈。  

3,源码分析

任务加入线程池是一个这样的过程:

首先会询问核心线程是否有没有分配到的,通常是和核心线程数进行比较。如果核心线程都满了,就会通过阻塞队列进行缓冲。如果阻塞队列都放慢了,就会看非核心线程是否到了最大的线程数,如果达到了最大线程,就会执行拒绝策略。

下面我们会通过看execute+addWorker的源码来还原这个过程。

3.1 ThreadPoolExecutor#execute

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果工作线程数小于核心线程线程数if (workerCountOf(c) < corePoolSize) {// 创建核心线程if (addWorker(command, true))return;// 考虑并发的情况的影响c = ctl.get();}// 如果线程池是Running状态,并且阻塞队列可以放入任务if (isRunning(c) && workQueue.offer(command)) {// 考虑并发的情况的影响int recheck = ctl.get();// 如果当前不是Running状态 执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 阻塞队列有任务,但是工作线程数为0,创建非核心线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 创建非核心线程执行任务 创建失败 执行拒绝策略else if (!addWorker(command, false))reject(command);
}

execute的代码中首先通过ctl进行位运算的分解获取当前的工作线程数,优先使用核心线程。然后下面会放入到阻塞队列中,如果阻塞队列中都放不下,再会看工作线程是否达到最大线程数。如果以上的执行都不能放入这个任务,就执行拒绝策略。

上面的代码中可以看到前面两步,也就是询问核心线程和询问阻塞队列是否放满,第三步看工作线程是否达到最大线程数是在addWorker中的

3.2 ThreadPoolExecutor#addWorker

part1 判断部分

addWorker的代码拆分成两部分来看,第一部分是进行一些条件判断:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果当前的线程池的状态是SHUTDOWNif (rs >= SHUTDOWN &&// 下面的判断等于// rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()// 1. 如果当前的状态已经是TIDYING 或者Terminated -&gt; 肯定不能添加Worker// 2. 如果当前的状态是Shutdown,但是传入一个非空的task,已经不接收新的任务了 -&gt; 肯定///    不能添加Worker// 3. 如果当亲的状态是Shutdown,而且传入的task不为空,如果阻塞队列是空,之前的任务// 全部处理完成了 -&gt; 肯定不能添加Worker! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 是否超过线程数(核心线程或者是最大线程)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 如果CAS操作完成成功if (compareAndIncrementWorkerCount(c))break retry;// 如果CAS操作失败,重新循环进行修改WorkerCountc = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;}}

特别是if中的第二个条件有点复杂,传入firstTask为空的情况是当阻塞队列中有任务,但是工作线程为0时,一般情况下firstTask肯定不为null。具体三种情况为什么需要返回false的原因我写在注释中了~

下面我们来看看执行的逻辑

part2 执行部分

boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的Workerw = new Worker(firstTask);// 从Worker中取出线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 如果现在的线程池是Running状态 或 当前的线程池虽然是Shutdown// 但是还是需要处理阻塞队列中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null))  {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 放入HashSetworkers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 确认加入workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

总的来说addWorker除了进行Worker的构建和添加到Workers之外,还进行了Worker中线程的启动,这块是真正执行我们定义的逻辑的地方。

我们再来看看Worker:

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// Worker也是一个Runnable,以当前的Worker作为Runnable构建新的线程this.thread = getThreadFactory().newThread(this);
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &amp;&amp;runStateAtLeast(ctl.get(), STOP))) &amp;&amp; !wt.isInterrupted())wt.interrupt();try {// 空方法// 省略异常处理的逻辑beforeExecute(wt, task);task.run();afterExecute(task, thrown);} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

因为Worker本身也是一个Runnable,所以当调用start的时候会执行Woker的run方法,Worker#run调用了runWorker。在runWorker中,我们重写的run会被执行。同时提供了两个钩子:beforeExecute和afterExecute,这两个方法本身是空实现,我们可以自行定义执行一些操作。

作为判断条件的代码我使用黄色底的字体标记出来了,具体的逻辑就是这样。

3.3 ThreadPoolExecutor#getTask

getTask其实就是从阻塞队列wokerQueue中获取task这样一件事情。

3.4 线程池的线程复用逻辑

这块直接上图,在addWorker中会执行Worker中的Thread#start,我们知道执行完成start之后就不能再次调用start。线程池与其说他是复用Thread,不如说他是不断地向Thread中填充新的Runnable,然后调用run,减少了创建Thread的开销。我们仔细看看addWorker的核心代码:

// ThreadPoolExecutor#runWorker
while (task != null || (task = getTask()) != null) {// ...task.run();// ...
}

不断地从workerQueue中取出新的Task,然后执行run。如果wokerQueue为空,getTask就会阻塞,等到有了新的Task再执行。

4,总结

  1. JDK中提供了一些可以直接启动线程池的方式,但是我们最好自己写一个ThreadPoolExecutor进行调整参数。ThreadPoolExecutor有以下几个核心参数:核心线程数、最大线程数、线程存活时间、阻塞队列
  2. ThreadPoolExecutor是有5中状态的,Running,Shutdown,Stop,Tidying,terminated。
  3. execute比较好理解,我们使用Runnable添加到ThreadPoolExecutor之后,首先会创建核心线程,核心线程其实就是一个标志位为true的Worker。Worker内部有一个Thread,会在addWorker方法中启动(Thread#start)。但是ThreadPoolExecutor其实并不会立刻【放过】Worker中的Thread。如果后续有runnable被放到阻塞队列之后,会从阻塞队列中读取。这点其实也是复用机制的关键。
http://www.dtcms.com/a/340381.html

相关文章:

  • 算法 ----- 链式
  • Day 30 模块和库导入
  • mapbox高阶,结合threejs(threebox)添加建筑glb模型,添加阴影效果,设置阴影颜色和透明度
  • 力扣 30 天 JavaScript 挑战 第36天 第8题笔记 深入了解reduce,this
  • CorrectNav——基于VLM构建带“自我纠正飞轮”的VLN:通过视觉输入和语言指令预测导航动作,且从动作和感知层面生成自我修正数据
  • 【Linux】系统部分——磁盘存储结构与文件系统
  • C++八股 —— 设计模式
  • wpf之ComboBox
  • DRF序列化器
  • DeepSeek V3.1 完整评测分析:2025年AI编程新标杆
  • ⭐CVPR2025 给3D高斯穿 “UV 衣” 框架[特殊字符]
  • 路由器NAT的类型测定
  • KubeBlocks AI:AI时代的云原生数据库运维探索
  • Redux 核心概念详解
  • Flutter开发 json_serializable json数据解析
  • 关联规则挖掘2:FP-growth算法(Frequent Pattern Growth,频繁模式增长)
  • rsync + inotify 数据实时同步
  • Android 入门到实战(三):ViewPager及ViewPager2多页面布局
  • 性能测试报告深度解析:从冰冷数据到火热洞察
  • android kernel代码 common-android13-5.15 下载 编译
  • Linux系统:C语言进程间通信信号(Signal)
  • RK3576赋能无人机巡检:多路视频+AI识别引领智能化变革
  • deque的原理与实现(了解即可)
  • 基于截止至 2025 年 6 月 4 日,在 App Store 上进行交易的设备数据统计,iOS/iPadOS 各版本在所有设备中所占比例详情
  • 比剪映更轻量!SolveigMM 视频无损剪切实战体验
  • shell变量进阶
  • 基于51单片机自动浇花1602液晶显示设计
  • Ubuntu-安装Epics Archiver Appliance教程
  • 玳瑁的嵌入式日记D21-08020(数据结构)
  • 服务器内存条不识别及服务器内存位置图