iChat:RabbitMQ封装
RabbitMQ封装
- RabbitMQ介绍
- 什么是 RabbitMQ?
- 为什么 iChat 需要 RabbitMQ?
- RabbitMQ 在 iChat 中的具体应用
- 技术选型说明
- 封装流程
RabbitMQ介绍
什么是 RabbitMQ?
RabbitMQ 是一个开源的消息代理和队列服务器,采用 Erlang 语言开发,实现了高级消息队列协议(AMQP)。它提供了可靠的消息传递机制,支持消息持久化、负载均衡、事务处理等企业级特性,是构建分布式系统中异步通信的核心组件。
为什么 iChat 需要 RabbitMQ?
在 iChat 的即时通讯场景中,存在大量的异步处理和消息分发需求:
- 消息广播:一条消息需要同时推送给多个在线用户
- 流量削峰:应对突发的大量消息发送请求
- 服务解耦:各微服务之间通过消息队列进行松耦合通信
- 可靠投递:确保重要消息不丢失,如聊天消息的持久化
如果直接使用同步调用处理这些场景,会导致系统耦合度高、响应延迟大、容错能力差等问题。
RabbitMQ 在 iChat 中的具体应用
iChat 使用 RabbitMQ 主要实现以下功能:
- 消息持久化:将聊天消息持久化到磁盘,防止服务重启导致数据丢失
- 发布订阅:实现消息的广播推送,支持多消费者同时处理
- 流量控制:通过队列缓冲机制平滑处理消息发送高峰
- 服务解耦:转发服务与消息存储服务通过消息队列异步通信
- 延迟消息:支持定时消息推送和重试机制
- 负载均衡:多个消息处理实例可以从同一队列竞争消费
技术选型说明
iChat 使用 amqp-cpp 作为 RabbitMQ 的 C++ 客户端,通过 AMQP 协议与 RabbitMQ 服务器通信,主要基于以下考虑:
- 可靠性:提供消息确认、持久化、事务等保证消息不丢失
- 灵活性:支持多种交换器类型(直连、扇出、主题、头交换)
- 高性能:Erlang 语言的并发特性确保高吞吐量
- 易管理:提供完善的 Web 管理界面,便于监控和运维
- 生态成熟:社区活跃,文档完善,与微服务架构完美契合
RabbitMQ 为 iChat 提供了可靠的消息中间件支撑,实现了各微服务之间的异步通信和解耦,确保了消息的可靠传递和系统的高可用性,是构建大规模即时通讯系统的关键基础设施。
封装流程
class MQClient {public:using Ptr = std::shared_ptr<MQClient>;private:struct ev_loop* _loop;std::unique_ptr<AMQP::LibEvHandler> _handler;std::unique_ptr<AMQP::TcpConnection> _connection;std::unique_ptr<AMQP::TcpChannel> _channel;std::thread _loop_thread;
};
首先是libev库的循环指针_loop,引入该库能够提供高效的网络I/O多路复用与非阻塞的响应网络事件处理。而后续的事件处理器_handler负责将libev事件转换为AMQP事件,_connection建立并维护与RabbitMQ服务器的TCP连接,_channel代表一个逻辑通道(一个TCP连接上可以建立多个逻辑通道,进行多路复用)。
class MQClient {public:MQClient(const std::string& user, const std::string& passwd,const std::string& host) {_loop = EV_DEFAULT;_handler = std::make_unique<AMQP::LibEvHandler>(_loop);auto url = "amqp://" + user + ":" + passwd + "@" + host + "/";AMQP::Address addr(url);_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), addr);_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());_loop_thread = std::thread([this]() { ev_run(_loop); });}
};
构造函数中,首先获取默认事件循环,再创建基于libev的AMQP事件处理器,随后建立TCP连接,并在连接上创建通信通道。最后,在新线程中运行事件循环,异步处理网络I/O(主线程负责消息的发布与消费)。
class MQClient {public:~MQClient() {ev_async watcher;ev_async_init(&watcher, watcher_cb);ev_async_start(_loop, &watcher);ev_async_send(_loop, &watcher);_loop_thread.join();}private:static void watcher_cb(struct ev_loop* loop, ev_async*, int) {ev_break(loop, EVBREAK_ALL);}
};
析构函数中,创建异步监视器向事件循环线程发送停止信号,初始化监视器时需要传入对应的回调函数(用于安全终止事件循环)。发送信号通知线程终止后,再销毁线程对象。
class MQClient {public:void declare(const std::string& exchange, const std::string& queue,const std::string& routing_key = "routing_key",AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct) {_channel->declareExchange(exchange, exchange_type).onError([](const std::string& err) {LOG_ERROR("交换机声明失败: {}", err);exit(0);}).onSuccess([exchange]() { LOG_DEBUG("交换机 {} 声明成功", exchange); });_channel->declareQueue(queue).onError([](const std::string& err) {LOG_ERROR("队列声明失败: {}", err);exit(0);}).onSuccess([queue]() { LOG_DEBUG("队列 {} 声明成功", queue); });_channel->bindQueue(exchange, queue, routing_key).onError([](const std::string& err) {LOG_ERROR("交换机-队列 绑定失败: {}", err);exit(0);}).onSuccess([exchange, queue]() {LOG_DEBUG("交换机 {} - 队列 {} 绑定成功", exchange, queue);});}
};
声明交换机时,默认使用direct直连类型,这种类型性能更优,也更加符合点对点聊天的业务。在声明交换机和队列时,都可以设置对应失败与成功的回调函数,打印日志方便调试。声明完交换机与队列后,再将其绑定在一起,最终可以实现一个信道对应多个交换机,而每个交换机下对应多个队列。而绑定时可以设置对应的路由键(路由时的规则),一个队列可以绑定多个路由键,让消息通过不同的路由规则传递到队列。
class MQClient {public:bool publish(const std::string& exchange, const std::string& msg,const std::string& routing_key = "routing_key") {bool ret = _channel->publish(exchange, routing_key, msg);if (!ret) {LOG_ERROR("交换机 {} 消息发布失败", exchange);return false;}return true;}
};
发布消息时,选择指定交换机,按照指定的路由规则,将指定消息发送到对应的队列中。
class MQClient {public:using MessageCallBack = std::function<void(const char*, uint64_t)>;public:void consume(const std::string& queue, const MessageCallBack& cb,const std::string& tag = "consume_tag") {_channel->consume(queue, tag).onReceived([this, cb](const AMQP::Message& msg, uint64_t deliveryTag, bool) {cb(msg.body(), msg.bodySize());_channel->ack(deliveryTag);}).onError([queue](const std::string& err) {LOG_ERROR("队列 {} 消息订阅失败: {}", queue, err);exit(0);});}
};
消费消息时,选择指定队列,并传入消费标签(用于标识消费者,记录哪个消费者在消费)。消费时设置收到消息的回调函数,具体消费手段通过外部传递的回调函数(回调函数类型由function函数包装器实现)决定,实现一定程度上的解耦。同时,消息处理完成后自动发送ACK,确保可靠消费。
