RabbitMQ 消费幂等性与消息重放实现
一、幂等性实现
1.1 什么是幂等性?
幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。
RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。
1.2 实现思路
RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。
消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略
二、消息重放实现
在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。
2.1 ack和nack
如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。
(1) ack
确认消息已被消费成功。当消费者调用:
ch.basic_ack(delivery_tag=method.delivery_tag)
RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。
(2) nack
告诉RabbitMQ“我没处理好”。有两种方式:
# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
2.2 实现思路
生产者代码
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message"properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
消费者代码
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"if r.setnx(redis_key, 1):r.expire(redis_key, 24*60*60)print(f"[x] Processing message: {body.decode()}")try:# 模拟异常if "fail" in body.decode():raise Exception("Simulated failure")# 成功处理ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:print(f"[!] Error: {e}")# 立即重放ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)else:# 已消费ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False # 手动ack
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()