RabbitMQ C API 实现 RPC 通信实例
RabbitMQ C API 实现 RPC 通信实例
-
- 一、概述
- 二、什么是 RPC?
-
- 1、RPC 的工作原理
- 2、为什么使用 RabbitMQ 实现 RPC?
- 三、环境搭建
-
- 1、 启动 RabbitMQ 服务
- 2.、安装 rabbitmq-c 库
- 四、RPC 实现
-
- 1、服务端
- 2、客户端
- 五、编译和运行
-
- 1、编译命令
- 2、运行测试
- 3、测试结果示例
- 六、性能优化建议
-
- 1、连接复用
- 2.、消息批处理
- 3、 异步处理
- 4、 内存管理
- 七、常见问题排查
-
- 1、连接失败
- 2、消息丢失
- 3、性能问题
一、概述
本文将通过一个完整的示例,介绍如何使用 RabbitMQ 的 C API 实现 RPC(远程过程调用)功能。
二、什么是 RPC?
RPC(Remote Procedure Call) 是一种分布式系统中的通信模式,它允许一个程序调用另一个地址空间(通常是远程服务器)上的函数或方法,就像调用本地函数一样简单。
1、RPC 的工作原理
- 客户端发送一个请求消息到服务器,包含要调用的函数名和参数
- 服务端接收请求,执行相应的函数
- 服务端将执行结果返回给客户端
- 客户端接收结果并继续执行
2、为什么使用 RabbitMQ 实现 RPC?
- 异步通信:客户端不需要等待服务器立即响应
- 解耦合:客户端和服务端可以独立开发和部署
- 负载均衡:多个服务端可以同时处理请求
- 可靠性:RabbitMQ 提供消息持久化和确认机制
三、环境搭建
1、 启动 RabbitMQ 服务
使用 Docker 快速启动 RabbitMQ 服务:
# 启动rabbitmq服务
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 \-p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
端口说明:
5672
:AMQP 协议端口(客户端通信)15672
:Web 管理界面端口25672
:集群通信端口
2.、安装 rabbitmq-c 库
rabbitmq-c 是 RabbitMQ 的 C 语言客户端库:
# 安装 CMake(编译工具)
wget https://github.com/Kitware/CMake/releases/download/v4.1.1/cmake-4.1.1-linux-x86_64.sh
bash cmake-4.1.1-linux-x86_64.sh --skip-license --prefix=/usr/local/# 编译安装 rabbitmq-c
git clone https://github.com/alanxz/rabbitmq-c
cd rabbitmq-c/
/usr/local/bin/cmake .
make
sudo make install
四、RPC 实现
1、服务端
服务端的主要职责是监听请求队列,处理请求并返回响应。
cat > mqtt_srv.cpp << 'EOF'
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <rabbitmq-c/ssl_socket.h>#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <cstring>
#include <signal.h>#define RPC_QUEUE "rpc_queue"volatile sig_atomic_t running = 1;void signal_handler(int signum) {running = 0;
}void die_on_amqp_error(amqp_rpc_reply_t x, const char* context) {if (x.reply_type != AMQP_RESPONSE_NORMAL) {std::cerr << context << ": " << x.reply_type << std::endl;if (x.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {std::cerr << "Library error: " << x.library_error << std::endl;}exit(1);}
}void die_on_error(int x, const char* context) {if (x < 0) {std::cerr << context << ": " << x << std::endl;exit(1);}
}// 请求处理函数
std::string request_handler(const std::string& message) {std::cout << "Received request: " << message << std::endl;return message; // 原样返回
}int main(int argc, char* argv[]) {if (argc != 2) {std::cerr << "Usage: " << argv[0] << " <hostname>" << std::endl;return 1;}// 注册信号处理signal(SIGINT, signal_handler);signal(SIGTERM, signal_handler);amqp_connection_state_t conn;amqp_socket_t *socket = NULL;amqp_rpc_reply_t reply;// 建立连接conn = amqp_new_connection();socket = amqp_tcp_socket_new(conn);die_on_error(amqp_socket_open(socket, argv[1], 5672), "Opening TCP socket");// 登录amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");amqp_channel_open(conn, 1);die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");// 声明队列amqp_queue_declare(conn, 1, amqp_cstring_bytes(RPC_QUEUE),