当前位置: 首页 > news >正文

RocketMQ 消息幂等性实战经验分享

封面

RocketMQ 消息幂等性实战经验分享

在分布式微服务架构中,消息队列已经成为异步解耦、削峰填谷、流量削峰的重要组件。然而,网络重试、消费者宕机、重复投递等原因,都会导致同一条消息被多次消费,造成重复执行业务的风险。因此,消息幂等性设计对于保障系统的数据一致性与稳定性至关重要。

本文基于真实生产环境场景,结合RocketMQ,在Java/Spring Boot生态中,讲解消息幂等性的设计思路、实现方案、落地优化及常见坑与解决方案,帮助开发者快速掌握并落地实践。


目录

  • 业务场景描述
  • 技术选型过程
  • 实现方案详解
  • 踩过的坑与解决方案
  • 总结与最佳实践

1. 业务场景描述

在电商平台的订单支付模块中,当用户发起支付请求后,订单系统会发送一条支付成功消息到RocketMQ,后续会由对账服务、库存服务、积分服务等多个消费者并发消费。生产环境中,常见消费重复、网络抖动重试导致多次下单、库存扣减等问题,严重时会造成库存混乱、积分多发等业务损失。

业务诉求:

  1. 确保每条支付成功消息在整个消费者体系中只会被业务处理一次。
  2. 在高并发情况下,消息处理性能、可扩展性与一致性需要兼顾。
  3. 支持分布式部署、水平扩容,且方案简单可维护。

2. 技术选型过程

在设计消息幂等方案时,我们主要考虑以下几种思路:

  1. 消息自带全局唯一ID(messageId),放入数据库主键约束或分布式锁。
  2. 基于Redis的去重缓存(SET/Hash)+TTL,快速判重。
  3. 将消息日志持久化后,用日志表做唯一索引防重复。
  4. 借助RocketMQ事务消息保证消费端二阶段提交。

我们最终选型:Consumer端使用Redis+MySQL双写幂等控制,结合业务执行状态表做精准判重

原因:

  • Redis查询开销小、TTL可控,高并发下可快速判重;
  • MySQL业务表做最终保障,若Redis漏判、宕机重试不丢失幂等记录;
  • 实现简单,可观测性强(可通过监控Redis命中率、MySQL唯一键错误数)。

3. 实现方案详解

3.1 项目结构

order-payment-service/
├── src/main/java/com/example/payment
│   ├── config/
│   │   └── RocketMQConfig.java
│   ├── consumer/
│   │   └── PaymentConsumer.java
│   ├── service/
│   │   └── PaymentService.java
│   ├── repository/
│   │   └── PaymentRecordRepository.java
│   ├── model/
│   │   └── PaymentRecord.java
│   └── Application.java
├── src/main/resources/
│   └── application.yml
└── pom.xml

3.2 关键配置(application.yml)

spring:application:name: order-payment-service
rocketmq:name-server: 192.168.0.100:9876consumer:group: payment-group

3.3 RocketMQ 消费者配置

@Configuration
public class RocketMQConfig {@Value("${rocketmq.name-server}")private String nameServer;@Beanpublic DefaultLitePullConsumer paymentConsumer(PaymentConsumer paymentConsumer) {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("payment-group");consumer.setNamesrvAddr(nameServer);consumer.subscribe("payment-topic", "*", paymentConsumer::handleMessage);consumer.setMessageModel(MessageModel.CLUSTERING);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);return consumer;}
}

3.4 幂等控制模型

PaymentRecord 实体
@Entity
@Table(name = "payment_record", uniqueConstraints = @UniqueConstraint(columnNames = "message_id"))
public class PaymentRecord {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(name = "message_id", nullable = false, length = 64)private String messageId;@Column(name = "order_id", nullable = false)private Long orderId;@Column(name = "status", nullable = false)private String status;@Column(name = "created_time")private LocalDateTime createdTime;// getters and setters
}
PaymentConsumer 消费实现
@Component
public class PaymentConsumer {private final RedisTemplate<String, String> redisTemplate;private final PaymentService paymentService;public PaymentConsumer(RedisTemplate<String, String> redisTemplate, PaymentService paymentService) {this.redisTemplate = redisTemplate;this.paymentService = paymentService;}public void handleMessage(MessageExt msg) {String messageId = msg.getMsgId();String redisKey = "payment:msg:lock:" + messageId;// 1. Redis 快速判重Boolean existed = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", 60, TimeUnit.SECONDS);if (!Boolean.TRUE.equals(existed)) {// 重复消息,跳过return;}// 2. 执行业务try {paymentService.processPayment(messageId, msg);} catch (Exception ex) {// 业务失败,删除 Redis 锁,让后续重试redisTemplate.delete(redisKey);throw new RuntimeException(ex);}}
}
PaymentService 核心逻辑
@Service
public class PaymentService {private final PaymentRecordRepository recordRepo;@Transactionalpublic void processPayment(String messageId, MessageExt msg) {// 3. MySQL 唯一索引二次校验PaymentRecord record = new PaymentRecord();record.setMessageId(messageId);record.setOrderId(getOrderIdFromMsg(msg));record.setStatus("PROCESSING");record.setCreatedTime(LocalDateTime.now());recordRepo.save(record);// 4. 执行实际业务逻辑,例如:扣库存、发积分等executeBusiness(record.getOrderId());// 5. 更新状态为 SUCCESSrecord.setStatus("SUCCESS");recordRepo.save(record);}
}

4. 踩过的坑与解决方案

  1. Redis Key 过期时间设置不当

    • 问题:TTL过短会导致同一消息在MySQL写入前Redis键过期,重新消费时绕过Redis判重。
    • 解决:根据最大业务处理时长和重试策略,设置TTL为60s以上,同时结合MySQL唯一索引最终保障。
  2. MySQL 唯一索引冲突异常未捕获

    • 问题:直接保存唯一约束冲突会抛出异常,导致消息不断重试。
    • 解决:在save方法中捕获DataIntegrityViolationException,识别重复消息后直接忽略。
  3. 事务范围不当导致数据不一致

    • 问题:业务执行与幂等记录在不同事务,回滚不一致。
    • 解决:将幂等记录插入和业务逻辑置于同一@Transactional事务中,确保原子性。
  4. 消息重试导致业务资源被二次锁定

    • 问题:长事务中消息多次投递,重复扣减库存。
    • 解决:结合Redis判重提前拦截,同时业务侧加重入检查。

5. 总结与最佳实践

  • 双重判重,Redis+MySQL 结合使用:一是保证高性能快速判重,二是通过数据库唯一约束防止漏判。
  • 合理设置TTL:基于业务最大处理时长和MQ重试策略,保证Redis锁的有效覆盖。
  • 统一事务管理:将幂等记录与业务执行业务置于同一事务,保证原子性与数据一致性。
  • 异常分类处理:对于幂等冲突做静默忽略,对于业务失败进行日志告警并等待二次消费。
  • 可观测性:监控Redis命中率、MySQL唯一索引错误次数,以及消费者消费失败率,及时告警。

以上方案已在日活百万级电商平台中稳定运行半年,成功避免了多次扣库存、重复发积分等问题,为系统稳定性提供了有力保障。希望本文分享的实战经验与思路,能帮助你在生产环境中高效落地RocketMQ消息幂等方案。



文章转载自:

http://ffeKgBfR.ckwrn.cn
http://kJiWNKI5.ckwrn.cn
http://9qAYMBMA.ckwrn.cn
http://gGZr8r1T.ckwrn.cn
http://9D7BNpsq.ckwrn.cn
http://FqK99Z61.ckwrn.cn
http://H7xP27c8.ckwrn.cn
http://jvsKLJxn.ckwrn.cn
http://9xSIj8TQ.ckwrn.cn
http://rSkMFo8l.ckwrn.cn
http://3E6wNxMt.ckwrn.cn
http://8MnnRwQR.ckwrn.cn
http://5TtWKPJ4.ckwrn.cn
http://pXAW7cqt.ckwrn.cn
http://jwZsFIz2.ckwrn.cn
http://KxOOCjyK.ckwrn.cn
http://EkQIIa5A.ckwrn.cn
http://cFeqRNON.ckwrn.cn
http://MrA6ACUk.ckwrn.cn
http://vb2zriV0.ckwrn.cn
http://63IVlbuo.ckwrn.cn
http://68fQ3oCV.ckwrn.cn
http://rxe0203h.ckwrn.cn
http://D9Q6YA6F.ckwrn.cn
http://vKttAUWm.ckwrn.cn
http://RbXMNLJt.ckwrn.cn
http://xfOW4pqB.ckwrn.cn
http://zjyLW7qq.ckwrn.cn
http://pUdJR9fm.ckwrn.cn
http://pHXbCQ80.ckwrn.cn
http://www.dtcms.com/a/385211.html

相关文章:

  • [SC]SystemC中,一个namespace中调用了其他namespace中的函数,需要显示include那个函数所在的.h文件吗?
  • Origin气泡图画相关性系数图
  • 基于SpringBoot+Uniapp的儿童疫苗接种预约小程序(qq邮箱、二维码识别)
  • 基于HugeGraph构建法律知识图谱(一)
  • C语言常用字符串函数
  • 【STM32项目开源】STM32单片机智能饮水机控制系统
  • 新质生产力背景下基于“开源链动2+1模式+AI智能名片+S2B2C商城小程序”的商业机会挖掘研究
  • html隐藏文本利用原理,实现点击隐藏功能
  • Java vs Python Web 开发深度对比:从传统同步到现代异步的全面演进
  • Redis 不只是缓存:深入解析 Redis Stack 与实时 AI 推理
  • IPv4地址类型
  • Deepin 25 系统安装 Docker:完整教程 + 常见问题解决
  • 虚拟机因网络导致域名解析出现问题
  • 群内靶机-Next
  • 【系统分析师】2025年上半年真题:论文及解题思路
  • 绿色出行新选择:圆梦交通联合卡的环保实践
  • 协程+连接池:高并发Python爬虫的底层优化逻辑
  • 深入理解 CAS:并发编程的原子操作基石
  • 矿用本安三电车变频器绝缘监测
  • 如何录制带解说的教学视频?屏幕录制工具推荐ASCOMP Screencapt Pro
  • 多模态视频理解领域 Benchmark 与 Leaderboard 整理
  • 《投资-54》元宇宙
  • OpenLayers数据源集成 -- 章节十四:WKT图层详解:标准几何文本格式的精确解析与渲染方案
  • U8g2 库驱动oled
  • 【NTC热敏电阻】NTC电阻测温电路与ADC换算
  • Gradle深度解析:从构建工具到开发生态系统
  • 本地搭建redis-cluster开发环境
  • 优化浏览体验:4个设置让Google Chrome更好用!
  • V100 部署qwen2.5-vl
  • 企业能源管控联网管理解决方案:为企业节能增效