《AMQP-CPP——轻量级的 RabbitMQ C++ 客户端库》
一、什么是 RabbitMQ?
RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 开发的开源消息中间件,由 Erlang 语言编写(依托 Erlang 天然的高并发、分布式特性,保证服务稳定性)。它的核心作用是在分布式系统中实现 消息的异步传递,解耦上下游服务、削峰填谷、保障消息可靠性,是企业级分布式架构中常用的 “通信桥梁”。
二、为什么需要它?
在传统的分布式系统中,服务间直接调用(如 A 服务同步调用 B 服务)存在诸多问题:
- 耦合度高:A 服务需知晓 B 服务的地址、接口,B 服务修改时会直接影响 A 服务;
- 抗风险差:若 B 服务宕机,A 服务的调用会失败,甚至引发连锁故障;
- 性能瓶颈:若 B 服务处理能力弱,大量请求会堆积在 A 服务,导致 A 服务响应缓慢。
RabbitMQ 的出现正是为了解决这些问题 —— 它作为 “中间代理人”,让上游服务(生产者)只需将消息发送到队列,下游服务(消费者)从队列中获取消息处理,实现 “生产者不直接依赖消费者” 的异步通信模式
通俗易懂,它就是个 “靠谱的中间跑腿的”:帮发消息的和收消息的 “牵线”,但不让他俩直接打交道。这样一来,系统里的各个服务(订单、库存、物流)不用互相依赖,就算某个服务坏了,消息也不会丢,等服务修好还能接着处理,既稳定又灵活。
三、安装 RabbitMQ
sudo apt install rabbitmq-server
1. RabbitMQ 的简单使用
# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个
administrator 用户,才可以做为远程登录和发表订阅消息:#添加用户
sudo rabbitmqctl add_user root 123456#设置用户 tag
sudo rabbitmqctl set_user_tags root administrator#设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management
查看 rabbitmq-server 的状态.

2. 安装 RabbitMQ 客户端库
如果需要在其他应用程序中使用 RabbitMQ,则需要安装 RabbitMQ 的客户端库。
可以使用以下命令安装 librabbitmq 库:
sudo apt-get install librabbitmq-dev
这将在系统上安装 RabbitMQ 的客户端库,包括头文件和静态库文件。
3. 安装 RabbitMQ 的 C++客户端库
• C 语言库:https://github.com/alanxz/rabbitmq-c
• C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP
我们这里使用 AMQP-CPP 库来编写客户端程序。
安装 AMQP-CPP
sudo apt install libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install

至此可以通过 AMQP-CPP 来操作 rabbitmq。
安装报错:
/usr/include/openssl/macros.h:147:4: error: #error
"OPENSSL_API_COMPAT expresses an impossible API compatibility
level"147 | # error "OPENSSL_API_COMPAT expresses an impossible API
compatibility level"| ^~~~~
In file included from /usr/include/openssl/ssl.h:18,from linux_tcp/openssl.h:20,from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
unsigned short *port_ptr))
这种错误,表示 ssl 版本出现问题。
解决方案:卸载当前的 ssl 库,重新进行修复安装
dpkg -l |grep ssl
ii erlang-ssl
ii libevent-openssl-2.1-7:amd64
pi libgnutls-openssl27:amd64
ii libssl-dev:amd64
ii libssl3:amd64
ii libxmlsec1-openssl:amd64
ii libzstd-dev:amd64
ii libzstd1:amd64
ii openssl
ii python3-openssl
ii zstd
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install
修复后,重新进行 make
四、AMQP-CPP 库的简单使用
AMQP-CPP 是一个轻量级的 RabbitMQ C++ 客户端库,它封装了 AMQP 协议的底层细节,让开发者能更便捷地用 C++ 操作 RabbitMQ。下面用通俗易懂的方式讲解其核心用法。
- AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
- 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件。
- AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
- 注意:它需要 c++17 的支持。
1. 发送者publish.cc
这里用的是AMQP-CPP教程里现有事件循环的教程(EXISTING EVENT LOOPS)
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main() {//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQp框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "声明交换机失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 声明队列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "声明队列失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 向交换机发布消息for (int i = 0; i < 10; i++) {std::string msg = "Hello Bite-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret = false) {std::cout << "publish 失败!\n";}}//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
使用 AMQP-CPP 库结合 libev 事件循环框架,与 RabbitMQ 服务器建立连接,并完成 “交换机声明、队列声明、绑定关系建立、发送消息” 的完整流程,属于 RabbitMQ 消息队列的生产者代码。具体作用分析如下:
1. 初始化底层通信框架
auto *loop = EV_DEFAULT; // 获取 libev 事件循环实例(负责监控 I/O 事件)
AMQP::LibEvHandler handler(loop); // 将 AMQP 框架与 libev 事件循环绑定// (无需手动实现 I/O 操作,由 LibEvHandler 自动处理)
- libev 是一个事件循环库,负责监控网络套接字的读写事件(如 RabbitMQ 服务器的响应)。
- LibEvHandler 是 AMQP-CPP 为 libev 提供的适配类,简化了网络 I/O 与 AMQP 协议的对接。
2. 建立与 RabbitMQ 的连接
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/"); // 定义连接参数
AMQP::TcpConnection connection(&handler, address); // 建立 TCP 连接
- Address 封装了 RabbitMQ 服务器的地址(127.0.0.1)、端口(5672)、用户名(root)、密码(123456)等信息。直接基于这些信息与 RabbitMQ 服务器建立 TCP 连接,无需手动拼接或解析连接参数,简化了连接过程
- TcpConnection 通过上述参数与 RabbitMQ 服务器建立 TCP 连接,并基于 handler 处理底层 I/O 事件。
3. 创建通信信道(Channel)
AMQP::TcpChannel channel(&connection); // 在连接上创建信道
- AMQP 协议中,所有操作(如声明交换机、发送消息)都通过 “信道” 进行,而非直接使用 TCP 连接。
- 一个 TCP 连接可以创建多个信道,信道用于隔离不同的消息流,提高连接利用率。
4. 声明交换机(Exchange)
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) { ... }) // 失败回调:输出错误并退出.onSuccess([](){ ... }); // 成功回调:输出创建成功信息
- 交换机是 RabbitMQ 中接收生产者消息并路由到队列的组件。
- 这里声明了一个名为 test-exchange 的交换机,类型为 direct(直接路由,根据路由键精确匹配)。
- n通过 onError 和 onSuccess 注册回调函数,分别处理声明失败和成功的情况。
5. 声明队列(Queue)
channel.declareQueue("test-queue").onError([](const char *message) { ... }) // 失败回调.onSuccess([](){ ... }); // 成功回调
- 队列是 RabbitMQ 中存储消息的容器,消费者从队列中获取消息。
- 这里声明了一个名为 test-queue 的队列(未指定特殊参数,使用默认配置,如非持久化、非排他)。
6. 绑定交换机与队列
channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) { ... }) // 失败回调.onSuccess([](){ ... }); // 成功回调
- 绑定操作定义了交换机如何将消息路由到队列:通过指定 “路由键(test-queue-key)”,交换机 test-exchange 会将携带该路由键的消息转发到 test-queue 队列。
- 对于 direct 类型的交换机,只有消息的路由键与绑定的路由键完全匹配时,才会被路由到对应队列。
7. 发送消息到交换机
for (int i = 0; i < 10; i++) {std::string msg = "Hello Bite-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret == false) { // 检查发送是否成功(原代码此处有笔误,已修正为 ==)std::cout << "publish 失败!\n";}
}
- 通过 publish 方法向 test-exchange 交换机发送 10 条消息,每条消息的内容为 Hello Bite-数字。
- 发送时指定路由键为 test-queue-key,结合之前的绑定关系,这些消息会被路由到 test-queue 队列。
- publish 方法返回 bool 类型,表示消息是否成功提交到本地缓冲区(注意:不代表已成功发送到服务器,最终发送由事件循环处理)。
8. 启动事件循环
ev_run(loop, 0); // 启动 libev 事件循环,处理网络 I/O 事件
- 事件循环是整个程序的核心,负责监控网络套接字的读写事件(如发送消息到服务器、接收服务器的响应)。
- 所有 AMQP 操作(声明交换机、发送消息等)的实际网络交互都在事件循环中完成。
2. 消费者consume.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>//消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {std::string msg;msg.assign(message.body(), message.bodySize());std::cout << message.body() << std::endl;std::cout << msg << std::endl;channel->ack(deliveryTag); //对消息进行确认
}int main() {//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQp框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "声明交换机失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 失明队列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "声明队列失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 订阅队列消息 -- 设置消息处理回调函数auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag").onReceived(callback).onError([](const char *message) {std::cout << "订阅 test-queue 队列消息失败! " << message << std::endl;exit(0);});//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
这里的 1 - 6 都和发送者一样,所以只解析一下第7步
1. 订阅队列消息
// 绑定消息回调函数(将 MessageCb 与信道关联)
auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);// 订阅队列 test-queue,指定消费者标签为 consume-tag
channel.consume("test-queue", "consume-tag").onReceived(callback) // 收到消息时触发 MessageCb 回调.onError([](const char *message) { // 订阅失败的回调std::cout << "订阅 test-queue 队列消息失败! " << message << std::endl;exit(0);});
- channel.consume 用于订阅队列,参数 consume-tag 是消费者的唯一标识(可自定义)。
- onReceived(callback) 注册消息接收回调,当队列有新消息时,自动调用 MessageCb 处理。
2. 消息回调处理函数 MessageCb
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {std::string msg;msg.assign(message.body(), message.bodySize()); // 将消息体转换为字符串std::cout << message.body() << std::endl; // 直接输出消息体(const char* 类型)std::cout << msg << std::endl; // 输出转换后的字符串消息channel->ack(deliveryTag); // 对消息进行确认(告知 RabbitMQ 已成功处理,可删除该消息)
}
- 作用:当队列中有新消息时,RabbitMQ 会触发此回调函数,用于处理消息。
- 参数说明:
-
- channel:当前通信的信道指针。
-
- message:消息对象,包含消息体(body())、消息大小(bodySize())等信息。
-
- deliveryTag:消息的唯一标识(用于确认消息)。
-
- redelivered:标记消息是否是重新投递的(如消费者未确认导致消息重新入队)。
- 关键操作:channel->ack(deliveryTag) 是消息确认机制,告知 RabbitMQ 该消息已被成功处理,可从队列中删除,避免消息丢失或重复消费。
3. 运行效果
makefile:
all : publish consume
publish : publish.ccg++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
consume : consume.ccg++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags.PHONY:clean
clean:rm -f publish consume


为什么第一个输出会有乱码?
message.body() 返回的是未以 \0 结尾的原始内存缓冲区(仅通过message.bodySize() 确定长度)。
C++ 的 std::cout << const char* 会默认打印到 \0为止,若消息体中恰好包含 \0(如二进制数据、特殊字符),会导致输出截断;若消息体末尾没有 \0,则会越界读取内存中的随机数据,表现为乱码。
