Spring 线程
以下是 Spring 中线程高级用法的详细说明,涵盖复杂场景的线程池管理、任务编排、线程上下文传递及性能优化:
1. 动态线程池配置
通过 ThreadPoolTaskExecutor
实现运行时动态调整参数(需配合监控系统如 Prometheus)。
@Bean("dynamicExecutor")
public ThreadPoolTaskExecutor dynamicExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Dynamic-");
executor.setAllowCoreThreadTimeOut(true); // 允许核心线程超时回收
executor.setKeepAliveSeconds(60); // 非核心线程空闲存活时间
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
// 动态调整线程池参数
@Autowired
private ThreadPoolTaskExecutor dynamicExecutor;
public void adjustPoolSize(int coreSize, int maxSize) {
dynamicExecutor.setCorePoolSize(coreSize);
dynamicExecutor.setMaxPoolSize(maxSize);
dynamicExecutor.setKeepAliveSeconds(30);
}
2. 复杂任务编排
使用 CompletableFuture
实现多任务并行执行和结果聚合。
@Async("dynamicExecutor")
public CompletableFuture<String> fetchUserData(Long userId) {
// 模拟远程调用
return CompletableFuture.completedFuture("User-" + userId);
}
@Async("dynamicExecutor")
public CompletableFuture<String> fetchOrderData(Long userId) {
// 模拟远程调用
return CompletableFuture.completedFuture("Order-" + userId);
}
// 合并两个异步任务结果
public CompletableFuture<Map<String, String>> getUserInfo(Long userId) {
CompletableFuture<String> userFuture = fetchUserData(userId);
CompletableFuture<String> orderFuture = fetchOrderData(userId);
return userFuture.thenCombine(orderFuture, (user, order) -> {
Map<String, String> result = new HashMap<>();
result.put("user", user);
result.put("order", order);
return result;
}).exceptionally(ex -> {
// 统一异常处理
return Collections.singletonMap("error", ex.getMessage());
});
}
3. 线程上下文传递
处理跨线程的上下文信息(如安全上下文、Trace ID)。
方案一:使用 TaskDecorator
@Bean("contextAwareExecutor")
public ThreadPoolTaskExecutor contextAwareExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setTaskDecorator(new ContextCopyingDecorator());
executor.initialize();
return executor;
}
// 自定义装饰器复制上下文
public static class ContextCopyingDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 获取父线程上下文(如 SecurityContext、MDC)
SecurityContext context = SecurityContextHolder.getContext();
Map<String, String> mdc = MDC.getCopyOfContextMap();
return () -> {
try {
// 将上下文设置到子线程
SecurityContextHolder.setContext(context);
if (mdc != null) MDC.setContextMap(mdc);
runnable.run();
} finally {
// 清理子线程上下文
SecurityContextHolder.clearContext();
MDC.clear();
}
};
}
}
方案二:使用 ThreadLocal
+ AOP
// 定义线程局部变量
private static final ThreadLocal<User> currentUser = new ThreadLocal<>();
// 通过切面自动传递
@Aspect
@Component
public class ThreadLocalAspect {
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object propagateThreadLocal(ProceedingJoinPoint pjp) throws Throwable {
User user = currentUser.get();
return CompletableFuture.runAsync(() -> {
try {
currentUser.set(user);
pjp.proceed();
} finally {
currentUser.remove();
}
});
}
}
4. 响应式线程池整合
与 WebFlux 或 Reactor 配合使用 Schedulers。
// 配置 Reactor 调度器
@Bean
public Scheduler boundedElasticScheduler() {
return Schedulers.newBoundedElastic(
50, // 线程数上限
100, // 任务队列容量
"reactor-scheduler"
);
}
// 在 Service 中使用
public Mono<String> reactiveTask() {
return Mono.fromCallable(() -> {
// 阻塞操作(自动切换到指定调度器)
return "Result from " + Thread.currentThread().getName();
}).subscribeOn(boundedElasticScheduler());
}
5. 线程池监控与管理
通过 Actuator 或自定义监控暴露线程池指标。
注册线程池指标
@Bean
public MeterBinder taskExecutorMetrics(ThreadPoolTaskExecutor executor) {
return (registry) -> {
ThreadPoolExecutor pool = executor.getThreadPoolExecutor();
Gauge.builder("thread.pool.active", pool::getActiveCount)
.register(registry);
Gauge.builder("thread.pool.queue.size", () -> pool.getQueue().size())
.register(registry);
};
}
通过 JMX 管理
@Bean
public ThreadPoolTaskExecutor jmxExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setThreadNamePrefix("JMX-");
executor.setBeanName("jmxThreadPool"); // 暴露为 MBean
executor.initialize();
return executor;
}
6. 高级异常处理
结合全局异常处理器和熔断机制。
// 自定义 Async 异常处理器
@Bean
public AsyncUncaughtExceptionHandler asyncExceptionHandler() {
return (ex, method, params) -> {
// 发送报警或记录详细日志
log.error("Async method '{}' failed: {}", method.getName(), ex.getMessage());
// 结合熔断器(如 Resilience4j)
CircuitBreakerRegistry.ofDefaults().circuitBreaker(method.getName())
.onError(ex.getCause());
};
}
// 在配置类中注册
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return asyncExceptionHandler();
}
}
7. 性能优化技巧
-
避免线程饥饿:为不同任务类型分配独立线程池(如 IO 密集型 vs CPU 密集型)
-
合理设置队列容量:根据任务特点选择
LinkedBlockingQueue
或SynchronousQueue
-
使用
@Async
的限定符:为不同场景指定不同线程池
@Async("ioTaskExecutor")
public void ioIntensiveTask() { ... }
@Async("cpuTaskExecutor")
public void cpuIntensiveTask() { ... }
关闭线程池:在应用关闭时主动回收资源
@PreDestroy
public void destroy() {
dynamicExecutor.shutdown();
}
总结
Spring 线程高级用法需要关注以下核心问题:
1、线程池精细化控制:动态调整、资源隔离、队列策略
2、上下文一致性:安全、日志跟踪、事务的跨线程传递
3、系统可观测性:指标监控、异常追踪、熔断降级
4、性能与稳定性平衡:避免阻塞、死锁、资源泄漏