当前位置: 首页 > news >正文

java每日精进 11.03【基于Spring AOP和事件驱动的资源操作消息处理流程(类似于若依框架的@Log注解)】

1.流程需求

在标注了@ResourceOperation注解的方法执行时,记录资源操作(如添加、更新、删除),然后在方法成功执行后,收集这些操作,发布事件,通过异步方式发送到RabbitMQ消息队列中,实现资源操作的解耦通知或后续处理(如审计、通知等)。这个系统支持Controller和服务层方法,确保线程安全(使用ThreadLocal),并处理异常情况;

总体流程定位:从注解标记方法 → AOP拦截记录操作 → 方法成功后发布事件 → 事件监听异步发送消息到RabbitMQ。整个系统强调异步、解耦、线程安全和异常处理,适用于资源管理流程(如资产管理)

注意:

  • 步骤总览:

1) 定义注解(属性建议包含:开关 enabled、校验条件如返回码、业务标签等)。

2) 在线程上下文里准备“记录容器”(ThreadLocal Map/List),并提供工具类方法在业务流程中便捷地“写入记录”。

3) 写切面:

  • Before:清理并启用上下文;
  • AfterReturning:根据“是否有记录+是否成功”判定,触发下一步(事件/直接发MQ/落库等);
  • AfterThrowing:异常即清理,不发。

4) 写事件与监听器(推荐):

  • 切面发布“批量事件”;
  • 监听器解包后,逐条或批量调用“消息发送服务”(或别的后续处理)。
  • 用 @Async 提升解耦与吞吐。

5) MQ 层:

  • 定义交换机、队列、绑定关系;
  • 配置 RabbitTemplate(或沿用 Spring Boot 默认),指定 Jackson2JsonMessageConverter;
  • 根据需要开启 Confirm/Return 回调与 Listener 工厂(并发、ack、重试、prefetch)。

6) 配置文件(Nacos/yml)里补齐 spring.rabbitmq.* 连接参数。

7) 在业务代码中合适位置调用工具类“记录操作”(插入/更新/删除等),并在需要的接口/方法上加注解。

  • 关键要点:
  • “谁来往上下文里写记录”要尽早决定(通常是业务 Service 完成关键操作之后,确保数据已落库、ID 已产生)。
  • “成功”的定义要统一(本项目用 AjaxResult.isSuccess(),也可基于异常、返回码、布尔返回值等)。
  • 批量化处理比逐条在切面里直接发 MQ 更好(解耦并利于聚合查询与统一日志)。
  • 强烈建议异步事件驱动,提高接口时延表现与系统稳定性。
  • 线程池必须配置好并监控指标(核心数、队列长度、拒绝策略)。
  • 注意清理 ThreadLocal,避免“脏数据串线程”(本项目在 Finally 里总是 clear())。

2.类的定位:

  • ResourceOperation(注解类):这是一个自定义注解,用于标记需要监控资源操作的方法。它是流程的入口点,定义了是否启用操作记录和成功状态码(主要针对Controller)。切面(Aspect)会基于这个注解拦截方法执行,从而触发整个流程。定位:作为标记器,标识哪些方法需要资源操作监控。
/*** 资源操作消息注解*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ResourceOperation {/*** 是否启用资源操作消息*/boolean enabled() default true;/*** 成功状态码(仅对Controller方法有效,默认200)*/int successCode() default 200;
}
  • ResourceOperationContext(上下文管理类):这是一个单例组件(@Component),使用ThreadLocal来管理当前线程的资源操作记录和启用状态。它负责临时存储操作记录(如资源ID和操作类型),并提供启用/禁用/清理等操作。定位:作为线程安全的临时存储器,确保多线程环境下每个请求的资源操作记录隔离,不互相干扰。整个流程依赖它来收集和传递操作数据。
ResourceOperationContext(上下文管理类)
这是一个静态方法类,使用ThreadLocal存储数据。addResourceOperation(String resourceId, OperationType operationType):注释 - 添加资源操作记录。传入参数:resourceId (String, 资源ID)、operationType (OperationType, 操作类型,如ADD/UPDATE/DELETE)。返回参数:void。作用:如果启用状态为true,将资源ID和操作类型放入ThreadLocal的Map中。调用其他类:无(纯内部操作)。
getResourceOperations():注释 - 获取所有资源操作记录。传入参数:无。返回参数:Map<String, OperationType>(新HashMap拷贝,防止修改原数据)。作用:返回当前线程的操作记录Map。调用其他类:无。
enableOperation():注释 - 启用资源操作记录。传入参数:无。返回参数:void。作用:设置ThreadLocal的启用标志为true。调用其他类:无。
disableOperation():注释 - 禁用资源操作记录。传入参数:无。返回参数:void。作用:设置启用标志为false。调用其他类:无。
clear():注释 - 彻底清空所有记录。传入参数:无。返回参数:void。作用:清空操作Map并禁用启用标志。调用其他类:无。
hasOperations():注释 - 判断是否有操作记录。传入参数:无。返回参数:boolean(Map是否非空)。作用:检查是否有记录,用于决定是否处理消息。调用其他类:无。/*** 资源操作上下文 - 使用ThreadLocal管理资源操作记录*/
@Component
public class ResourceOperationContext {private static final ThreadLocal<Map<String, OperationType>> RESOURCE_OPERATIONS =ThreadLocal.withInitial(HashMap::new);private static final ThreadLocal<Boolean> OPERATION_ENABLED =ThreadLocal.withInitial(() -> false);/*** 添加资源操作记录*/public static void addResourceOperation(String resourceId, OperationType operationType) {if (!OPERATION_ENABLED.get()) {return;}Map<String, OperationType> operations = RESOURCE_OPERATIONS.get();operations.put(resourceId, operationType);}/*** 获取所有资源操作记录*/public static Map<String, OperationType> getResourceOperations() {return new HashMap<>(RESOURCE_OPERATIONS.get());}/*** 启用资源操作记录*/public static void enableOperation() {OPERATION_ENABLED.set(true);}/*** 禁用资源操作记录*/public static void disableOperation() {OPERATION_ENABLED.set(false);}/*** 彻底清空所有记录*/public static void clear() {RESOURCE_OPERATIONS.get().clear();OPERATION_ENABLED.set(false);}/*** 判断是否有操作记录*/public static boolean hasOperations() {return !RESOURCE_OPERATIONS.get().isEmpty();}
}
  • ResourceOperationAspect(切面类):这是一个AOP切面(@Aspect @Component),负责拦截标注了@ResourceOperation的方法。它在方法执行前启用记录、在成功返回后处理并发布事件、在异常时清理记录。定位:作为流程的核心引擎,监控方法执行生命周期,判断成功与否,并触发事件发布,实现AOP的横切关注点(资源操作监控)与业务逻辑的分离。
1. 切点定义:resourceOperationPointcut
@Pointcut("@annotation(resourceOperation)")
public void resourceOperationPointcut(ResourceOperation resourceOperation) {}
注解 @Pointcut:定义切点(即拦截哪些方法)。这里的表达式 @annotation(resourceOperation) 表示:拦截所有标注了 @ResourceOperation 注解的方法,并将注解对象作为参数传入。
作用:后续的通知(@Before、@AfterReturning 等)会通过该切点关联到目标方法。
2. 前置通知:doBefore
@Before(value = "resourceOperationPointcut(resourceOperation)", argNames = "joinPoint,resourceOperation")
public void doBefore(JoinPoint joinPoint, ResourceOperation resourceOperation) {if (resourceOperation.enabled()) {ResourceOperationContext.clear(); // 清理线程上下文ResourceOperationContext.enableOperation(); // 启用操作记录log.debug("启用资源操作记录 - 方法: {}", getMethodName(joinPoint));}
}
注解 @Before:前置通知,在目标方法执行之前执行。
参数:
joinPoint:连接点对象,包含目标方法的信息(如方法名、参数等)。
resourceOperation:目标方法上的 @ResourceOperation 注解实例。
逻辑:
若注解的 enabled() 为 true(默认启用),则先清理 ResourceOperationContext(避免线程复用的脏数据),再开启资源操作记录功能。
目的:确保当前线程的操作记录从干净状态开始。
3. 后置返回通知:doAfterReturning
@AfterReturning(pointcut = "resourceOperationPointcut(resourceOperation)",returning = "result",argNames = "joinPoint,resourceOperation,result"
)
public void doAfterReturning(JoinPoint joinPoint, ResourceOperation resourceOperation, Object result) {try {if (!resourceOperation.enabled()) {return;}// 检查是否需要发送消息if (shouldSendMessage(joinPoint, resourceOperation, result)) {handleResourceOperations(); // 处理操作记录并发布事件}} catch (Exception e) {log.error("资源操作消息处理异常", e);} finally {ResourceOperationContext.clear(); // 最终清理线程上下文}
}
注解 @AfterReturning:后置返回通知,在目标方法成功执行并返回结果后执行。
参数:
result:目标方法的返回值(通过 returning 属性绑定)。
逻辑:
若注解未启用,直接返回。
调用 shouldSendMessage 判断是否需要处理操作记录(例如:Controller 方法需返回成功状态,Service 方法默认成功)。
若需要,则调用 handleResourceOperations 处理记录并发布事件。
最终通过 finally 块清理 ResourceOperationContext,防止 ThreadLocal 内存泄漏。
4. 异常通知:doAfterThrowing
@AfterThrowing(pointcut = "resourceOperationPointcut(resourceOperation)",throwing = "e"
)
public void doAfterThrowing(JoinPoint joinPoint, ResourceOperation resourceOperation, Exception e) {if (resourceOperation.enabled()) {log.debug("方法执行异常,清空资源操作记录 - 方法: {}, 异常: {}", getMethodName(joinPoint), e.getMessage());ResourceOperationContext.clear();}
}
注解 @AfterThrowing:异常通知,在目标方法抛出异常后执行。
参数:
e:目标方法抛出的异常(通过 throwing 属性绑定)。
逻辑:若注解启用,清空当前线程的操作记录,避免异常情况下的脏数据残留。
5. 辅助方法:shouldSendMessage
private boolean shouldSendMessage(JoinPoint joinPoint, ResourceOperation annotation, Object result) {if (!ResourceOperationContext.hasOperations()) {return false; // 无操作记录,不发送}if (isControllerMethod(joinPoint)) {return isSuccess(result); // Controller方法需检查返回值是否成功} else {return true; // Service方法默认发送}
}
作用:判断是否需要处理资源操作记录并发布事件。
逻辑:
若没有操作记录(ResourceOperationContext.hasOperations() 为 false),直接返回 false。
若目标方法是 Controller 方法(标注了 @RestController 或 @Controller),需通过 isSuccess 检查返回结果是否成功(如 AjaxResult 的 isSuccess())。
若目标方法是 Service 方法,默认返回 true(只要无异常且有记录就发送)。
6. 辅助方法:isControllerMethod
private boolean isControllerMethod(JoinPoint joinPoint) {Class<?> targetClass = joinPoint.getTarget().getClass();return targetClass.isAnnotationPresent(RestController.class) || targetClass.isAnnotationPresent(Controller.class);
}
作用:判断目标方法是否属于 Controller 层(用于区分 Controller 和 Service 方法的处理逻辑)。
逻辑:检查目标方法所在的类是否标注了 @RestController 或 @Controller 注解。
7. 辅助方法:isSuccess
private boolean isSuccess(Object result) {if (result instanceof AjaxResult) {return ((AjaxResult) result).isSuccess(); // 检查AjaxResult的成功状态}log.warn("Controller方法返回值不是AjaxResult类型,默认认为成功");return true;
}
作用:判断 Controller 方法的返回结果是否为 “成功” 状态。
逻辑:若返回值是 AjaxResult(常见的接口返回对象),则通过 isSuccess() 方法判断;否则默认认为成功(并打印警告)。
8. 辅助方法:getMethodName
private String getMethodName(JoinPoint joinPoint) {return joinPoint.getSignature().getName();
}
作用:获取目标方法的名称(用于日志打印)。
9. 核心处理方法:handleResourceOperations
private void handleResourceOperations() {Map<String, OperationType> operations = ResourceOperationContext.getResourceOperations();if (operations.isEmpty()) return;// 1. 收集资源数据:根据资源ID查询数据库,转换为消息数据格式List<AssetResourceData> resourceDataList = operations.keySet().stream().map(resourceId -> {try {return assetResourceMapper.selectResourceByResourceId(Long.valueOf(resourceId));} catch (NumberFormatException e) {log.warn("资源ID格式错误: {}", resourceId);return null;}}).filter(Objects::nonNull).map(dataConverter::convertToData) // 转换为消息所需的DTO.collect(Collectors.toList());if (resourceDataList.isEmpty()) {log.warn("未找到资源数据,跳过发送");return;}// 2. 创建批量操作对象BatchResourceOperation batchOperation = BatchResourceOperation.builder().resourceOperations(operations).resourceDataList(resourceDataList).build();// 3. 发布事件(由其他组件监听处理,如发送消息、记录审计日志等)eventPublisher.publishEvent(new BatchResourceOperationEvent(this, batchOperation));log.info("事件发布成功,涉及资源数量: {}", resourceDataList.size());
}
作用:处理收集到的资源操作记录,最终发布事件。
逻辑:
从 ResourceOperationContext 获取操作记录(资源 ID 与操作类型的映射)。
根据资源 ID 查询数据库,获取资源详情,并转换为消息所需的 AssetResourceData 格式。
封装为 BatchResourceOperation 对象,通过 ApplicationEventPublisher 发布 BatchResourceOperationEvent 事件。
其他监听该事件的组件(如消息发送器、日志处理器)会接收到事件并进行后续处理。
整体流程总结
拦截触发:当标注了 @ResourceOperation 的方法被调用时,切面生效。
前置处理:方法执行前清理线程上下文并开启操作记录。
业务执行:目标方法执行过程中,通过 ResourceOperationContext.addResourceOperation 记录资源操作。
后置处理:
若方法成功执行且满足发送条件,收集操作记录和资源数据,发布事件。
若方法抛出异常,清空操作记录。
资源清理:无论成功与否,最终清理线程上下文,避免内存泄漏。
  • BatchResourceOperationEventListener(事件监听器类):这是一个组件(@Component),监听BatchResourceOperationEvent事件。当事件发布后,它异步处理批量操作,将每个资源的操作转换为消息,并调用ResourceMessageService发送到RabbitMQ。定位:作为事件消费者,负责将收集的操作转化为异步消息发送,实现解耦和异步处理,提高系统性能。
/*** 批量资源操作事件监听器* @author guo* @date 2025-11-03*/
@Component
@Slf4j
public class BatchResourceOperationEventListener {@Autowiredprivate ResourceMessageService resourceMessageService;public BatchResourceOperationEventListener() {log.info("✅ BatchResourceOperationEventListener 已初始化");}/*** 使用 @EventListener非事务依赖*/@EventListener@Async("resourceMessageExecutor")public void handleBatchResourceOperation(BatchResourceOperationEvent event) {log.info("🎯 接收到批量资源操作事件 - 操作数量: {}",event.getBatchOperation().getResourceOperations().size());BatchResourceOperation batchOperation = event.getBatchOperation();Map<String, OperationType> operations = batchOperation.getResourceOperations();List<AssetResourceData> resourceDataList = batchOperation.getResourceDataList();log.info("📋 涉及资源ID: {}", operations.keySet());// 为每个资源单独发送消息for (AssetResourceData resourceData : resourceDataList) {String resourceId = resourceData.getResourceId();OperationType operationType = operations.get(resourceId);if (operationType != null) {log.info("🚀 发送资源操作消息 - 类型: {}, 资源ID: {}", operationType, resourceId);resourceMessageService.sendResourceMessage(operationType,resourceId,resourceData);}}log.info("✅ 批量资源操作消息发送完成");}
}
  • 异步线程池配置(resourceMessageExecutor):这是一个Bean定义,使用ThreadPoolTaskExecutor创建线程池,支持@Async注解。定位:提供异步执行能力,确保事件处理和消息发送不阻塞主线程,适用于高并发场景。
@Configuration
@EnableAsync
@EnableTransactionManagement  // 确保启用事务管理
@Slf4j
public class AsyncConfig {@Bean("resourceMessageExecutor")public TaskExecutor resourceMessageExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100);executor.setThreadNamePrefix("resource-message-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);executor.initialize();log.info("✅ 资源消息线程池初始化完成");return executor;}
}
  • ResourceMessageService(部分代码,发送方):这是一个服务类(假设@Autowired在其他地方),其sendResourceMessage方法负责构建操作对象并通过RabbitTemplate发送到RabbitMQ。定位:作为消息发送器,将资源操作数据序列化为JSON并推送到消息队列,实现与下游消费者的通信。
@Service
@Slf4j
public class ResourceMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Async("resourceMessageExecutor")public void sendResourceMessage(OperationType operationType,String resourceId,AssetResourceData resourceData) {try {AssetResourceOperation operation = AssetResourceOperation.builder().operationType(operationType).resourceId(resourceId).data(resourceData).build();log.info("📨 准备发送消息到RabbitMQ - 交换器: {}, 路由键: {}, 资源ID: {}",RabbitMQConfig.RESOURCE_EXCHANGE,RabbitMQConfig.RESOURCE_ROUTING_KEY,resourceId);rabbitTemplate.convertAndSend(RabbitMQConfig.RESOURCE_EXCHANGE,RabbitMQConfig.RESOURCE_ROUTING_KEY,operation);log.info("✅ RabbitMQ消息发送成功 - 操作类型: {}, 资源ID: {}", operationType, resourceId);} catch (Exception e) {log.error("❌ RabbitMQ消息发送失败 - 操作类型: {}, 资源ID: {}", operationType, resourceId, e);}}
}
  • RabbitMQConfig(配置类):这是一个@Configuration类,定义交换机、队列、路由键、死信队列等RabbitMQ基础设施。同时配置RabbitTemplate和监听器容器工厂。代码中有两个片段,但第二个更完整(包括死信机制),可能是完整的配置。定位:作为RabbitMQ基础设施提供者,确保消息可靠投递、序列化(JSON)和容错(死信队列、TTL)。

3.复刻步骤及注意事项

步骤

  1. 定义注解:创建一个自定义注解(如@MyOperation),添加属性如enabled、successCode。使用@Target(METHOD)、@Retention(RUNTIME)、@Documented。
  2. 创建上下文管理类:使用ThreadLocal存储线程数据(如Map<String, Type>)。提供add、get、enable、disable、clear、has等静态方法,确保线程隔离。
  3. 实现AOP切面:@Aspect @Component,定义Pointcut基于注解。添加@Before启用记录、@AfterReturning处理成功(判断+发布事件)、@AfterThrowing清理异常。注入事件发布器、Mapper、Converter。
  4. 定义事件和事件对象:创建自定义事件类(如MyEvent extends ApplicationEvent),和数据载体(如BatchMyOperation)。
  5. 创建事件监听器:@Component,@EventListener + @Async 处理事件,循环发送消息。注入消息服务。
  6. 配置异步线程池:@EnableAsync,@Bean TaskExecutor配置池大小、拒绝策略。
  7. 实现消息发送服务:创建服务类,@Async方法构建对象,用RabbitTemplate发送。
  8. 配置RabbitMQ:@Configuration,定义交换机、队列、绑定、死信机制、TTL、MessageConverter、RabbitTemplate、ListenerFactory。
  9. 集成到业务:在目标方法上加注解;在业务代码中调用上下文add方法记录操作。
  10. 测试:单元测试AOP拦截、事件发布、异步发送;集成测试RabbitMQ投递。

注意事项

  • 线程安全:ThreadLocal防止多线程污染,但需确保finally清理,避免内存泄漏。
  • 异常处理:AOP需捕获异常不影响业务;RabbitMQ用死信队列处理失败消息。
  • 性能:异步+线程池避免阻塞;操作记录Map大小控制,防止过多资源导致OOM。
  • 成功判断:Controller用返回值检查,Service默认成功;自定义返回值类型(如AjaxResult)。
  • 依赖注入:确保Mapper、Converter等存在;Spring版本兼容(AOP、Event、RabbitMQ)。
  • 日志与监控:添加详细日志;监控队列积压、线程池使用。
  • 可扩展性:支持批量/单个操作;操作类型枚举(OperationType)需定义。
  • 安全性:资源ID转换Long时处理异常;消息序列化确保JSON兼容。
  • 配置管理:队列名、路由键用常量;环境变量配置RabbitMQ连接。
  • 事务性:事件发布非事务,如果需事务,用@TransactionalEventListener。
  • 兼容性:如果无RabbitMQ,可换Kafka或其他;多模块时注意包扫描。

4.一些扩展

4.11. Controller方法(如transcodeCallback)的线程独立性和数据隔离

Controller方法(如transcodeCallback)在Spring Web应用中是线程独立的。让我详细解释其原理、执行机制,以及在多并发调用时的隔离方式。

基本原理
  • Spring的线程模型:Spring Boot(或Spring MVC)基于Servlet容器(如Tomcat、Jetty)运行。每个HTTP请求都会被分配到一个独立的线程(来自容器的线程池)。也就是说,当多个客户端同时调用/transcodeCallback时,每个请求都会在不同的线程中执行。Spring的Controller实例通常是单例的(@Controller默认单例),但方法执行是线程安全的,因为方法内部不共享可变状态(除非显式使用共享资源)。
  • 线程独立性:方法本身是线程独立的,因为:
    • 输入参数(如@RequestBody TranscodeCallbackDTO callbackDTO)是每个请求独立的(从HTTP请求体解析)。
    • 返回值AjaxResult也是每个线程独立构建的。
    • 方法体调用resourceUploadService.transcodeCallback(callbackDTO),假设ResourceUploadService是线程安全的(Spring服务默认单例,但如果内部使用ThreadLocal或不可变对象,则安全)。
  • @ResourceOperation注解的作用:这个注解触发AOP切面(ResourceOperationAspect),在方法执行前后进行拦截。但AOP本身不改变线程独立性;它只是织入代码,在当前线程中执行。
同时被多次调用时的数据隔离

在高并发场景下(如多个用户同时POST请求),数据隔离主要依赖以下机制:

  • 线程池隔离:Servlet容器(如Tomcat)有一个线程池(默认200线程),每个请求独占一个线程直到响应完成。线程之间不共享局部变量,因此方法内的局部变量(如callbackDTO)天然隔离。
  • ThreadLocal的使用(核心隔离机制):在你的系统中,ResourceOperationContext使用ThreadLocal存储操作记录(RESOURCE_OPERATIONS和OPERATION_ENABLED)。ThreadLocal是Java的标准类,它为每个线程提供独立的变量副本:
    • 当线程A调用方法时,切面doBefore调用ResourceOperationContext.enableOperation(),这只影响线程A的ThreadLocal副本。
    • 在方法体中,如果transcodeCallback内部调用addResourceOperation,它会添加到线程A的Map中。
    • 线程B的调用不会看到或修改线程A的记录,因为ThreadLocal是线程隔离的。
    • 方法成功后,切面doAfterReturning从当前线程的ThreadLocal获取记录,处理后调用clear()清理(防止内存泄漏)。
  • 潜在共享风险及隔离
    • 如果服务层(如resourceUploadService)有共享状态(如静态变量或单例字段),可能有线程安全问题。但你的系统使用ThreadLocal避免了这一点。
    • 数据库操作(如Mapper)通常通过连接池隔离,每个线程获取独立连接。
    • RabbitMQ发送是异步的(事件监听器@Async),不阻塞主线程。
模拟多线程场景(文本描述模拟)

假设两个请求同时到来:

  • 请求1(线程T1):POST数据{resourceId="100", operation=UPDATE}。
    • T1进入方法:切面enableOperation() → ThreadLocal(T1): enabled=true, operations={}。
    • 服务层addResourceOperation("100", UPDATE) → ThreadLocal(T1): operations={"100":UPDATE}。
    • 方法返回成功 → 切面handleResourceOperations()处理T1的operations,发布事件 → clear()清理T1的ThreadLocal。
  • 请求2(线程T2,同时):POST数据{resourceId="200", operation=DELETE}。
    • T2进入方法:切面enableOperation() → ThreadLocal(T2): enabled=true, operations={}(独立于T1)。
    • 服务层addResourceOperation("200", DELETE) → ThreadLocal(T2): operations={"200":DELETE}。
    • 方法返回成功 → 切面处理T2的operations,发布事件 → clear()清理T2。

结果:T1和T2的operations互不干扰。即使100个并发,ThreadLocal确保每个线程有自己的"私有存储"。如果没有ThreadLocal,使用共享Map(如静态HashMap)会导致数据混淆(线程不安全)。

如果并发很高,需监控线程池(Tomcat的maxThreads)和数据库连接池,避免资源耗尽。使用工具如JMeter测试并发。

4.2. 线程池(resourceMessageExecutor)的设计目的、业务需求、安全问题解决,以及多线程模拟

线程池resourceMessageExecutor是使用ThreadPoolTaskExecutor创建的自定义线程池,用于@Async注解的方法(如事件监听器的handleBatchResourceOperation和sendResourceMessage)。其设计针对异步消息处理的业务需求,同时解决安全和性能问题。

业务需求
  • 异步解耦:资源操作(如更新、删除)后,需要发送消息到RabbitMQ,但这不应阻塞主线程(HTTP响应)。线程池允许事件处理和消息发送在后台执行,提高响应速度和系统吞吐量。适用于高频资源操作场景,如资产管理系统中批量转码回调或资源同步。
  • 批量处理优化:事件监听器处理批量操作(可能多个资源),每个资源单独发送消息。如果同步执行,会延长HTTP响应时间;异步让主线程立即返回。
  • 可扩展性:核心池5、最大10、队列100,支持中等并发(如每天数千操作)。如果消息突发,队列缓冲,避免立即拒绝。
  • 资源控制:自定义线程名前缀("resource-message-")便于监控(如JMX或日志),AwaitTerminationSeconds(60)确保优雅关闭(应用停止时等待任务完成)。
安全方面的问题解决
  • 线程安全:每个@Async方法在独立线程执行,参数(如event、operationType)是不可变的或线程隔离的。RabbitTemplate是线程安全的(Spring管理)。
  • 拒绝策略(CallerRunsPolicy):当队列满(100)和最大池满(10)时,新任务由调用线程执行(回退到主线程)。这防止任务丢失,但可能导致主线程阻塞(安全折中,比AbortPolicy丢弃任务好)。
  • 内存泄漏防范:WaitForTasksToCompleteOnShutdown(true)和AwaitTerminationSeconds(60)确保关闭时不丢任务,防止线程挂起。
  • 异常处理:sendResourceMessage有try-catch,捕获RabbitMQ异常,日志记录,不影响其他任务。
  • 其他安全:无状态任务(消息发送),无共享变量风险。池大小适中,避免过度线程创建(上下文切换开销)。
模拟多线程情况(文本描述模拟)

假设系统接收3个批量事件(每个事件有2-3个资源操作),并发触发监听器。线程池初始核心5,队列100。

  • 场景1:低并发(3任务)
    • 任务1(事件A:资源100,200)→ 分配线程1:handleBatch... → 循环调用sendResourceMessage(异步,又分配线程2、3发送到MQ)。
    • 任务2(事件B:资源300)→ 线程4处理 → send(线程5)。
    • 任务3(事件C:资源400,500)→ 线程1复用(空闲后) → send(线程2、3复用)。
    • 结果:核心池处理所有,不扩容。主线程不阻塞,响应快。
  • 场景2:高并发(15任务突发)
    • 前5任务 → 核心池满(线程1-5处理)。
    • 后10任务 → 进入队列(容量100,未满)。
    • 处理中,线程空闲复用队列任务。
    • 如果队列满(>100),CallerRunsPolicy让调用线程(事件发布线程)执行,防止丢失,但可能慢。
  • 异常模拟:任务中RabbitMQ连接失败 → catch日志,不崩溃池,其他任务继续。

如果任务耗时长(e.g., MQ延迟),池可能饱和,需监控(Prometheus)。业务需求:如果消息是关键的,可加重试机制(RabbitMQ ACK)。

4.3. ResourceOperationContext中ThreadLocal的具体作用、与线程的关系、模拟数据操作过程的区别和联系,以及变种/替代扩展

ThreadLocal的具体作用和与线程的关系
  • 作用:ThreadLocal提供线程本地存储(Thread Local Storage),允许每个线程有独立的变量副本。代码中,它管理RESOURCE_OPERATIONS(Map<String, OperationType>)和OPERATION_ENABLED(Boolean),确保在多线程环境下,资源操作记录不被共享或覆盖。核心是隔离:线程A的set()只影响A,不影响B。
  • 与线程的关系:每个Thread对象内部有一个ThreadLocalMap(弱引用HashMap),键是ThreadLocal实例,值是用户数据。当线程调用get()/set(),它从当前线程的Map中操作。线程结束时,ThreadLocal数据自动回收(但需手动remove()防泄漏)。在Web应用中,每个HTTP请求是一个线程,ThreadLocal完美隔离请求数据(如你的操作记录)。
模拟数据操作过程的区别和联系(文本描述模拟)

假设两个线程(T1、T2)同时执行操作。区别:无ThreadLocal vs. 有ThreadLocal。

  • 无ThreadLocal(使用共享静态Map)
    • 共享Map: static Map<String, OperationType> operations = new HashMap<>();
    • T1: enableOperation() → enabled=true(共享)。
    • T1: add("100", UPDATE) → operations={"100":UPDATE}。
    • T2(同时): enableOperation() → enabled=true(相同)。
    • T2: add("200", DELETE) → operations={"100":UPDATE, "200":DELETE}(覆盖T1)。
    • T1 get() → 看到{"100":UPDATE, "200":DELETE}(污染)。
    • T2 get() → 同上。
    • 区别:数据混淆,线程不安全。联系:所有线程共享同一数据,适合全局常量,但不适合per-request数据。
  • 有ThreadLocal(你的代码)
    • T1: enableOperation() → T1的Boolean=true,T1的Map={}。
    • T1: add("100", UPDATE) → T1的Map={"100":UPDATE}。
    • T2: enableOperation() → T2的Boolean=true,T2的Map={}(独立)。
    • T2: add("200", DELETE) → T2的Map={"200":DELETE}。
    • T1 get() → {"100":UPDATE}(只自己的)。
    • T2 get() → {"200":DELETE}。
    • T1 clear() → T1的Map清空,enabled=false;T2不受影响。
    • 区别:完美隔离,无竞争。联系:ThreadLocal依赖线程生命周期,数据随线程存活;get/set是线程的"私有属性",但底层是线程的Map持有。

联系:两者都存储数据,但ThreadLocal是"垂直隔离"(per-thread),共享变量是"水平共享"(all-threads)。在操作过程中,add/get是原子性的(ThreadLocal内部同步),但clear需在finally确保。

ThreadLocal的其他变种、替代品及扩展
  • 变种
    • InheritableThreadLocal:ThreadLocal的子类,当父线程创建子线程时,自动继承值。适合线程池场景(如传递上下文到子任务)。你的系统如果是线程池重用,可用它防丢失。但需小心:子线程修改不影响父。
    • TransmittableThreadLocal (TTL):阿里巴巴开源(com.alibaba.ttl),支持线程池(如ExecutorService)中跨线程传递值。标准ThreadLocal在池中可能丢失(线程复用),TTL用代理解决。适用于你的@Async场景。
  • 替代品
    • ** synchronized 或锁(ReentrantLock)**:对共享Map加锁,确保线程安全。但性能差(锁竞争),不适合高并发。区别:阻塞式 vs. ThreadLocal的非阻塞。
    • ConcurrentHashMap:线程安全的Map,但仍共享,不隔离per-thread数据。适合全局缓存。
    • Request Scope Bean(Spring特有):@Scope("request"),每个HTTP请求一个实例。替代ThreadLocal存储请求数据,但更重(创建对象开销)。
    • MDC (Mapped Diagnostic Context):SLF4J/Log4j的ThreadLocal变体,用于日志上下文(如traceId)。类似你的日志。
    • Coroutine/Local in Kotlin/Java 21+:Java 21引入虚拟线程(Project Loom),ThreadLocal兼容,但未来Local变量可能更高效。
    • 缺点与注意:ThreadLocal可能泄漏(长寿线程如池),需remove()。替代时考虑场景:隔离需求高用ThreadLocal,共享需求用锁/并发集合。

扩展:ThreadLocal常用于Spring的RequestContextHolder(存储HttpServletRequest)。在系统中,它是理想选择,因为资源操作是per-request的。如果升级到响应式(WebFlux),需用Reactor的Context替代(非阻塞)。

http://www.dtcms.com/a/565019.html

相关文章:

  • Spring 从 0 → 1 保姆级笔记:IOC、DI、多配置、Bean 生命周期一次讲透
  • SpringBoot 项目基于责任链模式实现复杂接口的解耦和动态编排
  • Java 入门核心知识点分类学习
  • 叫人做网站后不提供源码商机网创业好项目
  • 【2052】范围判断
  • (1)pytest+Selenium自动化测试-序章
  • 用Python来学微积分29-原函数与不定积分完全指南
  • JavaSE---文件(File)、IO流(基础)
  • 论坛类网站备案吗红色专题网站首页模板
  • 网页设计师主要是做什么的呢深圳seo
  • C++多线程之 安全日志系统
  • 哪里有做效果图的网站wordpress文章内模板
  • Nof1:探索大语言模型作为量化交易者的极限(翻译)
  • 做网站整理信息的表格免费有效的推广网站
  • 基于ASM1042A系列芯片的CAN协议扩展方案在汽车座椅控制器中的应用探讨
  • 超越金融:深入解析STC的“绿色算力网络”与参与机制
  • 【大模型 Tokenizer 核心技术解析】从 BPE 到 Byte-Level 的完整指南
  • 黄岛网站建设价格怎么做自动下单网站
  • 关于我遇到的豆包的bug:mermaid图无法加载
  • Milvus:通过Docker安装Milvus向量数据库(一)
  • 第三方软件测试机构:【“Bug预防”比“Bug发现”更有价值:如何建立缺陷根因分析与流转机制?】
  • Milvus:Schema详解(四)
  • maven的jakarta项目直接运用jetty插件运行
  • 建设外贸网站哪家好网页制作流程视频
  • Java-166 Neo4j 安装与最小闭环 | 10 分钟跑通 + 远程访问 Docker neo4j.conf
  • 如何建立小企业网站wordpress图片上传地址修改
  • 【开题答辩过程】以《基于SpringBoot的中国传统文化推广系统的设计与实现》为例,不会开题答辩的可以进来看看
  • QML笔记
  • Android 在屏幕的右下角添加客户Logo
  • linux服务-frp内网穿透工具