当前位置: 首页 > news >正文

MQ防重复消费----去重表结合 Spring AOP 切面编程,抽象封装成通用幂等注解

以下内容包含针对 NoMQDuplicateConsumeAspect 的深度面试问答、消息队列重投递触发场景、AOP 切面编程扩展,以及基于已有实现的关键要点与步骤总结。文中所有论断均引用多源资料,以助于您在面试与实战中全面展示对幂等消费切面及消息重投的理解。


一、深度面试官提问与解答

1. 接口与 AOP 解耦机制

:请解释 NoMQDuplicateConsumeAspect 中,如何在不依赖具体业务类的前提下,通过 AOP 与 Spring 容器自动装配实现幂等性拦截?

  • 切面仅依赖于统一注解 @NoMQDuplicateConsume 和切点定义,不直接持有业务 Bean,引入环绕通知实现拦截 。

  • Spring 在启动时扫描所有被 @Component 标注的切面与 Handler Bean,将它们纳入 AOP 代理与上下文管理,实现业务与切面完全解耦 (Home)。

2. 幂等键设计与全局唯一性

:使用 SpEL 表达式生成的幂等键如何保证全局唯一?当方法参数为复杂对象时,应如何优化?

  • 把关键字段(如消息 ID、业务流水号)拼接到 keyPrefix 后,形成 key = prefix + ":" + id,即可保证同一消息唯一缓存键 (Stack Overflow)。

  • 对于嵌套对象,可使用 Jackson 将其序列化成 JSON 字符串或仅提取必要字段哈希值,避免过长或重复性不足 (Medium)。

3. Lua 脚本原子性与命令语义

:为什么要在 Lua 脚本中同时使用 NXPXGET?能否改为多条 Redis 命令?有什么风险?

  • NX 确保只有当键不存在时才写入;PX 指定过期毫秒;GET 返回旧值,实现原子 “读-写” 操作 (Redis)。

  • 分开执行 GETSET 会遭遇并发竞态:两个消费者都可能先 GET 得到空,再都 SET,失去幂等性保障 (Stack Overflow)。

4. 消息重投递触发条件

:MQ 在什么情况下会触发消息重投递?当消费者不 ACK 或超时时,容器如何处理?

  • 使用 JMS 事务模式时,若消息消费抛出异常导致事务回滚,消息未 ACK,会被立即或延迟重投 (InfoWorld)。

  • RabbitMQ 的 delivery‑acknowledgement‑timeout 机制:消费者在配置超时时间内未 ACK,则会重投或转入死信队列 (RabbitMQ) (Stack Overflow)。

  • 显式 basicNack(..., requeue=true) 也可触发重投;Quorum 队列的 delivery-limit 达到阈值后则死信化 (RabbitMQ) (CloudAMQP)。

5. 异常与补偿策略

:当链路中途抛出异常,Aspect 应如何确保 Redis 键被清理?在分布式事务下如何做补偿?

  • 在环绕通知的 finally 块中调用 redisTemplate.delete(key),保证无论业务成功与否都可清理过期或失败标志 (Home)。

  • 对于跨服务分布式事务,可结合 Seata 等框架,在全局事务回滚时触发消息补偿或二次幂等删除 (Ted Kaminski)。

6. 切面优先级与性能评估

:若系统中有多种切面(如日志、限流、幂等),如何定义执行顺序?如何测量切面带来的 TPS 开销?

  • 切面实现 Ordered 接口或使用 @Order 明确优先级,数值越小越先执行;Advice 类型也影响“入点/出点”顺序 (Home) 。

  • 可在切面中埋点 System.nanoTime() 前后差值,上报至 Micrometer/Prometheus 观察延迟分布,从而量化每个切面对吞吐的影响 。

7. 动态配置与热更新

:如何在不重启服务的前提下动态调整 keyTimeout 或开启/关闭幂等校验?

  • 将配置托管于 Spring Cloud Config,并在切面 Bean 上加 @RefreshScope,通过 /actuator/refresh 拉取最新配置 (Medium)。

  • 或者实现自定义管理接口,在运行时通过调用 ChainContext 提供的更新方法,动态修改超时或开关状态。

8. 跨场景复用与副作用隔离

:当需要在另一个消费场景中复用同一切面,仅改 mark() 标识,如何确保不会引入副作用?

  • 切面 mark() 返回值可基于方法注解或 SpEL 动态解析,不可硬编码单一场景;并在 ChainContext 注册时隔离不同 mark 的键空间 (Medium)。

  • 复用时,单元测试应覆盖多场景同时并行消费,确保不同 mark 间 Redis 键互不干扰。

9. 监控与告警埋点

:在幂等校验失败或超时场景,如何上报监控?可结合哪些工具?

  • 在切面中调用 Micrometer 的 CounterTimer 指标记录幂等跳过次数和处理时长,Prometheus/Grafana 可实时报警 。

  • 异常场景下可额外向 ELK(Elasticsearch + Logstash + Kibana)发送结构化日志,结合 Alertmanager 触发告警 。

10. 测试覆盖策略

:如何编写单元与集成测试,模拟 Redis 键已存在、Lua 脚本报错、MQ 重投递等场景?

  • 单元测试:Mock StringRedisTemplate.execute(...) 返回不同值,验证切面逻辑分支。

  • 集成测试:借助 Testcontainers 启动真实 Redis、RabbitMQ 实例,发送测试消息并断言消费结果与重投次数 (Home) (Nejc Korasa)。


二、MQ 重投递触发场景详解

  1. 事务回滚重投

    • JMS 事务单元失败时,Broker 保留消息并在事务结束后重新投递 (InfoWorld)。

  2. ACK 超时重投

    • RabbitMQ 消费者若超出 consumer_timeout 时间未 ACK,Broker 会将消息重投或 DLQ (RabbitMQ) (Stack Overflow)。

  3. 显式 NACK

    • 通过 channel.basicNack(..., requeue=true) 明确请求重投,或 Camel 的 redeliveryPolicy 控制重试次数 (RabbitMQ)。

  4. 背书阈值与死信

    • IBM MQ 在重投次数超 BOTHRESH 后移至背书队列;RabbitMQ Quorum 队列 delivery-limit 达到阈值后 DLX 处理 (Oracle Docs) (CloudAMQP)。

  5. Prefetch 与并发假重投

    • 过大 prefetch 造成处理缓慢,导致 ACK 超时,产生“假重试”现象 (Medium)。


三、AOP 切面编程扩展

  1. 切点与通知类型

    • 使用 @Pointcut 定义注解匹配,@Around 环绕通知可完全掌控方法执行前后与异常 (Medium)。

  2. Advice 顺序

    • 实现 Ordered@Order,结合 AspectJ 语义控制“入点优先级”与“出点顺序” (Home)。

  3. 代理模式与限制

    • Spring AOP 基于代理,无法拦截 privatestaticfinal 方法;对性能影响可通过窄切点与精确匹配减到最低 。

  4. 性能监控

    • 采用 Micrometer Observation API,在切面中记录 Timer,结合 Spring Boot 3 Observability 提供可视化分析 (Home)。

  5. 动态切面生效

    • 利用 @Profile@ConditionalOnProperty 控制切面 Bean 是否加载;或配合 @RefreshScope 实时切换幂等校验开关 (Stack Overflow) (Medium)。


四、关键要点与实现步骤总结

  1. 注解识别

    • 环绕通知通过 ProceedingJoinPoint 与反射 MethodSignature 获取 @NoMQDuplicateConsume 实例。

  2. SpEL 解析幂等键

    • 调用 SpELUtil.parseKey(...) 结合方法参数动态生成全局唯一的 Redis key (prefix:业务ID)。

  3. 原子脚本执行

    • 单条 Lua 脚本 SET key value NX GET PX expire 保证读写原子性,避免并发竞态。

  4. 结果判断

    • 脚本返回 nil → 首次消费,执行业务;否则检查返回值看是否为错误状态,抛异常或直接跳过。

  5. 后置标记与清理

    • 业务成功后 SET key consumed PX expire;失败或异常则在 finally/catchDEL key,支持 MQ 重投。

  6. 异常补偿

    • 结合分布式事务框架或补偿消息,确保跨服务调用时的一致性。

  7. 监控埋点

    • 利用 Micrometer/Grafana 跟踪幂等跳过率、处理延迟与失败数,确保实时报警与运维可视化。

附完整实现:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {/*** 设置防重令牌 Key 前缀*/String keyPrefix() default "";/*** 通过 SpEL 表达式生成的唯一 Key*/String key();/*** 设置防重令牌 Key 过期时间,单位秒,默认 1 小时*/long keyTimeout() default 3600L;
}@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {private final StringRedisTemplate stringRedisTemplate; // Redis 操作字符串模板// LUA 脚本,使用 Redis 的 SETNX 命令实现分布式锁,并设置过期时间private static final String LUA_SCRIPT = """local key = KEYS[1]local value = ARGV[1]local expire_time_ms = ARGV[2]return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)""";// 尝试用 NX(不存在才设置) + PX(指定毫秒级过期时间)设置Key//如果设置成功,返回 nil//如果Key已经存在,返回旧的Value/*** 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑*/@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)") // 创建 NoMQDuplicateConsumeAspect 切面控制器public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {// 获取自定义防重复消费注解NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);// 获取防重复消费注解 Key 的唯一标识String uniqueKey = noMQDuplicateConsume.keyPrefix() + // 防重令牌key前缀SpELUtil.parseKey(noMQDuplicateConsume.key(), // SpEL表达式动态生成唯一Key((MethodSignature) joinPoint.getSignature()).getMethod(), // 防重令牌key SpEL 表达式joinPoint.getArgs()); // 防重令牌key SpEL 表达式参数// Redis执行Lua脚本尝试加防重复锁// 如果Key不存在,成功设置,继续执行业务。// 如果Key存在,说明这个消息之前消费过或正在消费。String absentAndGet = stringRedisTemplate.execute(RedisScript.of(LUA_SCRIPT, String.class),List.of(uniqueKey),IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())));// 如果Key存在(重复消费了)if (Objects.nonNull(absentAndGet)) {// 判断是否为错误状态boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");if (errorFlag) {throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));}return null;}// 执行标记了消息队列防重复消费注解的方法原逻辑Object result;try {result = joinPoint.proceed();// 设置防重令牌 Key 过期时间,单位秒stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);} catch (Throwable ex) {// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费stringRedisTemplate.delete(uniqueKey);throw ex;}return result;}/*** @return 返回自定义防重复消费注解*/public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {// getSignature() 拿到的是一个 Signature,一般是方法签名信息。MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();// 获取目标方法实例Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());// 获取方法上的注解return targetMethod.getAnnotation(NoMQDuplicateConsume.class);}
}

相关文章:

  • Maplibgre-gl 学习1 初识
  • Maven构建流程详解:如何正确管理微服务间的依赖关系-当依赖的模块更新后,我应该如何重新构建主项目
  • ET MessageSender类(实体)分析
  • 第二十八节:直方图处理- 直方图计算与绘制
  • 智能化双语LaTeX系统,分阶段系统性开发技术实现路径:目标是实现语义级编译和认知增强写作,推动跨文明知识表达
  • stm32之FLASH
  • 嵌入式学习笔记 D20 :单向链表的基本操作
  • 黑马k8s(六)
  • Spring MVC数据绑定和响应 你了解多少?
  • 如何处理瀚高数据库与PG的冲突
  • nginx报错-[emerg] unknown directive “echo_reset_timer“
  • 【评测】免费体验dify工作流模式下腾讯语音转文字speech2text服务
  • 局部放电在线监测系统的数据传输协议选择研究:Modbus TCP 与 MQTT
  • libmemcached库api接口讲解五
  • 学习黑客NFC技术详解
  • day25 python异常处理
  • Leetcode209做题笔记
  • 常用的Java工具库
  • 【大模型面试每日一题】Day 18:大模型中KV Cache的作用是什么?如何通过Window Attention优化其内存占用?
  • AIX环境ORACLE RAC节点无法加入集群问题分析
  • 中巴续签双边本币互换协议,进一步深化金融战略合作
  • “一码难求”的Manus开放注册但价格不菲,智能体距离“实用”还有多远
  • 迪奥部分客户数据遭泄露,公司称正持续展开调查
  • 马上评丨75万采购300元设备,仅仅终止采购还不够
  • 白玉兰奖征片综述丨综艺市场破局焕新,多元赛道重塑价值坐标
  • 马上评丨摆摊要交芙蓉王?对吃拿卡要必须零容忍