闸机控制系统从设计到实现全解析:第 5 篇:RabbitMQ 消息队列与闸机通信设计
第 5 篇:RabbitMQ 消息队列与闸机通信设计
RabbitMQ 是一款开源的消息队列中间件(Message Queue,MQ),基于 Erlang 语言开发,遵循 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 标准,主要用于在分布式系统中实现消息的可靠传递,核心价值是解耦系统组件、削峰填谷、异步通信等。
RabbitMQ
一、核心定位:为什么需要 RabbitMQ?
RabbitMQ 作为“中间件”,通过存储转发消息的方式,让发送方(生产者)和接收方(消费者)“解耦”:生产者只需要把消息发给 RabbitMQ,不必关心谁来接收;消费者从 RabbitMQ 取消息,不必关心谁发来的。双方通过消息间接通信,提高系统灵活性和稳定性。
二、核心概念与架构
RabbitMQ 的工作流程基于“生产者-交换机-队列-消费者”的链路,核心组件包括:
- 生产者(Producer):发送消息的应用(如订单系统生成“新订单”消息)。
- 交换机(Exchange):接收生产者的消息,根据“路由规则”将消息转发到对应的队列。交换机是 RabbitMQ 灵活路由的核心,有 4 种类型:
- 直连交换机(Direct Exchange):消息的“路由键”(Routing Key)必须与队列绑定的“绑定键”完全匹配才转发(如“order.pay”只能匹配“order.pay”)。
- 主题交换机(Topic Exchange):支持通配符匹配(
*
匹配一个单词,#
匹配多个单词),适合按“主题”分类消息(如“order.*”可匹配“order.pay”“order.cancel”)。 - 扇形交换机(Fanout Exchange):忽略路由键,将消息广播到所有绑定的队列(适合“一对多”通知,如“系统通知”需要所有模块接收)。
- Headers 交换机:不依赖路由键,根据消息头(Headers)的键值对匹配(较少用)。
- 队列(Queue):存储消息的缓冲区,消息最终会进入队列等待消费者取走。队列是“持久化”的(可配置),即使 RabbitMQ 重启,未消费的消息也能保留。
- 绑定(Binding):连接交换机和队列的“规则”,包含“绑定键”(Binding Key),用于交换机判断消息该转发到哪个队列。
- 消费者(Consumer):从队列中获取并处理消息的应用(如支付系统接收“新订单”消息并处理支付)。
三、工作流程(核心逻辑)
- 生产者发送消息时,会指定消息的“路由键”(Routing Key)和目标交换机。
- 交换机根据自身类型和“绑定规则”(绑定键),将消息转发到匹配的队列。
- 队列存储消息,等待消费者连接并获取。
- 消费者从队列中取走消息并处理,处理完成后向 RabbitMQ 发送“确认信号(Ack)”,RabbitMQ 收到后删除队列中的该消息(避免重复消费)。
一、引入消息队列的作用
闸机控制涉及设备通信,采用同步调用易受网络波动影响。引入 RabbitMQ 可实现:
-
系统解耦:闸机控制服务与设备通信模块分离,便于独立扩展。
-
异步处理:开闸、关闸命令异步发送,不阻塞主线程。
-
可靠性保障:消息持久化,避免命令丢失。
二、RabbitMQ 核心组件设计
- 交换机(Exchange):
-
gate.control.exchange
:处理闸机控制命令(开闸 / 关闸)。 -
access.record.exchange
:处理通行记录存储。
- 队列(Queue):
-
gate.command.queue
:绑定闸机控制交换机,接收命令。 -
record.save.queue
:绑定记录交换机,异步保存通行信息。
- 消息模型:
// 闸机命令消息public class GateCommand{public string GateCode { get; set; } // 目标闸机编码public int CommandType { get; set; } // 1-开闸/0-关闸public DateTime Timestamp { get; set; } = DateTime.Now;}
三、RabbitMQ 集成实现
- 安装依赖:
dotnet add package RabbitMQ.Client
- 封装消息服务:
public class RabbitMQService : IDisposable{private readonly IConnection _connection;private readonly IModel_channel;public RabbitMQService(string connectionString){var factory = new ConnectionFactory { Uri = new Uri(connectionString) };_connection = factory.CreateConnection();_channel = _connection.CreateModel();// 声明交换机和队列_channel.ExchangeDeclare("gate.control.exchange", ExchangeType.Direct, durable: true);_channel.QueueDeclare("gate.command.queue", durable: true, exclusive: false, autoDelete: false);_channel.QueueBind("gate.command.queue", "gate.control.exchange", "gate.command");}// 发送闸机命令public void PublishGateCommand(GateCommand command){var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));_channel.BasicPublish(exchange: "gate.control.exchange",routingKey: "gate.command",basicProperties: null,body: body);}// 消费命令(闸机端实现)public void ConsumeGateCommands(Action<GateCommand> handler){var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) => {var body = ea.Body.ToArray();var command = JsonConvert.DeserializeObject\<GateCommand>(Encoding.UTF8.GetString(body));handler(command);_channel.BasicAck(ea.DeliveryTag, multiple: false);};_channel.BasicConsume(queue: "gate.command.queue", autoAck: false, consumer: consumer);}public void Dispose(){_channel?.Close();\connection?.Close();}}
四、闸机命令处理流程
-
验证通过后,系统向 RabbitMQ 发送开闸命令(
CommandType=1
)。 -
闸机端消费者接收命令,执行物理开闸操作。
-
开闸成功后,触发票证状态更新回调。
-
延迟 5 秒后,系统发送关闸命令(
CommandType=0
),完成一次控制流程。