当前位置: 首页 > news >正文

RabbitMq如何实现幂等性

目录

为什么会产生重复消息?

实现幂等性的常见方案

1. 业务逻辑天然幂等

2. 唯一键/版本号控制(最常用、最推荐)

3. 状态机控制(适用于有状态流转的业务)

4. 使用 Redis 等缓存中间件

总结与建议


在分布式系统中,网络抖动、客户端或服务端故障都可能导致消息重复传递。RabbitMQ 本身不提供幂等性保证,它提供的是消息投递的保证(如确认机制),但无法阻止重复消息的产生。

因此,消息幂等性必须由消费者来实现

为什么会产生重复消息?

  1. 生产者确认模式

    • 生产者开启了 publisher confirm 模式,但在消息发出后,网络抖动导致确认信号没有及时收到。生产者可能会认为消息发送失败而重试,导致Broker收到两条一样的消息。

  2. 消费者确认模式

    • 消费者处理完消息后,在发送 ack(确认)回Broker之前突然宕机或连接断开。Broker没有收到 ack,会认为该消息处理失败,从而将消息重新投递给另一个消费者(或者等待当前消费者重连后再次投递)。

实现幂等性的常见方案

幂等性的核心思想是:无论同一条消息被消费多少次,其结果都与消费一次相同。以下是几种主流的实现方案:

1. 业务逻辑天然幂等

首先检查你的业务操作本身是否就是幂等的。例如:

  • 查询操作select * from table where id=1,执行多次结果都一样。

  • 更新操作update table set status = 'completed' where id=1,执行多次后状态依然是 completed

  • 删除操作delete from table where id=1,执行多次后结果都是数据被删除。

如果业务逻辑本身是幂等的,那么就无需额外处理。

2. 唯一键/版本号控制(最常用、最推荐)

这是最通用和有效的方法。核心原理是:在数据库中利用唯一约束来防止重复数据

实现步骤:

  1. 在消息体中携带一个全局唯一的ID(例如 message_id),这个ID可以是业务主键,也可以是雪花算法等生成的分布式ID。这个ID需要唯一标识一条消息或一个业务请求

  2. 消费者在处理消息前,先拿这个 message_id 去一张“去重表”中查询。

    • 如果不存在,则进行业务处理,处理成功后将该 message_id 作为唯一键插入到“去重表”中。

    • 如果已存在,则说明该消息已经被成功处理过,直接丢弃或确认消息即可。

举例:订单支付消息
假设消息体为:{ "order_id": 20240907001, "amount": 100.00 }

-- 创建去重表
CREATE TABLE message_id_empower (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE, -- 唯一约束,确保不会重复插入create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);-- 消费者伪代码
public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 尝试插入去重表try {int count = executeSql("INSERT INTO message_id_empower (message_id) VALUES (?)", orderId);if (count > 0) {// 插入成功,说明是第一次处理processPayment(orderId); // 真正的业务处理:更新订单状态为已支付channel.basicAck(deliveryTag); // 确认消息} else {// 插入失败(由于唯一约束冲突),说明是重复消息channel.basicAck(deliveryTag); // 直接确认,不再处理log.warn("Duplicate message received, orderId: {}", orderId);}} catch (DuplicateKeyException e) {// 捕获唯一键冲突异常,同样视为重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}

优点

  • 简单可靠,通用性强。

  • 基于数据库,实现方便。

缺点

  • 需要引入额外的数据库表和写操作,有性能开销。

  • 去重表需要根据业务周期定期清理旧数据。

3. 状态机控制(适用于有状态流转的业务)

很多业务数据本身就有明确的状态流转(如订单状态:待支付 -> 已支付 -> 已发货)。可以通过判断当前状态来决定是否处理消息。

举例:同样的订单支付消息

public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 先从数据库查询当前订单状态Order order = orderDao.findById(orderId);if (order == null) {// 订单不存在,可能是脏数据,记录日志并确认消息channel.basicAck(deliveryTag);return;}if (OrderStatus.PAID.equals(order.getStatus())) {// 状态已是“已支付”,说明是重复消息,直接确认channel.basicAck(deliveryTag);log.warn("Order already paid, orderId: {}", orderId);return;}if (!OrderStatus.PENDING.equals(order.getStatus())) {// 状态不是“待支付”,说明订单无法支付(可能已取消),记录日志并确认channel.basicAck(deliveryTag);log.error("Order status is invalid for payment, orderId: {}, status: {}", orderId, order.getStatus());return;}// 2. 状态是“待支付”,正常处理业务processPayment(orderId);channel.basicAck(deliveryTag);
}

优点

  • 无需创建额外的去重表,利用业务数据本身实现。

  • 逻辑符合业务语义。

缺点

  • 只适用于有状态流转的业务模型。

  • 需要先进行一次数据库查询。

4. 使用 Redis 等缓存中间件

原理与“唯一键控制”类似,利用 Redis 的 SET key value NX(如果key不存在则设置)命令来实现分布式锁或去重标记。

public void consume(Message message) {String orderId = message.getBody().getString("order_id");String redisKey = "order_paid:" + orderId;// 尝试设置一个过期时间为一天的键,如果设置成功返回true,否则返回falseBoolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", Duration.ofDays(1));if (Boolean.TRUE.equals(success)) {// 设置成功,说明是第一次处理processPayment(orderId);channel.basicAck(deliveryTag);} else {// 设置失败,键已存在,说明是重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}

优点

  • 性能极高,速度远快于数据库。

缺点

  • 可靠性不如数据库,存在Redis服务宕机或数据丢失的风险(需要持久化配置)。

  • 需要合理设置键的过期时间。

总结与建议

方案适用场景优点缺点
天然幂等查询、特定更新/删除无需任何额外工作适用范围有限
唯一键控制几乎所有场景通用、可靠需要数据库支持,有性能开销
状态机控制订单等有状态流转的业务利用现有业务表,无需额外表需要先查询,只适用于特定业务
Redis 缓存对性能要求极高的场景性能极佳可靠性稍弱,需要维护Redis

最佳实践建议:

  1. 首选方案:对于大部分业务系统,“唯一键/版本号控制” 是最稳健、最通用的选择。结合数据库的唯一约束,可以万无一失。

  2. 组合使用:可以将多种方案结合。例如,先用 Redis 做快速去重过滤大部分请求,同时用数据库做最终兜底。

  3. 消息设计务必在消息体内携带一个全局唯一的业务ID(如 order_idmessage_id),这是实现幂等的基础。

  4. 先查后改:在处理任何消息时,养成“先查询当前状态,再决定是否处理”的习惯。

记住,RabbitMQ 提供了“至少一次”的消息投递保证,而要达成“恰好一次”的语义,必须依靠消费者端的幂等性处理来实现。


文章转载自:

http://2s0epSNc.xqkjp.cn
http://BDO9Md1h.xqkjp.cn
http://IDlJpfXf.xqkjp.cn
http://BaQhdNhY.xqkjp.cn
http://WtlcPv1f.xqkjp.cn
http://ubuurmh4.xqkjp.cn
http://IJVWKP5P.xqkjp.cn
http://25bNqhFt.xqkjp.cn
http://3loXxEtf.xqkjp.cn
http://VkevumSl.xqkjp.cn
http://5X0BGd3x.xqkjp.cn
http://YLr2BEtQ.xqkjp.cn
http://uHAY6Po1.xqkjp.cn
http://oxFHL2C1.xqkjp.cn
http://tCaOJjbI.xqkjp.cn
http://yYgFJEd1.xqkjp.cn
http://wlF2rAvF.xqkjp.cn
http://OhPvMxxo.xqkjp.cn
http://HNh3yddy.xqkjp.cn
http://wAI9V6v8.xqkjp.cn
http://UqNeKc4v.xqkjp.cn
http://ZxOx6ljL.xqkjp.cn
http://Jb2x4pLT.xqkjp.cn
http://Q0tCfcVs.xqkjp.cn
http://dOmmrXit.xqkjp.cn
http://n1HTk5Xq.xqkjp.cn
http://vMakXSf2.xqkjp.cn
http://ql8eZRIi.xqkjp.cn
http://8IyeOvEV.xqkjp.cn
http://GPLAkbJe.xqkjp.cn
http://www.dtcms.com/a/367345.html

相关文章:

  • 【JAVA】创建一个建单的TCP服务端和客户端
  • AI智汇社区凭什么半年估值破亿?这家公司让普通人也能玩转AI开发
  • WebSocket简述与网络知识回顾
  • 揭秘23种设计模式的艺术与技巧之行为型
  • 【LeetCode每日一题】94. 二叉树的中序遍历 104. 二叉树的最大深度
  • 渗透测试与网络安全审计的关系
  • Qwen2.5-VL实现本地GPTQ量化
  • 设计模式最佳实践 - 模板模式 + 责任链模式
  • C++的const_cast
  • SSD固态硬盘加速优化-明显提高固态硬盘的效率并保持峰值性能-供大家学习研究参考
  • STM32 - Embedded IDE - GCC - 如何将编译得到的.bin固件添加CRC32校验码
  • VSCode中的扩展Extension说明
  • 《IC验证必看|semaphore与mailbox的核心区别》
  • Web与Nginx
  • JS 可迭代对象详解:从概念到实践的全方位剖析
  • 同城酒水推广算法怎么做?
  • (自用)PowerShell常用命令自查文档
  • 当公司在你电脑上安装了IP-guard,你必须知道的事
  • 【已更新文章+代码】2025数学建模国赛B题思路代码文章高教社杯全国大学生数学建模-碳化硅外延层厚度的确定
  • 空车不空,英魂长在(记9.3大阅兵)
  • MySQL并发问题解析
  • linux——自定义协议
  • 基于联邦学习的政务大数据平台应用研究
  • Jenkins调用ansible部署lnmp平台
  • 迈威通信从送快递角度教你分清网络二层和三层
  • 热计量表通过M-Bus接口实现无线集抄系统的几种解决方
  • 从KV Cache竞争到多卡优化:vLLM加载AWQ模型的显存优化全攻略
  • 8.7 通过时间反向传播
  • 基于扣子平台构造AutoGen框架的多智能体使用-----封装成FastAPI接口供调用
  • 谈谈你对ThreadLocal的理解