线程池 ThreadPoolExecutor ForkJoinPool
线程池
线程池是 Java 并发编程中非常重要的组件,它通过重用线程来减少线程创建和销毁的开销,提高系统性能和资源利用率。
一、线程池核心参数
Java 线程池的核心参数在 ThreadPoolExecutor 中定义:
(1) corePoolSize:核心线程数,线程池长期保持的线程数量
(2) maximumPoolSize:最大线程数,线程池允许的最大线程数量
(3) keepAliveTime:非核心线程空闲超时时间
(4) unit:超时时间单位
(5) workQueue:任务等待队列
(6) threadFactory:线程创建工厂
(7) handler:任务拒绝策略
二、常用线程池及区别对比
Java 通过 Executors 提供了四种常用线程池:
线程池类型:
FixedThreadPool
固定线程数量,核心线程数 = 最大线程数
负载较重的服务器,为了资源管理
核心线程数 = 最大线程数,队列容量无限
CachedThreadPool
可缓存线程,核心线程数 = 0,最大线程数很大
执行大量短期异步任务
核心线程 = 0,最大线程数无限,同步队列
SingleThreadExecutor
单线程,所有任务顺序执行
需要保证任务顺序执行的场景
核心线程 = 1,最大线程 = 1,无界队列
ScheduledThreadPool
支持定时及周期性任务执行
需要定时任务或周期性任务的场景
核心线程固定,最大线程无限
ForkJoinPool
工作窃取算法,并行处理
大型可分解任务(如计算)
并行级别通常为 CPU 核心数
三、使用示例代码
public class ThreadPoolExamples {public static void main(String[] args) {// 1. 固定大小线程池示例fixedThreadPoolExample();// 2. 缓存线程池示例cachedThreadPoolExample();// 3. 单线程线程池示例singleThreadExecutorExample();// 4. 定时任务线程池示例scheduledThreadPoolExample();// 5. ForkJoinPool 示例testForkJoinPool();}/*** 固定大小线程池示例* 核心线程数和最大线程数相等*/private static void fixedThreadPoolExample() {System.out.println("=== FixedThreadPool 示例 ===");// 创建固定大小为3的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);for (int i = 0; i < 5; i++) {final int taskNum = i;fixedThreadPool.execute(() -> {System.out.println("FixedThreadPool任务 " + taskNum + " 由线程 " + Thread.currentThread().getName() + " 执行");try {// 模拟任务执行时间Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池fixedThreadPool.shutdown();}/*** 缓存线程池示例* 线程数量根据任务动态调整*/private static void cachedThreadPoolExample() {System.out.println("\n=== CachedThreadPool 示例 ===");// 创建缓存线程池ExecutorService cachedThreadPool = Executors.newCachedThreadPool();for (int i = 0; i < 5; i++) {final int taskNum = i;// 延迟提交任务,观察线程复用情况try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}cachedThreadPool.execute(() -> {System.out.println("CachedThreadPool任务 " + taskNum + " 由线程 " + Thread.currentThread().getName() + " 执行");try {// 模拟任务执行时间Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池cachedThreadPool.shutdown();} /*** 单线程线程池示例* 所有任务由一个线程按顺序执行*/private static void singleThreadExecutorExample() {System.out.println("\n=== SingleThreadExecutor 示例 ===");// 创建单线程线程池ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 5; i++) {final int taskNum = i;singleThreadExecutor.execute(() -> {System.out.println("SingleThreadExecutor任务 " + taskNum + " 由线程 " + Thread.currentThread().getName() + " 执行");try {// 模拟任务执行时间Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池singleThreadExecutor.shutdown();} /*** 定时任务线程池示例* 支持定时执行和周期性执行任务*/private static void scheduledThreadPoolExample() {System.out.println("\n=== ScheduledThreadPool 示例 ===");// 创建定时任务线程池,核心线程数为2ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);// 1. 延迟执行任务:延迟1秒后执行scheduledThreadPool.schedule(() -> {System.out.println("延迟任务执行: " + Thread.currentThread().getName());}, 1, TimeUnit.SECONDS);// 2. 周期性执行任务:延迟2秒后开始,每3秒执行一次scheduledThreadPool.scheduleAtFixedRate(() -> {System.out.println("周期性任务执行: " + Thread.currentThread().getName() + ",时间: " + System.currentTimeMillis());}, 2, 3, TimeUnit.SECONDS);// 运行一段时间后关闭try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}scheduledThreadPool.shutdown();} // 5. ForkJoinPool 示例(并行计算)private static void testForkJoinPool() throws ExecutionException, InterruptedException {System.out.println("\n=== ForkJoinPool 示例 ===");// 创建ForkJoinPool,默认并行度为CPU核心数ForkJoinPool forkJoinPool = new ForkJoinPool();// 计算1到10_000_000的和long start = System.currentTimeMillis();ForkJoinTask<Long> task = new SumTask(1, 10_000_000);Long result = forkJoinPool.submit(task).get();long end = System.currentTimeMillis();System.out.println("计算结果: " + result);System.out.println("耗时: " + (end - start) + "ms");forkJoinPool.shutdown();}// ForkJoin任务:计算指定范围内的整数和static class SumTask extends RecursiveTask<Long> {// 任务拆分阈值private static final long THRESHOLD = 10000;private long start;private long end;public SumTask(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {// 如果任务足够小,直接计算if (end - start <= THRESHOLD) {long sum = 0;for (long i = start; i <= end; i++) {sum += i;}return sum;} else {// 任务太大,拆分long mid = (start + end) / 2;SumTask leftTask = new SumTask(start, mid);SumTask rightTask = new SumTask(mid + 1, end);// 并行执行子任务leftTask.fork();rightTask.fork();// 合并结果return leftTask.join() + rightTask.join();}}}
}
四、线程池使用注意事项
1.避免使用 Executors 工具类创建线程池:
在生产环境中,建议直接使用 ThreadPoolExecutor 构造函数,以便明确线程池参数,避免资源耗尽风险。
2.合理设置线程池参数:
(1) CPU 密集型任务:线程数 = CPU 核心数 + 1
(2) IO 密集型任务:线程数 = CPU 核心数 * 2
3.正确关闭线程池:
shutdown():平缓关闭,等待已提交任务完成
shutdownNow():立即关闭,尝试中断正在执行的任务
4.处理异常:
在线程池中执行的任务异常不会直接抛出到主线程,需要在任务内部捕获或通过 Future 获取。
5.ForkJoinPool 与普通线程池的本质区别:
工作方式:
普通线程池是 "生产者 - 消费者" 模式,而 ForkJoinPool 采用 "工作窃取"(Work-Stealing)算法,空闲线程会主动获取其他线程的任务执行
任务类型:
ForkJoinPool 适合可递归分解的任务(RecursiveTask/RecursiveAction),普通线程池适合独立的原子任务
并行效率:
对于计算密集型的大型任务,ForkJoinPool 能更好地利用多核 CPU
计算密集型任务优先考虑 ForkJoinPool,其并行处理能力优于普通线程池
6.线程管理策略差异:
FixedThreadPool 保持固定线程数,避免频繁创建线程的开销
CachedThreadPool 线程数弹性变化,适合短任务但可能创建大量线程
ScheduledThreadPool 有专门的调度机制,支持延迟和周期性执行
通过合理使用线程池,可以有效提升 Java 应用的并发处理能力和资源利用率。
ThreadPoolExecutor 创建线程池
在 Java 中,ThreadPoolExecutor 是创建和管理线程池的核心类,位于 java.util.concurrent 包下。它提供了高度可配置的线程池实现,
允许你根据需求定制线程池的各种参数。
ThreadPoolExecutor 的示例代码:
public class ThreadPoolExample {public static void main(String[] args) {// 线程池核心参数设置int corePoolSize = 3; // 核心线程数int maximumPoolSize = 5; // 最大线程数long keepAliveTime = 60; // 非核心线程空闲超时时间TimeUnit unit = TimeUnit.SECONDS; // 超时时间单位// 任务队列,用于存储等待执行的任务ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);// 提交任务到线程池for (int i = 0; i < 8; i++) {final int taskId = i;executor.submit(() -> {try {System.out.println("任务 " + taskId + " 开始执行,线程名:" + Thread.currentThread().getName());// 模拟任务执行时间Thread.sleep(1000);System.out.println("任务 " + taskId + " 执行完毕");} catch (InterruptedException e) {Thread.currentThread().interrupt();e.printStackTrace();}});}// 关闭线程池executor.shutdown();try {// 等待所有任务完成或超时if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {// 超时后强制关闭executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();}System.out.println("所有任务处理完毕,线程池已关闭");}
}
线程池核心参数说明:
(1) corePoolSize:核心线程数,线程池始终保持的线程数量
(2) maximumPoolSize:最大线程数,线程池允许创建的最大线程数量
(3) keepAliveTime:当线程数超过核心线程数时,多余的空闲线程的存活时间
(4) unit:keepAliveTime 的时间单位
(5) workQueue:任务队列,用于存储等待执行的任务
线程池工作流程:
1.当提交任务时,若当前线程数小于核心线程数,会创建新线程执行任务
2.若当前线程数等于核心线程数,新任务会被加入任务队列等待
3.若任务队列已满,且当前线程数小于最大线程数,会创建新线程执行任务
4.若任务队列已满且线程数已达最大值,会触发拒绝策略(默认抛出异常)
常用任务队列:
1.ArrayBlockingQueue:基于数组的有界阻塞队列
2.LinkedBlockingQueue:基于链表的阻塞队列,可设置边界
3.SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待对应的删除操作
在实际开发中,也可以使用 Executors 工具类快速创建线程池,但推荐手动创建 ThreadPoolExecutor 以明确配置参数,避免资源耗尽风险。
异步线程执行实现
在 Java 中实现异步线程执行,最常用的方式是结合线程池(ThreadPoolExecutor)和 CompletableFuture(Java 8+ 引入),
它们提供了强大的异步编程能力,支持链式调用、结果组合等高级特性。
典型的 Java 异步线程执行实现方式:
public class JavaAsyncDemo {// 定义线程池(推荐复用,避免频繁创建)private static final ExecutorService executor = new ThreadPoolExecutor(4, // 核心线程数8, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), // 任务队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);public static void main(String[] args) throws Exception {System.out.println("主线程开始: " + Thread.currentThread().getName());// 方式1: 基础异步执行(无返回值)executor.execute(() -> {try {Thread.sleep(1000);System.out.println("异步任务1完成(无返回值)");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 方式2: 带返回值的异步执行(Future)Future<String> future = executor.submit(() -> {Thread.sleep(1500);return "异步任务2的结果";});// 方式3: 高级异步编程(CompletableFuture)CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);return "原始数据";} catch (InterruptedException e) {throw new RuntimeException(e);}}, executor)// 异步处理结果(链式调用).thenApplyAsync(data -> {System.out.println("处理中: " + data);return data + " -> 处理后的数据";}, executor)// 异步消费结果.thenAcceptAsync(result -> {System.out.println("最终结果: " + result);}, executor);// 主线程继续执行其他操作(非阻塞)System.out.println("主线程继续执行...");Thread.sleep(500);// 获取方式2的结果(会阻塞直到结果返回)System.out.println(future.get());// 等待方式3完成cf.get();// 关闭线程池executor.shutdown();executor.awaitTermination(1, TimeUnit.MINUTES);System.out.println("主线程结束");}
}
核心实现说明
1.线程池(ExecutorService):
(1)是异步执行的基础,负责管理线程资源
(2)推荐手动创建 ThreadPoolExecutor 而非使用 Executors 工具类,便于控制资源
(3)心参数(核心线程数、最大线程数、队列等)需根据业务场景配置
2.CompletableFuture 核心优势:
(1)链式调用:通过 thenApply、thenAccept 等方法实现结果的流水线处理
(2)异步组合:支持多个异步任务的并行 / 串行组合(thenCombine、allOf 等)
(3)异常处理:提供 exceptionally、handle 等方法处理异步任务中的异常
(4)非阻塞获取结果:可通过 whenComplete 在结果就绪时自动处理,避免阻塞
3.常见使用场景:
(1)I/O 密集型操作(数据库查询、网络请求、文件读写)
(2)需要并行处理的任务(如批量数据处理)
(3)需避免主线程阻塞的场景(如 GUI 程序、服务器请求处理)
使用时需注意:线程池需要正确关闭(shutdown()),避免资源泄漏;
对于长时间运行的异步任务,要做好超时控制(get(long timeout, TimeUnit unit))。
public class JavaAsyncDemo {// 定义线程池(推荐复用,避免频繁创建)private static final ExecutorService executor = new ThreadPoolExecutor(4, // 核心线程数8, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), // 任务队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);public static void main(String[] args) throws Exception {System.out.println("主线程开始: " + Thread.currentThread().getName());// 方式1: 基础异步执行(无返回值)executor.execute(() -> {try {Thread.sleep(1000);System.out.println("异步任务1完成(无返回值)");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 方式2: 带返回值的异步执行(Future)Future<String> future = executor.submit(() -> {Thread.sleep(1500);return "异步任务2的结果";});// 方式3: 高级异步编程(CompletableFuture)CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);return "原始数据";} catch (InterruptedException e) {throw new RuntimeException(e);}}, executor)// 异步处理结果(链式调用).thenApplyAsync(data -> {System.out.println("处理中: " + data);return data + " -> 处理后的数据";}, executor)// 异步消费结果.thenAcceptAsync(result -> {System.out.println("最终结果: " + result);}, executor);// 主线程继续执行其他操作(非阻塞)System.out.println("主线程继续执行...");Thread.sleep(500);// 获取方式2的结果(会阻塞直到结果返回)System.out.println(future.get());// 等待方式3完成cf.get();// 关闭线程池executor.shutdown();executor.awaitTermination(1, TimeUnit.MINUTES);System.out.println("主线程结束");//默认使用forkJoinThreadPoolCompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);return "原始数据,默认使用forkJoinThreadPool";} catch (InterruptedException e) {throw new RuntimeException(e);}});// 方式1: get() - 阻塞等待结果(无超时)try {String result1 = futureTask.get();System.out.println("方式1结果: " + result1);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 方式2: get(long timeout, TimeUnit unit) - 带超时的阻塞等待try {// 超时设置为3秒String result2 = futureTask.get(3, TimeUnit.SECONDS);System.out.println("方式2结果: " + result2);} catch (InterruptedException | ExecutionException | TimeoutException e) {e.printStackTrace();}// 方式3: join() - 阻塞等待(无超时,只抛出未检查异常)try {String result3 = futureTask.join();System.out.println("方式3结果: " + result3);} catch (RuntimeException e) { // supplyAsync中抛出的RuntimeException会被捕获e.printStackTrace();}// 方式4: 非阻塞回调处理(推荐)futureTask.whenComplete((result, ex) -> {if (ex == null) {System.out.println("方式4结果: " + result);} else {System.out.println("方式4出错: " + ex.getMessage());}});// 等待回调执行完成(实际业务中无需此步骤,这里仅为演示)try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}