brpc 安装及使用
介绍
brpc(Baidu Remote Procedure Call)是百度开源的一个高性能、通用的 RPC(远程过程调用)框架,其目标是让使用者能轻松构建高并发、分布式的应用程序。以下从多个方面详细介绍brpc:
核心特性
- 高性能
- 多线程并发处理:支持多线程并发处理请求,可充分利用多核 CPU 的性能,提高系统的并发处理能力,轻松应对高并发场景。
- 异步 I/O:采用异步 I/O 模型,避免了传统同步 I/O 中线程阻塞的问题,使得线程在等待 I/O 操作完成时可以处理其他任务,进一步提升了系统的整体性能。
- 易用性
- 类函数调用的使用方式:使用 brpc 时,调用远程服务就像调用本地函数一样简单,开发者无需关心底层的网络通信细节,降低了开发难度。
- 自动重试与超时机制:框架内置了自动重试和超时机制,当请求失败时会自动进行重试,同时可以设置合理的超时时间,避免长时间等待,提高系统的健壮性和可靠性。
应用场景
- 分布式系统:在大规模的分布式系统中,不同服务之间需要进行远程调用,brpc 可以作为服务间通信的桥梁,提供高效、稳定的通信服务。例如,在微服务架构中,各个微服务之间可以使用 brpc 进行通信,实现服务的解耦和独立部署。
- 高性能计算:对于需要处理大量数据和高并发请求的高性能计算场景,brpc 的高性能特性可以充分发挥作用,提高系统的处理能力和响应速度。比如,在大数据处理、人工智能等领域,brpc 可以用于数据的传输和处理。
- 云计算:在云计算环境中,不同的云服务之间需要进行交互,brpc 可以提供可靠的远程调用服务,确保云服务的高效运行。例如,在云存储、云数据库等服务中,brpc 可以用于客户端与服务端之间的通信。
安装
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
make && sudo make install
类与接口介绍
日志输出类与接口
namespace logging
{enum LoggingDestination{LOG_TO_NONE = 0 //表示不输出日志到任何目标(禁用日志功能)};struct BUTIL_EXPORT LoggingSettings{LoggingSettings();LoggingDestination logging_dest;};bool InitLogging(const LoggingSettings &settings);
}
protobuf 类与接口
namespace google
{namespace protobuf{class PROTOBUF_EXPORT Closure{public:Closure() {}virtual ~Closure();virtual void Run() = 0;};inline Closure *NewCallback(void (*function)());class PROTOBUF_EXPORT RpcController{bool Failed();std::string ErrorText();}}
}
回调机制
-
Closure
是异步 RPC 的完成通知载体,NewCallback
为其提供快速创建的辅助函数。 -
RpcController
通过Failed()
和ErrorText()
提供回调中可能需要的错误信息。
1. Closure
类
作用:定义了一个通用的、可扩展的回调操作,用于异步操作完成后的通知。子类必须实现 Run()
方法,定义具体的回调逻辑。
核心方法
方法 | 说明 |
---|---|
| 回调执行入口,由子类实现具体逻辑 |
| 虚析构函数,确保子类对象正确释放 |
设计意图
-
解耦异步操作与回调逻辑:允许用户自定义回调行为(如释放资源、触发事件等)。
-
与 Protobuf RPC 集成:广泛用于 RPC 完成后的异步通知。
示例用法
class MyCallback : public google::protobuf::Closure {
public:void Run() override {std::cout << "Callback executed!" << std::endl;delete this; // 自管理内存}
};
// 使用
google::protobuf::Closure* callback = new MyCallback();
callback->Run(); // 输出 "Callback executed!"
2. NewCallback
函数
作用:将普通函数(无参数、无返回值)包装为 Closure
对象。避免手动子类化,简化一次性回调的创建。
示例
void OnRpcDone() {std::cout << "RPC finished!" << std::endl;
}// 创建回调
google::protobuf::Closure* callback = google::protobuf::NewCallback(&OnRpcDone);
callback->Run(); // 输出 "RPC finished!"
delete callback; // 需手动释放
3. RpcController
类
作用:提供 RPC 执行过程中的错误处理和状态查询。
核心方法
方法 | 说明 |
---|---|
| 检查 RPC 是否失败 |
| 获取错误描述信息 |
典型用途
brpc::Controller cntl; // brpc 的实现类
MyService_Stub stub(&channel);
MyRequest request;
MyResponse response;// 发起异步 RPC
stub.MyMethod(&cntl, &request, &response, NewCallback(&OnRpcDone));
if (cntl.Failed()) {std::cerr << "RPC failed: " << cntl.ErrorText() << std::endl;
}
服务端类与接口
namespace brpc
{struct ServerOptions{// 无数据传输,则指定时间后关闭连接int idle_timeout_sec; // Default: -1 (disabled)int num_threads; // Default: #cpu-cores//....};enum ServiceOwnership {// 添加服务失败时,服务器将负责删除服务对象SERVER_OWNS_SERVICE,// 添加服务失败时,服务器也不会删除服务对象SERVER_DOESNT_OWN_SERVICE};class Server{int AddService(google::protobuf::Service *service,ServiceOwnership ownership);int Start(int port, const ServerOptions *opt);int Stop(int closewait_ms /*not used anymore*/);int Join();// 休眠直到 ctrl+c 按下,或者 stop 和 join 服务器void RunUntilAskedToQuit();};class ClosureGuard{explicit ClosureGuard(google::protobuf::Closure *done);~ClosureGuard(){if (_done)_done->Run();}};class HttpHeader{void set_content_type(const std::string &type)const std::string *GetHeader(const std::string &key) void SetHeader(const std::string &key,const std::string &value);const URI &uri() const { return _uri; }HttpMethod method() const { return _method; }void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);};class Controller : public google::protobuf::RpcController{void set_timeout_ms(int64_t timeout_ms);void set_max_retry(int max_retry);google::protobuf::Message *response();HttpHeader &http_response();HttpHeader &http_request();bool Failed();std::string ErrorText();using AfterRpcRespFnType = std::function<void(Controller *cntl,const google::protobuf::Message *req,const google::protobuf::Message *res)>;void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn)};
}
1. ServerOptions
结构体
作用:配置 brpc 服务器的核心参数,通过 Server::Start() 控制服务器行为。
关键字段
-
idle_timeout_sec:
空闲连接超时时间(秒),默认-1
(不启用),例如设为30
表示 30 秒无数据则关闭连接。 -
num_threads:
工作线程数,默认CPU核心数-1
。高并发场景可适当增加,但需避免线程过多导致上下文切换开销。
2. ServiceOwnership
枚举
作用:控制服务对象的生命周期管理权限。
枚举值
-
SERVER_OWNS_SERVICE:
服务器负责删除服务对象(防止内存泄漏)。 -
SERVER_DOESNT_OWN_SERVICE:
用户自行管理服务对象生命周期(需手动释放)。
典型场景
MyService *service = new MyService;
server.AddService(service, SERVER_OWNS_SERVICE); // 服务器自动管理
server.AddService(service, SERVER_DOESNT_OWN_SERVICE); // 用户需手动 delete
3. Server
类
核心方法
方法 | 作用 | 参数说明 |
---|---|---|
| 注册 Protobuf 服务 |
|
| 启动服务器 |
|
| 停止服务器(非阻塞) |
|
| 等待服务器完全停止 | 需在 |
| 阻塞直到收到终止信号 | 替代手动 |
工作流程
brpc::Server server;
MyService service;
server.AddService(&service, brpc::SERVER_OWNS_SERVICE);brpc::ServerOptions options;
options.idle_timeout_sec = 30;
server.Start(8000, &options);
server.RunUntilAskedToQuit(); // 阻塞运行
4. ClosureGuard
类
作用:RAII 封装 Protobuf Closure
,确保析构时自动调用 done->Run()
,避免回调遗漏。
典型用法
void MyRpcMethod(..., google::protobuf::Closure *done)
{brpc::ClosureGuard done_guard(done); // 异常安全if (error){cntl->SetFailed("Error");return; // done_guard 析构时自动调用 done->Run()}// 正常处理...
}
5. HttpHeader
类
作用:管理 HTTP 请求/响应的头部信息,提供便捷的键值操作。
核心方法
方法 | 作用 |
---|---|
| 设置 |
| 获取指定头部值 |
| 设置自定义头部 |
| 获取请求 URI(路径、查询参数等) |
| 获取 HTTP 方法(GET/POST 等) |
示例
brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_response().set_content_type("text/plain");
6. Controller
类
继承自 google::protobuf::RpcController
,扩展了 brpc 特有功能。
核心功能
方法 | 作用 |
---|---|
set_timeout_ms() | 设置 RPC 超时时间(毫秒) |
set_max_retry() | 设置最大重试次数 |
http_request() /http_response() | 访问 HTTP 头部 |
set_after_rpc_resp_fn() | 设置 RPC 完成后的回调(用于日志、统计等) |
典型流程
brpc::Controller cntl;
cntl.set_timeout_ms(5000); // 5秒超时
MyService_Stub stub(&channel);
MyRequest request;
MyResponse response;stub.MyMethod(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {LOG(ERROR) << "RPC failed: " << cntl.ErrorText();
}
客户端类与接口
namespace brpc
{struct ChannelOptions{// 请求连接超时时间int32_t connect_timeout_ms; // Default: 200 (milliseconds)// rpc 请求超时时间int32_t timeout_ms; // Default: 500 (milliseconds)// 最大重试次数int max_retry; // Default: 3// 序列化协议类型 options.protocol = "baidu_std";AdaptiveProtocolType protocol;//....};class Channel : public ChannelBase{// 初始化接口,成功返回 0;int Init(const char *server_addr_and_port,const ChannelOptions *options);};
}
1. ChannelOptions
结构体
作用:用于配置 Channel
(RPC 客户端通道)的核心参数,控制连接和请求行为。
核心参数
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
|
| 200 ms | 连接超时时间:建立 TCP 连接的最长等待时间。 |
|
| 500 ms | 请求超时时间:RPC 调用的最长等待时间(从发送到接收响应)。 |
|
| 3 | 最大重试次数:连接或请求失败时的自动重试次数。 |
|
| - | 协议类型:如 |
2. Channel
类
作用:管理与服务端的通信通道,支持同步/异步调用。
核心方法
方法 | 说明 |
---|---|
| 初始化通道,连接指定的服务端地址。 |
示例
brpc::Channel channel;
brpc::ChannelOptions options;
options.timeout_ms = 1000; // 设置请求超时为 1 秒
options.max_retry = 1; // 禁用重试if (channel.Init("127.0.0.1:8000", &options) != 0) {// 处理初始化失败
}
使用
创建 proto 文件 - main.proto
syntax="proto3";
package example;
option cc_generic_services = true;// 定义 Echo 方法请求参数结构
message EchoRequest
{string message=1;
};
// 定义 Echo 方法响应参数结构
message EchoResponse
{string message=1;
};
// 定义 RPC 远端方法
service EchoService
{rpc Echo(EchoRequest) returns (EchoResponse);
};
protoc --cpp_out=./ main.proto
创建服务端源码 - server.cpp
#include <iostream>
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
// 继承EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService
{
public:void Echo(google::protobuf::RpcController *controller,const example::EchoRequest *request, example::EchoResponse *response, google::protobuf::Closure *done){brpc::ClosureGuard guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = "这是响应--" + request->message();response->set_message(str);// done->Run(); 由于使用了ClosureGuard,异步完成时就不需要手动调用Run函数}
};
int main()
{// 关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 构造服务器对象brpc::Server server;// 向服务器对象添加EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1){std::cout << "添加Rpc服务失败!\n";return -1;}// 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; // 连接空闲超时时间,超时后连接被关闭,-1为一直连接options.num_threads = 1; // io线程数量ret = server.Start(8080, &options);if (ret == -1){std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit(); // 修改等待运行结束return 0;
}
创建客户端源码 - client.cpp
#include <iostream>
#include <thread>
#include <brpc/channel.h>
#include "main.pb.h"
#include <memory>
void callback(brpc::Controller *cntl, example::EchoResponse *response)
{std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> rsp_guard(response);if (cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << response->message() << std::endl;
}
int main()
{// 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; // rpc请求等待超时时间,-1表示一直等待options.max_retry = 3; // 请求重试次数options.protocol = "baidu_std"; // 序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1){std::cout << "初始化信道失败!\n";return -1;}// 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);// 进行rpc调用example::EchoRequest req;req.set_message("hello world!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();// 1.同步调用stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return -1;}std::cout << "收到响应: " << rsp->message() << std::endl;delete cntl;delete rsp;// 2.异步调用// auto closure = google::protobuf::NewCallback(callback,cntl,rsp);// stub.Echo(cntl, &req, rsp, closure);// std::cout << "异步调用结束!\n";// std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}
all:server client
server:server.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client:client.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
二次封装
- rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此我们只需要封装通信所需的 Channel 管理即可,这样当需要进行什么样的服务调用的时候,只需要通过服务名称获取对应的 channel,然后实例化 Stub 进行调用即可。
- 封装 Channel 的管理,每个不同的服务可能都会有多个主机提供服务,因此一个服务可能会对应多个 Channel,需要将其管理起来,并提供获取指定服务 channel 的接口。进行 rpc 调用时,获取 channel,目前以 RR 轮转的策略选择 channel。
- 提供进行服务声明的接口,因为在整个系统中,提供的服务有很多,但是当前可能并不一定会用到所有的服务,因此通过声明来告诉模块哪些服务是自己关心的,需要建立连接管理起来,没有添加声明的服务即使上线也不需要进行连接的建立。
- 提供服务上线时的处理接口,也就是新增一个指定服务的 channel。
- 提供服务下线时的处理接口,也就是删除指定服务下的指定 channel。
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "logger.hpp"
#include <brpc/channel.h>// 封装单个服务的信道管理类
class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &name): _service_name(name), _index(0){}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){// 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; // rpc请求等待超时时间,-1表示一直等待options.max_retry = 3; // 请求重试次数options.protocol = "baidu_std"; // 序列化协议,默认使用baidu_stdauto channel = std::make_shared<brpc::Channel>();int ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it == _hosts.end()){LOG_WARN("删除{}-{}节点信道时,没有找到信道信息!", _service_name, host);return;}for (auto vit = _channels.begin(); vit != _channels.end(); vit++){if (*vit == it->second){_channels.erase(vit);break;}}_hosts.erase(it);}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){LOG_ERROR("当前没有能够提供{}服务的节点!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index; // 当前轮转下标的计数器std::string _service_name; // 服务名称std::vector<ChannelPtr> _channels; // 当前服务对应的信道集合std::unordered_map<std::string, ChannelPtr> _hosts; // 主机地址与信道的映射
};// 总体的服务信道管理类
class ServiceManager
{
public:using ptr = std::shared_ptr<ServiceManager>;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("当前没有能够提供{}服务的节点!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 先声明我关心哪些服务的上下线,不关心的则不需要管理void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_DEBUG("{}-{}服务上线了,但是当前并不关心!", service_name, host);return;}// 先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services[service_name] = service;}elseservice = sit->second;}if (!service){LOG_ERROR("新增{}服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从相应的服务节点删除信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_DEBUG("{}-{}服务下线了,但是当前并不关心!", service_name, host);return;}// 先获取管理对象,有则删除节点信道auto sit = _services.find(service_name);if (sit == _services.end()){LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}elseservice = sit->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}private:std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;elsereturn service_instance.substr(0, pos);}private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;std::unordered_map<std::string, ServiceChannel::ptr> _services;
};