当前位置: 首页 > news >正文

BRPC基础使用

BRPC(Baidu RPC)是百度开源的高性能 RPC 框架,凭借多协议支持(HTTP、Redis、MySQL、H2 等)、低延迟高并发特性,成为分布式系统开发的优选方案,广泛应用于百度内部及外部企业的核心业务场景。

一、BRPC 核心概念梳理

在开始实践前,需先理解 BRPC 中的 5 个核心组件,它们构成了框架的基础架构:

核心组件作用说明关键特性
Server(服务端)注册 RPC 服务、监听端口、处理客户端请求支持多服务注册、自定义线程数、空闲连接超时控制
Client(客户端)向服务端发起 RPC 调用,支持同步 / 异步模式自动重试、超时控制、多协议适配
Service(服务类)定义 RPC 接口的抽象类,需继承 google::protobuf::Service与 Protobuf 强绑定,通过 .proto 文件自动生成接口代码
Method(方法)Service 中定义的具体 RPC 函数,对应客户端调用的接口输入(Request)/ 输出(Response)均为 Protobuf 消息
Channel(信道)客户端与服务端的连接载体,封装网络通信细节支持负载均衡、连接池管理、协议协商

二、环境搭建:依赖安装与 BRPC 编译

BRPC 依赖 Protobuf、LevelDB 等库,需先完成依赖安装,再编译框架源码。以下操作基于 Ubuntu 20.04 LTS 系统,其他 Linux 发行版可参考调整。

2.1 安装基础依赖

通过 apt-get 安装编译所需的工具链和依赖库:

bash

# 安装 Git、编译器、构建工具
sudo apt-get install -y git g++ make
# 安装 SSL(加密)、Protobuf(序列化)、LevelDB(本地存储)依赖
sudo apt-get install -y libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

2.2 编译并安装 BRPC

BRPC 源码托管于 Apache 仓库,通过 git clone 拉取后,使用 CMake 构建:

bash

# 1. 克隆源码(Apache 官方仓库)
git clone https://github.com/apache/brpc.git
cd brpc/# 2. 创建构建目录(避免污染源码)
mkdir build && cd build# 3. 配置 CMake(指定安装路径为 /usr,便于全局引用)
cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j$(nproc)
# 注:-j$(nproc) 表示使用所有 CPU 核心编译,加速构建# 4. 安装到系统(需 sudo 权限)
sudo make install

2.3 验证安装

安装完成后,可通过以下命令验证 BRPC 是否成功部署:

bash

# 检查头文件是否存在
ls /usr/include/brpc/server.h
# 检查库文件是否存在
ls /usr/lib/libbrpc.so

若文件存在,说明安装成功。

三、BRPC 核心接口详解

BRPC 提供了丰富的类和接口用于服务端配置、客户端通信、HTTP 头管理等,以下是高频使用的核心接口解析。

3.1 服务端相关接口

(1)ServerOptions:服务端配置结构体

用于定义服务端的运行参数,控制连接、线程等核心行为:

cpp

namespace brpc {
struct ServerOptions {int idle_timeout_sec;  // 空闲连接超时时间(秒),-1 表示禁用(默认)int num_threads;       // 处理请求的线程数,默认等于 CPU 核心数// 其他隐含参数:max_concurrency(最大并发数)、protocol(默认协议)等
};
}
(2)ServiceOwnership:服务所有权枚举

控制服务对象的生命周期,避免内存泄漏或重复释放:

cpp

namespace brpc {
enum ServiceOwnership {SERVER_OWNS_SERVICE,        // 服务端拥有所有权:AddService 失败时自动删除服务对象SERVER_DOESNT_OWN_SERVICE   // 服务端不拥有所有权:服务对象由用户自行管理(推荐)
};
}

注意:若服务对象是栈变量(如 EchoServiceImpl echo_service),必须使用 SERVER_DOESNT_OWN_SERVICE,否则服务端会尝试删除栈内存,导致崩溃。

(3)Server:服务端核心类

服务端的 “总控制器”,负责注册服务、启动 / 停止服务:

cpp

namespace brpc {
class Server {
public:// 注册服务:返回 0 表示成功,非 0 表示失败int AddService(google::protobuf::Service* service, ServiceOwnership ownership);// 启动服务:监听指定端口,传入配置(nullptr 表示使用默认配置)int Start(int port, const ServerOptions* opt);// 停止服务:不再接收新请求,参数 closewait_ms 已废弃(传 0 即可)int Stop(int closewait_ms);// 阻塞等待服务完全停止(需在 Stop 后调用)int Join();// 阻塞直到接收 Ctrl+C 信号,自动调用 Stop 和 Join(简化服务退出逻辑)void RunUntilAskedToQuit();
};
}
(4)ClosureGuard:异步回调安全工具

RAII 风格的工具类,确保异步回调(google::protobuf::Closure)在作用域结束时必执行,避免回调丢失:

cpp

namespace brpc {
class ClosureGuard {
public:// 构造:接收回调对象explicit ClosureGuard(google::protobuf::Closure* done);// 析构:自动执行回调(若 done 非空)~ClosureGuard() { if (_done) _done->Run(); }
};
}

3.2 HTTP 头管理:HttpHeader 类

用于封装 HTTP 请求 / 响应的头信息,支持设置 Content-Type、状态码等:

cpp

namespace brpc {
class HttpHeader {
public:// 设置 Content-Type(如 "application/json"、"text/plain")void set_content_type(const std::string& type);// 获取/设置 HTTP 头字段(如 "Authorization"、"Host")const std::string* GetHeader(const std::string& key);void SetHeader(const std::string& key, const std::string& value);// 获取请求 URI(包含路径、查询参数)const URI& uri() const { return _uri; }// 获取/设置 HTTP 请求方法(GET、POST 等,HttpMethod 为枚举)HttpMethod method() const { return _method; }void set_method(const HttpMethod method);// 获取/设置 HTTP 响应状态码(200、404、500 等)int status_code();void set_status_code(int status_code);
};
}

3.3 调用控制:Controller 类

继承自 google::protobuf::RpcController,管理 RPC 调用的超时、重试、错误信息:

cpp

namespace brpc {
class Controller : public google::protobuf::RpcController {
public:// 设置 RPC 调用超时时间(毫秒)void set_timeout_ms(int64_t timeout_ms);// 设置最大重试次数(失败时自动重试)void set_max_retry(int max_retry);// 获取响应消息(Protobuf 对象)google::protobuf::Message* response();// 获取 HTTP 请求/响应头(用于自定义头信息)HttpHeader& http_response();HttpHeader& http_request();// 判断调用是否失败(true 表示失败)bool Failed();// 获取失败详情(如超时、连接失败)std::string ErrorText();// 定义响应后的回调函数类型(支持 lambda)using AfterRpcRespFnType = std::function<void(Controller* cntl, const google::protobuf::Message* req, const google::protobuf::Message* res)>;// 设置回调函数(调用完成后执行)void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn);
};
}

3.4 客户端相关接口

(1)ChannelOptions:客户端信道配置

控制客户端与服务端的连接行为:

cpp

namespace brpc {
struct ChannelOptions {int32_t connect_timeout_ms;  // 连接超时时间(毫秒),-1 表示无限等待int32_t timeout_ms;          // RPC 调用超时时间(毫秒),-1 表示无限等待int max_retry;               // 最大重试次数,默认 0(不重试)AdaptiveProtocolType protocol; // 序列化协议,默认 "baidu_std"(BRPC 私有协议)
};
}
(2)Channel:客户端信道类

客户端与服务端的连接载体,封装网络细节:

cpp

namespace brpc {
class Channel : public ChannelBase {
public:// 初始化信道:连接指定服务端(格式 "ip:port"),返回 0 表示成功int Init(const char* server_addr_and_port, const ChannelOptions* options);
};
}

四、BRPC 实践:服务端与客户端实现

BRPC 基于 Protobuf 定义服务接口,需先编写 .proto 文件生成代码,再实现服务端和客户端。以下以 “回声服务(Echo)” 为例,演示完整流程。

4.1 步骤 1:定义 Protobuf 服务接口

创建 echo.proto 文件,定义请求、响应消息和服务接口:

protobuf

syntax = "proto3";  // 使用 Protobuf 3 语法
package example;     // 生成代码的命名空间// 启用 C++ 服务代码生成(必须开启)
option cc_generic_services = true;// 请求消息:包含一个字符串字段
message EchoRequest {string message = 1;  // 字段编号(不可重复)
}// 响应消息:与请求格式一致(回声服务原样返回)
message EchoResponse {string message = 1;
}// 服务定义:包含一个 Echo 方法
service EchoService {// RPC 方法:输入 EchoRequest,输出 EchoResponserpc Echo(EchoRequest) returns (EchoResponse);
}

4.2 步骤 2:生成 C++ 代码

使用 protoc 编译器将 .proto 文件生成 C++ 头文件和源文件:

bash

protoc --cpp_out=. echo.proto

执行后会生成两个文件:

  • echo.pb.h:包含服务类、消息类的声明;
  • echo.pb.cc:包含上述类的实现。

4.3 步骤 3:实现服务端

服务端需继承 Protobuf 生成的 EchoService 类,实现 Echo 方法,并启动服务。

服务端代码(server.cpp

cpp

#include "echo.pb.h"       // 生成的 Protobuf 头文件
#include <brpc/server.h>   // BRPC 服务端头文件
#include <butil/logging.h> // BRPC 日志头文件(可选)
#include <iostream>// 实现 EchoService 服务
class EchoServiceImpl : public example::EchoService {
public:// 重写 Echo 方法:处理客户端请求void Echo(google::protobuf::RpcController* controller,  // 调用控制器const example::EchoRequest* request,          // 客户端请求example::EchoResponse* response,              // 服务端响应google::protobuf::Closure* done              // 回调(通知框架完成)) override {// 1. 检查调用是否失败(如超时)brpc::Controller* cntl = dynamic_cast<brpc::Controller*>(controller);if (cntl->Failed()) {std::cerr << "调用失败:" << cntl->ErrorText() << std::endl;done->Run(); // 必须调用 done,否则框架会阻塞return;}// 2. 处理业务逻辑:回声服务原样返回请求消息std::cout << "收到客户端请求:" << request->message() << std::endl;response->set_message(request->message()); // 设置响应内容// 3. (可选)自定义 HTTP 响应头cntl->http_response().set_content_type("text/plain; charset=utf-8");cntl->http_response().SetHeader("Server", "BRPC-Echo-Server");// 4. 通知框架调用完成(必须调用)done->Run();}
};int main() {// (可选)关闭 BRPC 内置日志(避免干扰)logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 1. 创建服务对象(栈变量,需用 SERVER_DOESNT_OWN_SERVICE)EchoServiceImpl echo_service;// 2. 创建服务端实例brpc::Server server;// 3. 注册服务:所有权为 SERVER_DOESNT_OWN_SERVICEint ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret != 0) {std::cerr << "注册服务失败!" << std::endl;return -1;}// 4. 配置服务端参数brpc::ServerOptions options;options.idle_timeout_sec = -1;  // 禁用空闲连接超时options.num_threads = 4;       // 4 个线程处理请求(根据 CPU 调整)// 5. 启动服务:监听 8080 端口ret = server.Start(8080, &options);if (ret != 0) {std::cerr << "启动服务失败!" << std::endl;return -1;}// 6. 阻塞等待 Ctrl+C 信号,优雅退出std::cout << "服务端已启动,监听端口 8080,按 Ctrl+C 退出..." << std::endl;server.RunUntilAskedToQuit();return 0;
}

4.4 步骤 4:实现客户端

客户端通过 Channel 连接服务端,使用 Protobuf 生成的 EchoService_Stub 发起调用(支持同步 / 异步)。

客户端代码(client.cpp
#include "echo.pb.h"       // 生成的 Protobuf 头文件
#include <brpc/channel.h>  // BRPC 客户端信道头文件
#include <butil/logging.h> // BRPC 日志头文件(可选)
#include <iostream>// 异步调用回调函数(可选,同步调用无需此函数)
void AsyncEchoCallback(brpc::Controller* cntl,       // 调用控制器example::EchoResponse* rsp    // 响应消息
) {// 处理异步响应if (cntl->Failed()) {std::cerr << "异步调用失败:" << cntl->ErrorText() << std::endl;} else {std::cout << "异步收到响应:" << rsp->message() << std::endl;}// 释放动态分配的内存delete cntl;delete rsp;
}int main() {// (可选)关闭 BRPC 内置日志logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 1. 配置客户端信道参数brpc::ChannelOptions options;options.connect_timeout_ms = 1000;  // 连接超时 1 秒options.timeout_ms = 1000;          // 调用超时 1 秒options.max_retry = 2;              // 失败重试 2 次(共 3 次调用)options.protocol = "baidu_std";     // 使用 BRPC 私有协议// 2. 创建信道实例brpc::Channel channel;// 3. 初始化信道:连接服务端(127.0.0.1:8080)int ret = channel.Init("127.0.0.1:8080", &options);if (ret != 0) {std::cerr << "初始化信道失败!" << std::endl;return -1;}// 4. 创建 Stub(用于发起调用,绑定信道)example::EchoService_Stub stub(&channel);// --------------------------// 方式 1:同步调用(简单场景推荐)// --------------------------example::EchoRequest sync_req;example::EchoResponse sync_r

五、封装

rpc 调用这里的封装,因为不同的服务调用使用的是不同的Stub,这个封装起来的意 义不大,因此我们只需要封装通信所需的Channel管理即可,这样当需要进行什么样 的服务调用的时候,只需要通过服务名称获取对应的channel,然后实例化Stub进行 调用即可。

ServiceChannel 类

一个 ServiceChannel 类要管理多个信道

#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <mutex>
#include "logger.hpp"// 封装单个服务的信道管理类:class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &name) : _service_name(name), _index(0){}// 服务新上线了一个节点,调用append新增信道void append(const std::string host){auto channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待options.timeout_ms = -1;         // rpc请求等待超时时间options.max_retry = 3;           // 请求重试次数options.protocol = "baidu_std";  // 序列化协议,默认使用baiduLOG_INFO("设置节点{}",host);int ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);//添加信道_channels.push_back(channel);//添加服务器地址和信道的关系_hosts.insert(std::make_pair(host, channel));}// 服务下线了一个节点,调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it == _hosts.end()){LOG_INFO("没有找到{}-{}", _service_name, host);return;}for (auto vit = _channels.begin(); vit != _channels.end(); vit++){if (*vit == it->second){_channels.erase(vit);break;}}_hosts.erase(it);}// 通过RR轮转策略,获取一个Channel用于对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index; // 当前轮转下标计数器std::string _service_name; // 服务名称std::vector<ChannelPtr> _channels; // 当前服务对应的信道集合std::unordered_map<std::string, ChannelPtr> _hosts; // hosts和CHannel对应关系
};

ServiceManager 类
class ServiceManager
{public:using ptr = std::shared_ptr<ServiceManager>;// 获取服务的节点信道ServiceChannel::ChannelPtr choose(const std::string &service_name){LOG_INFO("获取服务节点的名称是{}",service_name);std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("没有找到能提供{}服务的节点", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 先声明,我关注哪些服务的上线和下线,不关心的就不需要管理void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_service.insert(service_name);}// 服务上线时调用的接口,将节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name=getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);// 现获取管理对象,如果有就加入,没有就创建auto it = _follow_service.find(service_name);if (it == _follow_service.end()){LOG_INFO("{}-{}节点上线,但不关心", service_name,host);return;}auto sit = _services.find(service_name);if (sit == _services.end()){service = std::make_shared<ServiceChannel>(service_name);LOG_INFO("新增服务节点:{}", service_name);_services.insert(std::make_pair(service_name, service));}else{LOG_INFO("服务节点已经在线:{}-{}",service_name,host);service = sit->second;}}if (!service){LOG_ERROR("新增{}数据管理失败",service_name);return;}//添加service->append(host);LOG_DEBUG("{}-{}服务上线,进行添加管理",service_name,host);}// 服务下线的调用接口,从服务信道管理中,删除指定的节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name=getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto it = _follow_service.find(service_name);if (it == _follow_service.end()){LOG_INFO("{}-{} 节点下线,但不关心", service_name,host);return;}auto sit = _services.find(service_name);if (sit == _services.end()){LOG_WARN("删除{} 服务节点时,没有找到管理对象",service_name);return;}else{LOG_INFO("删除{}服务节点",service_name);service = sit->second;}}//移除service->remove(host);LOG_DEBUG("服务下线{}-{},进行删除管理",service_name,host);}private:std::string getServiceName(std::string name){int pos = name.find_last_of('/');if(pos==std::string::npos){return name;}return name.substr(0, pos);}private:std::mutex _mutex;std::unordered_set<std::string> _follow_service;//服务名称和服务信道的对应std::unordered_map<std::string, ServiceChannel::ptr> _services;
};

注意:

std::string getServiceName(std::string name)这个函数是和 etcd 有关的一个函数,后面的 etc 和 brpc 联调会进行讲解

http://www.dtcms.com/a/487782.html

相关文章:

  • 如何用网站模板建设网站南京模板建网站哪家好
  • 称多县公司网站建设网上做家教那个网站好
  • 做家装模型的效果图网站宁德市住房和城乡建设局网站
  • Burp Suite抓包软件使用说明1-Http history
  • 买了两台服务器可以做网站吗不起眼的暴利小生意
  • glibc升级到指定版本
  • 做一个智能体搭建复盘吧
  • 销售网站建设的意义企业网站建设策划书 前言
  • 做房产网站在百度推广推广费前端素材网
  • 家政服务网站建设方案建筑建设网站
  • DirectShow帮助文档
  • No032:休眠的智慧——当DeepSeek学会在静默中更新
  • 注册什么公司给别人做网站成都市房产透明网官网
  • 租车网站 模板提供中山精品网站建设
  • 用于设计和验证自动驾驶系统的场景库
  • 做网站的抬头怎么做wordpress开发ide
  • 数字图像处理绪论
  • UVa 12494 Distinct Substring
  • 【Linux】Linux进程间通信:命名管道(FIFO)的模拟实现重要知识点梳理
  • 做网站时怎么裁切存图最佳建站模板
  • 020网站建设如何保护我做的网站模板
  • Escrcpy 安卓手机投屏软件中文绿色版
  • 大模型实习
  • 如何做网站旅游产品分析网站建设与数据库管理
  • dw不用代码做网站w3school网页制作
  • 网站备案号如何查询密码室内设计需要什么学历
  • Git 用户名与邮箱配置指南
  • Spring 中使用的设计模式
  • SAP MM采购订单审批接口分享
  • 东莞网站制作哪家公司好价格低用英语怎么说