线程池的一些了解
线程池的一些了解
- 核心组件
- 工作流程
- 关键参数
- 合理的创建标题,有助于目录的生成
- 四种拒绝策略
- 线程状态管理
- Worker工作机制
- 自定义线程池工具
- ctl原子控制变量
- 核心线程数控制逻辑
- Worker类实现
- 线程回收机制
核心组件
- 线程池管理器: 负责创建和管理线程池
- 工作线程集合: 存放活跃的线程
- 任务队列: 存放待处理的任务(BlockingQueue)
- 线程工厂: 创建新线程(ThreadFactory)
- 拒绝策略: 处理无法执行的任务
工作流程
- 任务提交: 任务通过execute()方法提交到线程池
- 检查当前运行线程数是否小于核心线程数(corePoolSize)
- 如果小于,则创建新线程执行任务
- 如果大于等于,则尝试加入任务队列
- 任务队列满时,检查线程数是否小于最大线程数(maximumPoolSize)
- 如果小于,则创建新线程执行任务
- 如果等于,则执行拒绝策略
- 线程回收: 非核心线程超过空闲时间(keepAliveTime)会被回收
关键参数
- corePoolSize: 核心线程数,即使空闲也不会被回收
- maximumPoolSize: 最大线程数,线程池允许的最大线程数量
- keepAliveTime: 非核心线程的空闲超时时间
- workQueue: 工作队列,存放等待执行的任务
- threadFactory: 线程工厂,用于创建新线程
- handler: 拒绝策略,当任务无法执行时的处理方式
合理的创建标题,有助于目录的生成
直接输入1次#,并按下space后,将生成1级标题。
输入2次#,并按下space后,将生成2级标题。
以此类推,我们支持6级标题。有助于使用TOC
语法后生成一个完美的目录。
四种拒绝策略
- AbortPolicy: 直接抛出异常(默认策略)
- CallerRunsPolicy: 由调用线程处理该任务
- DiscardPolicy: 直接丢弃任务
- DiscardOldestPolicy: 丢弃队列中最老的任务
线程状态管理
线程池内部通过ctl变量维护两个重要状态:
- runState: 运行状态(RUNNING、SHUTDOWN、STOP等)
- workerCount: 有效线程数量
- stop:中断所有正在执行的任务
Worker工作机制
每个Worker线程会循环从任务队列中获取任务并执行,实现方式类似于:
while (true) {Runnable task = workQueue.take(); // 阻塞获取任务task.run(); // 执行任务
}
自定义线程池工具
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程池工具类*/
public class CustomThreadPoolExecutor {/*** 创建固定大小线程池* * @param corePoolSize 核心线程数* @param maximumPoolSize 最大线程数* @param keepAliveTime 空闲线程存活时间* @param unit 时间单位* @param queueCapacity 任务队列容量* @param threadNamePrefix 线程名称前缀* @param allowCoreThreadTimeOut 是否允许核心线程超时* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createThreadPool(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int queueCapacity,String threadNamePrefix,boolean allowCoreThreadTimeOut) {// 创建有界阻塞队列BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueCapacity);// 创建自定义线程工厂ThreadFactory threadFactory = new CustomThreadFactory(threadNamePrefix);// 创建拒绝策略RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);// 设置核心线程是否允许超时executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);return executor;}/*** 创建IO密集型线程池* * @param threadNamePrefix 线程名称前缀* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createIOThreadPool(String threadNamePrefix) {int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;int maximumPoolSize = corePoolSize * 2;long keepAliveTime = 60L;int queueCapacity = 1000;return createThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,queueCapacity,threadNamePrefix,true);}/*** 创建CPU密集型线程池* * @param threadNamePrefix 线程名称前缀* @return ThreadPoolExecutor实例*/public static ThreadPoolExecutor createCPUThreadPool(String threadNamePrefix) {int corePoolSize = Runtime.getRuntime().availableProcessors();int maximumPoolSize = corePoolSize;long keepAliveTime = 60L;int queueCapacity = 200;return createThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,queueCapacity,threadNamePrefix,false);}/*** 自定义线程工厂*/static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;CustomThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());t.setDaemon(false); // 设置为非守护线程t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级return t;}}/*** 获取线程池状态信息* * @param executor 线程池实例* @return 线程池状态信息*/public static String getThreadPoolStatus(ThreadPoolExecutor executor) {return String.format("核心线程数: %d, 活跃线程数: %d, 最大线程数: %d, " +"任务队列大小: %d, 已完成任务数: %d, 总任务数: %d",executor.getCorePoolSize(),executor.getActiveCount(),executor.getMaximumPoolSize(),executor.getQueue().size(),executor.getCompletedTaskCount(),executor.getTaskCount());}
}
ctl原子控制变量
// 存储工作线程的集合
private final HashSet<Worker> workers = new HashSet<>();
// ctl同时维护运行状态和工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 获取工作线程数
private int workerCountOf(int c) { return c & CAPACITY;
}
核心线程数控制逻辑
// execute方法中的核心线程控制
public void execute(Runnable command) {int c = ctl.get();// 如果当前工作线程数小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 直接创建新线程执行任务if (addWorker(command, true))return;c = ctl.get();}// 其他逻辑...
}// addWorker方法中区分核心线程和非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// core参数决定是否受corePoolSize限制int wc = workerCountOf(c);if (core && wc >= corePoolSize) // 核心线程检查return false;if (!core && wc >= maximumPoolSize) // 非核心线程检查return false;// 创建Worker逻辑...
}
Worker类实现
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread;Runnable firstTask;Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 执行任务循环}
}
线程回收机制
// allowCoreThreadTimeOut控制核心线程是否可超时
public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}
}// getTask方法中的超时控制
private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();// 根据allowCoreThreadTimeOut决定是否超时回收boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (timed) {// 使用poll超时获取任务Runnable r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);if (r != null) return r;timedOut = true;} else {// 阻塞等待任务Runnable r = workQueue.take();if (r != null) return r;}}
}
ps:HashSet用于存储线程,AtomicInteger ctl用于记录当前工作线程数,而corePoolSize和maximumPoolSize作为配置参数使用普通int类型存储。