当前位置: 首页 > news >正文

ThreadPoolTaskExecutor 的使用案例

ThreadPoolTaskExecutor 的使用案例

1. 依赖说明

<!-- Spring Retry(用于任务重试) -->
<dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId><version>1.3.1</version>
</dependency>
<dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>5.3.22</version>
</dependency>
<!-- SLF4J MDC(用于上下文传递) -->
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version>
</dependency>

2. 完整配置代码

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.core.task.TaskDecorator;import java.util.concurrent.*;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.MDC;@Configuration
public class TaskExecutorConfig {/*** 配置线程池 Bean,名称为 "taskExecutor"* 包含上下文传递机制和重试机制*/@Bean(name = "taskExecutor")public ThreadPoolTaskExecutor taskExecutor() {// 创建线程池实例ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();/*** 核心线程数:线程池中始终保留的线程数* - 建议根据任务类型调整:*   - CPU密集型任务:设为 CPU 核心数(如 Runtime.getRuntime().availableProcessors())*   - IO密集型任务:设为 2 * CPU 核心数 或更高*/executor.setCorePoolSize(5);/*** 最大线程数:线程池中允许的最大线程数* - 当队列满时,会创建新线程,直到达到 maxPoolSize* - 超出后触发拒绝策略*/executor.setMaxPoolSize(10);/*** 空闲线程存活时间(单位秒):* - 当线程数超过 corePoolSize 时,多余的线程在空闲 keepAliveSeconds 后会被回收* - 通常设置为与任务平均执行时间相当*/executor.setKeepAliveSeconds(60);/*** 任务队列:用于缓存等待执行的任务* - ArrayBlockingQueue:有界队列,容量为 25(可根据业务需求调整)* - LinkedBlockingQueue:无界队列(不推荐,可能造成内存泄漏)* - SynchronousQueue:无容量队列,适合直接传递任务(如 newCachedThreadPool)*/BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(25);executor.setQueue(queue);/*** 拒绝策略:当队列满且线程数达到最大时,新任务如何处理* - CallerRunsPolicy:由调用线程(提交任务的线程)执行任务,避免丢弃* - AbortPolicy:抛出 RejectedExecutionException(默认)* - DiscardPolicy:直接丢弃任务* - DiscardOldestPolicy:丢弃队列中最旧的任务,尝试重新提交当前任务*/executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());/*** 线程名称前缀:便于日志追踪和线程监控* - 示例输出:TaskExecutor-1, TaskExecutor-2, ...*/executor.setThreadNamePrefix("TaskExecutor-");/*** 上下文传递机制:使用 TaskDecorator 传递 MDC 上下文(如日志追踪 ID)* - MDC(Mapped Diagnostic Context)是 SLF4J 提供的日志上下文工具* - 通过 TaskDecorator,在任务执行前后传递上下文信息*/executor.setTaskDecorator(new MdcTaskDecorator());/*** 初始化线程池* - Spring 容器会自动调用此方法,但显式调用更安全*/executor.initialize();return executor;}/*** 创建重试模板(最大重试3次,间隔1秒)* - 用于包装任务,使其具备重试能力*/public static RetryTemplate createRetryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();/*** 重试策略:定义重试条件和最大重试次数* - SimpleRetryPolicy:固定次数重试* - maxAttempts:最大重试次数(包含首次尝试)*/SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3); // 最多重试3次(含首次执行)retryTemplate.setRetryPolicy(retryPolicy);/*** 重试间隔策略:定义重试之间的等待时间* - FixedBackOffPolicy:固定间隔(如 1 秒)* - ExponentialBackOffPolicy:指数退避(间隔递增)*/FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000); // 每次重试间隔 1 秒retryTemplate.setBackOffPolicy(backOffPolicy);return retryTemplate;}/*** 重试任务包装器:将任务包装为支持重试的CallableWrapper* - 通过 RetryTemplate 执行重试逻辑*/public static class RetryCallableWrapper implements Runnable {private final Runnable targetTask;private final RetryTemplate retryTemplate;public RetryCallableWrapper(Runnable targetTask, RetryTemplate retryTemplate) {this.targetTask = targetTask;this.retryTemplate = retryTemplate;}@Overridepublic void run() {retryTemplate.execute(context -> {targetTask.run(); // 执行原始任务return null; // 返回类型无关紧要(Void)});}}/*** 上下文传递装饰器:用于传递 MDC 上下文(如日志追踪 ID)* - 通过 TaskDecorator 在任务执行前后传递上下文* - 确保异步任务中能访问到主线程的上下文*/public static class MdcTaskDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {// 获取当前线程的 MDC 上下文(如 traceId)Map<String, String> contextMap = MDC.getCopyOfContextMap();return () -> {try {// 恢复上下文(如 traceId)MDC.setContextMap(contextMap);// 执行任务runnable.run();} finally {// 清理上下文(可选)MDC.clear();}};}}/*** 使用示例:提交任务到线程池* - 演示上下文传递和重试机制*/public void useTaskExecutor() {ThreadPoolTaskExecutor executor = taskExecutor();// 示例1:提交带重试的任务Runnable retryableTask = new RetryCallableWrapper(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务");throw new RuntimeException("模拟任务失败"); // 模拟失败}, createRetryTemplate());executor.execute(retryableTask); // 自动重试3次// 示例2:提交任务并传递上下文(如 MDC 日志追踪 ID)MDC.put("traceId", "123456"); // 设置日志追踪 IDexecutor.execute(() -> {System.out.println("当前 traceId: " + MDC.get("traceId")); // 输出 123456System.out.println(Thread.currentThread().getName() + " 执行任务");});// 关闭线程池(Spring 容器会自动关闭)// executor.shutdown();}
}

3. 配置项详解

3.1 线程池核心配置
配置项说明推荐值/示例
corePoolSize核心线程数CPU密集型:Runtime.getRuntime().availableProcessors();IO密集型:2 * CPU核心数
maxPoolSize最大线程数根据业务负载调整(如 2 * corePoolSize
keepAliveSeconds空闲线程存活时间与任务平均执行时间匹配(如 60 秒)
queue任务队列ArrayBlockingQueue(25)(有界队列)或 LinkedBlockingQueue(无界队列)
rejectedExecutionHandler拒绝策略CallerRunsPolicy(调用线程执行)、AbortPolicy(抛异常)、DiscardPolicy(丢弃任务)
threadNamePrefix线程名称前缀便于日志追踪(如 "TaskExecutor-"
3.2 上下文传递机制
  • MDC(Mapped Diagnostic Context):SLF4J 提供的日志上下文工具,用于记录日志追踪 ID。
  • TaskDecorator:Spring 提供的装饰器接口,用于在任务执行前后传递上下文。
  • 实现方式
    1. 在主线程中设置 MDC 上下文(如 MDC.put("traceId", "123456"))。
    2. 通过 TaskDecorator 将上下文传递给异步任务。
    3. 任务执行时恢复上下文,确保日志可追踪。
3.3 重试机制
  • RetryTemplate:Spring Retry 提供的重试模板,封装了重试逻辑。
  • SimpleRetryPolicy:定义最大重试次数(如 3 次)。
  • FixedBackOffPolicy:定义重试间隔(如 1 秒)。
  • 使用方式
    1. 将任务包装为 RetryCallableWrapper
    2. 通过 RetryTemplate 执行任务,自动处理重试逻辑。

4. 使用场景建议

  1. 日志追踪:通过 MDC 传递 traceId,确保异步任务日志与主线程关联。
  2. 任务重试:适用于网络请求、数据库操作等需要自动重试的场景。
  3. 资源控制:通过队列和线程数限制,防止系统过载。

5. 注意事项

  1. 重试逻辑与异常处理
    • 默认对所有异常进行重试,可通过 SimpleRetryPolicy 自定义异常类型。
    • 重试后仍失败的任务需通过日志或监控告警处理。
  2. 上下文传递的开销
    • 如果上下文较大(如包含复杂对象),需评估性能影响。
    • 使用 InheritableThreadLocal 替代 MDC 可以更灵活地传递上下文。
  3. 线程池生命周期
    • 确保在应用关闭时正确关闭线程池(调用 executor.shutdown())。
    • Spring 容器会自动关闭线程池,但手动创建时需显式调用。
  4. 队列容量与拒绝策略
    • 有界队列需合理设置容量,防止任务堆积。
    • 拒绝策略应根据业务需求选择(如 CallerRunsPolicy 适合轻量任务)。

6. 扩展功能(可选)

  1. 动态调整线程池参数:通过 ThreadPoolTaskExecutor 提供的方法(如 setCorePoolSize())动态修改配置。
  2. 监控与调优:通过 ThreadPoolTaskExecutor 的统计方法(如 getPoolSize()getQueueSize())监控线程池状态。
  3. 自定义重试策略:结合 Spring Retry 实现更复杂的重试逻辑(如指数退避、重试条件判断)。

7. 代码结构图

TaskExecutorConfig
├── taskExecutor()        // 配置线程池
│   ├── corePoolSize      // 核心线程数
│   ├── maxPoolSize       // 最大线程数
│   ├── keepAliveSeconds  // 空闲线程存活时间
│   ├── queue             // 任务队列
│   ├── rejectedHandler   // 拒绝策略
│   └── setTaskDecorator  // 上下文传递机制
├── createRetryTemplate() // 创建重试模板
├── RetryCallableWrapper // 重试任务包装器
└── useTaskExecutor()     // 使用示例├── execute()         // 提交普通任务└── submitListenable()// 提交带返回值任务

8. 总结

  • 上下文传递:确保异步任务中能访问到主线程的上下文(如日志追踪 ID)。
  • 任务重试:对失败任务自动重试,提升系统鲁棒性。
  • 灵活的线程池管理:根据业务需求调整线程池参数,避免资源浪费或过载。
http://www.dtcms.com/a/269482.html

相关文章:

  • 【PTA数据结构 | C语言版】求单链表list中的元素个数,即表长
  • NumPy-随机数生成详解
  • AI编程的未来是智能体原生开发?
  • JavaSE-继承
  • UI前端与数字孪生结合实践案例:智慧零售的库存管理优化系统
  • 算法学习笔记:10.Prim 算法——从原理到实战,涵盖 LeetCode 与考研 408 例题
  • 【Mac】实现Docker下载安装【正在逐步完善】
  • 【论文阅读】CogVideoX: Text-to-Video Diffusion Models with An Expert Transformer
  • 【计算机基础理论知识】C++篇(一)
  • 暑假读书笔记第四天
  • 【Python-GEE】如何利用Landsat时间序列影像通过调和回归方法提取农作物特征并进行分类
  • python transformers库笔记(BertForTokenClassification类)
  • 【牛客刷题】小红的与运算
  • node.js中yarn、npm、cnpm详解
  • 精益管理与数字化转型的融合:中小制造企业降本增效的双重引擎
  • 算法训练营DAY29 第八章 贪心算法 part02
  • 实战Linux进程状态观察:R、S、D、T、Z状态详解与实验模拟
  • 联通线路物理服务器选择的关键要点
  • No Hack No CTF 2025Web部分个人WP
  • Django双下划线查询
  • 微信小程序控制空调之接收MQTT消息
  • 如何利用AI大模型对已有创意进行评估,打造杀手级的广告创意
  • deepseek实战教程-第九篇开源模型智能体开发框架solon-ai
  • Python爬取知乎评论:多线程与异步爬虫的性能优化
  • React18+TypeScript状态管理最佳实践
  • Jenkins 使用宿主机的Docker
  • 深入解析 structuredClone API:现代JS深拷贝的终极方案
  • Ubuntu 版本号与别名对照表(部分精选)
  • Java使用接口AES进行加密+微信小程序接收解密
  • Linux Ubuntu系统下载