线程池 相关知识
1. 线程池的基本概念
1.1. 线程池解决什么问题?
在并发环境下,系统不能够确定,在任意时刻中,有多少任务需要执行,有多少资源需要投入。创建过多的线程,一定会给系统带来额外的性能和资源开销,包括创建销毁线程的开销、调度线程的开销等等,同时也降低了系统的整体性能。
线程池解决的核心问题就是资源管理问题,它通过维护一定数量的线程,通过内部的处理流程,分配或创建线程执行提交给线程池的线程任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程无限制的创建,导致线程数量膨胀,而发生的系统过载问题。
1.2. 什么是线程池?
线程池是一种基于 “池化” 思想设计,管理和复用线程的工具类,它可以通过在运行期间维护若干个线程,避免了频繁创建和销毁线程带来的性能开销,同时可以控制并发处理数量。
所以,如果线程任务会提交给线程池,它创建线程来执行线程任务,或者将线程任务放入队列(工作队列)中等待一个空闲线程来执行。
1.3. 线程池核心作用
- 降低资源消耗:通过重复利用已经创建的线程,减少频繁创建和销毁线程带来的性能开销,提高线程复用。
- 方便线程管理:可统一调配资源,控制最大并发数。线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提高响应速度:线程任务提交给线程池后,无需等线程创建,可以立即执行线程任务。
1.4. 线程池核心思想
线程池的核心思想是线程复用、控制最大并发数,通过将任务提交和任务执行解耦,提高系统的稳定性和可维护性。
2. 线程池的核心接口和类
2.1. 核心接口
- Executor :顶层接口,定义了任务提交方法 execute (Runnable command) 。顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 线程任务对象,将任务的运行逻辑提交到执行器 ( Executor ) 中,由 Executor 框架完成线程的调配和任务的执行部分。
- ExecutorService :继承自 Executor ,扩展线程池管理方法。例如: shutdown () 线程池关闭、 submit () 任务提交、 awaitTermination () 等待线程池关闭。
2.2. 线程池实现类
- ThreadPoolExecutor:通过
Worker
工作线程、BlockingQueue
阻塞工作队列以及拒绝策略实现了一个标准的线程池 。 - ScheduledThreadPoolExecutor :是 ThreadPoolExecutor 类的子类,按照时间周期执行线程任务的线程池实现类,通常用于作业调度相关的业务场景。由于该线程池的工作队列使用 DelayedWorkQueue ,这是一个按照任务执行时间进行排序的优先级工作队列,所以这也是 ScheduledThreadPoolExecutor 线程池能按照时间周期来执行线程任务的主要原因。
- ForkJoinPool :是一个基于分治思想的线程池实现类,通过分叉 (fork) 合并 ( join ) 的方式,将一个大任务拆分成多个小任务,并且为每个工作线程提供一个工作队列,减少竞争,实现并行的线程任务执行方式,所以 ForkJoinPool 适合计算密集型场景,是 ThreadPoolExecutor 线程池的一种补充。
2.3. 工具类
【Executors
工具类】提供快速创建线程池的静态方法:
newFixedThreadPool(int nThreads)
:固定大小线程池 。newCachedThreadPool()
:可缓存的线程池,按需创建 。newSingleThreadExecutor()
:单线程线程池 。newScheduledThreadPool(int corePoolSize)
:定时任务线程池 。
// 线程池基本使用方式
// 创建一个ThreadPoolExecutor类型的对象
ExecutorService pool = Executors.newCachedThreadPool();// 执行线程任务
pool.execute(task1);
pool.execute(task2);
pool.execute(task3);
pool.execute(task4);
pool.execute(task5);// 使用结束后,使用shutdown关闭线程池
pool.shutdown();
3. ThreadPoolExecutor 线程池的配置参数
3.1. 核心配置参数
- 【corePoolSize】线程池核心线程数:也可理解为线程池维护的最小线程数量,核心线程创建后不会被回收。大于核心线程数的线程,在空闲时间超过 keepAliveTime 后会被回收;
- 【maximumPoolSize】线程池最大线程数:线程池允许创建的最大线程数量;(包含核心线程池数量)
- 【keepAliveTime】非核心线程线程存活时间:当一个可被回收的线程的空闲时间大于 keepAliveTime,就会被回收。
- 当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会被回收,直到线程池中的线程数不超过 corePoolSize 。
- 如果设置 allowCoreThreadTimeOut = true ,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0 ;
- 【TimeUnit】时间单位:参数 keepAliveTime 的时间单位;
- 【BlockingQueue】阻塞工作队列:用来存储等待执行的任务;
- 【ThreadFactory】线程工厂:用于创建线程,以及自定义线程名称,需要实现 ThreadFactory 接口;
- 【RejectedExecutionHandler】拒绝策略:当线程池线程内的线程耗尽,并且工作队列达到已满时,新提交的任务,将使用拒绝策略进行处理;
3.2. 任务队列
- ArrayBlockingQueue :基于数组实现的有界队列,读写线程不支持并发;
- LinkedBlockingQueue :基于链表实现的无界队列(有界队列),读写线程支持并发。
- SynchronousQueue :不存储任务,直接提交给线程执行。
- PriorityBlockingQueue :优先级队列。
3.3. 拒绝策略
- AbortPolicy :默认策略,丢弃任务并抛出RejectedExecutionException 异常;
- DiscardPolicy :丢弃任务,但是不抛出异常;
- DiscardOldestPolicy :丢弃工作队列中的队头任务(即最旧的任务,也就是最早进入队列的任务)后,继续将当前任务提交给线程池;
- CallerRunsPolicy :由原调用线程处理该任务(谁调用,谁处理 );
3.4. 线程池参数设置策略
3.4.1. 常规策略
1.核心线程数(corePoolSize )
- CPU 密集型任务:线程数设置为 CPU 核心数 + 1(避免上下文切换,充分利用 CPU 计算资源)。
- IO 密集型任务:线程数设置为 2 * CPU 核心数(IO 操作会导致线程阻塞,需要更多线程处理其他任务)。
2.最大线程数(maximumPoolSize )
- maximumPoolSize 保持与 corePoolSize 相同
3.任务队列(workqueue )
- 无界队列(如 LinkedBlockingQueue ):适用于任务量较小且稳定的场景。
- 备注:可能导致 OOM(内存溢出)。
- 有界队列(如 ArrayBlockingQueue ):必须设置合理的队列容量,结合 maximumPoolSize 控制总并发数。
- 备注:可以避免资源耗尽。
4.空闲线程存活时间(keepAliveTime )
- IO 密集型任务:可设置较长的存活时间(如 60 秒 ),避免频繁创建和销毁线程。
- CPU 密集型任务:可设置较短的存活时间,及时释放资源。
- 通过 allowCoreThreadTimeOut (true) 可使核心线程也受 keepAliveTime 控制。
5.拒绝策略(RejectedExecutionHandler )
- AbortPolicy (默认):抛出 RejectedExecutionException ,适合关键任务。
- CallerRunsPolicy :由提交任务的线程执行,可降低提交速度。
- DiscardPolicy :直接丢弃任务,适合允许丢失的非关键任务。
- DiscardOldestPolicy :丢弃队列中最老的任务,尝试重新提交当前任务。
- 自定义策略:实现 RejectedExecutionHandler 接口,如记录日志或持久化任务。
3.4.2. 常见场景
int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(cpuCores, // 核心线程数 = CPU核心数cpuCores + 1, // 最大线程数 = CPU核心数 + 160L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100), // 有界队列new ThreadPoolExecutor.AbortPolicy() // 关键任务,拒绝时抛出异常
);
int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(2 * cpuCores, // 核心线程数 = 2 * CPU核心数4 * cpuCores, // 最大线程数适当增大120L, TimeUnit.SECONDS, // 较长的空闲存活时间new LinkedBlockingQueue<>(500), // 较大的队列容量new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝时由调用线程执行
);
public class Test02 {public static void main(String[] args) {int cpuCores = Runtime.getRuntime().availableProcessors();//查看CPU核心数System.out.println(cpuCores);}
}
3.4.3. 案列
public class Test01 {static AtomicInteger counter=new AtomicInteger(0);//初始值为0public static void main(String[] args) {//自定义配置参数,创建线程池//核心线程数10个,阻塞工作队列5个,最大线程数30个,当一个可回收线程空闲0秒,就进行回收ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(10,30,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5));for (int i = 0; i < 50; i++) {poolExecutor.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"==> 执行任务"+counter.incrementAndGet());try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}poolExecutor.shutdown();//关闭线程池}
}
4. 线程池的执行流程
- 提交一个新线程任务,线程池会在线程池中分配一个空闲线程,用于执行线程任务;
- 如果线程池中不存在空闲线程,则线程池会判断当前 “存活的线程数” 是否小于核心线程数 corePoolSize 。
- 如果小于核心线程数 corePoolSize,线程池会创建一个新线程(核心线程)去处理新线程任务;
- 如果大于核心线程数 corePoolSize,线程池会检查工作队列;
- 如果工作队列未满,则将该线程任务放入工作队列进行等待。线程池中如果出现空闲线程,将从工作队列中按照 FIFO 的规则取出 1 个线程任务并分配执行;
- 如果工作队列已满,则判断线程数是否达到最大线程数maximumPoolSize ;
- 如果当前 “存活线程数” 没有达到最大线程数 maximumPoolSize,则创建一个新线程(非核心线程)执行新线程任务;
- 如果当前 “存活线程数” 已经达到最大线程数 maximumPoolSize,直接采用拒绝策略处理新线程任务;
综上所述,执行顺序为:核心线程、工作队列、非核心线程、拒绝策略。
5. 线程池常见方法
- 执行无返回值的线程任务:
void execute(Runnable command);
- 提交有返回值的线程任务:
Future<T> submit(Callable<T> task);
- 关闭线程池:
void shutdown();
或shutdownNow();
- 等待线程池关闭:
boolean awaitTermination(long timeout, TimeUnit unit);
5.1. 线程池的创建
// 1. 固定大小线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);// 2. 单线程线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();// 3. 缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();// 4. 定时任务线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);// 提交任务
fixedThreadPool.execute(() -> System.out.println("Task executed"));// 关闭线程池
fixedThreadPool.shutdown();
// 创建自定义线程池
ExecutorService executor = new ThreadPoolExecutor(5, // 核心线程数10, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), // 任务队列Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);// 提交任务
for (int i = 0; i < 20; i++) {final int taskId = i;executor.execute(() -> {System.out.println("执行任务: " + taskId);try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});
}// 关闭线程池
executor.shutdown();
try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}
} catch (InterruptedException e) {executor.shutdownNow();
}
5.2. 执行线程任务
execute()
只能提交 Runnable
类型任务,无返回值;submit()
既能提交 Runnable
任务,也能提交 Callable
任务,可返回 Future
类型结果获取执行结果 。
execute()
提交任务异常直接抛出,submit()
捕获异常,调用 Future
的 get()
取返回值时才抛异常。
// 创建一个线程池:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, // 核心线程数100, // 最大线程数60L, // 空闲线程存活时间TimeUnit.SECONDS, // 时间单位new LinkedBlockingQueue<Runnable>() // 工作队列(阻塞队列));// 创建集合,用于保存Future执行结果
List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();// 每10w个数字,封装成一个Callable线程任务,并提交给线程池
for (int i = 0; i <= 900000; i += 100000) {Future<Integer> result = poolExecutor.submit(new CalcTask(i+1, i + 100000));futureList.add(result);
}// 处理线程任务执行结果
try {int result = 0;for (Future<Integer> f : futureList) {result += f.get();}System.out.println("最终计算结果" + result);
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}// 关闭线程池
// 省略.....
最终计算结果:1784293664
5.3. 关闭线程池
线程池在程序结束的时候要关闭。使用 shutdown()
方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()
会立刻停止正在执行的任务;
当使用 awaitTermination()
方法时,主线程会处于一种等待的状态,按照指定的 timeout
检查线程池。
第一个参数指定的是时间,第二个参数指定的是时间单位 (当前是秒)。返回值类型为 boolean
型。
- 如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,
awaitTermination()
返回true
。 - 如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,
awaitTermination()
返回false
。 - 如果等待时间没有超过指定时间,则继续等待。
该方法经常与 shutdown()
方法配合使用,用于检测线程池中的任务是否已经执行完毕:
// 线程池已提交或执行若干个任务// 关闭线程池:必须等待任务执行结束后,线程池才会关闭
executorService.shutdown();// 每隔1秒钟,检查一次线程池的任务执行状态
while(!executorService.awaitTermination(1, TimeUnit.SECONDS)) {System.out.println("还没有关闭!");
}System.out.println("已关闭!");
// 线程池已提交或执行若干个任务// 关闭线程池:必须等待任务执行结束后,线程池才会关闭
executorService.shutdown(); // 拒绝新任务,执行队列中剩余任务try {// 等待 60 秒,让线程池有足够时间执行完任务if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// 超时后强制关闭(中断正在执行的任务)executorService.shutdownNow();}
} catch (InterruptedException e) {// 若当前线程被中断,也强制关闭executorService.shutdownNow();
}
6. 线程池分类
Java 标准库提供的几种常用线程池,创建这些线程池的方法都被封装到 Executors 工具类中。
- FixedThreadPool:线程数固定的线程池,使用 Executors.newFixedThreadPool () 创建;
- CachedThreadPool:线程数根据任务动态调整的线程池,使用 Executors.newCachedThreadPool () 创建;
- SingleThreadExecutor:仅提供一个单线程的线程池,使用 Executors.newSingleThreadExecutor () 创建;
- ScheduledThreadPool:能实现定时、周期性任务的线程池,使用 Executors.newScheduledThreadPool () 创建;
6.1. FixedThreadPool 线程池
线程数固定的线程池
以 FixedThreadPool
线程池为例,线程池的执行步骤如下:
public class Main {public static void main(String[] args) {// 创建一个固定大小的线程池:ExecutorService executorService = Executors.newFixedThreadPool(4);for (int i = 0; i < 6; i++) {executorService.execute(new Task("线程"+i));}// 关闭线程池:executorService.shutdown();}
}class Task implements Runnable {private String taskName;public Task(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println("启动线程 ===> " + this.taskName);try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("结束线程 <= " + this.taskName);}
}
执行结果:
启动线程 ===> 线程2
启动线程 ===> 线程3
启动线程 ===> 线程0
启动线程 ===> 线程1
结束线程 <= 线程2
结束线程 <= 线程3
结束线程 <= 线程1
结束线程 <= 线程0
启动线程 ===> 线程5
启动线程 ===> 线程4
结束线程 <= 线程4
结束线程 <= 线程5
执行分析:
一次性放入 6 个任务,由于线程池只有固定 4 个线程,因此前4个任务会同时执行,等到有线程空闲后,才会执行后面 2 个任务。
6.2. CachedThreadPool 线程池
线程数根据任务动态调整的线程池
- 把线程池改为
CachedThreadPool
,观察运行结果
ExecutorService executorService = Executors.newCachedThreadPool();
执行结果:
启动线程 => 线程1
启动线程 => 线程5
启动线程 => 线程2
启动线程 => 线程4
启动线程 => 线程0
启动线程 => 线程3
结束线程 <= 线程4
结束线程 <= 线程1
结束线程 <= 线程5
结束线程 <= 线程0
结束线程 <= 线程3
结束线程 <= 线程2
执行分析:
由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6 个任务可一次性全部同时执行。
6.3. ScheduledThreadPool 线程池
能实现定时、周期性任务的线程池
- 例如:每秒刷新证券价格等固定、需反复执行的任务,可用
ScheduledThreadPool
。 - 放入
ScheduledThreadPool
的任务可定期反复执行。
创建 ScheduledThreadPool 定时任务线程池
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
延迟 3 秒钟后执行,任务只执行 1 次
executorService.schedule(new Task("线程A"), 3, TimeUnit.SECONDS);
延迟 2 秒钟后,每隔 3 秒钟执行任务 1 次
// 方式1
executorService.scheduleAtFixedRate(new Task("线程A"), 2,3, TimeUnit.SECONDS);// 方式2
executorService.scheduleWithFixedDelay(new Task("线程A"), 2,3, TimeUnit.SECONDS);
FixedRate 和 FixedDelay 区别:
FixedRate
:任务以固定时间间隔触发,不管任务执行时间多长。FixedDelay
:上一次任务执行完毕后,等固定的时间间隔,再执行下一次任务。
6.4. 分类总结
FixedThreadPool
线程数固定的线程池
- 线程池参数:
- 核心线程数和最大线程数一致
- 非核心线程线程空闲存活时间,即 keepAliveTime 为 0
- 阻塞队列为无界队列 LinkedBlockingQueue
- 工作机制:
a. 提交线程任务
b. 如果线程数少于核心线程,创建核心线程执行任务
c. 如果线程数等于核心线程,把任务添加到 LinkedBlockingQueue 阻塞队列
d. 如果线程执行完任务,去阻塞队列取任务,继续执行 - 使用场景:适用于处理 CPU 密集型的任务,确保 CPU 在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
底层代码:
CachedThreadPool
可缓存线程池,线程数根据任务动态调整的线程池
- 线程池参数:
- 核心线程数为 0
- 最大线程数为 Integer.MAX_VALUE
- 工作队列是 SynchronousQueue 同步队列
- 非核心线程空闲存活时间为 60 秒
- 工作机制:
a. 提交线程任务
b. 因为核心线程数为 0,所以任务直接加到 SynchronousQueue 工作队列
c. 判断是否有空闲线程,如果有,就去取出任务执行
d. 如果没有空闲线程,就新建一个线程执行
e. 执行完任务的线程,还可以存活 60 秒,如果在这期间,接到任务,可以继续存活下去;否则,被销毁。 - 使用场景:用于并发执行大量短期的小任务。
底层代码:
SingleThreadExecutor
单线程化的线程池
- 线程池参数:
- 核心线程数为 1
- 最大线程数也为 1
- 使用场景:适用于串行执行任务的场景,将任务按顺序执行。、
底层代码:
ScheduledThreadPool
能实现定时、周期性任务的线程池
- 线程池参数:
- 最大线程数为 Integer.MAX_VALUE
- 阻塞队列是 DelayedWorkQueue
- keepAliveTime 为 0
- 使用场景:周期性执行任务,并且需要限制线程数量的需求场景。
底层代码:
7. 线程池的状态
线程池的状态分为:RUNNING 、SHUTDOWN 、STOP 、TIDYING 、TERMINATED
- RUNNING :运行状态,线程池一旦被创建,就处于 RUNNING 状态,并且线程池中的任务数为 0 。该状态的线程池会接收新任务,并处理工作队列中的任务。
- 调用线程池的 shutdown () 方法,可以切换到 SHUTDOWN 关闭状态;
- 调用线程池的 shutdownNow () 方法,可以切换到 STOP 停止状态;
- SHUTDOWN :关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;
- 当工作队列为空时,并且线程池中执行的任务也为空时,线程池进入 TIDYING 状态;
- STOP :停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- 线程池中执行的任务为空,进入 TIDYING 状态;
- TIDYING :整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为 0 ;
- terminated () 执行完毕,进入 TERMINATED 状态;
- TERMINATED :终止状态,该状态表示线程池彻底关闭。
8.线程池自定义前缀
public class MyThreadFactory implements ThreadFactory {private String prefix;//自定义一个前缀private AtomicInteger threadNumber=new AtomicInteger(1);//从1开始public MyThreadFactory(String prefix){this.prefix=prefix;}@Overridepublic Thread newThread(Runnable r) {Thread t=new Thread(r,prefix+"-"+threadNumber.getAndIncrement());//前缀+序号自增return t;}
}
public class Test01 {private static AtomicInteger count=new AtomicInteger(0);public static void main(String[] args) {//自定义配置参数,创建线程池//核心线程数10个,阻塞工作队列不限制个数,最大线程数20个,当一个可回收线程空闲20秒,就进行回收ThreadPoolExecutor pool=new ThreadPoolExecutor(10,20,20, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new MyThreadFactory("自定义线程"));for (int i = 0; i < 30; i++) {pool.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"==> 执行任务"+count.incrementAndGet());try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}pool.shutdown();//关闭线程池}
}