当前位置: 首页 > news >正文

《AMQP-CPP——轻量级的 RabbitMQ C++ 客户端库》

一、什么是 RabbitMQ?

RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 开发的开源消息中间件,由 Erlang 语言编写(依托 Erlang 天然的高并发、分布式特性,保证服务稳定性)。它的核心作用是在分布式系统中实现 消息的异步传递,解耦上下游服务、削峰填谷、保障消息可靠性,是企业级分布式架构中常用的 “通信桥梁”。

二、为什么需要它?

在传统的分布式系统中,服务间直接调用(如 A 服务同步调用 B 服务)存在诸多问题:

  • 耦合度高:A 服务需知晓 B 服务的地址、接口,B 服务修改时会直接影响 A 服务;
  • 抗风险差:若 B 服务宕机,A 服务的调用会失败,甚至引发连锁故障;
  • 性能瓶颈:若 B 服务处理能力弱,大量请求会堆积在 A 服务,导致 A 服务响应缓慢。

RabbitMQ 的出现正是为了解决这些问题 —— 它作为 “中间代理人”,让上游服务(生产者)只需将消息发送到队列,下游服务(消费者)从队列中获取消息处理,实现 “生产者不直接依赖消费者” 的异步通信模式

通俗易懂,它就是个 “靠谱的中间跑腿的”:帮发消息的和收消息的 “牵线”,但不让他俩直接打交道。这样一来,系统里的各个服务(订单、库存、物流)不用互相依赖,就算某个服务坏了,消息也不会丢,等服务修好还能接着处理,既稳定又灵活。

三、安装 RabbitMQ

sudo apt install rabbitmq-server

1. RabbitMQ 的简单使用

# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个
administrator 用户,才可以做为远程登录和发表订阅消息:#添加用户
sudo rabbitmqctl add_user root 123456#设置用户 tag
sudo rabbitmqctl set_user_tags root administrator#设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management

查看 rabbitmq-server 的状态.
在这里插入图片描述

2. 安装 RabbitMQ 客户端库

如果需要在其他应用程序中使用 RabbitMQ,则需要安装 RabbitMQ 的客户端库。
可以使用以下命令安装 librabbitmq 库:

sudo apt-get install librabbitmq-dev

这将在系统上安装 RabbitMQ 的客户端库,包括头文件和静态库文件。

3. 安装 RabbitMQ 的 C++客户端库

• C 语言库:https://github.com/alanxz/rabbitmq-c
• C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP
我们这里使用 AMQP-CPP 库来编写客户端程序。

安装 AMQP-CPP

sudo apt install libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make 
make install

在这里插入图片描述
至此可以通过 AMQP-CPP 来操作 rabbitmq。
安装报错:

/usr/include/openssl/macros.h:147:4: error: #error 
"OPENSSL_API_COMPAT expresses an impossible API compatibility 
level"147 | # error "OPENSSL_API_COMPAT expresses an impossible API 
compatibility level"| ^~~~~
In file included from /usr/include/openssl/ssl.h:18,from linux_tcp/openssl.h:20,from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor, 
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
unsigned short *port_ptr))

这种错误,表示 ssl 版本出现问题。
解决方案:卸载当前的 ssl 库,重新进行修复安装

dpkg -l |grep ssl
ii erlang-ssl 
ii libevent-openssl-2.1-7:amd64 
pi libgnutls-openssl27:amd64 
ii libssl-dev:amd64 
ii libssl3:amd64 
ii libxmlsec1-openssl:amd64 
ii libzstd-dev:amd64 
ii libzstd1:amd64 
ii openssl 
ii python3-openssl 
ii zstd
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev 
sudo apt --fix-broken install

修复后,重新进行 make

四、AMQP-CPP 库的简单使用

AMQP-CPP 是一个轻量级的 RabbitMQ C++ 客户端库,它封装了 AMQP 协议的底层细节,让开发者能更便捷地用 C++ 操作 RabbitMQ。下面用通俗易懂的方式讲解其核心用法。

  • AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
  • 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件。
  • AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
  • 注意:它需要 c++17 的支持。

1. 发送者publish.cc

这里用的是AMQP-CPP教程里现有事件循环的教程(EXISTING EVENT LOOPS)

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main() {//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQp框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "声明交换机失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 声明队列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "声明队列失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 向交换机发布消息for (int i = 0; i < 10; i++) {std::string msg = "Hello Bite-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret = false) {std::cout << "publish 失败!\n";}}//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}

使用 AMQP-CPP 库结合 libev 事件循环框架,与 RabbitMQ 服务器建立连接,并完成 “交换机声明、队列声明、绑定关系建立、发送消息” 的完整流程,属于 RabbitMQ 消息队列的生产者代码。具体作用分析如下:

1. 初始化底层通信框架

auto *loop = EV_DEFAULT;                  // 获取 libev 事件循环实例(负责监控 I/O 事件)
AMQP::LibEvHandler handler(loop);         // 将 AMQP 框架与 libev 事件循环绑定// (无需手动实现 I/O 操作,由 LibEvHandler 自动处理)
  • libev 是一个事件循环库,负责监控网络套接字的读写事件(如 RabbitMQ 服务器的响应)。
  • LibEvHandler 是 AMQP-CPP 为 libev 提供的适配类,简化了网络 I/O 与 AMQP 协议的对接。

2. 建立与 RabbitMQ 的连接

AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");  // 定义连接参数
AMQP::TcpConnection connection(&handler, address);            // 建立 TCP 连接
  • Address 封装了 RabbitMQ 服务器的地址(127.0.0.1)、端口(5672)、用户名(root)、密码(123456)等信息。直接基于这些信息与 RabbitMQ 服务器建立 TCP 连接,无需手动拼接或解析连接参数,简化了连接过程
  • TcpConnection 通过上述参数与 RabbitMQ 服务器建立 TCP 连接,并基于 handler 处理底层 I/O 事件。

3. 创建通信信道(Channel)

AMQP::TcpChannel channel(&connection);  // 在连接上创建信道
  • AMQP 协议中,所有操作(如声明交换机、发送消息)都通过 “信道” 进行,而非直接使用 TCP 连接。
  • 一个 TCP 连接可以创建多个信道,信道用于隔离不同的消息流,提高连接利用率。

4. 声明交换机(Exchange)

channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) { ... })  // 失败回调:输出错误并退出.onSuccess([](){ ... });                   // 成功回调:输出创建成功信息
  • 交换机是 RabbitMQ 中接收生产者消息并路由到队列的组件。
  • 这里声明了一个名为 test-exchange 的交换机,类型为 direct(直接路由,根据路由键精确匹配)。
  • n通过 onError 和 onSuccess 注册回调函数,分别处理声明失败和成功的情况。

5. 声明队列(Queue)

channel.declareQueue("test-queue").onError([](const char *message) { ... })  // 失败回调.onSuccess([](){ ... });                   // 成功回调
  • 队列是 RabbitMQ 中存储消息的容器,消费者从队列中获取消息。
  • 这里声明了一个名为 test-queue 的队列(未指定特殊参数,使用默认配置,如非持久化、非排他)。

6. 绑定交换机与队列

channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) { ... })  // 失败回调.onSuccess([](){ ... });                   // 成功回调
  • 绑定操作定义了交换机如何将消息路由到队列:通过指定 “路由键(test-queue-key)”,交换机 test-exchange 会将携带该路由键的消息转发到 test-queue 队列。
  • 对于 direct 类型的交换机,只有消息的路由键与绑定的路由键完全匹配时,才会被路由到对应队列。

7. 发送消息到交换机

for (int i = 0; i < 10; i++) {std::string msg = "Hello Bite-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret == false) {  // 检查发送是否成功(原代码此处有笔误,已修正为 ==)std::cout << "publish 失败!\n";}
}
  • 通过 publish 方法向 test-exchange 交换机发送 10 条消息,每条消息的内容为 Hello Bite-数字。
  • 发送时指定路由键为 test-queue-key,结合之前的绑定关系,这些消息会被路由到 test-queue 队列。
  • publish 方法返回 bool 类型,表示消息是否成功提交到本地缓冲区(注意:不代表已成功发送到服务器,最终发送由事件循环处理)。

8. 启动事件循环

ev_run(loop, 0);  // 启动 libev 事件循环,处理网络 I/O 事件
  • 事件循环是整个程序的核心,负责监控网络套接字的读写事件(如发送消息到服务器、接收服务器的响应)。
  • 所有 AMQP 操作(声明交换机、发送消息等)的实际网络交互都在事件循环中完成。

2. 消费者consume.cc

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>//消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {std::string msg;msg.assign(message.body(), message.bodySize());std::cout << message.body() << std::endl;std::cout << msg << std::endl;channel->ack(deliveryTag); //对消息进行确认
}int main() {//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQp框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "声明交换机失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 失明队列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "声明队列失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败: " << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 订阅队列消息 -- 设置消息处理回调函数auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag").onReceived(callback).onError([](const char *message) {std::cout << "订阅 test-queue 队列消息失败! " << message << std::endl;exit(0);});//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}

这里的 1 - 6 都和发送者一样,所以只解析一下第7步

1. 订阅队列消息

// 绑定消息回调函数(将 MessageCb 与信道关联)
auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);// 订阅队列 test-queue,指定消费者标签为 consume-tag
channel.consume("test-queue", "consume-tag").onReceived(callback)  // 收到消息时触发 MessageCb 回调.onError([](const char *message) {  // 订阅失败的回调std::cout << "订阅 test-queue 队列消息失败! " << message << std::endl;exit(0);});
  • channel.consume 用于订阅队列,参数 consume-tag 是消费者的唯一标识(可自定义)。
  • onReceived(callback) 注册消息接收回调,当队列有新消息时,自动调用 MessageCb 处理。

2. 消息回调处理函数 MessageCb

void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {std::string msg;msg.assign(message.body(), message.bodySize());  // 将消息体转换为字符串std::cout << message.body() << std::endl;        // 直接输出消息体(const char* 类型)std::cout << msg << std::endl;                   // 输出转换后的字符串消息channel->ack(deliveryTag);  // 对消息进行确认(告知 RabbitMQ 已成功处理,可删除该消息)
}
  • 作用:当队列中有新消息时,RabbitMQ 会触发此回调函数,用于处理消息。
  • 参数说明:
    • channel:当前通信的信道指针。
    • message:消息对象,包含消息体(body())、消息大小(bodySize())等信息。
    • deliveryTag:消息的唯一标识(用于确认消息)。
    • redelivered:标记消息是否是重新投递的(如消费者未确认导致消息重新入队)。
  • 关键操作:channel->ack(deliveryTag) 是消息确认机制,告知 RabbitMQ 该消息已被成功处理,可从队列中删除,避免消息丢失或重复消费。

3. 运行效果

makefile:

all : publish consume
publish : publish.ccg++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
consume : consume.ccg++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags.PHONY:clean
clean:rm -f publish consume

在这里插入图片描述
在这里插入图片描述

为什么第一个输出会有乱码?
message.body() 返回的是未以 \0 结尾的原始内存缓冲区(仅通过message.bodySize() 确定长度)。
C++ 的 std::cout << const char* 会默认打印到 \0为止,若消息体中恰好包含 \0(如二进制数据、特殊字符),会导致输出截断;若消息体末尾没有 \0,则会越界读取内存中的随机数据,表现为乱码。

http://www.dtcms.com/a/554300.html

相关文章:

  • 《UniApp 页面配置文件pages.json》
  • SQLBot:基于大模型和RAG的智能问数系统
  • STM32程序下载/串口一键下载电路
  • 邯郸做网站的电话网站怎么做优化推广
  • 《Unity渲染工具协同进阶:跳出单一工具的局限》
  • 【SOA仿真】SOA增益饱和特性仿真2
  • COOKIE 数据提交注入测试 sqlilabs less 20
  • EasyExcel 流式处理中实现末尾行过滤的技术方案
  • 免费网址导航网站建设编程培训班学费是多少
  • 内推网站企业网站后台管理软件
  • 广州公司网站电脑全自动挂机赚钱
  • Redis 分布式锁如何保证同一时间只有一个客户端持有锁
  • 做网站编辑工作好不好如何设计好的网页
  • U-Net笔记
  • 海力士DDR差异性对比--H9HCNNNCPMMLXR-NEE H9HCNNNCPMMLXR-NEI
  • bfs|红黑树multiset
  • 伊利集团的网站建设水平评价成都做网站做的好的公司
  • 论文阅读:arxiv 2025 Safety in Large Reasoning Models: A Survey
  • 选择手机网站建设医疗网站织梦
  • 蓝牙体重秤方案:硬件设计需要注意什么
  • 张家港建网站的公司住房和城乡建设部网站
  • 【AIGC】HPS v2:评估人类对文本到图像合成偏好的可靠基准
  • Download from your IP address is not allowed(qt下载教程)
  • 出海东南亚无忧:腾讯云如何凭借本地合作与全球节点,保障游戏和电商业务合规流畅?
  • Jmeter的自动化测试实施方案详解
  • 共享自行车与电动共享自行车使用中建成环境影响的对比研究:基于合肥数据的时空机器学习分析
  • 如何使用Jmeter做接口测试?
  • 网站用哪个软件做企业官网建设费用
  • 重庆网站设计找重庆最佳科技蛋糕网站源码
  • 东莞建设网官方网站小程序怎么赚钱的