当前位置: 首页 > news >正文

brpc 安装及使用

介绍

brpc(Baidu Remote Procedure Call)是百度开源的一个高性能、通用的 RPC(远程过程调用)框架,其目标是让使用者能轻松构建高并发、分布式的应用程序。以下从多个方面详细介绍brpc:

核心特性

  1. 高性能
    • 多线程并发处理:支持多线程并发处理请求,可充分利用多核 CPU 的性能,提高系统的并发处理能力,轻松应对高并发场景。
    • 异步 I/O:采用异步 I/O 模型,避免了传统同步 I/O 中线程阻塞的问题,使得线程在等待 I/O 操作完成时可以处理其他任务,进一步提升了系统的整体性能。
  2. 易用性
    • 类函数调用的使用方式:使用 brpc 时,调用远程服务就像调用本地函数一样简单,开发者无需关心底层的网络通信细节,降低了开发难度。
    • 自动重试与超时机制:框架内置了自动重试和超时机制,当请求失败时会自动进行重试,同时可以设置合理的超时时间,避免长时间等待,提高系统的健壮性和可靠性。

应用场景

  1. 分布式系统:在大规模的分布式系统中,不同服务之间需要进行远程调用,brpc 可以作为服务间通信的桥梁,提供高效、稳定的通信服务。例如,在微服务架构中,各个微服务之间可以使用 brpc 进行通信,实现服务的解耦和独立部署。
  2. 高性能计算:对于需要处理大量数据和高并发请求的高性能计算场景,brpc 的高性能特性可以充分发挥作用,提高系统的处理能力和响应速度。比如,在大数据处理、人工智能等领域,brpc 可以用于数据的传输和处理。
  3. 云计算:在云计算环境中,不同的云服务之间需要进行交互,brpc 可以提供可靠的远程调用服务,确保云服务的高效运行。例如,在云存储、云数据库等服务中,brpc 可以用于客户端与服务端之间的通信。

安装

以 Ubuntu 系统为例,先安装依赖
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装 brpc
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
make && sudo make install

类与接口介绍

日志输出类与接口

包含头文件: #include <butil/logging.h>
日志输出这里,本质上我们其实用不着 brpc 的日志输出,因此在这里主要介绍如何关闭日志输出。
namespace logging
{enum LoggingDestination{LOG_TO_NONE = 0 //表示不输出日志到任何目标(禁用日志功能)};struct BUTIL_EXPORT LoggingSettings{LoggingSettings();LoggingDestination logging_dest;};bool InitLogging(const LoggingSettings &settings);
}

protobuf 类与接口

namespace google
{namespace protobuf{class PROTOBUF_EXPORT Closure{public:Closure() {}virtual ~Closure();virtual void Run() = 0;};inline Closure *NewCallback(void (*function)());class PROTOBUF_EXPORT RpcController{bool Failed();std::string ErrorText();}}
}

回调机制

  • Closure 是异步 RPC 的完成通知载体,NewCallback 为其提供快速创建的辅助函数。

  • RpcController 通过 Failed() 和 ErrorText() 提供回调中可能需要的错误信息。

1. Closure 类

作用:定义了一个通用的、可扩展的回调操作,用于异步操作完成后的通知。子类必须实现 Run() 方法,定义具体的回调逻辑。

核心方法

方法

说明

virtual void Run() = 0

回调执行入口,由子类实现具体逻辑

virtual ~Closure()

虚析构函数,确保子类对象正确释放

设计意图

  • 解耦异步操作与回调逻辑:允许用户自定义回调行为(如释放资源、触发事件等)。

  • 与 Protobuf RPC 集成:广泛用于 RPC 完成后的异步通知。

示例用法

class MyCallback : public google::protobuf::Closure {
public:void Run() override {std::cout << "Callback executed!" << std::endl;delete this; // 自管理内存}
};
// 使用
google::protobuf::Closure* callback = new MyCallback();
callback->Run(); // 输出 "Callback executed!"

2. NewCallback 函数

作用:将普通函数(无参数、无返回值)包装为 Closure 对象。避免手动子类化,简化一次性回调的创建。

示例

void OnRpcDone() {std::cout << "RPC finished!" << std::endl;
}// 创建回调
google::protobuf::Closure* callback = google::protobuf::NewCallback(&OnRpcDone);
callback->Run(); // 输出 "RPC finished!"
delete callback; // 需手动释放

3. RpcController 类

作用:提供 RPC 执行过程中的错误处理和状态查询。

核心方法

方法

说明

bool Failed()

检查 RPC 是否失败

std::string ErrorText()

获取错误描述信息

典型用途

brpc::Controller cntl;  // brpc 的实现类
MyService_Stub stub(&channel);
MyRequest request;
MyResponse response;// 发起异步 RPC
stub.MyMethod(&cntl, &request, &response, NewCallback(&OnRpcDone));
if (cntl.Failed()) {std::cerr << "RPC failed: " << cntl.ErrorText() << std::endl;
} 

服务端类与接口

这里只介绍主要用到的成员与接口。
namespace brpc
{struct ServerOptions{// 无数据传输,则指定时间后关闭连接int idle_timeout_sec; // Default: -1 (disabled)int num_threads;      // Default: #cpu-cores//....};enum ServiceOwnership {// 添加服务失败时,服务器将负责删除服务对象SERVER_OWNS_SERVICE,// 添加服务失败时,服务器也不会删除服务对象SERVER_DOESNT_OWN_SERVICE};class Server{int AddService(google::protobuf::Service *service,ServiceOwnership ownership);int Start(int port, const ServerOptions *opt);int Stop(int closewait_ms /*not used anymore*/);int Join();// 休眠直到 ctrl+c 按下,或者 stop 和 join 服务器void RunUntilAskedToQuit();};class ClosureGuard{explicit ClosureGuard(google::protobuf::Closure *done);~ClosureGuard(){if (_done)_done->Run();}};class HttpHeader{void set_content_type(const std::string &type)const std::string *GetHeader(const std::string &key) void SetHeader(const std::string &key,const std::string &value);const URI &uri() const { return _uri; }HttpMethod method() const { return _method; }void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);};class Controller : public google::protobuf::RpcController{void set_timeout_ms(int64_t timeout_ms);void set_max_retry(int max_retry);google::protobuf::Message *response();HttpHeader &http_response();HttpHeader &http_request();bool Failed();std::string ErrorText();using AfterRpcRespFnType = std::function<void(Controller *cntl,const google::protobuf::Message *req,const google::protobuf::Message *res)>;void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn)};
}

1. ServerOptions 结构体

作用:配置 brpc 服务器的核心参数,通过 Server::Start() 控制服务器行为。

关键字段

  • idle_timeout_sec:空闲连接超时时间(秒),默认 -1(不启用),例如设为 30 表示 30 秒无数据则关闭连接。

  • num_threads:工作线程数,默认 CPU核心数-1。高并发场景可适当增加,但需避免线程过多导致上下文切换开销。


2. ServiceOwnership 枚举

作用:控制服务对象的生命周期管理权限。

枚举值

  • SERVER_OWNS_SERVICE:服务器负责删除服务对象(防止内存泄漏)。

  • SERVER_DOESNT_OWN_SERVICE:用户自行管理服务对象生命周期(需手动释放)。

典型场景

MyService *service = new MyService;
server.AddService(service, SERVER_OWNS_SERVICE); // 服务器自动管理
server.AddService(service, SERVER_DOESNT_OWN_SERVICE); // 用户需手动 delete

3. Server 类

核心方法

方法

作用

参数说明

AddService()

注册 Protobuf 服务

service: 服务对象指针;ownership: 生命周期控制

Start()

启动服务器

port: 监听端口;optServerOptions 配置指针

Stop()

停止服务器(非阻塞)

closewait_ms: 已弃用

Join()

等待服务器完全停止

需在 Stop() 后调用

RunUntilAskedToQuit()

阻塞直到收到终止信号

替代手动 Stop()+Join()

工作流程

brpc::Server server;
MyService service;
server.AddService(&service, brpc::SERVER_OWNS_SERVICE);brpc::ServerOptions options;
options.idle_timeout_sec = 30;
server.Start(8000, &options);
server.RunUntilAskedToQuit(); // 阻塞运行

4. ClosureGuard 类

作用:RAII 封装 Protobuf Closure,确保析构时自动调用 done->Run(),避免回调遗漏。

典型用法

void MyRpcMethod(..., google::protobuf::Closure *done)
{brpc::ClosureGuard done_guard(done); // 异常安全if (error){cntl->SetFailed("Error");return; // done_guard 析构时自动调用 done->Run()}// 正常处理...
}

5. HttpHeader 类

作用:管理 HTTP 请求/响应的头部信息,提供便捷的键值操作。

核心方法

方法

作用

set_content_type()

设置 Content-Type(如 "application/json"

GetHeader()

获取指定头部值

SetHeader()

设置自定义头部

uri()

获取请求 URI(路径、查询参数等)

method()

获取 HTTP 方法(GET/POST 等)

示例

brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_response().set_content_type("text/plain");

6. Controller 类

继承自 google::protobuf::RpcController,扩展了 brpc 特有功能。

核心功能

方法作用
set_timeout_ms()设置 RPC 超时时间(毫秒)
set_max_retry()设置最大重试次数
http_request()/http_response()访问 HTTP 头部
set_after_rpc_resp_fn()设置 RPC 完成后的回调(用于日志、统计等)

典型流程

brpc::Controller cntl;
cntl.set_timeout_ms(5000); // 5秒超时
MyService_Stub stub(&channel);
MyRequest request;
MyResponse response;stub.MyMethod(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {LOG(ERROR) << "RPC failed: " << cntl.ErrorText();
}
 

客户端类与接口

namespace brpc
{struct ChannelOptions{// 请求连接超时时间int32_t connect_timeout_ms; // Default: 200 (milliseconds)// rpc 请求超时时间int32_t timeout_ms; // Default: 500 (milliseconds)// 最大重试次数int max_retry; // Default: 3// 序列化协议类型 options.protocol = "baidu_std";AdaptiveProtocolType protocol;//....};class Channel : public ChannelBase{// 初始化接口,成功返回 0;int Init(const char *server_addr_and_port,const ChannelOptions *options);};
}

1. ChannelOptions 结构体

作用:用于配置 Channel(RPC 客户端通道)的核心参数,控制连接和请求行为。

核心参数

参数

类型

默认值

说明

connect_timeout_ms

int32_t

200 ms

连接超时时间:建立 TCP 连接的最长等待时间。

timeout_ms

int32_t

500 ms

请求超时时间:RPC 调用的最长等待时间(从发送到接收响应)。

max_retry

int

3

最大重试次数:连接或请求失败时的自动重试次数。

protocol

AdaptiveProtocolType

-

协议类型:如 "baidu_std"(百度标准协议)、"http" 等。


2. Channel 类

作用:管理与服务端的通信通道,支持同步/异步调用。

核心方法

方法

说明

Init()

初始化通道,连接指定的服务端地址。

示例

brpc::Channel channel;
brpc::ChannelOptions options;
options.timeout_ms = 1000;  // 设置请求超时为 1 秒
options.max_retry = 1;      // 禁用重试if (channel.Init("127.0.0.1:8000", &options) != 0) {// 处理初始化失败
}

使用

同步调用是指客户端会阻塞收到 server 端的响应或发生错误。下面我们以 Echo (输出 hello world )方法为例 , 来讲解基础的同步 RPC 请求是如何实现的。

创建 proto 文件 - main.proto

syntax="proto3";
package example;
option cc_generic_services = true;// 定义 Echo 方法请求参数结构
message EchoRequest
{string message=1;
};
// 定义 Echo 方法响应参数结构
message EchoResponse
{string message=1;
};
// 定义 RPC 远端方法
service EchoService
{rpc Echo(EchoRequest) returns (EchoResponse);
};
使用以下命令编译 main.proto 文件生成 main.pb.h 和 main.pb.cc 两个文件。
protoc --cpp_out=./ main.proto

创建服务端源码 - server.cpp

#include <iostream>
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
// 继承EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService
{
public:void Echo(google::protobuf::RpcController *controller,const example::EchoRequest *request, example::EchoResponse *response, google::protobuf::Closure *done){brpc::ClosureGuard guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = "这是响应--" + request->message();response->set_message(str);// done->Run(); 由于使用了ClosureGuard,异步完成时就不需要手动调用Run函数}
};
int main()
{// 关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 构造服务器对象brpc::Server server;// 向服务器对象添加EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1){std::cout << "添加Rpc服务失败!\n";return -1;}// 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; // 连接空闲超时时间,超时后连接被关闭,-1为一直连接options.num_threads = 1;       // io线程数量ret = server.Start(8080, &options);if (ret == -1){std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit(); // 修改等待运行结束return 0;
}

创建客户端源码 - client.cpp

#include <iostream>
#include <thread>
#include <brpc/channel.h>
#include "main.pb.h"
#include <memory>
void callback(brpc::Controller *cntl, example::EchoResponse *response)
{std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> rsp_guard(response);if (cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << response->message() << std::endl;
}
int main()
{// 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待options.timeout_ms = -1;         // rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;           // 请求重试次数options.protocol = "baidu_std";  // 序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1){std::cout << "初始化信道失败!\n";return -1;}// 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);// 进行rpc调用example::EchoRequest req;req.set_message("hello world!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();// 1.同步调用stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return -1;}std::cout << "收到响应: " << rsp->message() << std::endl;delete cntl;delete rsp;// 2.异步调用//  auto closure = google::protobuf::NewCallback(callback,cntl,rsp);//  stub.Echo(cntl, &req, rsp, closure);//  std::cout << "异步调用结束!\n";//  std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}
编写 m akefile
all:server client
server:server.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client:client.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb

二次封装

  • rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此我们只需要封装通信所需的 Channel 管理即可,这样当需要进行什么样的服务调用的时候,只需要通过服务名称获取对应的 channel,然后实例化 Stub 进行调用即可。
  • 封装 Channel 的管理,每个不同的服务可能都会有多个主机提供服务,因此一个服务可能会对应多个 Channel,需要将其管理起来,并提供获取指定服务 channel 的接口。进行 rpc 调用时,获取 channel,目前以 RR 轮转的策略选择 channel。
  • 提供进行服务声明的接口,因为在整个系统中,提供的服务有很多,但是当前可能并不一定会用到所有的服务,因此通过声明来告诉模块哪些服务是自己关心的,需要建立连接管理起来,没有添加声明的服务即使上线也不需要进行连接的建立。
  • 提供服务上线时的处理接口,也就是新增一个指定服务的 channel。
  • 提供服务下线时的处理接口,也就是删除指定服务下的指定 channel。
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "logger.hpp"
#include <brpc/channel.h>// 封装单个服务的信道管理类
class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &name): _service_name(name), _index(0){}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){// 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待options.timeout_ms = -1;         // rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;           // 请求重试次数options.protocol = "baidu_std";  // 序列化协议,默认使用baidu_stdauto channel = std::make_shared<brpc::Channel>();int ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it == _hosts.end()){LOG_WARN("删除{}-{}节点信道时,没有找到信道信息!", _service_name, host);return;}for (auto vit = _channels.begin(); vit != _channels.end(); vit++){if (*vit == it->second){_channels.erase(vit);break;}}_hosts.erase(it);}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){LOG_ERROR("当前没有能够提供{}服务的节点!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;                                     // 当前轮转下标的计数器std::string _service_name;                          // 服务名称std::vector<ChannelPtr> _channels;                  // 当前服务对应的信道集合std::unordered_map<std::string, ChannelPtr> _hosts; // 主机地址与信道的映射
};// 总体的服务信道管理类
class ServiceManager
{
public:using ptr = std::shared_ptr<ServiceManager>;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("当前没有能够提供{}服务的节点!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 先声明我关心哪些服务的上下线,不关心的则不需要管理void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_DEBUG("{}-{}服务上线了,但是当前并不关心!", service_name, host);return;}// 先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services[service_name] = service;}elseservice = sit->second;}if (!service){LOG_ERROR("新增{}服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从相应的服务节点删除信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_DEBUG("{}-{}服务下线了,但是当前并不关心!", service_name, host);return;}// 先获取管理对象,有则删除节点信道auto sit = _services.find(service_name);if (sit == _services.end()){LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}elseservice = sit->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}private:std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;elsereturn service_instance.substr(0, pos);}private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;std::unordered_map<std::string, ServiceChannel::ptr> _services;
};

相关文章:

  • 【推荐】智慧矿山矿业信息化智能化资料汇总-共25份
  • 算法刷题篇
  • 前端- ElementPlus入门
  • QT6 源(79):阅读与注释表盘 QDial 的源码,其是基类QAbstractSlider 的子类,及其刻度线的属性举例
  • 代码随想录算法训练营day9:字符串part02
  • 解密下一代AI:大模型技术的突破与挑战
  • 数据库的范围查询
  • brep2seq kaggle安装 micromamba conda环境
  • 进程间通信——管道
  • 计算机体系结构 第九章 (附带移数网络直径证明和取值情况)
  • 超详细BAM/SAM详解:文件格式与具体参数
  • Milvus(14):更改 Collections 字段、Schema 设计实践
  • Leetcode刷题记录33——二叉树的最小深度
  • 谈判模拟器提示词设计 - Gemini 2.5 优化版
  • Agent2Agent(谷歌A2A)协议原理讲解
  • CatBoost算法原理及Python实现
  • 牛客 Wall Builder II 题解
  • DeepSeek-提示词工程
  • 形式化数学——Lean的介绍与安装
  • Python异步编程入门:从同步到异步的思维转变
  • 抗战回望19︱《中国工程师学会四川考察团报告》:“将来重工业所在,以四川为最适宜之地点”
  • 体坛联播|米兰逆转热那亚豪取3连胜,阿诺德官宣离开利物浦
  • 100%关税!特朗普要让美国电影100%美国制造
  • 上海成五一国内最热门的入境游目的地,国际消费明显提升
  • 当一群杜克土木工程毕业生在三四十年后怀念大学的历史课……
  • AI世界的年轻人|研究不止于实验室,更服务于遥远山区