RabbitMQ 入门:基于 AMQP-CPP 的 C++ 实践指南与二次封装
文章目录
- RabbitMQ 入门:基于 AMQP-CPP 的 C++ 实践指南
- 一、AMQP-CPP 的两种通信模式
- 1. TCP 模式:自定义事件循环
- 核心步骤
- 基础代码框架
- 2. 扩展模式:借助第三方异步库
- 以 libev 为例的基础代码
- 二、RabbitMQ 核心操作:基于 Channel 接口
- 1. 关键概念与枚举
- 2. 核心操作实战
- (1)声明交换机
- (2)声明队列
- (3)发布消息
- (4)消费消息
- 三、常见问题与注意事项
- 什么是交换机标志和队列标志?
- 一、核心概念:什么是交换机标志和队列标志?
- 二、交换机标志:控制交换机的核心特性
- 三、队列标志:控制队列的核心特性
- 四、关键区别与使用场景
- 五、交换机的默认标志
- 六、队列的默认标志
- 四、 接口与案例更详细用法
- 一、核心类与接口表格整理
- 1. AMQP 核心类与接口
- 2. libev 核心接口
- 二、由浅入深的案例
- 案例 0:核心流程发布模式和消费模式
- 发布模式
- 消费模式
- 案例 1:基础消息发布与消费(libev 模式)
- 案例 2:使用 fanout 交换机实现广播
- 案例 3:使用 topic 交换机实现模糊路由
- 案例 4:异步二次封装
- 案例补充知识:
- 一、`ev_async` 的核心作用
- 二、`ev_async` 的工作原理
- 三、关键函数与数据结构
- 1. 数据结构
- 2. 核心函数
- 四、使用步骤(示例代码)
- 输出结果:
- 五、`ev_async` 在 `MQClient` 中的应用
- 六、注意事项
- 五、 部分参考源代码
RabbitMQ 入门:基于 AMQP-CPP 的 C++ 实践指南
RabbitMQ 作为主流的消息中间件,凭借高可靠性、灵活的路由策略和跨语言支持,广泛应用于分布式系统解耦、异步通信场景。本文将以 C++ 的 AMQP-CPP 库为核心,从通信模式到核心接口,带你快速上手 RabbitMQ 的开发。
一、AMQP-CPP 的两种通信模式
AMQP-CPP 是 RabbitMQ 的 C++ 客户端库,提供两种网络通信模式,可根据项目的事件循环依赖灵活选择。
1. TCP 模式:自定义事件循环
TCP 模式需手动实现网络层逻辑,核心是继承AMQP::TcpHandler
类并完成关键函数重写,适合需要深度定制事件循环(如用 select/epoll)的场景。
核心步骤
- 定义自定义处理器类,继承
AMQP::TcpHandler
。 - 重写必须实现的
monitor
函数,将文件描述符(fd)注册到自定义事件循环中。 - 事件循环检测到 fd 可读 / 可写时,调用
connection->process(fd, flags)
通知 AMQP-CPP 处理数据。 - 按需重写
onReady
(连接就绪)、onError
(错误处理)、onNegotiate
(心跳协商)等函数,处理连接生命周期事件。
基础代码框架
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>class MyTcpHandler : public AMQP::TcpHandler {
public:// 连接附加到处理器时调用void onAttached(AMQP::TcpConnection *connection) override {// 初始化连接相关资源}// AMQP协议就绪(可开始操作)时调用void onReady(AMQP::TcpConnection *connection) override {// 例如:创建Channel、声明交换机/队列}// 心跳协商:设置心跳间隔(此处接受服务器建议,最小60秒)uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) override {return interval < 60 ? 60 : interval;}// 核心:将fd注册到事件循环void monitor(AMQP::TcpConnection *connection, int fd, int flags) override {// 1. 从事件循环中移除旧的fd监听(若存在)// 2. 根据flags(AMQP::readable/AMQP::writable)添加新监听// 3. 监听触发时调用connection->process(fd, 触发的flags)}
};
2. 扩展模式:借助第三方异步库
若项目已使用libev
、libevent
、libuv
等异步库,可直接使用 AMQP-CPP 提供的封装处理器(如AMQP::LibEvHandler
),无需手动实现monitor
函数,大幅简化开发。
以 libev 为例的基础代码
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>int main() {// 1. 获取libev默认事件循环auto *loop = ev_default_loop(0);// 2. 创建libev专属处理器AMQP::LibEvHandler handler(loop);// 3. 创建TCP连接(参数:处理器、RabbitMQ地址、端口)AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost:5672/"));// 4. 创建Channel(后续操作的核心载体)AMQP::TcpChannel channel(&connection);// 5. 启动事件循环ev_run(loop, 0);return 0;
}
二、RabbitMQ 核心操作:基于 Channel 接口
Channel
是 RabbitMQ 操作的核心类,所有指令(声明交换机、队列、发布 / 消费消息)均通过Channel
执行。由于 AMQP-CPP 是异步库,操作结果需通过Deferred
类的回调函数获取,而非直接返回。
1. 关键概念与枚举
在开始操作前,需先了解核心枚举和标志,确保资源配置符合需求:
枚举 / 标志 | 说明 |
---|---|
ExchangeType | 交换机类型:fanout (广播)、direct (精确匹配)、topic (模糊匹配)等 |
交换机 / 队列 flags | durable (持久化)、autodelete (无绑定后自动删除)、exclusive (独占) |
消费 flags | noack (无需手动确认消息)、exclusive (独占消费) |
2. 核心操作实战
以下操作均基于TcpChannel
,需在onReady
(连接就绪)回调中执行,避免资源未初始化导致错误。
(1)声明交换机
交换机是消息的 “路由枢纽”,需先声明再使用,支持持久化、自动删除等配置。
// 声明一个持久化的direct类型交换机
channel.declareExchange("my_exchange", AMQP::ExchangeType::direct, AMQP::durable).onSuccess([]() {std::cout << "交换机声明成功" << std::endl;}).onError([](const char *msg) {std::cerr << "交换机声明失败:" << msg << std::endl;});
(2)声明队列
队列是消息的 “存储容器”,需与交换机绑定后才能接收消息,支持持久化、独占等配置。
// 声明一个持久化、非独占的队列
channel.declareQueue("my_queue", AMQP::durable).onSuccess([&channel](const std::string &name, uint32_t msgCount, uint32_t consumerCount) {std::cout << "队列" << name << "声明成功,消息数:" << msgCount << std::endl;// 队列声明成功后,绑定到交换机(路由键:my_routing_key)channel.bindQueue("my_exchange", name, "my_routing_key");}).onError([](const char *msg) {std::cerr << "队列声明失败:" << msg << std::endl;});
(3)发布消息
向交换机发布消息,通过路由键(routing key)决定消息流向哪个队列。
// 发布一条持久化消息(通过envelope设置属性)
std::string message = "Hello RabbitMQ!";
AMQP::Envelope envelope(message.data(), message.size());
envelope.setDeliveryMode(2); // 2表示持久化消息(重启后不丢失)// 发布到my_exchange,路由键my_routing_key
bool success = channel.publish("my_exchange", "my_routing_key", envelope);
if (!success) {std::cerr << "消息发布失败(可能连接未就绪)" << std::endl;
}
(4)消费消息
订阅队列并接收消息,支持手动确认(确保消息被处理后再删除)或自动确认。
// 消费my_queue的消息,手动确认(不设置AMQP::noack)
channel.consume("my_queue", "my_consumer_tag").onSuccess([](const std::string_view &tag) {std::cout << "消费者启动,标签:" << tag << std::endl;}).onMessage([&channel](const AMQP::Message &msg, uint64_t deliveryTag, bool redelivered) {// 处理消息内容std::cout << "收到消息:" << std::string(msg.body(), msg.bodySize()) << std::endl;// 手动确认消息(multiple=false:仅确认当前消息)channel.ack(deliveryTag, false);}).onError([](const char *msg) {std::cerr << "消费启动失败:" << msg << std::endl;});
三、常见问题与注意事项
- 异步回调顺序:AMQP-CPP 所有操作异步执行,需确保 “声明交换机→声明队列→绑定→发布 / 消费” 的顺序,可通过回调嵌套或状态标记控制。
- 消息持久化:需同时满足三点:交换机
durable
、队列durable
、消息deliveryMode=2
,否则重启 RabbitMQ 后消息丢失。 - 心跳机制:
onNegotiate
函数需设置合理的心跳间隔(建议≥60 秒),并在事件循环中定时调用connection->heartbeat()
,避免连接被 RabbitMQ 主动断开。 - 错误处理:所有操作需添加
onError
回调,捕获 “交换机不存在”“队列已存在但配置不匹配” 等错误,避免程序崩溃。
什么是交换机标志和队列标志?
一、核心概念:什么是交换机标志和队列标志?
在 RabbitMQ 中,无论是声明交换机(exchange.declare
)还是队列(queue.declare
),都需要传入一组 标志(Flags)。这些标志是预定义的枚举值(如 durable
、exclusive
),用于告诉 RabbitMQ:
- 是否持久化存储(重启后保留)
- 是否仅限当前连接使用
- 是否自动删除(无消费者 / 绑定后删除)
- 是否禁止手动删除 / 修改等
简单说,标志就是给交换机和队列 “打标签”,定义它们的 “生存规则”。
二、交换机标志:控制交换机的核心特性
交换机的核心作用是 “路由消息”,其标志主要围绕 持久化 和 自动删除 设计,常用标志如下:
标志(Flag) | 作用说明 |
---|---|
durable (持久化) | 交换机在 RabbitMQ 重启后不会被删除。⚠️ 注意:仅保证交换机本身存在,不保证消息持久化(需配合队列和消息的持久化)。 |
auto-delete (自动删除) | 当交换机没有任何绑定关系(即无队列 / 交换机绑定到它)时,会被 RabbitMQ 自动删除。 |
internal (内部交换机) | 该交换机只能接收来自其他交换机的消息,不能直接接收生产者发送的消息(用于内部路由转发)。 |
三、队列标志:控制队列的核心特性
队列是 “存储消息” 的载体,其标志更复杂,覆盖 持久化、独占性、自动删除、消息确认 等场景,常用标志如下:
标志(Flag) | 作用说明 |
---|---|
durable (持久化) | 队列在 RabbitMQ 重启后不会被删除,且队列中的持久化消息也会保留。✅ 关键:需配合 “消息持久化”(生产者发送时指定 delivery_mode=2 )才能实现消息不丢失。 |
exclusive (独占) | 1. 队列仅限当前连接使用,其他连接无法访问;2. 当当前连接断开时,队列会被自动删除(无论是否持久化);⚠️ 场景:临时队列(如 RPC 调用的响应队列)。 |
auto-delete (自动删除) | 当队列没有任何消费者(包括消费者断开连接)时,会被 RabbitMQ 自动删除。⚠️ 注意:与交换机的 auto-delete 触发条件不同(交换机是 “无绑定”,队列是 “无消费者”)。 |
passive (被动声明) | 仅 “检查队列是否存在”,不创建新队列。✅ 场景:消费者启动时确认队列已存在,避免误创建(若队列不存在,会抛出错误)。 |
四、关键区别与使用场景
维度 | 交换机标志 | 队列标志 |
---|---|---|
核心关注点 | 路由规则的 “生存性”(持久化、自动删除) | 消息存储的 “安全性”(持久化、独占性) |
触发删除条件 | auto-delete :无绑定关系 | auto-delete :无消费者 |
独占性 | 无 exclusive 标志 | 有 exclusive 标志(连接独占) |
五、交换机的默认标志
当声明交换机时未显式指定任何标志,默认使用以下组合,核心特点是 “临时存在”(服务重启后消失,无绑定也不自动删除)。
默认标志 | 对应行为说明 |
---|---|
non-durable (非持久化) | 交换机不持久化,RabbitMQ 重启后会自动删除。 |
no-auto-delete (不自动删除) | 即使交换机没有任何绑定关系(无队列 / 交换机绑定),也不会被自动删除,需手动删除。 |
non-internal (非内部交换机) | 允许生产者直接向交换机发送消息(不是只能接收其他交换机的消息)。 |
六、队列的默认标志
队列的默认标志同样遵循 “临时存在” 原则,但比交换机多了 “非独占” 特性,核心特点是 “临时、可共享、手动清理”。
默认标志 | 对应行为说明 |
---|---|
non-durable (非持久化) | 队列不持久化,RabbitMQ 重启后会自动删除,队列中的消息也会丢失。 |
no-auto-delete (不自动删除) | 即使队列没有任何消费者,也不会被自动删除,需手动删除。 |
non-exclusive (非独占) | 队列可被多个连接访问(如多个消费者同时监听同一个队列),不绑定到单个连接。 |
no-passive (非被动声明) | 若队列不存在,则自动创建队列(而非抛出错误)。 |
四、 接口与案例更详细用法
一、核心类与接口表格整理
1. AMQP 核心类与接口
类 / 类型 | 函数 / 成员 | 参数说明 | 作用 |
---|---|---|---|
Channel | declareExchange(name, type, flags, arguments) | - name :交换机名称- type :交换机类型(fanout /direct 等)- flags :属性标志(durable /autodelete 等)- arguments :额外参数 | 声明交换机,返回Deferred 对象用于注册回调 |
declareQueue(name, flags, arguments) | - name :队列名称(空则自动分配)- flags :属性标志(durable /exclusive 等)- arguments :额外参数 | 声明队列,返回DeferredQueue 对象用于注册回调 | |
bindQueue(exchange, queue, routingkey, arguments) | - exchange :源交换机- queue :目标队列- routingkey :路由键- arguments :额外参数 | 将队列绑定到交换机,返回Deferred 对象用于注册回调 | |
publish(exchange, routingKey, message, flags) | - exchange :目标交换机- routingKey :路由键- message :消息内容- flags :发布标志(mandatory 等) | 发布消息到交换机,返回bool 表示是否成功入队 | |
consume(queue, tag, flags, arguments) | - queue :要消费的队列- tag :消费者标签(空则自动分配)- flags :消费标志(noack 等)- arguments :额外参数 | 订阅队列消息,返回DeferredConsumer 对象用于注册回调 | |
ack(deliveryTag, flags) | - deliveryTag :消息唯一标识- flags :确认标志(multiple 批量确认) | 手动确认消息已处理,通知 RabbitMQ 删除消息 | |
Deferred | onSuccess(SuccessCallback) | 回调函数:void() | 注册操作成功的回调(如交换机 / 队列声明成功) |
onError(ErrorCallback) | 回调函数:void(const char *message) | 注册操作失败的回调(如交换机不存在) | |
DeferredQueue | onSuccess(QueueCallback) | 回调函数:void(const std::string &name, uint32_t msgCount, uint32_t consumerCount) | 注册队列声明成功的回调(返回队列名称、消息数等) |
DeferredConsumer | onSuccess(ConsumeCallback) | 回调函数:void(const std::string_view &tag) | 注册消费者启动成功的回调(返回消费者标签) |
onMessage(MessageCallback) | 回调函数:void(const Message &msg, uint64_t deliveryTag, bool redelivered) | 注册接收消息的回调(返回消息内容、唯一标识等) | |
Message | exchange() | 无 | 返回消息来源的交换机名称 |
routingkey() | 无 | 返回消息的路由键 | |
Envelope | body() | 无 | 返回消息体的原始数据(const char* ) |
bodySize() | 无 | 返回消息体的长度(字节数) |
2. libev 核心接口
类型 / 函数 | 参数说明 | 作用 |
---|---|---|
ev_default_loop(flags) | - flags :初始化标志(通常为 0) | 获取默认事件循环实例 |
ev_run(loop) | - loop :事件循环实例 | 启动事件循环,处理注册的事件(如 IO、定时器) |
ev_break(loop, break_type) | - loop :事件循环实例- break_type :终止类型(EVBREAK_ONE /EVBREAK_ALL ) | 终止事件循环(EVBREAK_ONE 终止一次循环,EVBREAK_ALL 终止所有循环) |
ev_async_init(w, cb) | - w :异步事件对象- cb :回调函数(void(loop, w, revents) ) | 初始化异步事件,注册触发时的回调 |
ev_async_start(loop, w) | - loop :事件循环实例- w :异步事件对象 | 将异步事件注册到事件循环 |
ev_async_send(loop, w) | - loop :事件循环实例- w :异步事件对象 | 触发异步事件,调用注册的回调 |
二、由浅入深的案例
案例 0:核心流程发布模式和消费模式
以下是完全简化的代码框架,去掉所有额外类和复杂逻辑,仅保留 “连接 RabbitMQ→发布消息” 的核心流程,适合新手理解基础链路。
发布模式
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <iostream>
#include <string>// 全局变量:仅用于在回调中访问事件循环和通道(新手简化用)
static struct ev_loop* g_loop = nullptr;
static AMQP::TcpChannel* g_channel = nullptr;// 连接就绪时的回调(由AMQP-CPP自动触发)
void onConnectionReady(AMQP::TcpConnection* conn) {std::cout << "1. RabbitMQ连接成功,创建通道..." << std::endl;// 创建通道(消息操作的核心载体)g_channel = new AMQP::TcpChannel(conn);// 声明交换机(简化:direct类型,非持久化,新手可先不纠结参数)g_channel->declareExchange("simple_exchange", AMQP::direct).onSuccess([]() {std::cout << "2. 交换机声明成功,声明队列..." << std::endl;// 声明队列(简化:非持久化,新手先用默认配置)g_channel->declareQueue("simple_queue").onSuccess([](const std::string& queueName) {std::cout << "3. 队列" << queueName << "声明成功,绑定交换机与队列..." << std::endl;// 绑定队列与交换机(路由键:simple_key,新手记住“绑定键=发布键”即可)g_channel->bindQueue("simple_exchange", queueName, "simple_key").onSuccess([queueName]() {std::cout << "4. 队列绑定成功,发布消息..." << std::endl;// 发布消息(简化:直接发字符串,不设复杂属性)std::string msg = "新手入门:RabbitMQ连接成功!";g_channel->publish("simple_exchange", "simple_key", msg);std::cout << "5. 消息发布完成!1秒后退出..." << std::endl;// 延迟1秒退出(确保消息发完,新手不用深究定时器细节)ev_timer* timer = new ev_timer;ev_timer_init(timer, [](struct ev_loop*, ev_timer* w, int) {ev_break(g_loop, EVBREAK_ALL); // 退出事件循环delete w;}, 1.0, 0.0);ev_timer_start(g_loop, timer);});});});
}// 连接错误时的回调(新手用于排查问题)
void onConnectionError(AMQP::TcpConnection* conn, const char* errMsg) {std::cerr << "连接错误:" << errMsg << std::endl;ev_break(g_loop, EVBREAK_ALL); // 出错直接退出
}int main() {// 1. 初始化事件循环(libev核心,新手理解为“异步调度中心”即可)g_loop = ev_default_loop(0);if (!g_loop) {std::cerr << "事件循环初始化失败!" << std::endl;return -1;}// 2. 创建Handler(简化:用lambda绑定回调,不写额外类)AMQP::LibEvHandler handler(g_loop);// 绑定“连接就绪”和“连接错误”的回调(核心:告诉AMQP-CPP事件发生时该做什么)handler.onReady(onConnectionReady);handler.onError(onConnectionError);// 3. 建立RabbitMQ连接(新手用默认地址:localhost:5672,账号guest/guest)AMQP::Address addr("amqp://guest:guest@localhost:5672/");AMQP::TcpConnection conn(&handler, addr);// 4. 启动事件循环(阻塞等待事件,新手理解为“启动程序运行”)ev_run(g_loop, 0);// 5. 释放资源(新手简化:直接删通道,避免内存泄漏)if (g_channel) delete g_channel;std::cout << "程序退出!" << std::endl;return 0;
}
新手必看:核心流程拆解(5 步理解)
- 初始化事件循环
ev_default_loop(0)
:相当于给 RabbitMQ 找一个 “工作柜台”,所有异步操作(比如等连接、发消息)都在这处理,新手不用深究细节,记住 “必须先初始化” 即可。 - 创建 Handler 并绑定回调
AMQP::LibEvHandler
:是 AMQP-CPP 和 libev 的 “桥梁”,负责传递事件。onReady
:连接成功后会自动调用这个函数(里面写后续的发消息逻辑)。onError
:连接失败时会调用(比如 RabbitMQ 没启动、地址错了,新手靠这个排查问题)。
- 建立 RabbitMQ 连接
AMQP::Address
:填 RabbitMQ 的地址和账号(默认amqp://guest:guest@localhost:5672/
,不用改)。AMQP::TcpConnection
:真正建立 TCP 连接,关联 Handler 后,连接状态变化会触发 Handler 的回调。
- 启动事件循环
ev_run(g_loop, 0)
:程序开始 “等待事件”—— 比如等连接成功、等消息发完,直到调用ev_break
才退出,新手理解为 “启动程序” 就行。 - **发布消息核心逻辑(在 onReady 里)**连接成功后,按 “声明交换机→声明队列→绑定→发布消息” 的顺序执行,新手记住:
- 交换机和队列要先 “声明” 才能用(相当于 “创建文件夹”)。
- 绑定要指定 “路由键”(相当于 “给文件夹贴标签,方便找到”)。
- 发布消息时的路由键要和绑定的一致(否则消息找不到队列)。
以下是与之前发布代码对应的消息接收(消费)框架,同样简化到极致,适合新手理解 “如何接收 RabbitMQ 消息” 的核心流程。
消费模式
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <iostream>
#include <string>// 全局变量:简化回调中的资源访问
static struct ev_loop* g_loop = nullptr;
static AMQP::TcpChannel* g_channel = nullptr;// 收到消息时的回调(核心:打印消息内容)
void onMessageReceived(const AMQP::Message& msg, uint64_t deliveryTag, bool redelivered) {// 从消息中提取内容(msg.body()是消息体,msg.bodySize()是长度)std::string messageContent(msg.body(), msg.bodySize());std::cout << "\n收到消息:" << messageContent << std::endl;std::cout << "消息路由键:" << msg.routingkey() << std::endl;// 手动确认消息(告诉RabbitMQ:我已经处理完了,可以删了)g_channel->ack(deliveryTag, false); // false表示只确认当前这一条消息
}// 连接就绪后:声明资源并开始消费消息
void onConnectionReady(AMQP::TcpConnection* conn) {std::cout << "1. 连接成功,创建通道..." << std::endl;g_channel = new AMQP::TcpChannel(conn);// 声明交换机(和发布端保持一致:名称、类型)g_channel->declareExchange("simple_exchange", AMQP::direct).onSuccess([]() {std::cout << "2. 交换机声明成功,声明队列..." << std::endl;// 声明队列(和发布端保持一致:名称)g_channel->declareQueue("simple_queue").onSuccess([](const std::string& queueName) {std::cout << "3. 队列" << queueName << "声明成功,绑定..." << std::endl;// 绑定(和发布端保持一致:交换机、队列、路由键)g_channel->bindQueue("simple_exchange", queueName, "simple_key").onSuccess([queueName]() {std::cout << "4. 绑定成功,开始监听消息(按Ctrl+C退出)..." << std::endl;// 开始消费消息(核心:注册收到消息的回调)g_channel->consume(queueName).onMessage(onMessageReceived); // 收到消息时调用onMessageReceived});});});
}// 连接错误时的回调(排查问题用)
void onConnectionError(AMQP::TcpConnection* conn, const char* errMsg) {std::cerr << "连接错误:" << errMsg << std::endl;ev_break(g_loop, EVBREAK_ALL); // 出错退出
}int main() {// 1. 初始化事件循环(和发布端一样)g_loop = ev_default_loop(0);if (!g_loop) {std::cerr << "事件循环初始化失败!" << std::endl;return -1;}// 2. 创建Handler并绑定回调AMQP::LibEvHandler handler(g_loop);handler.onReady(onConnectionReady); // 连接成功回调handler.onError(onConnectionError); // 连接错误回调// 3. 建立连接(和发布端用同一个RabbitMQ地址)AMQP::Address addr("amqp://guest:guest@localhost:5672/");AMQP::TcpConnection conn(&handler, addr);// 4. 启动事件循环(一直运行,等待消息)ev_run(g_loop, 0);// 5. 释放资源(正常退出时执行)if (g_channel) delete g_channel;std::cout << "程序退出!" << std::endl;return 0;
}
新手必看:接收消息核心流程
-
和发布端保持一致的 “前提工作”
- 必须声明相同的交换机(名称、类型:
simple_exchange
,direct
)。 - 必须声明相同的队列(名称:
simple_queue
)。 - 必须用相同的路由键绑定(simple_key)。(否则接收端找不到消息,新手记住 “发布端和接收端的这三个参数必须完全一致”)
- 必须声明相同的交换机(名称、类型:
-
核心:开始消费消息
g_channel->consume(queueName).onMessage(onMessageReceived);
consume(queueName)
:告诉 RabbitMQ “我要监听这个队列的消息”。onMessage(...)
:注册一个回调函数(onMessageReceived
),每当队列有新消息,这个函数就会被自动调用。
-
收到消息后要 “确认”
g_channel->ack(deliveryTag, false);
- 这行代码是告诉 RabbitMQ“我已经收到并处理完这个消息了,你可以从队列里删掉它了”。
- 如果不确认,RabbitMQ 会认为消息没处理完,下次程序重启还会再发一次(新手暂时记住必须加这行)。
案例 1:基础消息发布与消费(libev 模式)
目标:使用 libev 事件循环,声明一个交换机和队列,发布一条消息并消费。
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <iostream>// 1. 自定义Handler,继承LibEvHandler,重写事件回调
class MyHandler : public AMQP::LibEvHandler {
private:// 保存通道指针,用于在回调中操作AMQP::TcpChannel* channel_;
public:// 构造函数:接收libev事件循环MyHandler(struct ev_loop* loop) : AMQP::LibEvHandler(loop), channel_(nullptr) {}// 2. 重写onReady回调:连接就绪时触发void onReady(AMQP::TcpConnection* connection) override {std::cout << "连接已就绪,开始声明资源..." << std::endl;// 创建通道(连接就绪后才能安全创建)channel_ = new AMQP::TcpChannel(connection);// 后续操作:声明交换机、队列等declareResources();}// 3. 重写onError回调:处理连接错误void onError(AMQP::TcpConnection* connection, const char* message) override {std::cerr << "连接错误: " << message << std::endl;}// 声明交换机、队列并绑定void declareResources() {if (!channel_) return;// 声明交换机(direct类型,持久化)channel_->declareExchange("test_exchange", AMQP::direct, AMQP::durable).onSuccess([this]() {std::cout << "交换机声明成功" << std::endl;// 声明队列(持久化)channel_->declareQueue("test_queue", AMQP::durable).onSuccess([this](const std::string& queueName, uint32_t, uint32_t) {std::cout << "队列 " << queueName << " 声明成功" << std::endl;// 绑定队列到交换机(路由键test_key)channel_->bindQueue("test_exchange", queueName, "test_key").onSuccess([this, queueName]() {std::cout << "队列与交换机绑定成功" << std::endl;// 启动消费(手动确认)startConsuming(queueName);// 发布测试消息publishMessage();});});});}// 启动消费void startConsuming(const std::string& queueName) {channel_->consume(queueName, "consumer_tag").onSuccess([](const std::string_view& tag) {std::cout << "消费者启动,标签: " << tag << std::endl;}).onMessage([this](const AMQP::Message& msg, uint64_t deliveryTag, bool) {std::cout << "收到消息: " << std::string(msg.body(), msg.bodySize()) << std::endl;// 手动确认消息channel_->ack(deliveryTag, false);});}// 发布消息void publishMessage() {std::string message = "Hello RabbitMQ!";bool success = channel_->publish("test_exchange", "test_key", message);if (success) {std::cout << "消息发布成功" << std::endl;} else {std::cerr << "消息发布失败" << std::endl;}}
};int main() {// 获取libev默认事件循环auto* loop = ev_default_loop(0);// 创建自定义处理器MyHandler handler(loop);// 创建TCP连接(参数:处理器、RabbitMQ地址)AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost:5672/"));// 启动事件循环ev_run(loop, 0);return 0;
}
案例 2:使用 fanout 交换机实现广播
目标:声明 fanout 类型交换机,绑定两个队列,发布一条消息后两个队列均能收到。
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <iostream>int main() {auto* loop = ev_default_loop(0);AMQP::LibEvHandler handler(loop);AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost:5672/"));AMQP::TcpChannel channel(&connection);connection.onReady([&]() {// 1. 声明fanout交换机(广播模式)channel.declareExchange("broadcast_exchange", AMQP::fanout, AMQP::durable).onSuccess([&]() {// 2. 声明两个队列auto declareQueue = [&](const std::string& queueName) {channel.declareQueue(queueName, AMQP::durable).onSuccess([&, queueName](const std::string& name, uint32_t, uint32_t) {// 3. 绑定队列到fanout交换机(路由键无效,可省略)channel.bindQueue("broadcast_exchange", name, "").onSuccess([&, name]() {std::cout << "队列" << name << "绑定到广播交换机" << std::endl;// 4. 每个队列启动独立消费channel.consume(name).onMessage([name](const AMQP::Message& msg, uint64_t tag, bool) {std::cout << "队列" << name << "收到消息:" << std::string(msg.body(), msg.bodySize()) << std::endl;});});});};// 声明并绑定两个队列declareQueue("queue1");declareQueue("queue2");// 5. 发布消息到fanout交换机(路由键无效)channel.publish("broadcast_exchange", "", "这是一条广播消息!");});});ev_run(loop, 0);return 0;
}
案例 3:使用 topic 交换机实现模糊路由
目标:声明 topic 交换机,按规则绑定队列(如order.#
匹配order.create
、order.pay
),验证路由规则。
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <iostream>
#include <string>class TopicHandler : public AMQP::LibEvHandler {
private:AMQP::TcpConnection* _conn; // 私有成员:连接指针AMQP::TcpChannel* _channel; // 私有成员:通道指针public:// 构造函数TopicHandler(struct ev_loop* loop) : AMQP::LibEvHandler(loop), _conn(nullptr), _channel(nullptr) {}// 公有接口:绑定连接void setConnection(AMQP::TcpConnection* conn) {_conn = conn;}// 公有接口:释放资源(关键修复:外部通过此接口释放私有成员)void cleanup() {if (_channel != nullptr) {delete _channel;_channel = nullptr; // 避免野指针}}// 重写:连接就绪回调void onReady(AMQP::TcpConnection* connection) override {std::cout << "RabbitMQ连接就绪,初始化topic路由资源..." << std::endl;_channel = new AMQP::TcpChannel(_conn);setupTopicRouting();}// 重写:连接错误回调void onError(AMQP::TcpConnection* connection, const char* message) override {std::cerr << "[错误] 连接异常:" << message << std::endl;}// 重写:连接丢失回调void onLost(AMQP::TcpConnection* connection) override {std::cerr << "[提示] RabbitMQ连接已丢失" << std::endl;}private:// 私有方法:初始化topic路由资源void setupTopicRouting() {if (!_channel) return;// 声明topic交换机_channel->declareExchange("topic_exchange", AMQP::topic, AMQP::durable).onSuccess([this]() {std::cout << "topic交换机(topic_exchange)声明成功" << std::endl;// 声明并绑定订单队列(order.#)declareAndBindQueue("order_queue", "order.#", "订单队列");// 声明并绑定错误日志队列(log.error)declareAndBindQueue("error_log_queue", "log.error", "错误日志队列");// 发布测试消息publishTestMessages();}).onError([](const char* msg) {std::cerr << "[错误] topic交换机声明失败:" << msg << std::endl;});}// 私有方法:封装队列声明+绑定逻辑void declareAndBindQueue(const std::string& queueName, const std::string& routingKey, const std::string& queueDesc) {_channel->declareQueue(queueName, AMQP::durable).onSuccess([this, queueName, routingKey, queueDesc](const std::string& name, uint32_t, uint32_t) {std::cout << queueDesc << "(" << name << ")声明成功" << std::endl;// 绑定队列到交换机_channel->bindQueue("topic_exchange", name, routingKey).onSuccess([this, name, queueDesc, routingKey]() {std::cout << queueDesc << "(" << name << ")绑定规则:" << routingKey << std::endl;// 启动消费startConsume(name, queueDesc);}).onError([queueDesc, name](const char* msg) {std::cerr << "[错误] " << queueDesc << "(" << name << ")绑定失败:" << msg << std::endl;});}).onError([queueDesc](const char* msg) {std::cerr << "[错误] " << queueDesc << "声明失败:" << msg << std::endl;});}// 私有方法:启动队列消费void startConsume(const std::string& queueName, const std::string& queueDesc) {_channel->consume(queueName).onMessage([queueDesc](const AMQP::Message& msg, uint64_t, bool) {std::cout << queueDesc << "收到(路由键:" << msg.routingkey() << "):" << std::string(msg.body(), msg.bodySize()) << std::endl;}).onError([queueDesc](const char* msg) {std::cerr << "[错误] " << queueDesc << "消费启动失败:" << msg << std::endl;});}// 私有方法:发布测试消息void publishTestMessages() {_channel->publish("topic_exchange", "order.create", "新订单创建:订单号20240501001");_channel->publish("topic_exchange", "order.pay", "订单支付成功:金额99.00元");_channel->publish("topic_exchange", "log.error", "系统错误:数据库连接超时(错误码503)");_channel->publish("topic_exchange", "log.info", "系统启动完成:当前版本v1.0.0");std::cout << "\n4条测试消息已发布,等待队列接收..." << std::endl;}
};int main() {// 初始化libev事件循环struct ev_loop* loop = ev_default_loop(0);if (!loop) {std::cerr << "[错误] 无法创建libev事件循环" << std::endl;return -1;}// 创建Handler和连接TopicHandler handler(loop);AMQP::Address rabbitAddr("amqp://guest:guest@localhost:5672/");AMQP::TcpConnection conn(&handler, rabbitAddr);// 绑定连接handler.setConnection(&conn);// 启动事件循环std::cout << "等待RabbitMQ连接...(请确保RabbitMQ服务已启动)" << std::endl;ev_run(loop, 0);// 释放资源:调用公有接口cleanup,而非直接访问_channelhandler.cleanup();return 0;
}
案例 4:异步二次封装
#pragma once
#include "LogTool.hpp"
#include <amqpcpp.h>
#include <amqpcpp/address.h>
#include <amqpcpp/channel.h>
#include <amqpcpp/deferred.h>
#include <amqpcpp/deferredconsumer.h>
#include <amqpcpp/exception.h>
#include <amqpcpp/exchangetype.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/libev.h>
#include <amqpcpp/linux_tcp/tcpconnection.h>
#include <amqpcpp/linux_tcp/tcphandler.h>
#include <amqpcpp/message.h>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <ev.h>
#include <exception>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>namespace RMQTool
{using EvHandlerptr = std::unique_ptr<AMQP::LibEvHandler>;using TcpConptr = std::unique_ptr<AMQP::TcpConnection>;using Chlptr = std::unique_ptr<AMQP::TcpChannel>;using ErrorCB = std::function<void (const std::string&)>;using OrignalMsgCB = std::function<void (const AMQP::Message&, uint64_t, bool)>;using MsgCB = std::function<void (const char* msg, size_t)>;class RMQClient{public:RMQClient(const std::string& user, const std::string& password, const std::string& hosturl, const int32_t& port = 5672, ErrorCB ecb = nullptr){LogModule::Log::Init();std::unique_lock<std::mutex> ulock(mutex_);//1. 创建独立的事件循环loop_ = ev_loop_new(EVFLAG_AUTO);if (!loop_){LOG_ERROR("创建事件循环失败!");exit(-1);}LOG_INFO("创建事件循环成功!");//2. 创建事件处理器handler_ = std::make_unique<AMQP::LibEvHandler>(loop_);LOG_INFO("创建事件处理器成功!");//3. 解析地址//4. 创建TCP连接TryCatch<AMQP::Exception>([&](){//"amqp://guest:guest@localhost:5672/"std::string endurl = "amqp://" + user + ":" + password + "@" + hosturl + ":" + std::to_string(port) + "/"; AMQP::Address addr(endurl);connection_ = std::make_unique<AMQP::TcpConnection>(handler_.get(), addr);if(connection_ == nullptr){LOG_ERROR("创建Tcp链接失败!");exit(-1);}LOG_INFO("连接创建成功!");});//5. 创建信道 channel_ = std::make_unique<AMQP::TcpChannel>(connection_.get());channel_->onError([this](const char* msg){ErrorHandler(msg);});channel_->onReady([this](){LOG_INFO("信道创建成功!");});//6. 初始化异步任务管理器task_async_.data = this;ev_async_init(&task_async_, &TaskDisapatchHandler);ev_async_start(loop_, &task_async_);//创建结束异步任务监视器ev_async_init(&exit_async_, &ExitAsyncHandler);ulock.unlock();//7. 创建线程执行run_thread_ = std::thread([](struct ev_loop* loop){LOG_INFO("开始执行事件循环....");ev_run(loop,0);LOG_INFO("结束执行事件循环....");}, loop_);}void DeclareComponents(const std::string& exchange_name, const std::string& queue_name, const std::string& routing_key = "routing-key", AMQP::ExchangeType type = AMQP::ExchangeType::direct, int exchange_flags = 0, int queue_flags = 0){if(connection_ && connection_->closed()){ErrorHandler("Tcp 连接关闭, 无法操作!");return;}SubmitTask([=](){AMQP::Deferred& de = channel_->declareExchange(exchange_name, type, exchange_flags);de.onSuccess([=](){LOG_INFO("{}交换机声明成功", exchange_name);});de.onError([=](const char* msg){std::string err_msg = exchange_name + "交换机声明失败" + std::string(msg);ErrorHandler(err_msg);});AMQP::Deferred& dq = channel_->declareQueue(queue_name, queue_flags);dq.onSuccess([=](){LOG_INFO("{}队列声明成功", exchange_name);});dq.onError([=](const char* msg){std::string err_msg = queue_name + "队列声明失败" + std::string(msg);ErrorHandler(err_msg);});AMQP::Deferred& dbq = channel_->bindQueue(exchange_name, queue_name, routing_key);dbq.onSuccess([=](){LOG_INFO("{}-{}绑定成功, routingkey:{}", exchange_name, queue_name, routing_key);});dbq.onError([=](const char* msg){std::string err_msg = exchange_name + "-" + queue_name + "队列绑定失败" + std::string(msg);ErrorHandler(err_msg);});});}void Publisher(const std::string& exchange_name, const std::string& routing_key, const std::string& msg){if(connection_ && connection_->closed()){ErrorHandler("Tcp 连接关闭, 无法操作!");return;}SubmitTask([=](){bool b = channel_->publish(exchange_name, routing_key, msg);if (!b){ErrorHandler("发布消息到" + exchange_name + "失败");}else{LOG_INFO("消息已提交到 {} (routing_key: {}),长度: {}", exchange_name, routing_key, msg.size());}});}void Consumer(const std::string& queue_name, MsgCB cb, bool auto_ack = true, int flags = 0){if(connection_ && connection_->closed()){ErrorHandler("Tcp 连接关闭, 无法操作!");return;}SubmitTask([=](){AMQP::DeferredConsumer& dcu = channel_->consume(queue_name, flags);dcu.onReceived([=](const AMQP::Message& msg, uint64_t dtag, bool redivered){cb(msg.body(), msg.bodySize());if(auto_ack)channel_->ack(dtag);});dcu.onError([this, queue_name](const char* msg) {ErrorHandler("订阅队列 " + queue_name + " 失败: " + std::string(msg));});dcu.onSuccess([queue_name]() {LOG_INFO("开始订阅队列: {}", queue_name);});});}~RMQClient(){TryCatch([this](){// 1. 第一步:手动销毁依赖 loop 的组件(关键!)// 先销毁 TCP 连接(释放底层 IO 句柄)connection_.reset();// 再销毁信道(依赖连接)channel_.reset();// 最后销毁事件处理器(直接依赖 loop)handler_.reset();// 2. 第二步:终止事件循环if (loop_) {// 启动并发送退出信号(触发 ev_break)ev_async_start(loop_, &exit_async_);ev_async_send(loop_, &exit_async_);}// 3. 第三步:等待事件循环线程结束if (run_thread_.joinable()) {run_thread_.join();}// 4. 第四步:销毁 loop(必须在所有依赖组件销毁后!)if (loop_) {ev_loop_destroy(loop_);loop_ = nullptr;}LOG_INFO("线程结束!RMQClient退出!");});}private:template<typename T = std::exception>void TryCatch(std::function<void ()> func){try{func();}catch (const T& e){ErrorHandler(e.what());}}void ErrorHandler(const std::string& emsg){LOG_ERROR("发现了异常错误: {}", emsg);if (errorcb_){LOG_INFO("进入错误回调处理中....");errorcb_(emsg);}}void SubmitTask(std::function<void ()> taskfunc){std::unique_lock<std::mutex> ulock(mutex_);tash_queue_.push(taskfunc);ev_async_send(loop_, &task_async_);LOG_INFO("提交一个任务!");}inline static void TaskDisapatchHandler(struct ev_loop* loop, struct ev_async* watcher, int revents){RMQClient* client = static_cast<RMQClient*>(watcher->data);std::unique_lock<std::mutex> ulock(client->mutex_);while (client->tash_queue_.empty() == false){auto task = client->tash_queue_.front();client->TryCatch([&](){task();});LOG_INFO("执行一个任务!");client->tash_queue_.pop();}}inline static void ExitAsyncHandler(struct ev_loop* loop, struct ev_async* watcher, int revents){ev_break(loop, EVBREAK_ALL);}private:struct ev_loop* loop_;EvHandlerptr handler_;TcpConptr connection_;Chlptr channel_;private:std::mutex mutex_;struct ev_async task_async_;struct ev_async exit_async_;std::queue<std::function<void ()>> tash_queue_;ErrorCB errorcb_;std::thread run_thread_;};
};// LOG.hpp
#pragma once
#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
#include <vector>#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/async_logger.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/basic_file_sink.h>namespace LogModule
{enum OutputMode{CONSOLE_ONLY,FILE_ONLY,BOTH};typedef struct LogInfo{ OutputMode outmode = CONSOLE_ONLY;bool is_debug_ = true;std::string logfile_ = "logfile.txt";bool is_async_ = false;size_t queue_size_ = 1 << 12;size_t thread_num_ = 1;} LogInfo_t;class Log{public:static void Init(const LogInfo_t& loginfo = LogInfo_t()){if(is_init_)return;logconf_ = loginfo;create_logger();is_init_ = true;logger_->info("日志初始化完成!");}static Log& GetInstance(){static Log log;return log;}template<typename... Args>void trace(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->trace(fmt, args...);}template<typename... Args>void info(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->info(fmt, args...);}template<typename... Args>void debug(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->debug(fmt, args...);}template<typename... Args>void warn(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->warn(fmt, args...);}template<typename... Args>void error(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->error(fmt, args...);}template<typename... Args>void critical(const char* fmt, const Args&... args){if(!is_init_){logger_->info("日志初始化未完成!");return;}if(logger_)logger_->critical(fmt, args...);}static void shutdown(){spdlog::shutdown();}private:Log() = default;~Log() = default;Log& operator=(const Log&) = delete;Log(const Log&) = delete;static void create_logger(){try{std::vector<spdlog::sink_ptr> sinks;if(logconf_.outmode == CONSOLE_ONLY || logconf_.outmode == BOTH){auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>(); sinks.push_back(console_sink);}if(logconf_.outmode == FILE_ONLY || logconf_.outmode == BOTH){auto file_sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(logconf_.logfile_);sinks.push_back(file_sink);}spdlog::level::level_enum lenum = logconf_.is_debug_ ? spdlog::level::trace : spdlog::level::info;if(logconf_.is_async_){spdlog::init_thread_pool(logconf_.queue_size_, logconf_.thread_num_);logger_ = std::make_shared<spdlog::async_logger>("mainlog", sinks.begin(), sinks.end(), spdlog::thread_pool(), spdlog::async_overflow_policy::block);}else{logger_ = std::make_shared<spdlog::logger>("mainlog", sinks.begin(), sinks.end());}logger_->set_level(lenum);spdlog::set_default_logger(logger_); // 重要:设置默认日志器logger_->set_pattern("[%Y-%m-%d %H:%M:%S.%e][%t][%-8l]%v");}catch (const std::exception& e){logger_->error("创建日志器失败!-{}", e.what());}}private:static inline std::shared_ptr<spdlog::logger> logger_;static inline LogInfo_t logconf_;static inline std::atomic<bool> is_init_ = false;};
};// // 使用简化版本的宏定义
// #define LOG_TRACE(...) LogModule::Log::GetInstance().trace(__VA_ARGS__)
// #define LOG_INFO(...) LogModule::Log::GetInstance().info(__VA_ARGS__)
// #define LOG_DEBUG(...) LogModule::Log::GetInstance().debug(__VA_ARGS__)
// #define LOG_WARN(...) LogModule::Log::GetInstance().warn(__VA_ARGS__)
// #define LOG_ERROR(...) LogModule::Log::GetInstance().error(__VA_ARGS__)
// #define LOG_CRITICAL(...) LogModule::Log::GetInstance().critical(__VA_ARGS__)// 修改后宏定义:
#define LOG_TRACE(fmt, ...) LogModule::Log::GetInstance().trace("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...) LogModule::Log::GetInstance().info("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) LogModule::Log::GetInstance().debug("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_WARN(fmt, ...) LogModule::Log::GetInstance().warn("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) LogModule::Log::GetInstance().error("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_CRITICAL(fmt, ...) LogModule::Log::GetInstance().critical("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
案例补充知识:
在 libev
事件驱动库中,ev_async
是一种跨线程异步通知机制,用于在一个线程中向另一个运行 libev
事件循环的线程发送信号,触发指定的回调函数。它是线程间通信的核心工具,尤其适合需要从外部线程唤醒事件循环并执行任务的场景(如前面 MQClient
中跨线程提交任务)。
一、ev_async
的核心作用
libev
的事件循环(ev_run
)通常运行在一个独立线程中,默认会阻塞等待事件(如 IO 事件、定时器事件)。ev_async
的作用是:允许其他线程主动 “唤醒” 事件循环线程,并执行一段预定义的代码(回调函数)。
例如:在主线程中调用 ev_async_send
,事件循环线程会立即从阻塞中唤醒,执行 ev_async
绑定的回调函数,完成后继续等待其他事件。
二、ev_async
的工作原理
- 初始化与注册
- 首先通过
ev_async_init
初始化一个ev_async
结构体(称为 “watcher”),绑定回调函数和事件循环。 - 再通过
ev_async_start
将 watcher 注册到事件循环中,使其开始监听异步通知。
- 首先通过
- 发送通知
- 其他线程通过
ev_async_send
向事件循环发送异步通知,此时libev
会标记事件循环有 “待处理的异步事件”。
- 其他线程通过
- 执行回调
- 事件循环线程会在下次迭代时(或立即从阻塞中唤醒)检测到异步事件,执行
ev_async
绑定的回调函数。
- 事件循环线程会在下次迭代时(或立即从阻塞中唤醒)检测到异步事件,执行
- 线程安全保证
ev_async_send
是线程安全的,可以在任意线程中调用,无需额外加锁。- 多次调用
ev_async_send
可能会被合并(即如果事件循环还没处理前一次通知,多次发送只会触发一次回调),避免冗余处理。
三、关键函数与数据结构
1. 数据结构
struct ev_async {ev_watcher w; // 基础watcher结构(包含事件循环指针、状态等)void (*cb)(struct ev_loop *loop, struct ev_async *w, int revents); // 回调函数// 内部字段(用于线程同步)
};
/**struct ev_loop *loop:当前事件循环的指针
作用:指向触发回调的事件循环实例(即 ev_run 正在运行的那个循环)。*struct ev_async *w:当前异步事件监听器(watcher)的指针*作用:指向当前触发回调的 ev_async 实例本身(即被 ev_async_init 初始化的那个结构体)。*int revents:实际触发的事件类型*作用:表示当前回调被触发的具体事件类型(对于 ev_async 而言,固定为 EV_ASYNC)。
*/
cb
:异步事件触发时执行的回调函数,参数包括事件循环、当前ev_async
结构体、事件类型(固定为EV_ASYNC
)。
2. 核心函数
函数 | 作用 |
---|---|
ev_async_init | 初始化 ev_async 结构体,绑定回调函数。 |
ev_async_start | 将 ev_async 注册到事件循环,开始监听异步通知。 |
ev_async_send | 向事件循环发送异步通知,触发回调(线程安全)。 |
ev_async_stop | 从事件循环中移除 ev_async ,停止监听(通常在析构时调用)。 |
四、使用步骤(示例代码)
#include <ev.h>
#include <pthread.h>
#include <stdio.h>// 全局事件循环
struct ev_loop *loop;
// 定义ev_async结构体
struct ev_async async_watcher;// 1. 回调函数:事件循环线程中执行
static void async_callback(struct ev_loop *loop, struct ev_async *w, int revents) {printf("事件循环线程收到通知,执行回调!\n");
}// 2. 事件循环线程函数
static void *loop_thread(void *arg) {printf("事件循环线程启动\n");// 启动事件循环(阻塞)ev_run(loop, 0);printf("事件循环线程退出\n");return NULL;
}int main() {// 初始化事件循环loop = ev_default_loop(0);// 初始化ev_async:绑定回调函数ev_async_init(&async_watcher, async_callback);// 将async_watcher注册到事件循环ev_async_start(loop, &async_watcher);// 启动事件循环线程pthread_t tid;pthread_create(&tid, NULL, loop_thread, NULL);// 3. 主线程发送异步通知printf("主线程发送通知...\n");ev_async_send(loop, &async_watcher); // 线程安全// 等待1秒,让事件循环线程处理sleep(1);// 退出事件循环ev_break(loop, EVBREAK_ALL);pthread_join(tid, NULL);return 0;
}
输出结果:
事件循环线程启动
主线程发送通知...
事件循环线程收到通知,执行回调!
事件循环线程退出
五、ev_async
在 MQClient
中的应用
在之前优化的 MQClient
中,ev_async
被用于两个场景:
- 跨线程提交任务
- 定义
_task_async
作为任务通知的 watcher,回调函数handleTaskAsync
负责执行任务队列中的任务。 - 主线程调用
publish
、consume
等方法时,通过submitTask
将任务放入队列,再调用ev_async_send(_loop, &_task_async)
通知事件循环线程执行任务。
- 定义
- 优雅退出事件循环
- 定义
_exit_async
作为退出通知的 watcher,回调函数handleExitAsync
中调用ev_break
终止事件循环。 - 析构函数中通过
ev_async_send
发送退出通知,确保事件循环线程安全退出。
- 定义
六、注意事项
- 线程安全
ev_async_send
是唯一线程安全的函数,其他操作(如ev_async_init
、ev_async_start
)必须在事件循环线程中调用。
- 回调执行时机
- 回调函数在事件循环线程中执行,因此可以安全地操作事件循环内的资源(如
libev
的其他 watcher、AMQP 通道等)。
- 回调函数在事件循环线程中执行,因此可以安全地操作事件循环内的资源(如
- 避免阻塞回调
- 回调函数应尽量简短,避免长时间阻塞,否则会影响事件循环处理其他事件(如 IO 读写、定时器等)。
- 资源释放顺序
- 销毁
ev_async
前,需先调用ev_async_stop
从事件循环中移除,再释放相关资源。
- 销毁
五、 部分参考源代码
/*** @file amqp_ev_integration.h* @brief AMQP客户端与libev事件循环集成头文件* @details 该文件提供了AMQP协议客户端功能,并与libev事件循环进行集成*/#ifndef AMQP_EV_INTEGRATION_H
#define AMQP_EV_INTEGRATION_H#include <functional>
#include <string>
#include <string_view>
#include <cstdint>namespace AMQP {// 前向声明
class Connection;
class Message;
class Table;
class Deferred;
class DeferredQueue;
class DeferredConsumer;
class MetaData;
class Envelope;/*** @brief 交换器类型枚举*/
enum ExchangeType {fanout, ///< 广播交换,绑定的队列都能拿到消息direct, ///< 直接交换,只将消息交给routingkey一致的队列topic, ///< 主题交换,将消息交给符合bindingkey规则的队列headers, ///< 头部交换consistent_hash, ///< 一致性哈希交换message_deduplication ///< 消息去重交换
};/*** @brief 通用回调函数定义*/
using SuccessCallback = std::function<void()>; ///< 成功回调
using ErrorCallback = std::function<void(const char *message)>; ///< 错误回调
using FinalizeCallback = std::function<void()>; ///< 完成回调/*** @brief 队列相关回调*/
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; ///< 队列声明回调
using DeleteCallback = std::function<void(uint32_t deletedmessages)>; ///< 队列删除回调/*** @brief 消息相关回调1. const Message &message含义:消息本体,包含消息的内容、属性和路由信息。2. uint64_t deliveryTag含义:消息的唯一投递标识(相当于消息的 “快递单号”)。3. bool redelivered含义:标记消息是否是 “重发的”(true 表示重发,false 表示首次投递)。*/
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>; ///< 消息接收回调/*** @brief 确认回调* @note 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback*/
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; ///< 消息确认回调/*** @brief 发布确认回调* @note 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调*/
using PublishAckCallback = std::function<void()>; ///< 发布确认回调
using PublishNackCallback = std::function<void()>; ///< 发布否定确认回调
using PublishLostCallback = std::function<void()>; ///< 发布丢失回调/*** @brief 消费者回调*/
using ConsumeCallback = std::function<void(const std::string &consumertag)>; ///< 消费者启动回调
using CancelCallback = std::function<void(const std::string &tag)>; ///< 消费者取消回调/*** @brief AMQP通道类* @details 提供AMQP协议的各种操作,如声明交换器、队列、发布消息等*/
class Channel {
public:/*** @brief 构造函数* @param connection AMQP连接对象指针*/Channel(Connection *connection);/*** @brief 检查连接状态* @return bool 返回true表示已连接,false表示未连接*/bool connected();/*** @brief 声明交换器* @details 如果提供了一个空名称,则服务器将分配一个名称* * 支持的flags:* - durable 持久化,重启后交换机依然有效* - autodelete 删除所有连接的队列后,自动删除交换* - passive 仅被动检查交换机是否存在* - internal 创建内部交换* * @param name 交换机的名称* @param type 交换类型* @param flags 交换机标志* @param arguments 其他参数* @return Deferred& 返回延迟处理对象,可以安装回调函数*/Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments);/*** @brief 声明队列* @details 如果不提供名称,服务器将分配一个名称* * 支持的flags:* - durable 持久队列在代理重新启动后仍然有效* - autodelete 当所有连接的使用者都离开时,自动删除队列* - passive 仅被动检查队列是否存在* - exclusive 队列仅存在于此连接,并且在连接断开时自动删除* * @param name 队列的名称* @param flags 标志组合* @param arguments 可选参数* @return DeferredQueue& 返回延迟队列处理对象* * @example* channel.declareQueue("myqueue").onSuccess(* [](const std::string &name, * uint32_t messageCount, * uint32_t consumerCount) {* std::cout << "Queue '" << name << "' ";* std::cout << "has been declared with ";* std::cout << messageCount;* std::cout << " messages and ";* std::cout << consumerCount;* std::cout << " consumers" << std::endl;* });*/DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments);/*** @brief 将队列绑定到交换器* @param exchange 源交换器* @param queue 目标队列* @param routingkey 路由密钥* @param arguments 其他绑定参数* @return Deferred& 返回延迟处理对象*/Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments);/*** @brief 发布消息到交换器* @details RabbitMQ将尝试将消息发送到一个或多个队列。* 使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。* 默认情况下,不可更改的消息将被静默地丢弃。* * 如果设置了'mandatory'或'immediate'标志,则无法处理的消息将返回到应用程序。* 在开始发布之前,请确保您已经调用了recall()-方法,并设置了所有适当的处理程序来处理这些返回的消息。* * 支持的flags:* - mandatory 如果设置,服务器将返回未发送到队列的消息* - immediate 如果设置,服务器将返回无法立即转发给使用者的消息。* * @param exchange 要发布到的交换器* @param routingKey 路由密钥* @param message 要发送的消息* @param flags 可选标志* @return bool 返回true表示成功,false表示失败*/bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0);/*** @brief 消费消息* @details 调用此方法后,RabbitMQ开始向客户端应用程序传递消息。* consumer tag是一个字符串标识符,如果您以后想通过channel::cancel()调用停止它,可以使用它来标识使用者。* 如果您没有指定使用者tag,服务器将为您分配一个。* * 支持的flags:* - nolocal 如果设置了,则不会同时消耗在此通道上发布的消息* - noack 如果设置了,则不必对已消费的消息进行确认* - exclusive 请求独占访问,只有此使用者可以访问队列* * @param queue 您要使用的队列* @param tag 将与此消费操作关联的消费者标记* @param flags 其他标记* @param arguments 其他参数* @return DeferredConsumer& 返回延迟消费者处理对象* * @example* channel.consume("myqueue").onSuccess(* [](const std::string_view& tag) {* std::cout << "Started consuming under tag ";* std::cout << tag << std::endl;* });*/DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments);/*** @brief 确认接收到的消息* @details 当在DeferredConsumer::onReceived()方法中接收到消息时,* 必须确认该消息,以便RabbitMQ将其从队列中删除(除非使用noack选项消费)。* * 支持的标志:* - multiple 确认多条消息:之前传递的所有未确认消息也会得到确认* * @param deliveryTag 消息的唯一delivery标签* @param flags 可选标志* @return bool 返回true表示成功,false表示失败*/bool ack(uint64_t deliveryTag, int flags = 0);
};/*** @brief 延迟消费者类* @details 用于处理消费者相关的异步操作和回调*/
class DeferredConsumer {
public:/*** @brief 注册消费者启动成功回调* @param callback 回调函数,格式:void(const std::string &consumertag)* @return DeferredConsumer& 返回自身的引用,支持链式调用*/DeferredConsumer &onSuccess(const ConsumeCallback& callback);/*** @brief 注册消息接收回调* @param callback 回调函数,格式:void(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)* @return DeferredConsumer& 返回自身的引用,支持链式调用*/DeferredConsumer &onReceived(const MessageCallback& callback);/*** @brief onReceived()的别名* @param callback 回调函数* @return DeferredConsumer& 返回自身的引用,支持链式调用*/DeferredConsumer &onMessage(const MessageCallback& callback);/*** @brief 注册消费者取消回调* @param callback 回调函数,格式:void(const std::string &tag)* @return DeferredConsumer& 返回自身的引用,支持链式调用*/DeferredConsumer &onCancelled(const CancelCallback& callback);
};/*** @brief 消息类* @details 继承自Envelope,提供消息相关功能*/
class Message : public Envelope {
public:/*** @brief 获取交换器名称* @return const std::string& 交换器名称*/const std::string &exchange();/*** @brief 获取路由键* @return const std::string& 路由键*/const std::string &routingkey();
};/*** @brief 消息信封类* @details 继承自MetaData,包含消息的基本信息*/
class Envelope : public MetaData {
public:/*** @brief 获取消息体数据指针* @return const char* 消息体数据指针*/const char *body();/*** @brief 获取消息体大小* @return uint64_t 消息体大小(字节)*/uint64_t bodySize();
};} // namespace AMQP/*** @brief libev异步监视器结构体*/
typedef struct ev_async {EV_WATCHER (ev_async) ///< libev监视器基础结构EV_ATOMIC_T sent; ///< 私有字段,用于标记是否已发送
} ev_async;/*** @brief 循环中断类型枚举*/
enum {EVBREAK_CANCEL = 0, ///< 取消未循环EVBREAK_ONE = 1, ///< 单次未循环EVBREAK_ALL = 2 ///< 所有循环未循环
};/*** @brief 获取默认事件循环* @param flags 标志位* @return struct ev_loop* 事件循环指针*/
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP (= 0));/*** @brief 默认事件循环宏定义*/
#define EV_DEFAULT ev_default_loop(0)/*** @brief 运行事件循环* @param loop 事件循环指针* @return int 返回状态码*/
int ev_run(struct ev_loop *loop);/*** @brief 中断事件循环* @param loop 事件循环指针* @param break_type 中断类型*/
void ev_break(struct ev_loop *loop, int32_t break_type);/*** @brief 异步监视器回调函数类型* @param loop 事件循环指针* @param watcher 异步监视器指针* @param revents 事件标志*/
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);/*** @brief 初始化异步监视器* @param w 异步监视器指针* @param cb 回调函数*/
void ev_async_init(ev_async *w, callback cb);/*** @brief 启动异步监视器* @param loop 事件循环指针* @param w 异步监视器指针*/
void ev_async_start(struct ev_loop *loop, ev_async *w);/*** @brief 发送异步信号* @param loop 事件循环指针* @param w 异步监视器指针*/
void ev_async_send(struct ev_loop *loop, ev_async *w);#endif // AMQP_EV_INTEGRATION_H