当前位置: 首页 > news >正文

RabbitMQ 消费幂等性与消息重放实现

一、幂等性实现

1.1 什么是幂等性?

幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。

RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。

1.2 实现思路

RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。

消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略

 二、消息重放实现

在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。

2.1 ack和nack

如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。

(1) ack

确认消息已被消费成功。当消费者调用:

ch.basic_ack(delivery_tag=method.delivery_tag)

RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。

(2) nack

告诉RabbitMQ“我没处理好”。有两种方式:

# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

2.2 实现思路

生产者代码
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message"properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
 消费者代码
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"if r.setnx(redis_key, 1):r.expire(redis_key, 24*60*60)print(f"[x] Processing message: {body.decode()}")try:# 模拟异常if "fail" in body.decode():raise Exception("Simulated failure")# 成功处理ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:print(f"[!] Error: {e}")# 立即重放ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)else:# 已消费ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False   # 手动ack
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()

相关文章:

  • 车联网网络安全渗透测试:深度解析与实践
  • 机器学习中为什么要用混合精度训练
  • 学习设计模式《十五》——模板方法模式
  • sql server 将nvarchar长度设置成max有什么隐患
  • 2025暑期学习计划​参考
  • 【C语言】知识总结·指针篇
  • 鸿蒙 List 组件解析:从基础列表到高性能界面开发指南
  • TCP/IP协议简要概述
  • 大彩讲堂:基于VisualTFT软件如何调节电容屏触摸灵敏度
  • 【Pandas】pandas DataFrame last_valid_index
  • PHP语法基础篇(六):数组
  • 【Docker管理工具】安装Docker容器自动更新工具Watchtower
  • HTTP协议中Connection: Keep-Alive和Keep-Alive: timeout=60, max=100的作用
  • vue项目中纯前端实现导出pdf文件,不需要后端处理。
  • 探索相机成像的奥秘 - 齐次坐标、径向失真和图像传感器倾斜
  • ROS:录制相机、IMU、GNSS等设备数据
  • 扫地机产品认证--黑名单制裁公司能否拿到美国产品准入许可(FCC认证)
  • 疲劳检测与行为分析:工厂智能化实践
  • gemini-cli 踩坑实录
  • vue a-table自定义表头颜色+合并表格