Java线程池的分析和使用
一、线程池的核心概念
Java 的线程池(Thread Pool)是一种管理和复用线程的机制,它可以有效地控制并发线程的数量,减少线程创建和销毁的开销,提高系统的性能和稳定性。
1.1 线程池的作用
- 复用线程:避免频繁创建和销毁线程,减少系统开销。
- 控制并发:限制并发线程的数量,防止资源耗尽。
- 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 管理任务:提供任务队列,支持任务的提交、执行和取消。
1.2 线程池的组成
- 核心线程(Core Threads):线程池中始终存活的线程,即使它们处于空闲状态。
- 最大线程(Maximum Threads):线程池中允许的最大线程数量。
- 任务队列(Work Queue):用于存放待执行的任务。
- 线程工厂(Thread Factory):用于创建新线程。
- 拒绝策略(Rejected Execution Handler):当任务无法被线程池接受时的处理策略。
二、线程池的实现
Java 线程池的核心实现类是 ThreadPoolExecutor,它提供了灵活的配置选项。以下是 ThreadPoolExecutor 的构造函数:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
2.1 核心参数
- corePoolSize:核心线程数,线程池中始终保持存活的线程数量。
- maximumPoolSize:最大线程数,线程池中允许的最大线程数量。
- keepAliveTime:空闲线程的存活时间,超过该时间的空闲线程会被销毁。
- unit:keepAliveTime 的时间单位。
- workQueue:任务队列,用于存放待执行的任务。
- threadFactory:线程工厂,用于创建新线程。
- handler:拒绝策略,当任务无法被线程池接受时的处理方式。
2.2 任务调度流程
- 当提交一个新任务时,线程池会优先创建核心线程来执行任务。
- 如果核心线程都在忙,且任务队列未满,任务会被放入队列中等待执行。
- 如果任务队列已满,且当前线程数小于 maximumPoolSize,线程池会创建新的非核心线程执行任务。
- 如果线程数已达到 maximumPoolSize,且任务队列已满,线程池会执行拒绝策略。
2.3 任务队列
常见的任务队列实现包括:
- LinkedBlockingQueue:无界队列,任务数量不受限制。
- ArrayBlockingQueue:有界队列,任务数量受队列容量限制。
- SynchronousQueue:不存储任务的队列,任务直接交给线程执行。
- PriorityBlockingQueue:具有优先级得无限阻塞队列,可以自定义规则根据任务的优先级顺序先后执行。
2.4 拒绝策略
常见的拒绝策略包括:
- AbortPolicy:直接抛出 RejectedExecutionException。
- CallerRunsPolicy:由提交任务的线程直接执行任务。
- DiscardPolicy:直接丢弃任务,不抛出异常。
- DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新提交新任务。
三、线程池的使用
3.1 创建线程池
可以通过 ThreadPoolExecutor 构造函数创建自定义线程池,也可以使用 Executors 工厂类创建预定义的线程池。
// 使用 ThreadPoolExecutor 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 使用 Executors 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5); // 固定大小的线程池
ExecutorService executor = Executors.newCachedThreadPool(); // 可缓存的线程池
ExecutorService executor = Executors.newSingleThreadExecutor(); // 单线程的线程池
ExecutorService executor = Executors.newScheduledThreadPool(5); // 支持定时任务的线程池
3.2 提交任务
可以通过 execute() 或 submit() 方法向线程池提交任务。
// 使用 execute() 提交任务
executor.execute(() -> {
// 任务逻辑
});
// 使用 submit() 提交任务
Future<?> future = executor.submit(() -> {
// 任务逻辑
return result;
});
3.3 关闭线程池
可以通过 shutdown() 或 shutdownNow() 方法关闭线程池。
// 使用 shutdown() 关闭线程池
executor.shutdown(); // 平缓关闭,等待所有任务执行完毕
// 使用 shutdownNow() 关闭线程池
executor.shutdownNow(); // 立即关闭,尝试中断所有任务
四、常见的线程池类型
Executors提供四种常用的线程池,分别为:
4.1 newCachedThreadPool
-
特点:线程数量动态调整,任务队列为空。
-
适用场景:适合执行大量短生命周期的任务。
// 通过Executors.newCachedThreadPool()创建一个可缓存线程池 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // newCachedThreadPool()方法的源码 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
4.2 newFixedThreadPool
- 特点:线程数量固定,任务队列无界。
- 适用场景:适合负载较重的服务器环境。
// 通过Executors.newFixedThreadPool()创建一个定长线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // newFixedThreadPool()方法的源码 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
4.3 newScheduledThreadPool
- 特点:支持定时任务和周期性任务。
- 适用场景:适合需要定时执行任务的场景。
// 通过Executors.newScheduledThreadPool()创建一个可定期或者延时执行任务的定长线程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // newScheduledThreadPool()方法的源码 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
4.4 newSingleThreadExecutor
- 特点:只有一个线程,任务队列无界。
- 适用场景:适合需要顺序执行任务的场景。
// 通过Executors.newSingleThreadExecutor()创建一个单线程化的线程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // newSingleThreadExecutor()方法的源码 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
五、线程池的注意事项
- 合理配置线程池参数:根据任务类型和系统资源设置核心线程数、最大线程数和任务队列容量。
- 避免无界队列:无界队列可能导致内存耗尽,建议使用有界队列。
- 处理拒绝策略:根据业务需求选择合适的拒绝策略。
- 关闭线程池:确保线程池在使用完毕后正确关闭,避免资源泄漏。
六、线程池的核心方法
-
任务提交方法
- execute(Runnable command)
- 作用:提交一个任务到线程池中执行。
- 特点:
- 任务是无返回值的 Runnable 对象。
- 如果线程池已关闭,会抛出 RejectedExecutionException。
- submit(Runnable task)
- 作用:提交一个任务到线程池中执行,并返回一个 Future 对象。
- 特点:
- 任务是无返回值的 Runnable 对象。
- 可以通过 Future 对象判断任务是否完成。
- execute(Runnable command)
-
线程池生命周期管理方法
- shutdown()
- 作用:平缓关闭线程池,等待所有已提交的任务执行完毕。
- 特点:
- 不再接受新任务。
- 已提交的任务会继续执行。
- shutdownNow()
- 作用:立即关闭线程池,尝试中断所有正在执行的任务。
- 特点:
- 不再接受新任务。
- 返回未执行的任务列表。
- isShutdown()
- 作用:判断线程池是否已关闭。
- 返回值:true 表示线程池已关闭,false 表示线程池未关闭。
- isTerminated()
- 作用:判断线程池是否已终止(所有任务执行完毕且线程池已关闭)。
- 返回值:true 表示线程池已终止,false 表示线程池未终止。
- awaitTermination(long timeout, TimeUnit unit)
- 作用:等待线程池终止,直到超时或线程池终止。
- 返回值:true 表示线程池已终止,false 表示超时。
- shutdown()
-
线程池状态查询方法
- getPoolSize()
- 作用:获取线程池中当前的线程数量。
- 返回值:线程池中的线程数量。
- getActiveCount()
- 作用:获取线程池中正在执行任务的线程数量。
- 返回值:正在执行任务的线程数量。
- getCompletedTaskCount()
- 作用:获取线程池中已完成的任务数量。
- 返回值:已完成的任务数量。
- getTaskCount()
- 作用:获取线程池中已提交的任务总数(包括已完成和未完成的任务)。
- 返回值:已提交的任务总数。
- getPoolSize()
-
线程池配置方法
- setCorePoolSize(int corePoolSize)
- 作用:设置线程池的核心线程数。
- 特点:
- 如果新值小于当前核心线程数,多余的线程会被终止。
- 如果新值大于当前核心线程数,新的线程会被创建。
- setMaximumPoolSize(int maximumPoolSize)
- 作用:设置线程池的最大线程数。
- 特点:
- 如果新值小于当前线程数,多余的线程会被终止。
- setKeepAliveTime(long time, TimeUnit unit)
- 作用:设置空闲线程的存活时间。
- 特点:
- 空闲线程在超过存活时间后会被终止。
- setRejectedExecutionHandler(RejectedExecutionHandler handler)
- 作用:设置线程池的拒绝策略。
- 特点:
- 当任务无法被线程池接受时,会调用拒绝策略处理任务。
- setCorePoolSize(int corePoolSize)
七、核心线程的空闲处理
默认情况下,核心线程即使处于空闲状态也不会被销毁。可以通过设置 allowCoreThreadTimeOut(true),使核心线程在空闲超过 keepAliveTime 后被销毁。
根据核心线程的源码分析,在 ThreadPoolExecutor 的 getTask 方法中,核心线程会根据 allowCoreThreadTimeOut 和 keepAliveTime 决定是否阻塞等待任务或超时退出。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断是否允许核心线程超时退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}