(二十七)Java-ThreadPoolExecutor
ThreadPoolExecutor 是 Java 中线程池的核心实现类,提供了一种管理和复用线程池中线程的机制。它是 Java java.util.concurrent 包中的一部分,用于提高多线程程序的性能和效率,避免了频繁创建和销毁线程的开销。
一、常用构造函数
// 线程池参数 int corePoolSize = 5; // 核心线程数 int maximumPoolSize = 10; // 最大线程数 long keepAliveTime = 5000; // 空闲线程存活时间(毫秒) TimeUnit unit, // keepAliveTime 的时间单位 int workQueue = 100; // 队列容量 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(workQueue); //队列 ThreadFactory factory = new DefaultThreadFactory("默认工厂"); //工厂 RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //拒绝策略 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, queue, factory, Policy); |
各个参数的含义:
- corePoolSize: 核心线程池的大小,线程池中保持的最小线程数。如果线程数小于该值,线程池会创建新线程来处理任务,直到核心线程数达到这个值。
- maximumPoolSize: 线程池中允许的最大线程数。当任务量非常大时,线程池会扩展线程数,最多扩展到此值。
- keepAliveTime: 线程池中多余的空闲线程存活的最大时间。当线程池中的线程数超过核心线程数时,多余的线程会在空闲时被销毁。
keepAliveTime
指定了空闲线程的最大存活时间。 - unit:
keepAliveTime
参数的时间单位,常见的单位有TimeUnit.SECONDS
,TimeUnit.MILLISECONDS
等。 - queue: 任务队列,用来存储等待执行的任务。常见的队列实现包括
LinkedBlockingQueue
、ArrayBlockingQueue
、PriorityBlockingQueue
等。 - factory:线程工厂
- policy:拒绝策略
二、线程池的工作原理
ThreadPoolExecutor
的线程池执行流程可以概括为以下几个步骤:
-
提交任务: 当一个任务被提交到线程池时,
ThreadPoolExecutor
会先尝试使用空闲的核心线程执行任务。如果核心线程池未满,线程池会直接分配任务给一个线程来执行。 -
线程池扩展: 当线程池中的核心线程都在忙碌,而任务队列满了,
ThreadPoolExecutor
会创建新线程执行任务,直到线程池的线程数达到最大线程数 (maximumPoolSize
)。 -
任务队列: 如果线程池中的线程数已经达到最大值,而任务队列也满了,那么新的任务将被拒绝执行(除非配置了拒绝策略)。
-
线程回收: 如果线程池中的线程数超过了核心线程数,而这些线程处于空闲状态超过
keepAliveTime
指定的时间,它们会被回收,以节省资源。 -
线程池关闭: 当线程池不再需要执行任务时,可以通过调用
shutdown()
或shutdownNow()
方法来关闭线程池。
三、线程池参数详解
(1) int corePoolSize
核心线程池数量。即使没有任务执行,核心线程也会一直存在,直到调用 shutdown()。
(2)int maximumPoolSize
最大线程数量。当任务数超过 corePoolSize 时,线程池会创建新的线程来处理任务,但总线程数不能超过 maximumPoolSize。
底层逻辑:
线程池接收到新任务,当前工作线程数少于corePoolSize, 即使有空闲的工作线程,也会创建新的线程来处理该请求,直到线程数达到corePoolSize;
当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程;
maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
(3)long keepAliveTime
非核心线程空闲时的存活时间。如果线程池中的线程数大于 corePoolSize 且线程空闲时间超过 keepAliveTime,则该线程会被销毁。
(4)TimeUnit unit
keepAliveTime 的时间单位(例如:TimeUnit.SECONDS)。
(5)BlockingQueue<Runnable> workQueue
BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。
常见的几种阻塞队列的实现:
ArrayBlockingQueue:
基于数组的阻塞队列(有界队列),具有固定的容量。队列中的元素按FIFO排序,创建时必须设置大小。
LinkedBlockingQueue:
基于链表实现的线程安全的阻塞队列,具有较高的吞吐量。按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.MAX_VALUE作为容量(无界队列)。
PriorityBlockingQueue:
是具有优先级的无界队列
SynchronousQueue:
每次提交任务时,必须等待线程池中有线程来接收任务。
(6)ThreadFactory threadFactory
线程工厂,用于创建新线程。可以自定义线程的名称、优先级等。
(7)RejectedExecutionHandler handler
线程池的拒绝策略
拒绝策略触发条件:
当线程池中的线程数已达 maximumPoolSize,且队列已满时,如何处理新提交的任务。
常见的拒绝策略有:
AbortPolicy:默认,新任务会被拒绝,抛出RejectedExecutionException。
DiscardPolicy:丢弃任务,且不会有任何异常抛出。
DiscardOldestPolicy:丢弃队列中最旧的任务(最早进入)。
CallerRunsPolicy:由提交任务的线程来执行任务,而不是新创建线程。
四、常用方法
(1)execute、submit
提交任务
public void execute(Runnable command); public Future<?> submit(Runnable task); public <T> Future<T> submit(Callable<T> task); |
注:execute和submit的区别:
execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数;Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。
(2)invokeAll、invokeAny
invokeAll:等待线程池中的所有任务完成并返回结果
invokeAny:等待线程池中的任意任务完成并返回结果
// 等待线程池中的任务完成并返回结果 public List<Runnable> invokeAll(Collection<? extends Callable<T>> tasks); public T invokeAny(Collection<? extends Callable<T>> tasks); |
(3)shutdown、shutdownNow
shutdown():关闭线程池,不再接受新任务,但会继续执行已经提交的任务。
shutdownNow():尝试停止所有正在执行的任务,停止处理等待队列中的任务,并返回尚未开始执行的任务。
public void shutdown(); public List<Runnable> shutdownNow(); |
(4)isShutdown、isTerminated
isShutdown():检查线程池是否已经关闭。
isTerminated():检查线程池中的所有任务是否已经完成。
五、调度器的钩子方法
protected void beforeExecute(Thread t, Runnable r) { }:
任务执行之前的钩子方法
protected void afterExecute(Runnable r, Throwable t) { }:
任务执行之后的钩子方法
protected void terminated() { }:
线程池终止时的钩子方法
ExecutorService pool=new ThreadPoolExecutor(2, 4, 60,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)){ @Override protected void terminated() { System.out.println("调度器已停止..."); } @Override protected void beforeExecute(Thread t,Runnable target) { System.out.println("前钩执行..."); super.beforeExecute(t, target); } @Override protected void afterExecute(Runnable target,Throwable t) { System.out.println("后钩执行..."); super.afterExecute(target, t); } } |
六、线程池状态
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; |
RUNNING: 线程池接受新的任务并处理已提交的任务。
SHUTDOWN: 不再接受新任务,但会继续执行已提交的任务。
STOP: 不接受新任务,也不处理已提交的任务,正在处理的任务被中断。
TIDYING: 所有任务都已结束,线程池即将关闭。
TERMINATED: 线程池完全关闭。
七、总结
ThreadPoolExecutor
是 Java 中功能强大的线程池类,提供了灵活的配置选项,能够高效地管理线程和任务。通过合理的配置线程池的核心线程数、最大线程数、空闲时间、工作队列等,可以根据不同的应用场景来优化多线程程序的性能。