当前位置: 首页 > news >正文

iChat:RabbitMQ封装

RabbitMQ封装

  • RabbitMQ介绍
    • 什么是 RabbitMQ?
    • 为什么 iChat 需要 RabbitMQ?
    • RabbitMQ 在 iChat 中的具体应用
    • 技术选型说明
  • 封装流程


RabbitMQ介绍

什么是 RabbitMQ?

RabbitMQ 是一个开源的消息代理和队列服务器,采用 Erlang 语言开发,实现了高级消息队列协议(AMQP)。它提供了可靠的消息传递机制,支持消息持久化、负载均衡、事务处理等企业级特性,是构建分布式系统中异步通信的核心组件。

为什么 iChat 需要 RabbitMQ?

在 iChat 的即时通讯场景中,存在大量的异步处理和消息分发需求:

  • 消息广播:一条消息需要同时推送给多个在线用户
  • 流量削峰:应对突发的大量消息发送请求
  • 服务解耦:各微服务之间通过消息队列进行松耦合通信
  • 可靠投递:确保重要消息不丢失,如聊天消息的持久化

如果直接使用同步调用处理这些场景,会导致系统耦合度高、响应延迟大、容错能力差等问题。

RabbitMQ 在 iChat 中的具体应用

iChat 使用 RabbitMQ 主要实现以下功能:

  • 消息持久化:将聊天消息持久化到磁盘,防止服务重启导致数据丢失
  • 发布订阅:实现消息的广播推送,支持多消费者同时处理
  • 流量控制:通过队列缓冲机制平滑处理消息发送高峰
  • 服务解耦:转发服务与消息存储服务通过消息队列异步通信
  • 延迟消息:支持定时消息推送和重试机制
  • 负载均衡:多个消息处理实例可以从同一队列竞争消费

技术选型说明

iChat 使用 amqp-cpp 作为 RabbitMQ 的 C++ 客户端,通过 AMQP 协议与 RabbitMQ 服务器通信,主要基于以下考虑:

  • 可靠性:提供消息确认、持久化、事务等保证消息不丢失
  • 灵活性:支持多种交换器类型(直连、扇出、主题、头交换)
  • 高性能:Erlang 语言的并发特性确保高吞吐量
  • 易管理:提供完善的 Web 管理界面,便于监控和运维
  • 生态成熟:社区活跃,文档完善,与微服务架构完美契合

RabbitMQ 为 iChat 提供了可靠的消息中间件支撑,实现了各微服务之间的异步通信和解耦,确保了消息的可靠传递和系统的高可用性,是构建大规模即时通讯系统的关键基础设施。

封装流程

class MQClient {public:using Ptr = std::shared_ptr<MQClient>;private:struct ev_loop* _loop;std::unique_ptr<AMQP::LibEvHandler> _handler;std::unique_ptr<AMQP::TcpConnection> _connection;std::unique_ptr<AMQP::TcpChannel> _channel;std::thread _loop_thread;
};

首先是libev库的循环指针_loop,引入该库能够提供高效的网络I/O多路复用与非阻塞的响应网络事件处理。而后续的事件处理器_handler负责将libev事件转换为AMQP事件,_connection建立并维护与RabbitMQ服务器的TCP连接,_channel代表一个逻辑通道(一个TCP连接上可以建立多个逻辑通道,进行多路复用)。

class MQClient {public:MQClient(const std::string& user, const std::string& passwd,const std::string& host) {_loop = EV_DEFAULT;_handler = std::make_unique<AMQP::LibEvHandler>(_loop);auto url = "amqp://" + user + ":" + passwd + "@" + host + "/";AMQP::Address addr(url);_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), addr);_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());_loop_thread = std::thread([this]() { ev_run(_loop); });}
};

构造函数中,首先获取默认事件循环,再创建基于libev的AMQP事件处理器,随后建立TCP连接,并在连接上创建通信通道。最后,在新线程中运行事件循环,异步处理网络I/O(主线程负责消息的发布与消费)。

class MQClient {public:~MQClient() {ev_async watcher;ev_async_init(&watcher, watcher_cb);ev_async_start(_loop, &watcher);ev_async_send(_loop, &watcher);_loop_thread.join();}private:static void watcher_cb(struct ev_loop* loop, ev_async*, int) {ev_break(loop, EVBREAK_ALL);}
};

析构函数中,创建异步监视器向事件循环线程发送停止信号,初始化监视器时需要传入对应的回调函数(用于安全终止事件循环)。发送信号通知线程终止后,再销毁线程对象。

class MQClient {public:void declare(const std::string& exchange, const std::string& queue,const std::string& routing_key = "routing_key",AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct) {_channel->declareExchange(exchange, exchange_type).onError([](const std::string& err) {LOG_ERROR("交换机声明失败: {}", err);exit(0);}).onSuccess([exchange]() { LOG_DEBUG("交换机 {} 声明成功", exchange); });_channel->declareQueue(queue).onError([](const std::string& err) {LOG_ERROR("队列声明失败: {}", err);exit(0);}).onSuccess([queue]() { LOG_DEBUG("队列 {} 声明成功", queue); });_channel->bindQueue(exchange, queue, routing_key).onError([](const std::string& err) {LOG_ERROR("交换机-队列 绑定失败: {}", err);exit(0);}).onSuccess([exchange, queue]() {LOG_DEBUG("交换机 {} - 队列 {} 绑定成功", exchange, queue);});}
};

声明交换机时,默认使用direct直连类型,这种类型性能更优,也更加符合点对点聊天的业务。在声明交换机和队列时,都可以设置对应失败与成功的回调函数,打印日志方便调试。声明完交换机与队列后,再将其绑定在一起,最终可以实现一个信道对应多个交换机,而每个交换机下对应多个队列。而绑定时可以设置对应的路由键(路由时的规则),一个队列可以绑定多个路由键,让消息通过不同的路由规则传递到队列。

class MQClient {public:bool publish(const std::string& exchange, const std::string& msg,const std::string& routing_key = "routing_key") {bool ret = _channel->publish(exchange, routing_key, msg);if (!ret) {LOG_ERROR("交换机 {} 消息发布失败", exchange);return false;}return true;}
};

发布消息时,选择指定交换机,按照指定的路由规则,将指定消息发送到对应的队列中。

class MQClient {public:using MessageCallBack = std::function<void(const char*, uint64_t)>;public:void consume(const std::string& queue, const MessageCallBack& cb,const std::string& tag = "consume_tag") {_channel->consume(queue, tag).onReceived([this, cb](const AMQP::Message& msg, uint64_t deliveryTag, bool) {cb(msg.body(), msg.bodySize());_channel->ack(deliveryTag);}).onError([queue](const std::string& err) {LOG_ERROR("队列 {} 消息订阅失败: {}", queue, err);exit(0);});}
};

消费消息时,选择指定队列,并传入消费标签(用于标识消费者,记录哪个消费者在消费)。消费时设置收到消息的回调函数,具体消费手段通过外部传递的回调函数(回调函数类型由function函数包装器实现)决定,实现一定程度上的解耦。同时,消息处理完成后自动发送ACK,确保可靠消费。

http://www.dtcms.com/a/597415.html

相关文章:

  • 悬镜安全CEO子芽荣获“2025年度OSCAR开源人物”
  • 江西省职业院校技能大赛“信创适配及安全管理”赛项
  • 音视频媒体服务领域中三种架构方式的定义与区别(Mesh、MCU、SFU)
  • Navicat17安装
  • 【Arm】Encountered an improper argument
  • Python编程题 | 深入浅出解析常见编程问题,快速提升编程能力
  • protobuf编码原理
  • 港股实时行情API接入全流程
  • 公司网站制作计入什么科目重庆建筑网
  • Next.js第一章(入门)
  • 数据管理战略|数字化改革的四个体系
  • 设备外绝缘强度将随海拔的升高而降低,导致设备允许的最高工作电压下降。
  • crm系统设计东莞百度seo地址
  • 2025年第四期DAMA数据治理CDGA考试练习题
  • 面向对象(上)-package关键字的使用
  • 自己做电影网站违法吗dz网站收款即时到账怎么做的
  • 电子商务网站开发语言wordpress读取相册
  • 全面了解云手机的安全性
  • 数据结构代码练习DAY2
  • 声网SDK让音视频开发效率翻倍
  • 网站图片尺寸如何免费建站
  • 360做网站和推广怎么样网站后端架构如何做
  • 从零到一构建数据驱动的业务落地
  • 测试题-6
  • 那个网站上有做婚礼布场样图的营销型网站有意义吗
  • 安卓和苹果手机通用的备忘录app测评
  • 宸建设计网站哪里能做网页建站
  • VsionMaster筛选机错误情况
  • Spring Boot 面试专题及答案
  • 利用k8s client-go库创建CRD的informer的操作流程