RocketMQ 消息幂等性实战经验分享
RocketMQ 消息幂等性实战经验分享
在分布式微服务架构中,消息队列已经成为异步解耦、削峰填谷、流量削峰的重要组件。然而,网络重试、消费者宕机、重复投递等原因,都会导致同一条消息被多次消费,造成重复执行业务的风险。因此,消息幂等性设计对于保障系统的数据一致性与稳定性至关重要。
本文基于真实生产环境场景,结合RocketMQ,在Java/Spring Boot生态中,讲解消息幂等性的设计思路、实现方案、落地优化及常见坑与解决方案,帮助开发者快速掌握并落地实践。
目录
- 业务场景描述
- 技术选型过程
- 实现方案详解
- 踩过的坑与解决方案
- 总结与最佳实践
1. 业务场景描述
在电商平台的订单支付模块中,当用户发起支付请求后,订单系统会发送一条支付成功消息到RocketMQ,后续会由对账服务、库存服务、积分服务等多个消费者并发消费。生产环境中,常见消费重复、网络抖动重试导致多次下单、库存扣减等问题,严重时会造成库存混乱、积分多发等业务损失。
业务诉求:
- 确保每条支付成功消息在整个消费者体系中只会被业务处理一次。
- 在高并发情况下,消息处理性能、可扩展性与一致性需要兼顾。
- 支持分布式部署、水平扩容,且方案简单可维护。
2. 技术选型过程
在设计消息幂等方案时,我们主要考虑以下几种思路:
- 消息自带全局唯一ID(messageId),放入数据库主键约束或分布式锁。
- 基于Redis的去重缓存(SET/Hash)+TTL,快速判重。
- 将消息日志持久化后,用日志表做唯一索引防重复。
- 借助RocketMQ事务消息保证消费端二阶段提交。
我们最终选型:Consumer端使用Redis+MySQL双写幂等控制,结合业务执行状态表做精准判重。
原因:
- Redis查询开销小、TTL可控,高并发下可快速判重;
- MySQL业务表做最终保障,若Redis漏判、宕机重试不丢失幂等记录;
- 实现简单,可观测性强(可通过监控Redis命中率、MySQL唯一键错误数)。
3. 实现方案详解
3.1 项目结构
order-payment-service/
├── src/main/java/com/example/payment
│ ├── config/
│ │ └── RocketMQConfig.java
│ ├── consumer/
│ │ └── PaymentConsumer.java
│ ├── service/
│ │ └── PaymentService.java
│ ├── repository/
│ │ └── PaymentRecordRepository.java
│ ├── model/
│ │ └── PaymentRecord.java
│ └── Application.java
├── src/main/resources/
│ └── application.yml
└── pom.xml
3.2 关键配置(application.yml)
spring:application:name: order-payment-service
rocketmq:name-server: 192.168.0.100:9876consumer:group: payment-group
3.3 RocketMQ 消费者配置
@Configuration
public class RocketMQConfig {@Value("${rocketmq.name-server}")private String nameServer;@Beanpublic DefaultLitePullConsumer paymentConsumer(PaymentConsumer paymentConsumer) {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("payment-group");consumer.setNamesrvAddr(nameServer);consumer.subscribe("payment-topic", "*", paymentConsumer::handleMessage);consumer.setMessageModel(MessageModel.CLUSTERING);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);return consumer;}
}
3.4 幂等控制模型
PaymentRecord 实体
@Entity
@Table(name = "payment_record", uniqueConstraints = @UniqueConstraint(columnNames = "message_id"))
public class PaymentRecord {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(name = "message_id", nullable = false, length = 64)private String messageId;@Column(name = "order_id", nullable = false)private Long orderId;@Column(name = "status", nullable = false)private String status;@Column(name = "created_time")private LocalDateTime createdTime;// getters and setters
}
PaymentConsumer 消费实现
@Component
public class PaymentConsumer {private final RedisTemplate<String, String> redisTemplate;private final PaymentService paymentService;public PaymentConsumer(RedisTemplate<String, String> redisTemplate, PaymentService paymentService) {this.redisTemplate = redisTemplate;this.paymentService = paymentService;}public void handleMessage(MessageExt msg) {String messageId = msg.getMsgId();String redisKey = "payment:msg:lock:" + messageId;// 1. Redis 快速判重Boolean existed = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", 60, TimeUnit.SECONDS);if (!Boolean.TRUE.equals(existed)) {// 重复消息,跳过return;}// 2. 执行业务try {paymentService.processPayment(messageId, msg);} catch (Exception ex) {// 业务失败,删除 Redis 锁,让后续重试redisTemplate.delete(redisKey);throw new RuntimeException(ex);}}
}
PaymentService 核心逻辑
@Service
public class PaymentService {private final PaymentRecordRepository recordRepo;@Transactionalpublic void processPayment(String messageId, MessageExt msg) {// 3. MySQL 唯一索引二次校验PaymentRecord record = new PaymentRecord();record.setMessageId(messageId);record.setOrderId(getOrderIdFromMsg(msg));record.setStatus("PROCESSING");record.setCreatedTime(LocalDateTime.now());recordRepo.save(record);// 4. 执行实际业务逻辑,例如:扣库存、发积分等executeBusiness(record.getOrderId());// 5. 更新状态为 SUCCESSrecord.setStatus("SUCCESS");recordRepo.save(record);}
}
4. 踩过的坑与解决方案
-
Redis Key 过期时间设置不当
- 问题:TTL过短会导致同一消息在MySQL写入前Redis键过期,重新消费时绕过Redis判重。
- 解决:根据最大业务处理时长和重试策略,设置TTL为60s以上,同时结合MySQL唯一索引最终保障。
-
MySQL 唯一索引冲突异常未捕获
- 问题:直接保存唯一约束冲突会抛出异常,导致消息不断重试。
- 解决:在save方法中捕获
DataIntegrityViolationException
,识别重复消息后直接忽略。
-
事务范围不当导致数据不一致
- 问题:业务执行与幂等记录在不同事务,回滚不一致。
- 解决:将幂等记录插入和业务逻辑置于同一
@Transactional
事务中,确保原子性。
-
消息重试导致业务资源被二次锁定
- 问题:长事务中消息多次投递,重复扣减库存。
- 解决:结合Redis判重提前拦截,同时业务侧加重入检查。
5. 总结与最佳实践
- 双重判重,Redis+MySQL 结合使用:一是保证高性能快速判重,二是通过数据库唯一约束防止漏判。
- 合理设置TTL:基于业务最大处理时长和MQ重试策略,保证Redis锁的有效覆盖。
- 统一事务管理:将幂等记录与业务执行业务置于同一事务,保证原子性与数据一致性。
- 异常分类处理:对于幂等冲突做静默忽略,对于业务失败进行日志告警并等待二次消费。
- 可观测性:监控Redis命中率、MySQL唯一索引错误次数,以及消费者消费失败率,及时告警。
以上方案已在日活百万级电商平台中稳定运行半年,成功避免了多次扣库存、重复发积分等问题,为系统稳定性提供了有力保障。希望本文分享的实战经验与思路,能帮助你在生产环境中高效落地RocketMQ消息幂等方案。
完