当前位置: 首页 > news >正文

自定义线程池 3.1

自定义线程池 3.1

1. 简介

上次我们实现了自定义线程池的 3.0 版本,使用了策略模式,为线程池增加了无法执行任务时的拒绝策略,本文我们将对线程池进行如下的优化:

  • 让线程池中线程的创建使用线程工厂。
  • 写一个工具类用来创建不同类型的线程池。

2. 优化一

2.1 引入

之前测试的时候可以发现,线程池中创建的线程的名字一直是 thread-?,对于多线程程序调试很不友好,建议给每个线程一个有意义的名字。比如线程池若是用来执行斐波那契数列中的值,则可以给线程取名 fib-?

这样一来,当程序运行在服务器中,查看日志,发现有一个线程抛出一个异常,就可以 快速定位线程的定义位置。除此之外,通过 htop 可以查看线程运行情况,如果线程有一个有意义的名字,那就可以 快速定位到想观察的线程

接下来想一想怎么实现吧。如果只是给一个线程起一个名字,那么直接使用 public Thread(Runnable target, String name) 这个构造方法即可。但是如果想给一个线程池中的线程起名字,那么就得使用一些别的方法,你能想到吗?


揭晓答案:使用 ThreadFactory 类的 Thread newThread(Runnable r) 方法。让使用者传递一个线程工厂,然后在创建线程时使用即可。

2.2 线程工厂的其他用处

在 Java 中,对线程参数的设置比较少,除了设置名字外,还可以设置如下参数:

  • 设置线程是为 守护线程:在工具类或后台任务中使用守护线程,防止主线程退出后仍有非守护线程运行。
  • 设置线程的 优先级:为核心任务设置更高优先级,确保资源优先分配。

除了设置线程的基础参数,其实还有一个扩展点,就是 给线程写一个异常处理器。我们可以写一个如下的线程工厂(匿名内部类):

new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(() -> {try {r.run();} catch (Throwable e) {System.err.println(Thread.currentThread().getName() + "抛出异常:" + e);}}, "thread-pool-" + counter++);}
};

使用线程工厂还可以让使用者配置这些东西,而不仅仅只能设置线程的名称,如果你能想出线程工厂的其他作用,也可以在评论讨论。

2.3 实现上的变化

Worker 只有构造方法发生了变化,变化如下:

public Worker(Runnable initialTask, Set<Worker> threadPool) {this.initialTask = initialTask;this.actuallyRunningThread = new Thread(this);threadPool.add(this);
}
->
public Worker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {this.initialTask = initialTask;this.actuallyRunningThread = threadFactory.newThread(this);threadPool.add(this);
}

3. 优化二

3.0 版本的 ThreadPool3_1 的构造方法有 6 个参数,加上 3.1 版本新加的参数,一共有 7 个参数,显得十分臃肿。如果只是想简单使用一下线程池,或者在特定场景下使用特定参数的线程池,还得写全 7 个参数,很麻烦,所以我们可以写一个工具类 ThreadPoolUtil,它用来创建特定场景下专用的线程池。

首先我们想一想,都有什么特定的场景:

  • 执行 CPU 密集型任务
  • 执行 I/O 密集型任务
  • 执行 处理时间长 的任务。
  • 执行 处理时间短 的任务。
  • 负载稳定
  • 负载波动较大
  • 任务需要 串行执行

接下来,我们讨论这些场景适合的线程池:

  • 执行 CPU 密集型任务:适合线程数量固定的线程池,线程数量可以等于 CPU 核心数。
  • 执行 I/O 密集型任务:适合线程数量不固定的线程池。
  • 执行 处理时间长 的任务:适合线程数量固定的线程池,避免创建太多线程。
  • 执行 处理时间短 的任务:适合线程数量不固定的线程池,可以创建很多线程,因为这些线程短时间内就退出了。
  • 负载稳定:线程数量固定的线程池仿佛就是为这个场景设置的。
  • 负载波动较大:适合线程数量不固定的线程池。
  • 任务需要 串行执行:只能使用一个线程的线程池。

然后,我们将这些场景适合的线程池总结一下:

  • 线程数量固定的线程池。
  • 线程数量不固定的线程池。
  • 只有一个线程的线程池。

那么,ThreadPoolUtil 提供的方法就明确了:

3.1 newFixedThreadPool

newFixedThreadPool():创建一个全是核心线程的线程池。因为线程数量固定,直接让线程永不退出,只创建核心线程就行了。

  • 核心线程数量:由传入的固定线程数量决定。
  • 最大线程数量:由传入的固定线程数量决定。
  • keepAliveTimetimeUnit0TimeUnit.SECONDS
  • 任务队列:使用 LinkedBlockingQueue,因为它是 “无界” 的,不确定使用者到底能在任务队列中囤积多少任务。

3.2 newCachedThreadPool

newCachedThreadPool():创建一个全是临时线程的线程池。因为线程数量不固定,有时多、有时少,直接创建临时线程,让它们在空闲一段时间后退出即可。

  • 核心线程数量0,因为不用创建核心线程。
  • 最大线程数量Integer.MAX_VALUE,因为不确定使用者到底能提交多少任务。
  • keepAliveTimetimeUnit1TimeUnit.SECONDS,因为希望临时线程的空闲时间尽量短一点,只要空闲 1s,就直接退出。
  • 任务队列:使用 SynchronousQueue,因为它 不保存任务,每个插入操作都需要等待取出操作,反之亦然。一般不会使用任务队列,如果要使用,说明到极端场景了,让提交任务的线程等待有线程空闲。

3.3 newSingleThreadPool

newSingleThreadPool():创建只有一个线程的线程池。

  • 核心线程数量1
  • 最大线程数量1
  • keepAliveTimetimeUnit0TimeUnit.SECONDS
  • 任务队列:使用 LinkedBlockingQueue,因为它是 “无界” 的,不确定使用者到底能在任务队列中囤积多少任务。

3.4 多个构造方法

此外,为了方便线程池的创建,我认为有些参数可以调成默认的,核心线程数量、最大线程数量、keepAliveTimetimeUnit、任务队列 这 5 个参数是核心参数,大部分情况下不同线程池都会有不同的选择,使用者一般不关注 拒绝策略线程工厂 这 2 个参数,所以我们可以为这两个参数设置默认值。

说到默认值的设置,还是有点学问的,我们应该避免设置使用者无感的默认参数,因为这样运行失败后很难想到哪里出问题了。在线程池中,如果我们将拒绝策略默认设置成 直接丢弃,那就会出现使用者不知道为什么有一个任务无法执行的情况。但是,并不是说这个策略毫无意义,如果使用者 主动选择 直接丢弃 的拒绝策略,代表他知道可能会造成这样的情况。

这里我们可以把拒绝策略选择成 抛出异常 的拒绝策略,线程工厂直接给线程设置一个默认名字 thread-pool-? 即可。

当我们选择好默认参数后,最好把 默认参数写到构造方法的注释(JavaDoc)上

4. 实现 3.1 版本

4.1 ThreadPool3_1

public class ThreadPool3_1 {/*** 线程池中核心线程的最大数量*/private final int corePoolSize;/*** 线程池中线程的最大数量*/private final int maxPoolSize;/*** 临时线程阻塞的最长时间(单位:ns),超过这个时间还没有领取到任务就直接退出*/private final long keepAliveTime;/*** 任务队列*/private final BlockingQueue<Runnable> taskQueue;/*** 拒绝策略,用于在无法执行任务的时候拒绝任务*/private final RejectPolicy rejectPolicy;/*** 线程工厂*/private final ThreadFactory threadFactory;/*** 默认的线程工厂*/private static final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "thread-pool-" + counter++);}};/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param threadFactory 线程工厂*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, ThreadFactory threadFactory) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION, threadFactory);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, rejectPolicy, DEFAULT_THREAD_FACTORY);}/*** 构造一个线程池** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略* @param threadFactory 线程工厂*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy, ThreadFactory threadFactory) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = unit.toNanos(keepAliveTime);this.taskQueue = taskQueue;this.rejectPolicy = rejectPolicy;this.threadFactory = threadFactory;}/*** 存放线程的集合,使用 {@link Set} 是因为 {@link Set#remove(Object)} 性能更高*/private final Set<Worker> threadPool = new HashSet<>();/*** 线程池的管程* <p>* 用于保证 <strong>获取线程池大小</strong>、<strong>将线程放入线程池</strong>、<strong>从线程池中移除线程</strong> 的互斥性*/private final Object threadPoolMonitor = new Object();/*** <h3>核心线程执行的任务</h3>* {@link #getTask()} 方法会一直阻塞,直到有新任务*/public final class CoreWorker extends Worker {public CoreWorker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {super(initialTask, threadPool, threadFactory);}@Overrideprotected Runnable getTask() {try {return taskQueue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}}@Overrideprotected void onWorkerExit() {// 在目前的代码中,发现核心线程并不会退出,所以这个方法先不实现}}/*** <h3>临时线程执行的任务</h3>* {@link #getTask()} 方法会在阻塞一定时间后如果还没有任务,则会返回 {@code null}*/public final class TempWorker extends Worker {public TempWorker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {super(initialTask, threadPool, threadFactory);}@Overrideprotected Runnable getTask() {try {return taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}}@Overrideprotected void onWorkerExit() {removeWorkerFromThreadPool(this);}}/*** 提交任务** @param task 待执行的任务*/public void submit(Runnable task) {// 如果 线程数量 小于 最大核心线程数量,则新建一个 核心线程 执行任务,然后直接返回synchronized (threadPoolMonitor) {if (threadPool.size() < corePoolSize) {CoreWorker coreWorker = new CoreWorker(task, threadPool, threadFactory);coreWorker.start();return;}}// 如果能够放到任务队列中,则直接返回if (taskQueue.offer(task)) {return;}// 如果 线程数量 小于 最大线程数量,则新建一个 临时线程 执行任务synchronized (threadPoolMonitor) {if (threadPool.size() < maxPoolSize) {TempWorker tempWorker = new TempWorker(task, threadPool, threadFactory);tempWorker.start();}}// 线程数量到达最大线程数量,任务队列已满,执行拒绝策略rejectPolicy.reject(this, task);}/*** 获取当前线程池中的线程数量** @return 当前线程池中的线程数量*/public int getCurrPoolSize() {synchronized (threadPoolMonitor) {return threadPool.size();}}/*** 丢弃任务队列 {@link #taskQueue} 中的最旧的任务(队头任务)** @return 任务队列中的最旧的任务(队头任务)*/public Runnable discardOldestTask() {return taskQueue.poll();}/*** 从 {@link #threadPool} 中移除指定的 {@link Worker} 对象** @param worker 待移除的 {@link Worker} 对象*/private void removeWorkerFromThreadPool(Worker worker) {synchronized (threadPoolMonitor) {threadPool.remove(worker);}}
}

4.2 ThreadPoolUtil

public class ThreadPoolUtil {/*** 创建一个全是核心线程的线程池** @param poolSize 核心线程的数量* @return 一个全是核心线程的线程池*/public static ThreadPool3_1 newFixedThreadPool(int poolSize) {return new ThreadPool3_1(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());}/*** 创建一个全是核心线程的线程池** @param poolSize 核心线程的数量* @param threadFactory 线程工厂* @return 一个全是核心线程的线程池*/public static ThreadPool3_1 newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {return new ThreadPool3_1(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);}/*** 创建一个全是临时线程的线程池** @return 一个全是临时线程的线程池*/public static ThreadPool3_1 newCachedThreadPool() {return new ThreadPool3_1(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>());}/*** 创建一个全是临时线程的线程池** @param threadFactory 线程工厂* @return 一个全是临时线程的线程池*/public static ThreadPool3_1 newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPool3_1(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);}/*** 创建只有一个线程的线程池** @return 只有一个线程的线程池*/public static ThreadPool3_1 newSingleThreadPool() {return new ThreadPool3_1(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());}/*** 创建只有一个线程的线程池** @param threadFactory 线程工厂* @return 只有一个线程的线程池*/public static ThreadPool3_1 newSingleThreadPool(ThreadFactory threadFactory) {return new ThreadPool3_1(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);}
}

5. 测试程序

public class ThreadPool3_1Test {/*** 测试线程池 3.1 版本的基本功能*/@Testpublic void test() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = new ThreadPool3_1(1, 2, 3, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), RejectPolicy.THROW_EXCEPTION);LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建全是核心线程的线程池 的功能*/@Testpublic void testNewFixedThreadPool() throws InterruptedException {final int taskSize = 5;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newFixedThreadPool(2);LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建全是临时线程的线程池 的功能*/@Testpublic void testNewCachedThreadPool() throws InterruptedException {final int taskSize = 5;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newCachedThreadPool();LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建只有一个线程的线程池 的功能*/@Testpublic void testNewSingleThreadPool() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newSingleThreadPool();LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}
}

6. 测试结果

6.1 test

新建的线程采用 thread-pool-? 的方式命名,这是默认线程工厂在起作用。

16:16:48 [    main] 提交任务前
16:16:48 [    main] 提交任务后
16:16:49 [thread-pool-0] 正在执行任务0
16:16:50 [thread-pool-0] 正在执行任务1
16:16:51 [thread-pool-0] 正在执行任务2
16:16:51 [    main] 任务执行完毕

6.2 testNewFixedThreadPool

线程只有固定的两个,thread-pool-0thread-pool-1

16:20:20 [    main] 提交任务前
16:20:20 [    main] 提交任务后
16:20:21 [thread-pool-1] 正在执行任务1
16:20:21 [thread-pool-0] 正在执行任务0
16:20:22 [thread-pool-0] 正在执行任务3
16:20:22 [thread-pool-1] 正在执行任务2
16:20:23 [thread-pool-0] 正在执行任务4
16:20:23 [    main] 任务执行完毕

6.3 testNewCachedThreadPool

线程数量不固定,一个任务新建一个线程。

16:21:05 [    main] 提交任务前
16:21:05 [    main] 提交任务后
16:21:06 [thread-pool-4] 正在执行任务4
16:21:06 [thread-pool-3] 正在执行任务3
16:21:06 [thread-pool-0] 正在执行任务0
16:21:06 [thread-pool-2] 正在执行任务2
16:21:06 [thread-pool-1] 正在执行任务1
16:21:06 [    main] 任务执行完毕

6.4 testNewSingleThreadPool

只有一个线程,串行执行所有任务。

16:21:16 [    main] 提交任务前
16:21:16 [    main] 提交任务后
16:21:17 [thread-pool-0] 正在执行任务0
16:21:18 [thread-pool-0] 正在执行任务1
16:21:19 [thread-pool-0] 正在执行任务2
16:21:19 [    main] 任务执行完毕

7. 思考

  • ThreadPool3_1 中,默认的线程工厂中有一个 count 字段,当多线程提交任务时,count 会不会有线程安全问题?如果有,应该怎么修改?如果没有,为什么?
  • 众所周知,资源有创建就有销毁,线程池也一样,需要一个停止线程运行的方法,你能独立设计并实现它吗?

8. 总结

这次我们实现了自定义线程池的 3.1 版本,提供了 线程工厂创建线程 和 工具类创建简单线程池 的功能,使线程池使用起来更加方便。不过得注意一点,在生产中一定不要使用工具类简单创建的线程池,最好通过压测得到线程池的各个参数。除此之外,本文还讲解了不同场景下的线程池选型,可以作为以后使用线程池的参考。

相关文章:

  • YOLOv8改进 | 有效涨点 | 使用TPAMI2025 Hyper-YOLO中的尺度融合方式HyperC2Net改进YOLOv8的Neck
  • 【大模型-写作】LLMxMapReduce-V2 自动修改大纲 生成高质量文章
  • Transformer 与 XGBoost 协同优化的时间序列建模
  • 端侧推理新标杆——MiniCPM 4本地部署教程:5%稀疏度实现128K长文本7倍加速,低成本训练开销匹敌Qwen3-8B
  • Sentinel 流量控制安装与使用
  • 机器人导航中的高程图 vs 高度筛选障碍物点云投影 —— 如何高效处理避障问题?
  • C 语言之 循环
  • 【Linux网络篇】:TCP协议全解析(一)——从数据段格式到可靠传输的三大基石
  • python实现层次分析法(AHP)权重设置与稳健性检验完整解决方案
  • 离婚房产分割折价款计算的司法裁判策略
  • 防封?避坑?青否数字人直播系统如何做到稳定直播?zhibo175
  • 请问黑盒测试和白盒测试有哪些方法?
  • map与set的模拟实现
  • 25.6.12学习总结
  • 深入解析 IPOIB 驱动:从 IP 报文传输到自定义协议族改造
  • Git 首次使用完整设置指南
  • 大模型辅助商业决策
  • Qt 环境搭建全流程详解:从下载到创建首个项目
  • LangGraph--带记忆和工具的聊天机器人
  • 快递鸟电商退换货技术全解析:构建智能化逆向物流管理体系
  • 东胜做网站/个人网页免费域名注册入口
  • 苏州建设网站的公司/微信朋友圈营销文案
  • 怎么给网站做域名重定向/seo技术外包公司
  • 制作公众号的软件/搜索引擎优化规则
  • 网站续费/网站建设公司地址在哪
  • 做招聘网站需要哪些手续/真正免费的网站建站