基于Pika的RabbitMQ 消费者异常消息消费问题分析
Pika 是 Python 中用于与 RabbitMQ 消息代理进行交互的第三方库,它提供了简单易用的 API,允许开发者在 Python 应用程序中实现消息队列功能,支持消息的发布、订阅、接收和处理。
1. 问题描述
1.1 预期行为设计
原程序的功能设计目标为:
- 自动侦听模式:启动 RabbitMQ 队列消费者,持续监听消息到达
- 单消息触发机制:当成功获取到一条有效消息后,立即停止消息侦听
- 业务处理隔离:基于获取的消息执行相关业务逻辑(具体业务不展开)
- 循环重启机制:业务处理完成后,重新启动消息侦听,进入下一轮处理周期
1.2. 实际异常现象
在实际运行过程中,当消息队列中积累多个待处理消息时,观察到以下异常行为:
- 批量消息消费:每次消费周期中,消费者会连续处理多条消息(而非设计的单条)
- 消息丢失风险:在批量消费过程中,可能出现消息处理异常或丢失的情况
- 控制逻辑失效:
stop_consuming()
调用未能及时阻止后续消息的继续消费
2. 技术原理深度分析
2.1. 原代码实现
channel = rabbitmq_connection.channel()
channel.queue_declare(queue=rabbitmq_queue, durable=True)def callback(ch, method, properties, body):try:datas = json.loads(body.decode('utf-8'))logger.info(f"接收到消息:{body.decode('utf-8')}")input_datas = {"ID": datas["ID"], "appname": datas["appname"], "poi": datas["poi"] } target_queue.put(input_datas