TransmittableThreadLocal(父子线程传递ThreadLocal)
一、背景介绍
在业务开发中,时常需要获取一些用户信息,而这些用户信息,通常是线程级别的缓存,通常使用TreadLocal进行存储,但是对于某些,请求本身使用异步线程处理,在子线程里面再去获取用户信息,会导致获取不到,因为ThreadLocal本身是不支持主子线程之间的传递。
二、解决方案
通常的做法有:
1.主动取出ThreadLocal中的变量,声明为局部变量,通过变量传入,这个相对来说最易于实现,但是不太能够复用(虽然没那么灵活,但是踩坑几率还是比较小),并且如果涉及一些aop的功能来说,改造比较麻烦;
2.使用InheritableThreadLocal(在线程池化之后,容易踩坑);
InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();threadLocal.set(StpUtil.getTokenValue());CompletableFuture.runAsync(() -> {StpUtil.setTokenValue(threadLocal.get()); // 设置初始 token}, executorService).exceptionally(throwable -> {log.error("执行业务操作时发生错误:", throwable);});
3.使用可传输的TransmittableThreadLocal(本次介绍的主角);
mock代码示例:
初始化线程池:
/*** dataTaskExecutor 执行任务*/@Bean(name = "dataTaskExecutor")public Executor mainTaskExecutor() {
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setCorePoolSize(12);
// executor.setMaxPoolSize(16);
// executor.setQueueCapacity(100);
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// executor.setKeepAliveSeconds(30);
// executor.setThreadNamePrefix("data-task-executor-");
// executor.initialize();
// return executor;
// 模拟线程池复用,我们将核心线程和最大线程都设置1,方便模拟线程复用的情况ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1); // 只留一个核心线程executor.setMaxPoolSize(1); // 最大线程数也限制为1executor.setQueueCapacity(100);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());executor.setKeepAliveSeconds(30);executor.setThreadNamePrefix("data-task-executor-");executor.initialize();return executor;}@Bean(name = "optDataTaskExecutor")public Executor dataTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(100);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());executor.setKeepAliveSeconds(30);executor.setThreadNamePrefix("data-task-executor-");executor.initialize();// 用 TTL 包装 Spring 的 Executor 本身return TtlExecutors.getTtlExecutor(executor);}
测试用例代码:
@Slf4j
public class ThreadLocalTest {private static final ThreadLocal<String> normalThreadLocal = new ThreadLocal<>();private static final ThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();private static final TransmittableThreadLocal<String> ttlThreadLocal = new TransmittableThreadLocal<>();private static final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();@Autowiredprivate Executor dataTaskExecutor;@Autowiredprivate Executor optDataTaskExecutor;/*** 测试普通 ThreadLocal 在线程池中的表现*/@Testpublic void testNormalThreadLocal() throws Exception {normalThreadLocal.set("main-thread-value");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.info("普通 ThreadLocal 子线程取值: {}", normalThreadLocal.get());}, dataTaskExecutor);future.get();}/*** 测试 InheritableThreadLocal 在线程池中的表现*/@Testpublic void testInheritableThreadLocal() throws Exception {inheritableThreadLocal.set("main-thread-value");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.info("InheritableThreadLocal 子线程取值: {}", inheritableThreadLocal.get());}, dataTaskExecutor);future.get();}/*** 测试 TransmittableThreadLocal 在线程池中的表现(未包装任务时)*/@Testpublic void testTTLWithoutWrapper() throws Exception {ttlThreadLocal.set("main-thread-value");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.info("TTL (未包装) 子线程取值: {}", ttlThreadLocal.get());}, dataTaskExecutor);future.get();}/*** 测试 TransmittableThreadLocal 在线程池中的表现(包装任务时)*/@Testpublic void testTTLWithWrapper() throws Exception {ttlThreadLocal.set("main-thread-value");Runnable task = () -> log.info("TTL (包装) 子线程取值: {}", ttlThreadLocal.get());// 使用 TTL 提供的包装器CompletableFuture<Void> future = CompletableFuture.runAsync(TtlRunnable.get(task), dataTaskExecutor);future.get();}/*** 测试InheritableThreadLocal线程池复用场景:先提交一次,再切换主线程变量,再提交任务*/@Testpublic void testInheritableThreadLocalReuse() throws Exception {inheritableThreadLocal.set("first-value");Runnable task = () -> log.info("InheritableThreadLocal 子线程取值: {}", inheritableThreadLocal.get());// 第一次任务CompletableFuture<Void> future1 = CompletableFuture.runAsync(task, dataTaskExecutor);future1.get();// 修改主线程变量inheritableThreadLocal.set("second-value");// 第二次任务(线程池线程可能复用)CompletableFuture<Void> future2 = CompletableFuture.runAsync(task, dataTaskExecutor);future2.get();}/*** 测试TransmittableThreadLocal线程池复用场景:先提交一次,再切换主线程变量,再提交任务,不使用包装类get*/@Testpublic void testTTLWithoutWrapperReuse() throws Exception {ttlThreadLocal.set("first-value");Runnable task = () -> log.info("TTL (未包装) 子线程取值: {}", ttlThreadLocal.get());// 第一次任务CompletableFuture<Void> future1 = CompletableFuture.runAsync(task, dataTaskExecutor);future1.get();// 修改主线程变量ttlThreadLocal.set("second-value");// 第二次任务(线程池线程可能复用)CompletableFuture<Void> future2 = CompletableFuture.runAsync(task, dataTaskExecutor);future2.get();}/*** 测试TransmittableThreadLocal线程池复用场景:先提交一次,再切换主线程变量,再提交任务*/@Testpublic void testThreadPoolReuseWithTTL() throws Exception {ttlThreadLocal.set("first-value");Runnable task = () -> log.info("线程池复用 -> TTL 子线程取值: {}", ttlThreadLocal.get());// 第一次任务CompletableFuture<Void> future1 = CompletableFuture.runAsync(TtlRunnable.get(task), dataTaskExecutor);future1.get();// 修改主线程的变量ttlThreadLocal.set("second-value");// 第二次任务(线程池线程可能复用)CompletableFuture<Void> future2 = CompletableFuture.runAsync(TtlRunnable.get(task), dataTaskExecutor);future2.get();}/*** 测试包装线程池*/@Testpublic void testCompletableFutureWithTtlExecutor() throws Exception {try {// 父线程设置上下文context.set("user-akihito");log.info("[父线程] 设置上下文: {}", context.get());// 提交第一个异步任务CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {try {log.info("[子线程1] 读取上下文: {}", context.get());} finally {// 子线程执行完要清理,避免内存泄漏context.remove();log.info("[子线程1] 清理上下文");}}, optDataTaskExecutor);// 提交第二个异步任务CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {try {log.info("[子线程2] 读取上下文: {}", context.get());} finally {// 子线程执行完要清理,避免内存泄漏context.remove();log.info("[子线程2] 清理上下文");}}, optDataTaskExecutor);} finally {// 父线程清理上下文context.remove();log.info("[父线程] 清理上下文");}Thread.sleep(3000L);}/*** 模拟最佳实践*/@Testpublic void testCompletableFutureWithTTL() throws Exception {try {// 父线程设置上下文(相当于请求进入时)context.set("user-akihito");log.info("[父线程] 设置上下文: {}", context.get());// CompletableFuture 异步执行任务CompletableFuture.runAsync(TtlRunnable.get(() -> {try {log.info("[子线程] 读取上下文: {}", context.get());} finally {// 子线程执行完要清理,避免内存泄漏context.remove();log.info("[子线程] 清理上下文");}}, true), dataTaskExecutor);} finally {// 父线程请求结束时清理context.remove();log.info("[父线程] 清理上下文");}Thread.sleep(3000L);}
}
三、TransmittableThreadLocal小结
实现主子线程之间传递,TransmittableThreadLocal提供了两种方式,第一种是对线程进行装饰器增强(TtlRunnable),第二种则是对于线程池提供装饰器增强(TtlExecutors);两种方式均可以使用到TransmittableThreadLocal的增强;
需要注意的点,如果有TransmittableThreadLocal的set操作,务必要进行remove操作,否则会导致内存溢出,如果主线程set value,主线程中一定要remove;如果子线程没有set value,仅使用TransmittableThreadLocal的get方法,可以不进行remove操作;
关于TransmittableThreadLocal,它实现原理,是通过capture(遍历注册过的 ThreadLocal,生成快照),replay(把捕获的值写回子线程的 ThreadLocal),restore(任务结束后把原来的值写回去)三个方法,来实现提交任务时动态捕获值,而对于InheritableThreadLocal,仅会在线程创建时,复制一父线程的值,之后不会再感知父线程的变化;
private static final Transmittee<HashMap<ThreadLocal<Object>, Object>, HashMap<ThreadLocal<Object>, Object>> threadLocalTransmittee =new Transmittee<HashMap<ThreadLocal<Object>, Object>, HashMap<ThreadLocal<Object>, Object>>() {@NonNull@Overridepublic HashMap<ThreadLocal<Object>, Object> capture() {final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = newHashMap(threadLocalHolder.size());for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();final TtlCopier<Object> copier = entry.getValue();threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));}return threadLocal2Value;}@NonNull@Overridepublic HashMap<ThreadLocal<Object>, Object> replay(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {final HashMap<ThreadLocal<Object>, Object> backup = newHashMap(captured.size());for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();backup.put(threadLocal, threadLocal.get());final Object value = entry.getValue();if (value == threadLocalClearMark) threadLocal.remove();else threadLocal.set(value);}return backup;}@NonNull@Overridepublic HashMap<ThreadLocal<Object>, Object> clear() {final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = newHashMap(threadLocalHolder.size());for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();threadLocal2Value.put(threadLocal, threadLocalClearMark);}return replay(threadLocal2Value);}@Overridepublic void restore(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();threadLocal.set(entry.getValue());}}};
另外:
JVM 在创建子线程时,会把父线程 inheritableThreadLocals 整个 ThreadLocalMap 复制一份。
而这个 Map 里保存的是所有 ITL,而不是“用到的”,所以不是“按需复制”,而是“全量复制”。如果本身依赖的框架中,有使用到InheritableThreadLocal的地方,都会进行复制,复制的对象通常会更多,因为本身是java核心库的实现,会有更多的框架会使用并且依赖于InheritableThreadLocal;
对于TransmittableThreadLocal,仅会对当前holder中的值进行复制,相对于InheritableThreadLocal来说,依赖于TransmittableThreadLocal的三方框架会更少,会有更少的内存占用;