实现一个线程池管理器
为什么?
平时我们在开发中,创建了不少线程池,这些线程池都处于游离状态,不方便管理和监控,我们是无法知道目前系统中有哪些线程池、以及每个线程池当前的一个状态,负载情况等,所以我们可以开发一个线程池管理器来解决这个问题。
是什么
统管系统中所有线程池,负责所有线程池的创建、监控等操作。
代码
1.线程池管理类
包含线程池的创建,查询,扩容
/*** 线程池管理器*/
public class ThreadPoolManager {private static final Map<String, ThreadPoolTaskExecutor> threadPoolMap = new ConcurrentHashMap<String, ThreadPoolTaskExecutor>(16);private static final int corePoolSize = 10;private static final int maxPoolSize = Integer.MAX_VALUE;private static final int queueCapacity = Integer.MAX_VALUE;private static final int keepAliveSeconds = 60;/**** @param name 线程池名称*/public static ThreadPoolTaskExecutor newThreadPool(String name) {return newThreadPool(name, corePoolSize, maxPoolSize);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name 线程池名称* @param corePoolSize 核心线程数*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize) {return newThreadPool(name, corePoolSize, corePoolSize);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name 线程池名称* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, null, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name 线程池名称* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数* @param queueCapacity 队列大小*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, null, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name 线程池名称* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数* @param queueCapacity 队列大小* @param threadFactory 线程工厂*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity,ThreadFactory threadFactory) {return newThreadPool(name, corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds, threadFactory, null);}/*** 创建新的线程池,如果线程池已经创建,返回已经创建的线程池** @param name 线程池名称* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数* @param queueCapacity 队列大小* @param keepAliveSeconds 线程池存活时间(秒)* @param threadFactory 线程工厂* @param rejectedExecutionHandler 拒绝策略* @return ThreadPoolTaskExecutor*/public static ThreadPoolTaskExecutor newThreadPool(String name, int corePoolSize, int maxPoolSize, int queueCapacity, int keepAliveSeconds, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return threadPoolMap.computeIfAbsent(name, threadGroupName -> {ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor() {private boolean initialized = false;@Overrideprotected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<>(queueCapacity);} else {return new SynchronousQueue<>();}}@Overridepublic void setQueueCapacity(int queueCapacity) {if (this.initialized && this.getThreadPoolExecutor() != null &&this.getThreadPoolExecutor().getQueue() != null &&this.getThreadPoolExecutor().getQueue() instanceof ArrayBlockingQueue) {((LinkedBlockingQueue) this.getThreadPoolExecutor().getQueue()).setCapacity(queueCapacity);}super.setQueueCapacity(queueCapacity);}@Overridepublic void afterPropertiesSet() {if (initialized) {return;}super.afterPropertiesSet();this.initialized = true;}};threadPoolExecutor.setCorePoolSize(corePoolSize);threadPoolExecutor.setMaxPoolSize(maxPoolSize);threadPoolExecutor.setQueueCapacity(queueCapacity);threadPoolExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolExecutor.setThreadGroupName(name);threadPoolExecutor.setThreadNamePrefix(name + "-");if (threadFactory != null) {threadPoolExecutor.setThreadFactory(threadFactory);}if (rejectedExecutionHandler != null) {threadPoolExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);}threadPoolExecutor.afterPropertiesSet();return threadPoolExecutor;});}/*** 获取所有线程池信息**/public static List<ThreadPoolInfo> threadPoolInfoList() {return threadPoolMap.entrySet().stream().map(entry -> threadPoolInfo(entry.getKey(), entry.getValue())).collect(Collectors.toList());}/*** 动态变更线程池(如:扩缩容、扩缩队列大小)** @param threadPoolChange 变更线程池信息*/public static void changeThreadPool(ThreadPoolChange threadPoolChange) {ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolMap.get(threadPoolChange.getName());if (threadPoolTaskExecutor == null) {throw new IllegalArgumentException();}if (threadPoolChange.getCorePoolSize() > threadPoolChange.getMaxPoolSize()) {throw new IllegalArgumentException();}synchronized (ThreadPoolManager.class) {if (threadPoolChange.getMaxPoolSize() > threadPoolTaskExecutor.getCorePoolSize()) {threadPoolTaskExecutor.setMaxPoolSize(threadPoolChange.getMaxPoolSize());threadPoolTaskExecutor.setCorePoolSize(threadPoolChange.getCorePoolSize());threadPoolTaskExecutor.setQueueCapacity(threadPoolChange.getQueueCapacity());} else {threadPoolTaskExecutor.setCorePoolSize(threadPoolChange.getCorePoolSize());threadPoolTaskExecutor.setMaxPoolSize(threadPoolChange.getMaxPoolSize());threadPoolTaskExecutor.setQueueCapacity(threadPoolChange.getQueueCapacity());}}}/*** 获取所有线程池的信息*/private static ThreadPoolInfo threadPoolInfo(String name, ThreadPoolTaskExecutor threadPool) {ThreadPoolInfo threadPoolInfo = new ThreadPoolInfo();threadPoolInfo.setName(name);threadPoolInfo.setCorePoolSize(threadPool.getCorePoolSize());threadPoolInfo.setMaxPoolSize(threadPool.getMaxPoolSize());threadPoolInfo.setActiveCount(threadPool.getActiveCount());threadPoolInfo.setQueueCapacity(threadPool.getQueueCapacity());threadPoolInfo.setQueueSize(threadPool.getQueueSize());return threadPoolInfo;}}
2.线程池配置
将线程池加载到ioc,自定义线程工程,防止线程池内日志无法写入日志
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {//自定义线程工程private static final ThreadFactory threadFactory=(Runnable r) -> {Thread t = new Thread(r);t.setUncaughtExceptionHandler(GlobalUncaughtExceptionHandler.getInstance());return t;};/*** 项目共用线程池*/public static final String BIZ_EXECUTOR = "bizExecutor";/*** websocket通信线程池*/public static final String WS_EXECUTOR = "websocketExecutor";@Overridepublic Executor getAsyncExecutor() {return bizExecutor();}@Bean(BIZ_EXECUTOR)@Primarypublic ThreadPoolTaskExecutor bizExecutor() {return ThreadPoolManager.newThreadPool(BIZ_EXECUTOR, 10, 20, 1000,threadFactory);}@Bean(WS_EXECUTOR)@Primarypublic ThreadPoolTaskExecutor wsExecutor() {return ThreadPoolManager.newThreadPool(WS_EXECUTOR, 10, 20, 1000,threadFactory);}}
自定义线程异常处理器,大运异常线程
@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {private static final GlobalUncaughtExceptionHandler instance = new GlobalUncaughtExceptionHandler();private GlobalUncaughtExceptionHandler() {}@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("Exception in thread {} ", t.getName(), e);}public static GlobalUncaughtExceptionHandler getInstance() {return instance;}}
3.其它实体队列
重写并发包下 LinkedBlockingQueue 增加setCapacity方法扩容
线程池中会用到Java中的阻塞队列java.util.concurrent.BlockingQueue
,目前jdk中自带几个阻塞队列都不支持动态扩容,比如java.util.concurrent.LinkedBlockingQueue
,他里面的capacity是final的,不支持修改,为了是队列容量能够支持调整,我们创建了一个可扩容的阻塞队列ResizeLinkedBlockingQueue
,代码是从LinkedBlockingQueue
中拷贝过来的,然后添加了一个可以修改容量capacity的方法,如下,然后创建线程池的时候,我们使用自定义的这个阻塞队列便可以实现线程池的动态扩容。
public void setCapacity(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() > capacity) {throw new IllegalArgumentException();}this.capacity = capacity;} finally {putLock.unlock();}}
线程实体
@Data
public class ThreadPoolInfo {private String name;//核心线程数private int corePoolSize;//最大线程数private int maxPoolSize;//活动的线程数private int activeCount;//队列的容量private int queueCapacity;//队列中当前任务数量private int queueSize;
}
线程扩容实体
@Data
public class ThreadPoolChange {//线程池名称private String name;//核心线程数private int corePoolSize;//最大线程数private int maxPoolSize;//队列容量private int queueCapacity;}
3.模拟查询
@RestController
@RequestMapping("/threadPool")
public class ThreadPoolManagerController {/*** 获取所有的线程池信息** @return*/@GetMapping("/list")public List<ThreadPoolInfo> threadPoolInfoList() {return ThreadPoolManager.threadPoolInfoList();}/*** 线程池扩缩容** @return*/@PostMapping("/change")public Boolean threadPoolChange(@RequestBody ThreadPoolChange threadPoolChange) {ThreadPoolManager.changeThreadPool(threadPoolChange);return true;}/*** 模拟使用线程池* @param threadPoolChange* @return*/@Qualifier(ThreadPoolConfig.BIZ_EXECUTOR)@Autowiredprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@PostMapping("/send")public Boolean send() {for (int i = 0; i < 50; i++){threadPoolTaskExecutor.execute(() -> {ThreadUtil.sleep(3000);System.out.println("线程池执行任务-");});}return true;}}
扩容后:
正在使用