线程池拒绝策略踩坑
线程池拒绝策略
当前项目中一般都会使用到线程池,使用线程池的时候,我一般选择拒绝策略都是ThreadPoolExecutor.CallerRunsPolicy()
是让主线程去执行,这样我的任务不会丢,最多就是主线程执行会慢一些,这样会有哪些问题呢
线程数据传递
当前项目中在线程的ThreadLocal
里面都会放数据,什么时候会用到呢,比如logid,想记录一个请求进来之后都进行了哪些操作,根据logId进行追踪,对于sl4j
来说,可以在MDC
进行设置对应的值,然后在log打印的时候进行配置(这里不展开说),或者别的场景需要存放到线程的私有变量,那么对于用到线程池的怎么办,数据怎么传递进去呢,我这里使用的是手动设置,在开启线程之前获取到,线程里面再设置,线程结束进行清除,那么怎么更方便些呢,就是自定义线程池,来处理这些可以做到无感知传递
自定义线程池处理数据传递
具体怎么做呢,
- 自定义线程池
在任务执行之前,获取对应的数据,然后传递到要开启的任务里面,在任务开始之前进行设置进去,然后结束进行移除
@Slf4j
public class MyExecutor extends ThreadPoolTaskExecutor {@Overridepublic void execute(@NotNull Runnable task) {String traceId = TraceUtil.getTraceId();super.execute(new Runner(task, Thread.currentThread(), traceId));}@NotNull@Overridepublic Future<?> submit(@NotNull Runnable task) {String traceId = TraceUtil.getTraceId();return super.submit(new Runner(task, Thread.currentThread(),traceId));}@NotNull@Overridepublic <T> Future<T> submit(@NotNull Callable<T> task) {String traceId = TraceUtil.getTraceId();return super.submit(new Caller(task, Thread.currentThread(),traceId));}
}
上面需要的自定义的runner,主要执行之前设置,结束之后移除对应的数据
@Slf4j
public class Runner implements Runnable {private final Thread parentThread;private final Runnable runnable;private final String traceId;public Runner(Runnable runnable, Thread parentThread, String traceId) {this.runnable = runnable;this.parentThread = parentThread;this.traceId = traceId;}@Overridepublic void run() {//这里是子线程TraceUtil.setCacheTraceId(traceId);try {runnable.run();} catch (Exception e) {log.error("异步任务异常 {}", e.getMessage());throw e;} finally {TraceUtil.clearCacheTraceId();}}
}
public class TraceUtil {public static String getTraceId() {String traceId = MDC.get(Constant.REQUEST_LOG_KEY);if(StringUtils.isBlank(traceId)){traceId = UUID.randomUUID().toString();MDC.put(Constant.REQUEST_LOG_KEY, traceId);}return traceId;}public static void setCacheTraceId(String traceId) {MDC.put(Constant.REQUEST_LOG_KEY, traceId);}public static void clearCacheTraceId() {MDC.remove(Constant.REQUEST_LOG_KEY);}
}
上面就是我之前使用的手动传递的方法,这样后续就可以直接创建一个配置文件自定义线程池
@Bean(value = "myTaskExecutors")public Executor jobStreetTaskExecutors() {ThreadPoolTaskExecutor executor = new MyExecutor();// 核心线程数:线程池创建时候初始化的线程数executor.setCorePoolSize(3);// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程executor.setMaxPoolSize(10);// 缓冲队列:用来缓冲执行任务的队列executor.setQueueCapacity(0);// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);
// executor.setDaemon(true);// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁executor.setKeepAliveSeconds(60);// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池executor.setThreadNamePrefix("myTaskExecutors-pool-");// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}
比如这样,我需要线程池的时候进行注入使用就可以了,常规下是没问题的,但是,我遇到了奇怪的bug
遇到的问题
直接说结论吧,当任务很多的时候,执行到拒绝策略,这时候由主线程执行,主线程执行完之后,会把线程变量清除,这时候就把主线程自己的数据清了,存的数据,logId这些就没了
处理方案
怎么处理呢,不能清除主线程的数据,那就要判断下当前执行的线程跟开启线程的是不是同一个,可以对runner进行优化,可以在开启线程的时候把当前线程传递进去,然后在清除的时候判断下当前线程跟父线程是不是同一个线程,如果是同一个代表走了拒绝策略,这时候不能清除数据
@Overridepublic void run() {//这里是子线程TraceUtil.setCacheTraceId(traceId);try {runnable.run();} catch (Exception e) {log.error("异步任务异常 {}", e.getMessage());throw e;} finally {if (parentThread != Thread.currentThread()) {TraceUtil.clearCacheTraceId();}}}