如何避免消息重复投递或重复消费
前言
大家好,在消息队列的使用过程中,重复消费是一个比消息丢失更常见的问题。想象一下这样的场景:用户支付了100元,由于消息重复消费,账户被扣了200元;商品库存原本有100件,重复消费后变成了98件... 这种问题如果处理不好,会给业务带来严重的影响。
今天我们就来深入探讨消息重复消费的原因,以及如何通过幂等性设计来彻底解决这个问题。
为什么会产生重复消费?
在深入解决方案之前,我们先了解一下消息重复的常见原因:
1. 消费端重试机制
// 当消费端处理失败时,MQ会自动重试
try {processMessage(message);
} catch (Exception e) {// 处理失败,MQ会重新投递这条消息log.error("消息处理失败,等待重试", e);throw e; // 抛出异常触发重试
}
2. 生产者重复发送
// 网络超时导致生产者重复发送
public void sendOrderMessage(Order order) {try {// 第一次发送,网络超时kafkaTemplate.send("order-topic", order.getId(), order);} catch (TimeoutException e) {// 超时后重试,但第一次可能实际上已经发送成功kafkaTemplate.send("order-topic", order.getId(), order);}
}
3. 位点提交延迟
// 位点提交不及时,消费者重启后重复消费
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processMessage(record.value()); // 处理消息// 如果在这里宕机,位点没有提交,重启后会重新消费}consumer.commitSync(); // 批量提交位点
}
核心解决方案:实现消息幂等性
幂等性是解决重复消费的银弹。所谓幂等,就是同一个操作执行多次的结果与执行一次的结果相同。
方案一:唯一标识 + 去重表
这是最常用且最可靠的方案,适合所有业务场景。
数据库去重表实现
@Service
public class OrderService {@Autowiredprivate JdbcTemplate jdbcTemplate;public boolean processOrderMessage(OrderMessage message) {// 1. 检查消息是否已经处理过String checkSql = "SELECT count(1) FROM message_duplicate WHERE msg_id = ?";Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, message.getMessageId());if (count != null && count > 0) {log.info("消息已处理,直接返回: {}", message.getMessageId());return true; // 已经处理过,直接返回成功}try {// 2. 处理业务逻辑processBusiness(message);// 3. 记录消息ID到去重表String insertSql = "INSERT INTO message_duplicate(msg_id, create_time) VALUES (?, NOW())";jdbcTemplate.update(insertSql, message.getMessageId());log.info("消息处理完成: {}", message.getMessageId());return true;} catch (DuplicateKeyException e) {// 并发情况下可能出现的重复插入log.warn("消息重复处理: {}", message.getMessageId());return true;}}// 业务处理逻辑private void processBusiness(OrderMessage message) {// 实际的订单处理逻辑orderDao.updateOrderStatus(message.getOrderId(), OrderStatus.PAID);inventoryDao.deductStock(message.getProductId(), message.getQuantity());}
}
Redis去重实现
@Service
public class PaymentService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String MESSAGE_PREFIX = "msg:";private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时public boolean processPaymentMessage(PaymentMessage message) {String messageKey = MESSAGE_PREFIX + message.getMessageId();// 使用SETNX原子操作实现去重Boolean success = redisTemplate.opsForValue().setIfAbsent(messageKey, "processed", EXPIRE_TIME, TimeUnit.SECONDS);if (Boolean.FALSE.equals(success)) {log.info("支付消息已处理: {}", message.getMessageId());return true;}try {// 处理支付业务processPaymentBusiness(message);log.info("支付处理成功: {}", message.getMessageId());return true;} catch (Exception e) {// 处理失败,删除Redis键,允许重试redisTemplate.delete(messageKey);log.error("支付处理失败: {}", message.getMessageId(), e);throw e;}}
}
方案二:业务逻辑幂等设计
有时候,我们可以通过巧妙的业务设计,让操作本身具备幂等性。
状态机模式
@Service
public class OrderStatusService {public boolean updateOrderStatus(String orderId, OrderStatus newStatus) {// 先查询当前状态Order order = orderDao.findById(orderId);if (order == null) {throw new OrderNotFoundException("订单不存在: " + orderId);}// 检查状态流转是否合法if (!canTransition(order.getStatus(), newStatus)) {log.warn("订单状态流转不合法: {} -> {}", order.getStatus(), newStatus);return false; // 幂等:重复操作直接返回}// 更新状态orderDao.updateStatus(orderId, newStatus);return true;}// 定义合法的状态流转private boolean canTransition(OrderStatus current, OrderStatus target) {Map<OrderStatus, Set<OrderStatus>> transitions = Map.of(OrderStatus.CREATED, Set.of(OrderStatus.PAID, OrderStatus.CANCELLED),OrderStatus.PAID, Set.of(OrderStatus.SHIPPED, OrderStatus.REFUNDING),OrderStatus.SHIPPED, Set.of(OrderStatus.COMPLETED),OrderStatus.REFUNDING, Set.of(OrderStatus.REFUNDED));return transitions.getOrDefault(current, Collections.emptySet()).contains(target);}
}
版本号控制
@Entity
@Table(name = "product_inventory")
public class ProductInventory {@Idprivate Long productId;private Integer stock;@Version // JPA乐观锁版本号private Long version;// 扣减库存的幂等方法public boolean deductStock(Integer quantity) {if (this.stock < quantity) {throw new InsufficientStockException("库存不足");}this.stock -= quantity;return true;}
}@Service
public class InventoryService {@Transactionalpublic boolean deductInventory(Long productId, Integer quantity, Long version) {// 使用乐观锁确保幂等String sql = "UPDATE product_inventory SET stock = stock - ? " +"WHERE product_id = ? AND version = ? AND stock >= ?";int affectedRows = jdbcTemplate.update(sql, quantity, productId, version, quantity);if (affectedRows == 0) {// 版本号不匹配或库存不足,说明可能已经处理过log.warn("库存扣减失败,可能已处理: productId={}, version={}", productId, version);return false;}log.info("库存扣减成功: productId={}, quantity={}", productId, quantity);return true;}
}
方案三:生产者防重复发送
从源头避免重复消息的产生。
@Service
public class ReliableMessageProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String SEND_RECORD_PREFIX = "send:";public SendResult sendMessage(String topic, String key, Object message) {String sendKey = SEND_RECORD_PREFIX + key;// 检查是否已经发送过if (redisTemplate.hasKey(sendKey)) {log.info("消息已发送过,直接返回: key={}", key);return getCachedResult(key);}try {// 发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, key, message);SendResult result = future.get(5, TimeUnit.SECONDS);// 发送成功,记录发送状态redisTemplate.opsForValue().set(sendKey, "sent", 1, TimeUnit.HOURS);return result;} catch (TimeoutException e) {// 超时情况,需要业务方根据实际情况处理log.error("消息发送超时: key={}", key, e);throw new MessageSendException("消息发送超时", e);} catch (Exception e) {log.error("消息发送失败: key={}", key, e);throw new MessageSendException("消息发送失败", e);}}
}
方案四:消费端位点管理
确保消费进度正确提交,避免因位点回退导致的重复消费。
@Component
public class ReliableMessageConsumer {@KafkaListener(topics = "order-topic")public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {String messageId = record.key();try {// 1. 检查幂等性if (isMessageProcessed(messageId)) {log.info("消息已处理,直接确认: {}", messageId);ack.acknowledge(); // 确认消息return;}// 2. 处理业务processBusiness(record.value());// 3. 记录处理状态markMessageProcessed(messageId);// 4. 手动确认消息ack.acknowledge();log.info("消息处理完成: {}", messageId);} catch (Exception e) {log.error("消息处理失败: {}", messageId, e);// 不确认消息,等待重试// 在实际生产中,可以记录重试次数,超过阈值进入死信队列}}private boolean isMessageProcessed(String messageId) {// 检查Redis或数据库,判断消息是否已处理return redisTemplate.hasKey("processed:" + messageId);}private void markMessageProcessed(String messageId) {redisTemplate.opsForValue().set("processed:" + messageId, "true", 24, TimeUnit.HOURS);}
}
实战建议
1. 根据业务场景选择合适的方案
- 金融支付类:推荐使用"唯一标识+数据库去重表",可靠性最高
- 电商库存类:可以使用"版本号控制"或"状态机模式"
- 日志通知类:简单的Redis去重即可满足需求
2. 设置合理的消息过期时间
// 根据业务特点设置不同的过期时间
private static final Map<MessageType, Long> MESSAGE_TTL_MAP = Map.of(MessageType.PAYMENT, 30L * 24 * 60 * 60, // 支付消息保存30天MessageType.ORDER, 7L * 24 * 60 * 60, // 订单消息保存7天MessageType.NOTIFY, 1L * 24 * 60 * 60 // 通知消息保存1天
);
3. 监控与告警
@Component
public class DuplicateMessageMonitor {public void monitorDuplicateRate() {// 监控重复消息比例long totalMessages = getTotalMessages();long duplicateMessages = getDuplicateMessages();double duplicateRate = (double) duplicateMessages / totalMessages;if (duplicateRate > 0.01) { // 重复率超过1%alertService.sendAlert("消息重复率过高", String.format("当前重复率: %.2f%%", duplicateRate * 100));}}
}