当前位置: 首页 > news >正文

线程池的一些了解

线程池的一些了解

  • 核心组件
  • 工作流程
  • 关键参数
    • 合理的创建标题,有助于目录的生成
  • 四种拒绝策略
  • 线程状态管理
  • Worker工作机制
  • 自定义线程池工具
  • ctl原子控制变量
  • 核心线程数控制逻辑
  • Worker类实现
  • 线程回收机制

核心组件

  1. 线程池管理器: 负责创建和管理线程池
  2. 工作线程集合: 存放活跃的线程
  3. 任务队列: 存放待处理的任务(BlockingQueue)
  4. 线程工厂: 创建新线程(ThreadFactory)
  5. 拒绝策略: 处理无法执行的任务

工作流程

  1. 任务提交: 任务通过execute()方法提交到线程池
  2. 检查当前运行线程数是否小于核心线程数(corePoolSize)
    • 如果小于,则创建新线程执行任务
    • 如果大于等于,则尝试加入任务队列
  3. 任务队列满时,检查线程数是否小于最大线程数(maximumPoolSize)
    • 如果小于,则创建新线程执行任务
    • 如果等于,则执行拒绝策略
  4. 线程回收: 非核心线程超过空闲时间(keepAliveTime)会被回收

关键参数

  1. corePoolSize: 核心线程数,即使空闲也不会被回收
  2. maximumPoolSize: 最大线程数,线程池允许的最大线程数量
  3. keepAliveTime: 非核心线程的空闲超时时间
  4. workQueue: 工作队列,存放等待执行的任务
  5. threadFactory: 线程工厂,用于创建新线程
  6. handler: 拒绝策略,当任务无法执行时的处理方式

合理的创建标题,有助于目录的生成

直接输入1次#,并按下space后,将生成1级标题。
输入2次#,并按下space后,将生成2级标题。
以此类推,我们支持6级标题。有助于使用TOC语法后生成一个完美的目录。

四种拒绝策略

  1. AbortPolicy: 直接抛出异常(默认策略)
  2. CallerRunsPolicy: 由调用线程处理该任务
  3. DiscardPolicy: 直接丢弃任务
  4. DiscardOldestPolicy: 丢弃队列中最老的任务

线程状态管理

线程池内部通过ctl变量维护两个重要状态:

  1. runState: 运行状态(RUNNING、SHUTDOWN、STOP等)
  2. workerCount: 有效线程数量
  3. stop:中断所有正在执行的任务

Worker工作机制

每个Worker线程会循环从任务队列中获取任务并执行,实现方式类似于:

while (true) {Runnable task = workQueue.take(); // 阻塞获取任务task.run(); // 执行任务
}

自定义线程池工具

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程池工具类*/
public class CustomThreadPoolExecutor {/*** 创建固定大小线程池* * @param corePoolSize 核心线程数* @param maximumPoolSize 最大线程数* @param keepAliveTime 空闲线程存活时间* @param unit 时间单位* @param queueCapacity 任务队列容量* @param threadNamePrefix 线程名称前缀* @param allowCoreThreadTimeOut 是否允许核心线程超时* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createThreadPool(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int queueCapacity,String threadNamePrefix,boolean allowCoreThreadTimeOut) {// 创建有界阻塞队列BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueCapacity);// 创建自定义线程工厂ThreadFactory threadFactory = new CustomThreadFactory(threadNamePrefix);// 创建拒绝策略RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);// 设置核心线程是否允许超时executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);return executor;}/*** 创建IO密集型线程池* * @param threadNamePrefix 线程名称前缀* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createIOThreadPool(String threadNamePrefix) {int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;int maximumPoolSize = corePoolSize * 2;long keepAliveTime = 60L;int queueCapacity = 1000;return createThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,queueCapacity,threadNamePrefix,true);}/*** 创建CPU密集型线程池* * @param threadNamePrefix 线程名称前缀* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createCPUThreadPool(String threadNamePrefix) {int corePoolSize = Runtime.getRuntime().availableProcessors();int maximumPoolSize = corePoolSize;long keepAliveTime = 60L;int queueCapacity = 200;return createThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,queueCapacity,threadNamePrefix,false);}/*** 自定义线程工厂*/static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;CustomThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());t.setDaemon(false); // 设置为非守护线程t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级return t;}}/*** 获取线程池状态信息* * @param executor 线程池实例* @return 线程池状态信息*/public static String getThreadPoolStatus(ThreadPoolExecutor executor) {return String.format("核心线程数: %d, 活跃线程数: %d, 最大线程数: %d, " +"任务队列大小: %d, 已完成任务数: %d, 总任务数: %d",executor.getCorePoolSize(),executor.getActiveCount(),executor.getMaximumPoolSize(),executor.getQueue().size(),executor.getCompletedTaskCount(),executor.getTaskCount());}
}

ctl原子控制变量

// 存储工作线程的集合
private final HashSet<Worker> workers = new HashSet<>();
// ctl同时维护运行状态和工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 获取工作线程数
private int workerCountOf(int c) { return c & CAPACITY; 
}

核心线程数控制逻辑

// execute方法中的核心线程控制
public void execute(Runnable command) {int c = ctl.get();// 如果当前工作线程数小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 直接创建新线程执行任务if (addWorker(command, true))return;c = ctl.get();}// 其他逻辑...
}// addWorker方法中区分核心线程和非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// core参数决定是否受corePoolSize限制int wc = workerCountOf(c);if (core && wc >= corePoolSize) // 核心线程检查return false;if (!core && wc >= maximumPoolSize) // 非核心线程检查return false;// 创建Worker逻辑...
}

Worker类实现

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread;Runnable firstTask;Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 执行任务循环}
}

线程回收机制

// allowCoreThreadTimeOut控制核心线程是否可超时
public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}
}// getTask方法中的超时控制
private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();// 根据allowCoreThreadTimeOut决定是否超时回收boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (timed) {// 使用poll超时获取任务Runnable r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);if (r != null) return r;timedOut = true;} else {// 阻塞等待任务Runnable r = workQueue.take();if (r != null) return r;}}
}

ps:HashSet用于存储线程,AtomicInteger ctl用于记录当前工作线程数,而corePoolSize和maximumPoolSize作为配置参数使用普通int类型存储。

http://www.dtcms.com/a/481946.html

相关文章:

  • R语言利用Export包导出pptx格式的文件有错误的原因
  • 金坛建设网站dw个人主页制作模板
  • AsmDude2 ASM汇编语言插件安装方式 基于Visual Studio 2022 直接插件安装无法使用的破解之法
  • 双目三维重建-2双目系统标定
  • 深入理解 PostgreSQL 数据库的 MVCC:原理、优势与实践
  • 基于python智能家居环境质量分析系统的设计与实现
  • 免费公司网站建设烟台网站制作开发
  • 射频前端MMIC:5G时代的技术引擎与市场机遇
  • 25G SFP28 光模块:中高速场景的高适配之选
  • 计算机毕设项目推荐:基于SpringBoot+Vue的非物质文化遗产再创新系统
  • 梦丘操作系统(MOS)
  • 9-机器学习与大模型开发数学教程-第1章 1-1 课程介绍与数学在机器学习中的作用
  • 成品网站管系统戴尔网站建设的特点
  • 【机器学习01】监督学习、无监督学习、线性回归、代价函数
  • 互联网大厂Java面试:缓存技术与监控运维的深度探讨
  • 用dw设计网站模板下载地址安徽工程建设官方网站
  • 【Linux】五种IO模型 + 非阻塞IO
  • threejs(四)层级模型
  • 高级系统架构师笔记——数据库设计基础知识(2)关系数据库基本概念
  • SAP MM采购申请创建接口分享
  • for循环语句练习题
  • [Agent开发平台] 后端的后端 | MySQL | Redis | RQ | idgen | ObjectStorage
  • AI(学习笔记第十二课) 使用langsmith的agents
  • 怎么制作网站教程wordpress用什么建
  • 多态:(附高频面试题)虚函数重写覆盖,基类析构重写,重载重写隐藏对比,多态原理,虚表探究一文大全
  • 《从系统调用到驱动回调:read() 如何映射到 chrdev_read()》
  • 【杂记】AI智能体产品开发中的多种语言混合编程
  • 财务开票的类型、异同点以及蓝字和红字的区别
  • 高阶数据结构-并查集
  • 从零开始的C++学习生活 8:list的入门使用