Spring ThreadPoolTaskExecutor 与 CompletableFuture.supplyAsync 常用实践
Spring ThreadPoolTaskExecutor 与 CompletableFuture.supplyAsync 常用实践
1. 基础配置
线程池配置
@Configuration
@EnableAsync
public class AsyncConfig {@Bean("taskExecutor")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100);executor.setThreadNamePrefix("Async-");executor.initialize();return executor;}
}
2. CompletableFuture.supplyAsync 核心用法
2.1 基础异步执行
@Service
public class AsyncService {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*** 最基本的异步执行*/public CompletableFuture<String> basicAsync(String task) {return CompletableFuture.supplyAsync(() -> {// 模拟业务处理try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "处理完成: " + task;}, taskExecutor);}
}
3. CompletableFuture 回调方法详解
3.1 thenApply - 结果转换
/*** thenApply: 任务完成后对结果进行转换* 同步执行,使用上一个任务的同一线程*/
public CompletableFuture<String> thenApplyExample(String input) {return CompletableFuture.supplyAsync(() -> {System.out.println("supplyAsync 线程: " + Thread.currentThread().getName());return "Hello " + input;}, taskExecutor).thenApply(result -> {System.out.println("thenApply 线程: " + Thread.currentThread().getName());return result.toUpperCase() + "!";});
}// 使用 thenApplyAsync 在指定线程池中异步执行
public CompletableFuture<String> thenApplyAsyncExample(String input) {return CompletableFuture.supplyAsync(() -> {return "Hello " + input;}, taskExecutor).thenApplyAsync(result -> {// 这个转换会在另一个线程中执行return result.toUpperCase() + "!";}, taskExecutor);
}
3.2 thenAccept - 消费结果
/*** thenAccept: 消费结果,不返回新值*/
public CompletableFuture<Void> thenAcceptExample(String input) {return CompletableFuture.supplyAsync(() -> {return "处理结果: " + input;}, taskExecutor).thenAccept(result -> {// 消费结果,比如保存到数据库、发送消息等System.out.println("消费结果: " + result);// 这里没有返回值});
}
3.3 thenRun - 完成后执行
/*** thenRun: 任务完成后执行操作,不关心结果*/
public CompletableFuture<Void> thenRunExample(String input) {return CompletableFuture.supplyAsync(() -> {return "处理: " + input;}, taskExecutor).thenRun(() -> {// 无论任务成功完成,都会执行这里System.out.println("任务执行完毕,进行清理操作");});
}
3.4 exceptionally - 异常处理
/*** exceptionally: 异常处理回调*/
public CompletableFuture<String> exceptionallyExample(String input) {return CompletableFuture.supplyAsync(() -> {if ("error".equals(input)) {throw new RuntimeException("模拟异常");}return "成功: " + input;}, taskExecutor).exceptionally(throwable -> {// 当任务出现异常时执行System.out.println("捕获异常: " + throwable.getMessage());return "默认返回值";});
}
3.5 whenComplete - 完成时回调
/*** whenComplete: 无论成功失败都会执行*/
public CompletableFuture<String> whenCompleteExample(String input) {return CompletableFuture.supplyAsync(() -> {if ("error".equals(input)) {throw new RuntimeException("模拟异常");}return "成功: " + input;}, taskExecutor).whenComplete((result, throwable) -> {if (throwable != null) {System.out.println("任务失败: " + throwable.getMessage());} else {System.out.println("任务成功: " + result);}});
}
4. 实际应用示例
4.1 完整的业务处理链
@Service
@Slf4j
public class UserService {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;public CompletableFuture<String> processUser(String userId) {return CompletableFuture.supplyAsync(() -> {log.info("开始查询用户信息: {}", userId);// 模拟数据库查询return "用户:" + userId;}, taskExecutor).thenApplyAsync(userInfo -> {log.info("处理用户数据: {}", userInfo);// 数据处理return userInfo + "-已处理";}, taskExecutor).thenApplyAsync(processedData -> {log.info("保存处理结果");// 保存结果return processedData + "-已保存";}, taskExecutor).exceptionally(throwable -> {log.error("处理用户失败: {}", userId, throwable);return "用户处理失败: " + throwable.getMessage();}).whenComplete((result, throwable) -> {if (throwable == null) {log.info("用户处理完成: {}", result);}});}
}
4.2 多个任务组合
/*** 组合多个异步任务*/
public CompletableFuture<String> combineTasks() {CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {return "任务1结果";}, taskExecutor);CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {return "任务2结果";}, taskExecutor);// 等待两个任务都完成后处理return task1.thenCombineAsync(task2, (result1, result2) -> {return result1 + " + " + result2;}, taskExecutor);
}
5. 控制器使用
@RestController
public class AsyncController {@Autowiredprivate UserService userService;@GetMapping("/user/{id}")public CompletableFuture<ResponseEntity<String>> getUser(@PathVariable String id) {return userService.processUser(id).thenApply(ResponseEntity::ok).exceptionally(throwable -> ResponseEntity.status(500).body("处理失败"));}@GetMapping("/user/callback/{id}")public String getUserWithCallback(@PathVariable String id) {userService.processUser(id).thenAccept(result -> {// 异步回调,比如发送消息通知System.out.println("异步通知: " + result);}).exceptionally(throwable -> {System.out.println("异步处理失败: " + throwable.getMessage());return null;});return "请求已接收,处理中...";}
}
6. 常用回调方法总结
| 方法 | 描述 | 是否接收结果 | 是否返回结果 |
|---|---|---|---|
thenApply() | 结果转换 | ✅ | ✅ |
thenAccept() | 结果消费 | ✅ | ❌ |
thenRun() | 后续操作 | ❌ | ❌ |
exceptionally() | 异常处理 | ❌ | ✅ |
whenComplete() | 完成回调 | ✅ | ❌ |
7. 注意事项
- 线程池选择: 始终指定自定义线程池,避免使用默认的 ForkJoinPool
- 异常处理: 每个异步链都要有异常处理
- 资源清理: 及时关闭不再需要的 CompletableFuture
- 避免阻塞: 回调方法中不要执行长时间阻塞操作
这样配置和使用可以满足大部分异步处理场景,代码简洁且易于维护。
