当前位置: 首页 > news >正文

实现一个线程池管理器

为什么?

平时我们在开发中,创建了不少线程池,这些线程池都处于游离状态,不方便管理和监控,我们是无法知道目前系统中有哪些线程池、以及每个线程池当前的一个状态,负载情况等,所以我们可以开发一个线程池管理器来解决这个问题。

是什么

统管系统中所有线程池,负责所有线程池的创建、监控等操作。

代码

1.线程池管理类

包含线程池的创建,查询,扩容

/*** 线程池管理器*/
public class ThreadPoolManager {private static final Map<String, ThreadPoolTaskExecutor> threadPoolMap = new ConcurrentHashMap<String, ThreadPoolTaskExecutor>(16);private static final int corePoolSize = 10;private static final int maxPoolSize = Integer.MAX_VALUE;private static final int queueCapacity = Integer.MAX_VALUE;private static final int keepAliveSeconds = 60;/**** @param name 线程池名称*/public static ThreadPoolTaskExecutor newThreadPool(String name) {return newThreadPool(name, corePoolSize, maxPoolSize);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name         线程池名称* @param corePoolSize 核心线程数*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize) {return newThreadPool(name, corePoolSize, corePoolSize);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name         线程池名称* @param corePoolSize 核心线程数* @param maxPoolSize  最大线程数*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, null, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name          线程池名称* @param corePoolSize  核心线程数* @param maxPoolSize   最大线程数* @param queueCapacity 队列大小*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, null, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name          线程池名称* @param corePoolSize  核心线程数* @param maxPoolSize   最大线程数* @param queueCapacity 队列大小* @param threadFactory            线程工厂*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity,ThreadFactory threadFactory) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, threadFactory, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name                     线程池名称* @param corePoolSize             核心线程数* @param maxPoolSize              最大线程数* @param queueCapacity            队列大小* @param keepAliveSeconds         线程池存活时间(秒)* @param threadFactory            线程工厂* @param rejectedExecutionHandler 拒绝策略* @return ThreadPoolTaskExecutor*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity, int keepAliveSeconds, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return threadPoolMap.computeIfAbsent(name, threadGroupName -> {ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor() {private boolean initialized = false;@Overrideprotected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<>(queueCapacity);} else {return new SynchronousQueue<>();}}@Overridepublic void setQueueCapacity(int queueCapacity) {if (this.initialized && this.getThreadPoolExecutor() != null &&this.getThreadPoolExecutor().getQueue() != null &&this.getThreadPoolExecutor().getQueue() instanceof ArrayBlockingQueue) {((LinkedBlockingQueue) this.getThreadPoolExecutor().getQueue()).setCapacity(queueCapacity);}super.setQueueCapacity(queueCapacity);}@Overridepublic void afterPropertiesSet() {if (initialized) {return;}super.afterPropertiesSet();this.initialized = true;}};threadPoolExecutor.setCorePoolSize(corePoolSize);threadPoolExecutor.setMaxPoolSize(maxPoolSize);threadPoolExecutor.setQueueCapacity(queueCapacity);threadPoolExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolExecutor.setThreadGroupName(name);threadPoolExecutor.setThreadNamePrefix(name + "-");if (threadFactory != null) {threadPoolExecutor.setThreadFactory(threadFactory);}if (rejectedExecutionHandler != null) {threadPoolExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);}threadPoolExecutor.afterPropertiesSet();return threadPoolExecutor;});}/*** 获取所有线程池信息**/public static List<ThreadPoolInfo> threadPoolInfoList() {return threadPoolMap.entrySet().stream().map(entry -> threadPoolInfo(entry.getKey(), entry.getValue())).collect(Collectors.toList());}/*** 动态变更线程池(如:扩缩容、扩缩队列大小)** @param threadPoolChange 变更线程池信息*/public static void changeThreadPool(ThreadPoolChange threadPoolChange) {ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolMap.get(threadPoolChange.getName());if (threadPoolTaskExecutor == null) {throw new IllegalArgumentException();}if (threadPoolChange.getCorePoolSize() > threadPoolChange.getMaxPoolSize()) {throw new IllegalArgumentException();}synchronized (ThreadPoolManager.class) {if (threadPoolChange.getMaxPoolSize() > threadPoolTaskExecutor.getCorePoolSize()) {threadPoolTaskExecutor.setMaxPoolSize(threadPoolChange.getMaxPoolSize());threadPoolTaskExecutor.setCorePoolSize(threadPoolChange.getCorePoolSize());threadPoolTaskExecutor.setQueueCapacity(threadPoolChange.getQueueCapacity());} else {threadPoolTaskExecutor.setCorePoolSize(threadPoolChange.getCorePoolSize());threadPoolTaskExecutor.setMaxPoolSize(threadPoolChange.getMaxPoolSize());threadPoolTaskExecutor.setQueueCapacity(threadPoolChange.getQueueCapacity());}}}/*** 获取所有线程池的信息*/private static ThreadPoolInfo threadPoolInfo(String name, ThreadPoolTaskExecutor threadPool) {ThreadPoolInfo threadPoolInfo = new ThreadPoolInfo();threadPoolInfo.setName(name);threadPoolInfo.setCorePoolSize(threadPool.getCorePoolSize());threadPoolInfo.setMaxPoolSize(threadPool.getMaxPoolSize());threadPoolInfo.setActiveCount(threadPool.getActiveCount());threadPoolInfo.setQueueCapacity(threadPool.getQueueCapacity());threadPoolInfo.setQueueSize(threadPool.getQueueSize());return threadPoolInfo;}}

2.线程池配置

将线程池加载到ioc,自定义线程工程,防止线程池内日志无法写入日志

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {//自定义线程工程private static final ThreadFactory threadFactory=(Runnable r) -> {Thread t = new Thread(r);t.setUncaughtExceptionHandler(GlobalUncaughtExceptionHandler.getInstance());return t;};/*** 项目共用线程池*/public static final String BIZ_EXECUTOR = "bizExecutor";/*** websocket通信线程池*/public static final String WS_EXECUTOR = "websocketExecutor";@Overridepublic Executor getAsyncExecutor() {return bizExecutor();}@Bean(BIZ_EXECUTOR)@Primarypublic ThreadPoolTaskExecutor bizExecutor() {return ThreadPoolManager.newThreadPool(BIZ_EXECUTOR, 10, 20, 1000,threadFactory);}@Bean(WS_EXECUTOR)@Primarypublic ThreadPoolTaskExecutor wsExecutor() {return ThreadPoolManager.newThreadPool(WS_EXECUTOR, 10, 20, 1000,threadFactory);}}

自定义线程异常处理器,大运异常线程

@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {private static final GlobalUncaughtExceptionHandler instance = new GlobalUncaughtExceptionHandler();private GlobalUncaughtExceptionHandler() {}@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("Exception in thread {} ", t.getName(), e);}public static GlobalUncaughtExceptionHandler getInstance() {return instance;}}

3.其它实体队列

重写并发包下 LinkedBlockingQueue 增加setCapacity方法扩容

线程池中会用到Java中的阻塞队列java.util.concurrent.BlockingQueue,目前jdk中自带几个阻塞队列都不支持动态扩容,比如java.util.concurrent.LinkedBlockingQueue,他里面的capacity是final的,不支持修改,为了是队列容量能够支持调整,我们创建了一个可扩容的阻塞队列ResizeLinkedBlockingQueue,代码是从LinkedBlockingQueue中拷贝过来的,然后添加了一个可以修改容量capacity的方法,如下,然后创建线程池的时候,我们使用自定义的这个阻塞队列便可以实现线程池的动态扩容。


public void setCapacity(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() > capacity) {throw new IllegalArgumentException();}this.capacity = capacity;} finally {putLock.unlock();}}

线程实体

@Data
public class ThreadPoolInfo {private String name;//核心线程数private int corePoolSize;//最大线程数private int maxPoolSize;//活动的线程数private int activeCount;//队列的容量private int queueCapacity;//队列中当前任务数量private int queueSize;
}

线程扩容实体

@Data
public class ThreadPoolChange {//线程池名称private String name;//核心线程数private int corePoolSize;//最大线程数private int maxPoolSize;//队列容量private int queueCapacity;}

3.模拟查询

@RestController
@RequestMapping("/threadPool")
public class ThreadPoolManagerController {/*** 获取所有的线程池信息** @return*/@GetMapping("/list")public List<ThreadPoolInfo> threadPoolInfoList() {return ThreadPoolManager.threadPoolInfoList();}/*** 线程池扩缩容** @return*/@PostMapping("/change")public Boolean threadPoolChange(@RequestBody ThreadPoolChange threadPoolChange) {ThreadPoolManager.changeThreadPool(threadPoolChange);return true;}/*** 模拟使用线程池* @param threadPoolChange* @return*/@Qualifier(ThreadPoolConfig.BIZ_EXECUTOR)@Autowiredprivate  ThreadPoolTaskExecutor threadPoolTaskExecutor;@PostMapping("/send")public Boolean send() {for (int i = 0; i < 50; i++){threadPoolTaskExecutor.execute(() -> {ThreadUtil.sleep(3000);System.out.println("线程池执行任务-");});}return true;}}

扩容后:

正在使用

http://www.dtcms.com/a/361806.html

相关文章:

  • 数字后端tap cell:新老工艺tap cell区别
  • 人工智能视频画质增强和修复软件Topaz Video AI v7.1.1最新汉化,自带星光模型
  • 网络编程5-数据库、sqlite3数据库
  • 多级渐远纹理(Mipmap):原理、生成、采样与 OpenGL 实践
  • 2025 金融行业证书怎么选?从能力适配到职业方向的理性梳理
  • 7-ATSAM3X8-DAC输出
  • 网络与信息安全有哪些岗位:(13)安全服务工程师 / 顾问
  • 机器学习——损失函数
  • leetcode-python-1796字符串中第二大的数字
  • LeetCode82删除排序链表中的重复元素 II
  • wpf之样式
  • 嵌入式解谜日志之Linux操作系统—共享内存
  • Python备份实战专栏第5/6篇:Docker + Nginx 生产环境一键部署方案
  • 基于多种分词算法的词频统计的中文分词系统的设计与实现
  • 信创之-麒麟v10服务器安装tengine(已完成)
  • 推荐系统中Redis 数据存储:二进制序列化协议选型与优化
  • linux连接服务器sftp无法输入中文
  • 基于SpringBoot的教务管理系统(源码+文档)
  • C/C++ Linux系统编程:进程通讯完全指南,管道通讯、共享内存以及消息队列
  • 零基础从头教学Linux(Day 25)
  • vue3使用Eslint
  • B样条曲线在节点u处添加节点的操作方法
  • 心率监测系统优化方案全解析
  • 火语言 RPA:轻松生成界面应用,让开发触手可及​
  • 求欧拉回路:Hierholzer算法图解模拟
  • 计算机网络技术(四)完结
  • 算法题-02
  • 大型语言模型监督微调(SFT)
  • GitLab 18.3 正式发布,更新多项 DevOps、CI/CD 功能【二】
  • MiniCPM-V-4.5:重新定义边缘设备多模态AI的下一代视觉语言模型