深入浅出线程池ThreadPoolExecutor
介绍
ThreadPoolExecutor 是 Java 中非常重要的线程池实现类,它实现了 ExecutorService 接口,用于管理线程池中的线程并执行任务。ThreadPoolExecutor 提供了丰富的配置选项和控制方法,适用于高并发、高性能的多线程任务执行。
构造方法
在使用 ThreadPoolExecutor 时,可以通过构造方法来定义线程池的基本参数:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
- corePoolSize: 核心线程数。即使线程池中的线程空闲,线程池也会保持这部分核心线程。
- maximumPoolSize: 最大线程数。线程池中最多可以容纳的线程数。如果线程池的线程数超过这个值,新的任务会被放入 workQueue,当队列也满时,线程池会拒绝任务。
- keepAliveTime: 非核心线程空闲时的存活时间。即使线程数超过 corePoolSize,如果某些线程空闲超过这个时间,它们会被终止回收。
- unit: keepAliveTime 的时间单位(如 TimeUnit.SECONDS)。
- workQueue: 用于存放任务的队列。可以使用多种不同类型的队列,如 LinkedBlockingQueue、SynchronousQueue、ArrayBlockingQueue 等。
- threadFactory:当线程池需要创建新的线程时,ThreadFactory 会负责实例化。
- handler:当线程池无法处理某个任务时(即线程池满了且队列也满了),就会调用 RejectedExecutionHandler 来处理任务拒绝的行为。
常用方法
1. execute(Runnable command)
- 描述:提交一个任务给线程池执行。任务会根据线程池的当前配置执行,具体的执行过程由线程池控制。
- 使用场景:用于提交不需要返回值的任务。
executor.execute(() -> System.out.println("Task executed"));
2. submit(Callable<T> task)
- 描述:提交一个返回值的任务,返回一个
Future<T>
对象,可以通过它来获取任务的执行结果或取消任务。 - 返回值:
Future<T>
,用于获取任务执行的结果或状态。
Future<Integer> future = executor.submit(() -> 42);
Integer result = future.get(); // 获取任务执行结果
3. shutdown()
- 描述:优雅地关闭线程池。调用此方法后,线程池不会再接收新的任务,但会继续执行已经提交的任务,直到所有任务完成。
- 注意:该方法不会立即停止线程池中的线程,而是会等待任务完成后再退出。
executor.shutdown();
4. shutdownNow()
- 描述:立即关闭线程池。会尝试停止所有正在执行的任务,并返回尚未执行的任务列表。并且不会等待正在执行的任务完成。
- 注意:无法保证当前执行的任务会被中断,只会尝试通过
Thread.interrupt()
来中断任务。
List<Runnable> notExecutedTasks = executor.shutdownNow();
5. getCorePoolSize()
- 描述:获取线程池的核心线程数。
int coreSize = executor.getCorePoolSize();
6. setCorePoolSize(int corePoolSize)
- 描述:设置线程池的核心线程数。可以动态调整核心线程数。
executor.setCorePoolSize(10);
7. getMaximumPoolSize()
- 描述:获取线程池的最大线程数。
int maxSize = executor.getMaximumPoolSize();
8. setMaximumPoolSize(int maximumPoolSize)
- 描述:设置线程池的最大线程数。可以动态调整最大线程数。
executor.setMaximumPoolSize(20);
9. getKeepAliveTime(TimeUnit unit)
- 描述:获取线程池中非核心线程的存活时间。即空闲时存活的时间。
long keepAliveTime = executor.getKeepAliveTime(TimeUnit.SECONDS);
10. setKeepAliveTime(long time, TimeUnit unit)
- 描述:设置线程池中非核心线程的存活时间。可以动态调整这个时间。
executor.setKeepAliveTime(30, TimeUnit.SECONDS);
11. getQueue()
- 描述:获取任务队列。
workQueue
中存放的是等待执行的任务,可以通过这个方法访问队列内容。
BlockingQueue<Runnable> queue = executor.getQueue();
12. prestartCoreThread()
- 描述:尝试启动一个核心线程。如果核心线程池未满,调用此方法会启动一个新的核心线程来处理任务。
- 返回值:返回
true
表示启动成功,false
表示无法启动线程。
boolean started = executor.prestartCoreThread();
13. prestartAllCoreThreads()
- 描述:启动所有核心线程。如果核心线程池未满,调用此方法会启动所有核心线程来处理任务。
executor.prestartAllCoreThreads();
14. getActiveCount()
- 描述:获取当前正在执行任务的线程数。
int activeThreads = executor.getActiveCount();
15. getCompletedTaskCount()
- 描述:获取已完成任务的数量。可以用来监控线程池的执行进度。
long completedTasks = executor.getCompletedTaskCount();
16. getTaskCount()
- 描述:获取提交给线程池的任务总数(包括已完成、正在执行和待处理的任务)。
long totalTasks = executor.getTaskCount();
17. getPoolSize()
- 描述:获取线程池中当前活动的线程数和空闲线程数的总和。
int poolSize = executor.getPoolSize();
18. allowCoreThreadTimeOut(boolean value)
- 描述:设置是否允许核心线程超时。默认情况下,核心线程即使空闲也不会被回收,如果设置为
true
,即使是核心线程也会在空闲超过keepAliveTime
后被回收。
executor.allowCoreThreadTimeOut(true);
拒绝策略
在 Java 中,ThreadPoolExecutor
提供了 拒绝策略(RejectedExecutionHandler
),用于在任务提交到线程池时,线程池无法处理任务时的处理逻辑。通常,当线程池的线程都在工作,且任务队列已满时,提交的任务就会被拒绝。拒绝策略定义了当任务被拒绝时如何处理这些任务。
RejectedExecutionHandler
接口
RejectedExecutionHandler
接口有一个方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
ThreadPoolExecutor
提供了 4 种常用的默认拒绝策略,也可以自定义拒绝策略。
1. 常见的拒绝策略
AbortPolicy
(默认策略)
-
行为:抛出
RejectedExecutionException
异常,终止任务提交。 -
特点:
- 这是
ThreadPoolExecutor
默认的拒绝策略。 - 当线程池无法处理新任务时,直接抛出
RejectedExecutionException
,这意味着任务提交者需要通过捕获异常来处理任务被拒绝的情况。 - 适用场景:当任务数达到极限且不能丢弃任务时,使用此策略。
- 这是
-
代码示例:
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), handler
);
- 优点:任务会立刻失败,应用可以知道有任务没有执行,可以做进一步的处理。
- 缺点:任务失败会抛出异常,可能会导致程序崩溃,特别是没有适当的异常处理时。
CallerRunsPolicy
-
行为:由提交任务的线程来执行被拒绝的任务。
-
特点:
- 当线程池的队列满时,任务不会被丢弃,而是由提交任务的线程来执行这个任务(即当前线程会自己执行任务,而不是让线程池来执行)。
- 适用场景:如果希望通过减轻线程池负担来避免任务丢失,可以选择此策略。通过这种方式,任务并不会丢失,但可能会导致提交任务的线程压力增大。
-
代码示例:
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), handler
);
- 优点:任务不会丢失,而且提交任务的线程会执行该任务。避免了任务丢失的风险。
- 缺点:可能会使调用线程的执行时间过长,导致调用线程被阻塞,特别是在提交任务的线程池负载已经很重的情况下,可能会影响整个系统的响应时间。
DiscardPolicy
-
行为:直接丢弃被拒绝的任务,不抛出任何异常。
-
特点:
- 被拒绝的任务会被丢弃,不会给用户反馈。
- 适用场景:当丢弃任务不会影响程序的正确性时,可以使用此策略。比如任务不重要,丢弃也无妨。
-
代码示例:
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), handler
);
- 优点:非常简单,不会抛出异常,性能开销较低。
- 缺点:任务丢失了,用户无法知道丢失了什么任务,这可能会影响业务逻辑。
DiscardOldestPolicy
-
行为:丢弃任务队列中最旧的任务,并尝试提交当前任务。
-
特点:
- 当队列已满时,丢弃队列中最旧的任务,并尝试提交新任务。
- 适用场景:适用于希望优先处理新任务,丢弃老任务的情况。例如,实时性要求很高,丢弃旧任务来保证新任务的及时处理。
-
代码示例:
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), handler
);
- 优点:可以保持任务队列的活跃性,避免因队列满而导致所有任务被拒绝。
- 缺点:丢弃了队列中最老的任务,可能会导致任务执行的顺序错乱或丢失重要任务。
2. 自定义拒绝策略
除了上述四种内置的拒绝策略,ThreadPoolExecutor
也支持自定义拒绝策略。你可以通过实现 RejectedExecutionHandler
接口来定义自己的策略。
示例:自定义拒绝策略
RejectedExecutionHandler customHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("Custom handler: Task rejected, trying to log it");// 你可以选择记录日志或将任务保存到数据库中}
};ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), customHandler
);
ThreadFactory
ThreadFactory 是 Java 中的一个接口,用于创建新的线程。ThreadFactory 提供了一种方式来定制线程的创建过程,允许开发者在创建线程时设置线程的一些特性,比如线程名称、优先级、是否为守护线程等。
1. ThreadFactory
接口
ThreadFactory
接口的定义非常简单,只有一个方法:
public interface ThreadFactory {Thread newThread(Runnable r);
}
2. 默认的 ThreadFactory
如果没有自定义 ThreadFactory
,可以使用 Executors
类提供的默认工厂方法。例如,Executors.defaultThreadFactory()
会返回一个默认的线程工厂。
默认线程工厂行为
- 线程名称为
pool-X-thread-Y
,其中X
是线程池编号,Y
是线程编号。 - 默认创建的线程是 非守护线程。
- 默认线程的优先级是 正常优先级(
Thread.NORM_PRIORITY
)。
3. 自定义 ThreadFactory
通常,我们会根据需求定制线程的创建方式,比如:
- 给线程设置特定的名称。
- 设置线程为守护线程。
- 设置线程的优先级。
- 设置 UncaughtExceptionHandler 来处理线程中的未捕获异常。
示例:自定义 ThreadFactory
假设我们希望为每个线程设置一个特定的名称,并且设置为守护线程。可以这样做:
ThreadFactory customThreadFactory = new ThreadFactory() {private final AtomicInteger threadCount = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("Custom-Thread-" + threadCount.incrementAndGet());thread.setDaemon(true); // 设置线程为守护线程return thread;}
};ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), customThreadFactory
);
在这个例子中:
- 我们使用
AtomicInteger
来确保线程名称是唯一的。 - 我们设置了每个线程为 守护线程,这意味着如果程序中没有其他非守护线程在运行,守护线程会被 JVM 回收。
另一种方式:使用 lambda 表达式
自 Java 8 起,可以使用 lambda 表达式来简化代码。比如:
ThreadFactory customThreadFactory = r -> {Thread thread = new Thread(r);thread.setName("Custom-Thread-" + Thread.currentThread().getId());thread.setDaemon(true); // 设置线程为守护线程return thread;
};
阻塞队列
在 线程池 中,阻塞队列(Blocking Queue)是一个重要的组件,用来存放待执行的任务。当线程池中的线程都忙时,任务会被放入阻塞队列中,直到有线程空闲下来处理队列中的任务。
1. 阻塞队列的基本特性
- 线程安全:阻塞队列内部已经处理了线程安全的问题,多个线程可以同时访问队列而不会出现数据竞争。
- 阻塞操作:如果队列为空,调用
take()
的线程会被阻塞,直到队列中有任务;如果队列已满,调用put()
的线程会被阻塞,直到队列有空间。
2. 常见的阻塞队列实现
Java 中有多种阻塞队列实现,常见的有:
ArrayBlockingQueue
:一个基于数组的有界阻塞队列。LinkedBlockingQueue
:一个基于链表的有界或无界阻塞队列。SynchronousQueue
:每个插入操作必须等待一个移除操作,适用于需要直接交换元素的场景。PriorityBlockingQueue
:一个无界阻塞队列,它的元素是按优先级顺序排列的。DelayQueue
:一个支持延时的无界阻塞队列,适用于需要延时执行任务的场景。
3. 阻塞队列与线程池的结合
在 ThreadPoolExecutor
中,阻塞队列用于存放任务,线程池会从队列中取出任务并执行。不同类型的阻塞队列可以影响线程池的行为,特别是在任务量大于线程池容量时。
阻塞队列在 ThreadPoolExecutor
中的作用非常重要,主要体现在以下几个方面:
- 任务排队:当所有线程都在忙碌时,任务会被存放到队列中等待执行。
- 控制并发数:队列的容量决定了线程池可以排队的任务数量,影响线程池的吞吐量和性能。
- 调节任务负载:通过使用不同类型的队列,线程池可以平衡任务的生产和消费,避免系统过载。