ThreadPoolTaskExecutor 的使用案例
1. 依赖说明
<dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId><version>1.3.1</version>
</dependency>
<dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>5.3.22</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version>
</dependency>
2. 完整配置代码
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.core.task.TaskDecorator;import java.util.concurrent.*;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.MDC;@Configuration
public class TaskExecutorConfig {@Bean(name = "taskExecutor")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setKeepAliveSeconds(60);BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(25);executor.setQueue(queue);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix("TaskExecutor-");executor.setTaskDecorator(new MdcTaskDecorator());executor.initialize();return executor;}public static RetryTemplate createRetryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy);FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(backOffPolicy);return retryTemplate;}public static class RetryCallableWrapper implements Runnable {private final Runnable targetTask;private final RetryTemplate retryTemplate;public RetryCallableWrapper(Runnable targetTask, RetryTemplate retryTemplate) {this.targetTask = targetTask;this.retryTemplate = retryTemplate;}@Overridepublic void run() {retryTemplate.execute(context -> {targetTask.run(); return null; });}}public static class MdcTaskDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {Map<String, String> contextMap = MDC.getCopyOfContextMap();return () -> {try {MDC.setContextMap(contextMap);runnable.run();} finally {MDC.clear();}};}}public void useTaskExecutor() {ThreadPoolTaskExecutor executor = taskExecutor();Runnable retryableTask = new RetryCallableWrapper(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务");throw new RuntimeException("模拟任务失败"); }, createRetryTemplate());executor.execute(retryableTask); MDC.put("traceId", "123456"); executor.execute(() -> {System.out.println("当前 traceId: " + MDC.get("traceId")); System.out.println(Thread.currentThread().getName() + " 执行任务");});}
}
3. 配置项详解
3.1 线程池核心配置
配置项 | 说明 | 推荐值/示例 |
---|
corePoolSize | 核心线程数 | CPU密集型:Runtime.getRuntime().availableProcessors() ;IO密集型:2 * CPU核心数 |
maxPoolSize | 最大线程数 | 根据业务负载调整(如 2 * corePoolSize ) |
keepAliveSeconds | 空闲线程存活时间 | 与任务平均执行时间匹配(如 60 秒) |
queue | 任务队列 | ArrayBlockingQueue(25) (有界队列)或 LinkedBlockingQueue (无界队列) |
rejectedExecutionHandler | 拒绝策略 | CallerRunsPolicy (调用线程执行)、AbortPolicy (抛异常)、DiscardPolicy (丢弃任务) |
threadNamePrefix | 线程名称前缀 | 便于日志追踪(如 "TaskExecutor-" ) |
3.2 上下文传递机制
- MDC(Mapped Diagnostic Context):SLF4J 提供的日志上下文工具,用于记录日志追踪 ID。
- TaskDecorator:Spring 提供的装饰器接口,用于在任务执行前后传递上下文。
- 实现方式:
- 在主线程中设置 MDC 上下文(如
MDC.put("traceId", "123456")
)。 - 通过
TaskDecorator
将上下文传递给异步任务。 - 任务执行时恢复上下文,确保日志可追踪。
3.3 重试机制
- RetryTemplate:Spring Retry 提供的重试模板,封装了重试逻辑。
- SimpleRetryPolicy:定义最大重试次数(如 3 次)。
- FixedBackOffPolicy:定义重试间隔(如 1 秒)。
- 使用方式:
- 将任务包装为
RetryCallableWrapper
。 - 通过
RetryTemplate
执行任务,自动处理重试逻辑。
4. 使用场景建议
- 日志追踪:通过 MDC 传递
traceId
,确保异步任务日志与主线程关联。 - 任务重试:适用于网络请求、数据库操作等需要自动重试的场景。
- 资源控制:通过队列和线程数限制,防止系统过载。
5. 注意事项
- 重试逻辑与异常处理:
- 默认对所有异常进行重试,可通过
SimpleRetryPolicy
自定义异常类型。 - 重试后仍失败的任务需通过日志或监控告警处理。
- 上下文传递的开销:
- 如果上下文较大(如包含复杂对象),需评估性能影响。
- 使用
InheritableThreadLocal
替代 MDC
可以更灵活地传递上下文。
- 线程池生命周期:
- 确保在应用关闭时正确关闭线程池(调用
executor.shutdown()
)。 - Spring 容器会自动关闭线程池,但手动创建时需显式调用。
- 队列容量与拒绝策略:
- 有界队列需合理设置容量,防止任务堆积。
- 拒绝策略应根据业务需求选择(如
CallerRunsPolicy
适合轻量任务)。
6. 扩展功能(可选)
- 动态调整线程池参数:通过
ThreadPoolTaskExecutor
提供的方法(如 setCorePoolSize()
)动态修改配置。 - 监控与调优:通过
ThreadPoolTaskExecutor
的统计方法(如 getPoolSize()
、getQueueSize()
)监控线程池状态。 - 自定义重试策略:结合
Spring Retry
实现更复杂的重试逻辑(如指数退避、重试条件判断)。
7. 代码结构图
TaskExecutorConfig
├── taskExecutor() // 配置线程池
│ ├── corePoolSize // 核心线程数
│ ├── maxPoolSize // 最大线程数
│ ├── keepAliveSeconds // 空闲线程存活时间
│ ├── queue // 任务队列
│ ├── rejectedHandler // 拒绝策略
│ └── setTaskDecorator // 上下文传递机制
├── createRetryTemplate() // 创建重试模板
├── RetryCallableWrapper // 重试任务包装器
└── useTaskExecutor() // 使用示例├── execute() // 提交普通任务└── submitListenable()// 提交带返回值任务
8. 总结
- 上下文传递:确保异步任务中能访问到主线程的上下文(如日志追踪 ID)。
- 任务重试:对失败任务自动重试,提升系统鲁棒性。
- 灵活的线程池管理:根据业务需求调整线程池参数,避免资源浪费或过载。