【PmHub后端篇】PmHub整合TransmittableThreadLocal (TTL)缓存用户数据
1 相关理论知识
1.1 ThreadLocal简介
1.1.1 ThreadLocal是什么
ThreadLocal是Java中lang
包下的一个类,用于解决多线程下共享变量并发问题。它为每个线程维护独立的变量副本,即同一个变量在不同线程下可赋予不同值,避免多个线程同时访问同一个变量时的冲突。
1.1.2 ThreadLocal与synchronized的区别
synchronized基于锁机制,控制对共享资源的访问,确保线程间数据的一致性和安全性,实现线程间的互斥访问。而ThreadLocal采用空间换时间的方式,为每个线程提供变量副本,实现线程隔离。
synchronized是时间换空间让多个线程排队访问,ThreadLocal是空间换时间为每个线程提供了一份变量的副本,从而实现线程隔离。
1.1.3 ThreadLocal使用场景
ThreadLocal 主要用于实现线程间的数据隔离,其使用场景比较多,下面列举几个比较常见的场景:
- 用户会话信息:在web应用中,每个请求在独立线程中处理时,可使用ThreadLocal存储用户会话信息,防止不同请求线程间的数据混淆。
- 数据库连接管理:在线程中保存数据库连接,使每个线程拥有自己的数据库连接实例,避免连接共享问题,提高性能。
- 格式化工具:像SimpleDateFormat这类线程不安全的工具,可使用ThreadLocal为每个线程提供独立实例,避免线程安全问题。
- 日志上下文信息传递:在日志记录中,使用ThreadLocal存储请求ID、用户ID等上下文信息,在不同日志记录中共享这些信息。
1.2 ThreadLocal原理
1.2.1 ThreadLocal的内部结构
ThreadLocal
是一个泛型类,主要作用是提供一个用于存储线程局部变量的容器。每个线程都有一个 ThreadLocalMap
对象,可以用来存储该线程的所有 ThreadLocal
实例及其对应的值。
其内部主要有三个方法:
get()
:获取线程的threadlocalmap
,根据key(当前threadlocal
)获取值并返回,若map为空或获取不到值则返回默认值。set(T value)
:先获取当前线程,获取线程的threadlocalmap
(若获取不到则创建一个map),将当前ThreadLocal
作为key,value作为值进行设置。initialValue
:设置默认初始值,可被继承。
1.2.2 ThreadLocalMap基本结构
ThreadLocalMap包含两个重要部分:一是threadlocal
静态内部类;二是key为threadlocal
对象的弱引用,目的是将threadlocal对象的生命周期和线程的生命周期解绑。
1.3 TransmittableThreadLocal (TTL)介绍
TTL是阿里巴巴开源的工具库,用于解决Java中ThreadLocal在使用线程池或其他多线程框架(如Executors、ForkJoinPool等)时无法传递父线程上下文的问题。其开源地址为https://github.com/alibaba/transmittable-thread-local 。整个TransmittableThreadLocal库核心功能代码量约1000 SLOC,较为精简。
1.3.1 TTL实现原理
在Java多线程编程中,ThreadLocal在使用线程池时,由于线程复用会导致变量在父线程和子线程之间无法正确传递。InheritableThreadLocal虽能传递父线程变量给子线程,但在线程池环境下仍无法解决线程复用问题。
TTL的工作原理主要包括以下三点:
- 上下文拷贝:任务提交时,TTL会拷贝当前线程的上下文到任务中。
- 任务执行前设置上下文:在任务执行前,TTL将拷贝的上下文设置到当前线程中。
- 任务执行后清理上下文:任务执行完毕后,TTL清理线程中的上下文,防止内存泄漏。
以下是结合图片对TransmittableThreadLocal(TTL)工作流程的说明:
-
创建和设置TTL对象
- 步骤1:
createTtl()
:业务代码(Biz Code)首先创建一个TransmittableThreadLocal
对象。这是使用TTL来传递上下文信息的起始点,通过创建该对象,后续可以利用它来存储和传递特定于线程的上下文数据。 - 步骤2:
setTtlValue()
:在创建好TransmittableThreadLocal
对象后,业务代码向该对象中设置具体的值。这个值通常是需要在多线程环境中跨线程传递的上下文信息,比如用户会话信息、请求标识等。
- 步骤1:
-
包装业务任务
- 步骤3:
createBizTaskRunnable()
:业务代码创建实际的业务任务,该任务实现了Runnable
接口,包含了具体的业务逻辑操作。 - 步骤4:
createTtlRunnableWrapper(Runnable)
:将上述创建的业务任务(Runnable
)包装成TtlRunnable
。在这个过程中,会执行captureAllTtlValues()
操作 :- 步骤4.1:
captureAllTtlValues()
:通过Transmitter
工具类来捕获当前线程中所有TransmittableThreadLocal
对象的值。- 步骤4.1.1:
get()
:从TransmittableThreadLocal
对象中获取其存储的值。 - 步骤4.1.2:
copy(value:T)
:对获取到的值进行拷贝操作。这一步很关键,通过拷贝,确保在后续线程执行时,使用的是独立的、与当前线程上下文相关联的值副本,而不会受到其他线程的干扰 。
- 步骤4.1.1:
- 步骤4.1:
- 步骤3:
-
提交任务到线程池
- 步骤5:
submitTtlRunnableToThreadPool()
:将包装好的TtlRunnable
任务提交到线程池(ThreadPool)中。线程池会根据自身的调度策略来安排任务的执行。
- 步骤5:
-
任务执行过程中的上下文处理
- 步骤6:
run()
:当线程池中的线程调度到该任务时,TtlRunnable
的run
方法被调用开始执行任务。- 步骤6.1:
beforeExecute()
:在实际业务任务执行前,执行一些前置操作,这些操作可能包括对线程上下文环境的进一步准备工作等。 - 步骤6.2:
replayCapturedTtlValues()
:通过Transmitter
工具类,将之前捕获并拷贝的TransmittableThreadLocal
的值重新应用到当前执行线程的上下文中。这样,在任务执行过程中,就可以使用到从父线程传递过来的上下文信息。 - 步骤6.3:
run()
:执行实际包装的业务任务(Runnable
)中的业务逻辑,在这个过程中可以使用到已经恢复的TransmittableThreadLocal
中的值。- 步骤6.3.1:
useValueInTTL()
:在业务逻辑执行过程中,使用TransmittableThreadLocal
中存储的上下文值来完成具体的业务操作,例如根据用户会话信息进行权限判断等。
- 步骤6.3.1:
- 步骤6.4:
restoreTtlValuesBeforeReplay()
:在任务执行完成后,恢复TransmittableThreadLocal
的值到之前捕获并应用值之前的状态,以避免对后续任务的上下文产生干扰。 - 步骤6.5:
afterExecute()
:执行一些后置操作,清理相关资源或者进行一些任务执行完成后的记录等工作 。
- 步骤6.1:
- 步骤6:
整个TTL的工作流程围绕着在多线程环境下,尤其是在线程池场景中,如何准确地捕获、传递和恢复线程的上下文信息,从而保证业务逻辑在不同线程中能够正确使用到所需的上下文数据 。
1.3.2 TTL主要使用场景
- 分布式追踪:在分布式系统中传递追踪ID,便于日志的关联和问题排查。
- 事务管理:在分布式事务中传递事务上下文,确保事务的一致性。
- 上下文信息传递:在多线程环境中传递用户会话、请求上下文等信息。
1.3.3 TTL对比ThreadLocal优势
- 上下文传递:ThreadLocal仅在当前线程内存储,无法跨线程传递;TTL能够在线程池和多线程框架中传递上下文信息。
- 线程复用支持:ThreadLocal在线程池复用线程时无法保证变量一致性;TTL支持线程池复用,确保变量在任务间传递和保持一致。
- 无侵入性:ThreadLocal需手动管理变量设置和清除,容易出错;TTL替换ThreadLocal即可自动管理上下文传递和清除。
- 集成方便:ThreadLocal适用于简单线程环境;TTL可与各种线程池和多线程框架无缝集成。
2 项目实战
2.1 具体实现流程
在微服务架构中,需求是将用户登录后的信息保存在上下文变量中,并进行跨线程之间传递。
- 具体流程如下:
用户登录后会获得token
,后续请求携带该token
,且token
中携带有用户信息。所有请求首先经过网关的过滤器AuthFilter
,在AuthFilter
中,将用户信息放到请求头中。请求经过网关后,会到达自定义请求头拦截器HeaderInterceptor
,在HeaderInterceptor
中,取出请求头中的用户信息并放到TTL中,这样链路上的服务就可以直接从TTL中获取用户信息。
TTL(ThreadLocal)的应用主要分为两个层面,以下是具体实现流程:
- 网关层认证(AuthFilter)
// 关键代码流程:
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// 1. 验证token有效性(依赖Redis TTL)boolean islogin = redisService.hasKey(getTokenKey(userkey)); // 第66行if (!islogin) {return unauthorizedResponse(exchange, "登录状态已过期");}// 2. 将认证信息写入请求头addHeader(mutate, SecurityConstants.USER_KEY, userkey); // 第77行addHeader(mutate, SecurityConstants.DETAILS_USER_ID, userid);addHeader(mutate, SecurityConstants.DETAILS_USERNAME, username);
}
- 业务层上下文传递(HeaderInterceptor)
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {// 1. 从请求头提取网关写入的信息SecurityContextHolder.setUserId(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USER_ID));SecurityContextHolder.setUserName(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USERNAME));SecurityContextHolder.setUserKey(ServletUtils.getHeader(request, SecurityConstants.USER_KEY));// 2. 将用户信息存储到ThreadLocal(TTL)if (StringUtils.isNotNull(loginUser)) {SecurityContextHolder.set(SecurityConstants.LOGIN_USER, loginUser); // 第49行}
}
- 架构流程图解:
客户端请求│↓
[网关层 AuthFilter]│ 1. 校验Redis TTL│ 2. 添加认证头信息↓
[业务服务 HeaderInterceptor]│ 1. 从请求头提取信息│ 2. 存入ThreadLocal(TTL)↓
[Controller/Service]│ 使用SecurityContextHolder.getXXX()获取上下文
-
这种设计的优势:
- 双重验证机制:网关层做基础认证,业务层做上下文管理
- 无状态传输:通过请求头传递认证信息,避免会话状态维护
- 线程级隔离:TTL保证每个请求的用户信息独立存储
- 自动清理:afterCompletion方法确保TTL内容及时清除(防止内存泄漏)
2.2 完整代码实现
com.laigeoffer.pmhub.gateway.filter.AuthFilter
/*** 网关鉴权** @author canghe*/
@Component
public class AuthFilter implements GlobalFilter, Ordered {private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);private static final String BEGIN_VISIT_TIME = "begin_visit_time";//开始访问时间// 排除过滤的 uri 地址,nacos自行添加@Autowiredprivate IgnoreWhiteProperties ignoreWhite;@Autowiredprivate RedisService redisService;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpRequest.Builder mutate = request.mutate();String url = request.getURI().getPath();// 跳过不需要验证的路径if (StringUtils.matches(url, ignoreWhite.getWhites())) {return chain.filter(exchange);}String token = getToken(request);if (StringUtils.isEmpty(token)) {return unauthorizedResponse(exchange, "令牌不能为空");}Claims claims = JwtUtils.parseToken(token);if (claims == null) {return unauthorizedResponse(exchange, "令牌已过期或验证不正确!");}String userkey = JwtUtils.getUserKey(claims);boolean islogin = redisService.hasKey(getTokenKey(userkey));if (!islogin) {return unauthorizedResponse(exchange, "登录状态已过期");}String userid = JwtUtils.getUserId(claims);String username = JwtUtils.getUserName(claims);if (StringUtils.isEmpty(userid) || StringUtils.isEmpty(username)) {return unauthorizedResponse(exchange, "令牌验证失败");}// 设置用户信息到请求addHeader(mutate, SecurityConstants.USER_KEY, userkey);addHeader(mutate, SecurityConstants.DETAILS_USER_ID, userid);addHeader(mutate, SecurityConstants.DETAILS_USERNAME, username);// 内部请求来源参数清除(防止网关携带内部请求标识,造成系统安全风险)removeHeader(mutate, SecurityConstants.FROM_SOURCE);//先记录下访问接口的开始时间exchange.getAttributes().put(BEGIN_VISIT_TIME, System.currentTimeMillis());// Mono.fromRunnable 是非阻塞的,适合在 then 中处理后续的日志逻辑。return chain.filter(exchange).then(Mono.fromRunnable(() -> {try {// 记录接口访问日志Long beginVisitTime = exchange.getAttribute(BEGIN_VISIT_TIME);if (beginVisitTime != null) {URI uri = exchange.getRequest().getURI();Map<String, Object> logData = new HashMap<>();logData.put("host", uri.getHost());logData.put("port", uri.getPort());logData.put("path", uri.getPath());logData.put("query", uri.getRawQuery());logData.put("duration", (System.currentTimeMillis() - beginVisitTime) + "ms");log.info("访问接口信息: {}", logData);log.info("我是美丽分割线: ###################################################");}} catch (Exception e) {log.error("记录日志时发生异常: ", e);}}));}private void addHeader(ServerHttpRequest.Builder mutate, String name, Object value) {if (value == null) {return;}String valueStr = value.toString();String valueEncode = ServletUtils.urlEncode(valueStr);mutate.header(name, valueEncode);}private void removeHeader(ServerHttpRequest.Builder mutate, String name) {mutate.headers(httpHeaders -> httpHeaders.remove(name)).build();}private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {log.error("[鉴权异常处理]请求路径:{}", exchange.getRequest().getPath());return ServletUtils.webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);}/*** 获取缓存key*/private String getTokenKey(String token) {return CacheConstants.LOGIN_TOKEN_KEY + token;}/*** 获取请求token*/private String getToken(ServerHttpRequest request) {String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);// 如果前端设置了令牌前缀,则裁剪掉前缀if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);}return token;}@Overridepublic int getOrder() {return -200;}
}
com.laigeoffer.pmhub.base.security.interceptor.HeaderInterceptor
/*** 自定义请求头拦截器,将Header数据封装到线程变量中方便获取* 注意:此拦截器会同时验证当前用户有效期自动刷新有效期** @author canghe*/
public class HeaderInterceptor implements AsyncHandlerInterceptor {// 需要免登录的路径集合private static final Set<String> EXEMPTED_PATHS = new HashSet<>();static {// 在这里添加所有需要免登录默认展示首页的的路径EXEMPTED_PATHS.add("/system/user/getInfo");EXEMPTED_PATHS.add("/project/statistics");EXEMPTED_PATHS.add("/project/doing");EXEMPTED_PATHS.add("/project/queryMyTaskList");EXEMPTED_PATHS.add("/project/select");EXEMPTED_PATHS.add("/system/menu/getRouters");}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (!(handler instanceof HandlerMethod)) {return true;}SecurityContextHolder.setUserId(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USER_ID));SecurityContextHolder.setUserName(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USERNAME));SecurityContextHolder.setUserKey(ServletUtils.getHeader(request, SecurityConstants.USER_KEY));String token = SecurityUtils.getToken();if (StringUtils.isNotEmpty(token)) {LoginUser loginUser = AuthUtil.getLoginUser(token);if (StringUtils.isNotNull(loginUser)) {AuthUtil.verifyLoginUserExpire(loginUser);SecurityContextHolder.set(SecurityConstants.LOGIN_USER, loginUser);}} else {// 首页免登场景展示// 检查请求路径是否匹配特定路径String requestURI = request.getRequestURI();if (isExemptedPath(requestURI)) {// 创建一个默认的 LoginUser 对象LoginUser defaultLoginUser = createDefaultLoginUser();SecurityContextHolder.set(SecurityConstants.LOGIN_USER, defaultLoginUser);}}return true;}// 判断请求路径是否匹配特定路径private boolean isExemptedPath(String requestURI) {// 你可以根据需要调整特定路径的匹配逻辑return EXEMPTED_PATHS.stream().anyMatch(requestURI::startsWith);}// 创建一个默认的 LoginUser 对象private LoginUser createDefaultLoginUser() {LoginUser defaultLoginUser = new LoginUser();defaultLoginUser.setUserId(173L); // 设置默认的用户IDdefaultLoginUser.setUsername(Constants.DEMO_ACCOUNT); // 设置默认的用户名SysUser demoSysUser = new SysUser();demoSysUser.setUserId(173L);demoSysUser.setUserName(Constants.DEMO_ACCOUNT);demoSysUser.setDeptId(100L);demoSysUser.setStatus("0");defaultLoginUser.setUser(demoSysUser);// 设置其他必要的默认属性return defaultLoginUser;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)throws Exception {SecurityContextHolder.remove();}
}
3 总结
文章介绍了ThreadLocal,其可解决多线程共享变量并发问题,与synchronized不同。还阐述了TransmittableThreadLocal(TTL),能解决ThreadLocal在多线程框架中上下文传递问题,有分布式追踪等使用场景。项目实战展示了用户信息在微服务中跨线程传递的实现代码。
4 参考链接
- PmHub整合TransmittableThreadLocal (TTL)缓存用户数据
- 项目仓库(GitHub)
- 项目仓库(码云)