当前位置: 首页 > news >正文

Linux系统C++开发环境搭建工具(三)—— brpc使用指南

文章目录

  • 一、概念简述
  • 二、brpc的安装
  • 三、相关接口的使用
  • 四、测试示例
  • 五、brpc二次封装

一、概念简述

1.什么是RPC?
  RPC 的全称是 Remote Procedure Call,即远程过程调用。简单来说,它是一种技术,允许你像调用本地函数一样,去调用一个位于另一台机器上的服务(函数或方法),而无需关心底层的网络细节。

一个生动的比喻:
想象一下,你想知道某个遥远城市的天气。

  • 没有RPC的情况: 你需要自己查到这个城市气象局的电话号码(确定网络地址),用正确的语言和格式编辑一条短信“查询天气”(组装网络请求报文),然后发送出去。之后,你还要等待回复,并解析回复的短信内容(解析网络响应报文)。
  • 有RPC的情况: 你只需要拿起手机,对着语音助手说:“告诉我XX城市的天气。” 语音助手会自动帮你完成查找号码、发送请求、解析结果等一系列复杂操作,最后直接告诉你“今天晴,25度”。对你而言,你只是“调用”了“查询天气”这个“函数”,并得到了结果。

在技术层面,RPC框架帮你隐藏了以下复杂性:

  • 网络通信:如Socket的建立、连接、数据包的发送和接收。
  • 序列化与反序列化:将函数参数、返回值等内存对象转换成可以在网络中传输的字节流(序列化),以及将接收到的字节流转换回内存对象(反序列化)。
  • 服务发现:如何找到提供服务的远程机器地址。
  • 错误处理:处理网络超时、丢包、服务端宕机等各种异常情况。

也就是有了rpc后我们不用自己去一点点的编写这些可以模板化的操作,提高了开发效率。

2.为什么要有RPC?
RPC的出现是为了解决分布式系统中的通信问题。

  • 解耦与模块化:在大型系统中,不同的业务功能可以被拆分成独立的服务(例如用户服务、订单服务、支付服务)。RPC使得这些服务可以独立开发、部署和扩展,并通过RPC进行通信,实现了系统的高内聚、低耦合。
  • 打破地理与物理限制:服务可以部署在不同的物理机、数据中心甚至不同的地区。RPC使得跨网络的协作变得透明。
  • 复用技术与资源:可以将用不同语言编写的、擅长不同任务的服务整合起来。例如,用C++编写高性能的计算服务,用Java编写复杂的业务逻辑服务,用Python编写数据分析服务,它们之间通过RPC进行通信。
  • 提升开发效率:开发者无需关注底层的网络编程细节,可以更专注于业务逻辑的实现。调用远程服务就像调用本地接口一样简单。

  总结:RPC的核心目标是让分布式系统下的服务间调用变得更简单、更直观,就像在本地一样。本质是一个第三方开发工具,我们可以利用它来开发一个rpc服务器或rpc客户端。

  1. 什么是brpc?
    brpc是百度开源的一款非常优秀的工业级RPC框架。它的名字最初源于 “baidu RPC”,但现在已服务于众多公司,成为了一个通用的高性能网络框架。
  2. brpc的特点
    高性能网络模型:基于epoll(Linux)等IO多路复用技术,实现了高效的异步IO。它内置了多种高性能的线程模型,如bthread(用户态线程/协程),能够在单机轻松支撑数百万并发连接,并极大地减少了线程切换的开销。
    零拷贝:在某些场景下支持零拷贝技术,减少数据在内核态和用户态之间的复制,进一步提升吞吐量。
    协议优化:支持并深度优化了多种协议,其内置的baidu_std协议非常高效。
    多协议支持:brpc不仅仅是一个RPC框架,更是一个通用的网络通信框架。通常只支持一种 “全能选手”,一份代码同时支持RPC、HTTP等多种协议

二、brpc的安装

sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

克隆仓库:

  • git clone https://github.com/apache/brpc.git
  • cd brpc/
  • mkdir build && cd build
  • cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
  • make && sudo make install

三、相关接口的使用

客户端类

  • brpc::Controller:这是一个 “每次RPC调用”的上下文对象。它用于在客户端和服务器端传递额外的控制和信息。每个 RPC 调用都应该使用一个独立的 Controller 对象,它不能在多次调用间复用。
    常用接口:

    • Failed() const: 判断RPC是否失败。
    • ErrorCode() const: 获取错误码。
    • ErrorText() const: 获取错误文本。
  • brpc::Channel:这是 客户端用于与服务器通信的通道。它代表了一个到服务端集群的连接池。创建 Channel 开销较大,应该被长期复用。
    常用接口:

    • Init(const char* server_addr_and_port, ...): 初始化连接到单个服务器。
    • Init(const char* naming_service_url, ...): 初始化通过命名服务(如Nacos, Consul, DNS)发现的服务集群。
  • YourService_Stub 类:由 protoc 编译器根据 .proto 文件自动生成的客户端存根类。它继承自 YourService,内部封装了一个 brpc::Channel,并提供了与服务器端方法一一对应的、更易用的同步/异步调用接口。

服务端类

  • brpc::Server:RPC 服务器的主体,负责注册服务、启动端口监听、管理内部线程等。
    关键接口:
    • AddService(google::protobuf::Service* service, brpc::ServiceOwnership ownership): 将你的服务实现注册到服务器。ownership 参数指定服务器是否接管服务的生命周期。
    • Start(int port, ...): 在指定端口启动服务。
    • RunUntilAskedToQuit(): 阻塞当前线程,直到收到退出信号(如Ctrl+C)。这是最常用的运行方式。
    • Stop(...)Join(): 优雅地停止服务器。

配置类

  • brpc::ServerOptions:在启动 brpc::Server 时进行各种配置。
    常用配置项:

    • idle_timeout_sec: 连接空闲超时时间。
    • max_concurrency: 服务器最大并发度,用于限制同时处理的请求数。
    • internal_port: 内部端口,用于获取服务器内置状态、监控。
  • brpc::ChannelOptions:初始化 brpc::Channel 时的配置选项。
    常用配置项:

    • timeout_ms: 全局超时时间。
    • connection_type: 连接类型(单连接、连接池、短连接)。
    • protocol: 使用的协议(默认为 baidu_std,还有 h2:grpc, http 等)。
    • load_balancing_name: 负载均衡算法名。
  • brpc::ClosureGuard:是一个 RAII(资源获取即初始化)包装器,它确保在作用域结束时,ClosureRun() 方法会被自动调用一次。这是防止内存泄漏和忘记回调的最佳实践。

创建流程

  1. 定义接口:编写 .proto 文件,定义服务和消息。
  2. 实现服务端:
    • 继承并实现由 .proto生成的 Service 类。
    • main 函数中,创建 brpc::Server
    • 调用 server.AddService 注册你的服务实现。
    • 调用 server.Startserver.RunUntilAskedToQuit
  3. 实现客户端:
    • 创建并初始化(Init)一个长期复用的 brpc::Channel
    • 使用生成的 Stub 类,传入 Channel 来创建存根。
    • 为每次调用创建独立的 Controller, Request, Response
    • 使用 Stub 的同步或异步方法进行调用。

四、测试示例

使用protobuf工具解决数据序列化与反序列化问题,和rpc接口。
编写proto文件

syntax="proto3";//声明版本
package example;//声明命名空间
option cc_generic_services = true;
message EchoRequest//定义请求对象
{string message = 1;//变量编号
}
message EchoResponse
{string message = 1;
}
//定义rpc接口
service EchoService
{rpc Echo(EchoRequest) returns (EchoResponse);//远程接口调用
}

编译文件:protoc --cpp_out=./ main.proto

服务器:

#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
class EchoServiceImpl : public example::EchoService
{
public:void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) override final{brpc::ClosureGuard rpc_guard(done); //智能指针管理消息发送std::cout<<"收到消息"<<request->message()<<std::endl;std::string str = request->message() + "---响应";response->set_message(str);}
private:
};
int main(int argc, char *argv[])
{//1.构建brpc服务brpc::Server server;//2.向brpc服务对象中新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if(ret == -1){std::cout<<"服务添加失败"<<std::endl;return -1;}//3.启动服务brpc::ServerOptions options;//服务器配置类options.idle_timeout_sec=-1;//连接空闲超时时间,超时后关闭options.num_threads = 1;//io线程数ret = server.Start(7070,&options);if(ret == -1){std::cout<<"服务启动失败"<<std::endl;return -2;}server.RunUntilAskedToQuit();//等待运行结束return 0;
}

客户端:

#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
void callback(brpc::Controller* cntl,example::EchoResponse* rsp)
{std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> rsp_guard(rsp);if(cntl->Failed()==true){std::cout<<"调用失败"<<std::endl;return;}std::cout<<"收到响应:"<<rsp_guard->message()<<std::endl;return;
}
int main(int argc, char* argv[])
{//1.构建信道连接服务器brpc::ChannelOptions options;//客户端配置类options.connect_timeout_ms = -1;options.timeout_ms = -1;options.max_retry = 3;options.protocol = "baidu_std";brpc::Channel channel;int ret = channel.Init("127.0.0.1:7070",&options);if(ret == -1){std::cout<<"初始化信道失败"<<std::endl;return -1;}example::EchoService_Stub stub(&channel);example::EchoRequest req;req.set_message("hello brpc");brpc::Controller* cntl = new brpc::Controller();example::EchoResponse* rsp = new example::EchoResponse();auto clusure = google::protobuf::NewCallback(callback,cntl,rsp);stub.Echo(cntl,&req,rsp,clusure);std::cout<<"异步调用结束"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}

  在做项目时通常都会用到很多工具,如果这些工具都用各自的日志输出,各式各样就很乱,所以这里关闭brpc的日志系统,统一用一个日志输出。

五、brpc二次封装

  同一个服务的节点可能有多个,服务的种类同样会有多个,为了方便客户端的高效访问,我们把这样服务节点都管理起来,需要利用etcd服务,让所有提供服务的节点都注册到etcd数据库中,key=服务名称,val=服务的节点。

封装:

  1. 指定服务信道的管理类——可以有多个节点提供同一种服务,每个节点都有自己的channel,建立服务与信道的映射关系,并且是一对多,采用RR轮转策略来获取。
  2. 总体服务管理类——将多个服务信道管理对象管理起来。
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "logger.hpp"
// 封装单个服务的信道类管理
class ServiceChannel
{
public:using ChannelPtr = std::shared_ptr<brpc::Channel>;using ptr = std::shared_ptr<ServiceChannel>;ServiceChannel(const std::string &name): _server_name(name), _index(0){}void append(const std::string &host) // 服务上线{std::unique_lock lock(_mutex);ChannelPtr clptr = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1;options.timeout_ms = -1;options.max_retry = 3;options.protocol = "baidu_std";int ret = clptr->Init(host.c_str(), &options);if (ret == -1){LOG_ERR("初始化{}-{}信道失败", host, _server_name);return;}_channels.push_back(clptr);_hosts.insert(make_pair(host, clptr));}void remove(const std::string &host) // 服务下线{std::unique_lock lock(_mutex);auto ret = _hosts.find(host);if (ret == _hosts.end()){LOG_ERR("{}-{}服务不存在", host, _server_name);return;}for (auto it = _channels.begin(); it != _channels.end(); it++){if (*it == _hosts[host]){_channels.erase(it);break;}}_hosts.erase(host);}ChannelPtr choose() // 服务获取{std::unique_lock lock(_mutex);if (_channels.size() == 0){LOG_INFO("当前没有提供 {} 服务的节点", _server_name);return ChannelPtr();}int32_t index = _index;_index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;           // 轮询计数std::string _server_name; // 服务名称std::vector<ChannelPtr> _channels;std::unordered_map<std::string, ChannelPtr> _hosts; // 主机地址与信道关系
};
class ServiceManage
{
public:using ptr = std::shared_ptr<ServiceManage>;ServiceChannel::ChannelPtr choose(const std::string &service_name){std::unique_lock lock(_mutex);if (_services.find(service_name) == _services.end()){LOG_INFO("当前没有{}服务的节点", service_name);return ServiceChannel::ChannelPtr();}return _services[service_name]->choose();}void declared(const std::string &service_name){std::unique_lock lock(_mutex);_follow_services.insert(service_name);}// 服务上线处理void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);std::unique_lock lock(_mutex);if (_follow_services.find(service_name) != _follow_services.end()){LOG_INFO("{} 此服务不关心", service_name);return;}if (_services.find(service_name) == _services.end()){ServiceChannel::ptr sc(std::make_shared<ServiceChannel>(service_name));sc->append(host);_services.insert(make_pair(service_name, sc));}else{ServiceChannel::ptr sc = _services[service_name];sc->append(host);}LOG_INFO("{}-{} 服务节点已上线", service_name, host);}// 服务下线处理void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);std::unique_lock lock(_mutex);if (_services.find(service_name) == _services.end()){LOG_WARN("{} 该服务不存在",host);return;}ServiceChannel::ptr sc = _services[service_name];sc->remove(host);LOG_INFO("{}-{} 服务节点已删除",service_name,host);}std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;return service_instance.substr(0, pos);}
private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;std::unordered_map<std::string, ServiceChannel::ptr> _services;
};

关于etcd的使用:Linux系统C++开发环境搭建工具(二)—— etcd 使用指南
spdlog的使用:Linux系统C++开发环境搭建工具(一)—— gflags/gtest/spdlog 使用指南

etcd.hpp:

#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <etcd/SyncClient.hpp>
#include <memory>
#include <functional>
class Registry
{
public:using ptr=std::shared_ptr<Registry>;Registry(const std::string& host):_client(std::make_shared<etcd::Client>(host)),_keep_alive(_client->leasekeepalive(3).get()),_lease_id(_keep_alive->Lease()){}bool registry(const std::string& key,const std::string& val){auto rsp = _client->put(key,val,_lease_id).get();if(rsp.is_ok()==false){//替换为日志输出更为适宜std::cout<<key<<": "<<val<<"注册失败"<<std::endl;return false;}else return true;}~Registry(){_keep_alive->Cancel();}
private:std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::KeepAlive> _keep_alive;uint64_t _lease_id;
};
class Discovery
{
public:using NotifyCallback = std::function<void(std::string, std::string)>;using ptr = std::shared_ptr<Discovery>;Discovery(const std::string& host,const std::string& basedir,const NotifyCallback& put_cb,const NotifyCallback& del_cb):_client(std::make_shared<etcd::Client>(host)),_put_cb(put_cb),_del_cb(del_cb){auto rsp = _client->ls(basedir).get();if(rsp.is_ok()==false){std::cout<<"获取数据失败 "<<rsp.error_message()<<std::endl;}int sz = rsp.keys().size();for(int i=0;i<sz;i++){if(_put_cb)_put_cb(rsp.key(i),rsp.value(i).as_string());}_watcher = std::make_shared<etcd::Watcher>(*_client,basedir,std::bind(&Discovery::callback,this, std::placeholders::_1),true);}void callback(const etcd::Response& rsp){if(rsp.is_ok()==false){std::cout<<"错误事件通知"<<rsp.error_message()<<std::endl;return;}for(auto const& ev: rsp.events()){if (ev.event_type() == etcd::Event::EventType::PUT) {if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());std::cout << "上线服务:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;}else if (ev.event_type() == etcd::Event::EventType::DELETE_) {if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());std::cout << "下线服务:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;}}}~Discovery(){_watcher->Cancel();}
private:NotifyCallback _put_cb;NotifyCallback _del_cb;std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::Watcher> _watcher;
};

测试:
registry.cc(服务器)

#include "etcd.hpp"
#include <gflags/gflags.h>
#include <thread>
#include <logger.hpp>
#include "main.pb.h"
#include <brpc/server.h>DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/echo/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:7070", "当前实例的外部访问地址");
DEFINE_int32(listen_port,7070,"Rpc服务监听端口");class EchoServiceImpl : public example::EchoService
{
public:void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) override final{brpc::ClosureGuard rpc_guard(done); //智能指针管理消息发送std::cout<<"收到消息"<<request->message()<<std::endl;std::string str = request->message() + "---响应";response->set_message(str);}
private:
};
int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);//关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);//1.构建服务器brpc::Server server;//2.向服务器对象中新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if(ret == -1){std::cout<<"服务添加失败"<<std::endl;return -1;}//3.启动服务brpc::ServerOptions options;//服务器配置类options.idle_timeout_sec=-1;//连接空闲超时时间,超时后关闭options.num_threads = 1;//io线程数ret = server.Start(7070,&options);if(ret == -1){std::cout<<"服务启动失败"<<std::endl;return -2;}Registry::ptr rclient = std::make_shared<Registry>(FLAGS_etcd_host);rclient->registry(FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);LOG_DEBUG("服务启动: {}", FLAGS_base_service + FLAGS_instance_name);server.RunUntilAskedToQuit();//等待运行结束return 0;
}

discovery.cc(客户端)

#include <gflags/gflags.h>
#include <thread>
#include <logger.hpp>
#include <brpc/server.h>
#include "main.pb.h"
#include "etcd.hpp"
#include "channel.hpp"
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(call_service, "/service/echo", "当前实例名称");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 构造rpc管理对象ServiceManage::ptr sm(std::make_shared<ServiceManage>());//sm->declared(FLAGS_call_service);auto put_cb = std::bind(&ServiceManage::onServiceOnline, sm.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManage::onServiceOffline, sm.get(), std::placeholders::_1, std::placeholders::_2);// 构建服务发现对象Discovery::ptr dclient = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);while (1){//Discovery::ptr dclient = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);// 通过rpc管理对象获取信道std::shared_ptr<brpc::Channel> channel = sm->choose(FLAGS_call_service);if (!channel){std::this_thread::sleep_for(std::chrono::seconds(1));continue;}example::EchoService_Stub stub(channel.get());example::EchoRequest req;req.set_message("hello brpc");// 发起调用brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed() == true){delete cntl;delete rsp;LOG_ERR("调用失败");std::this_thread::sleep_for(std::chrono::seconds(1));continue;}std::cout << "收到响应 " << rsp->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

非常感谢您能耐心读完这篇文章。倘若您从中有所收获,还望多多支持呀!在这里插入图片描述

http://www.dtcms.com/a/552719.html

相关文章:

  • 《静态库与动态库:从编译原理到实战调用,一篇文章讲透》
  • 标签绑定货物信息误删处理
  • 划时代的AI Agent qwen的回答和思考
  • Rust中泛型函数实现不同类型数据的比较
  • 19. React的高阶组件
  • 中小企业建站服务外贸建站模板免费
  • 网站域名多少钱一年wordpress 发布 工具
  • 个人备案网站可以做淘宝客吗codex.wordpress.org
  • 做网络推广自己建网站建设局局长权力大吗
  • 外贸网站建设需求无锡做网站设计
  • 泰安网站建设策划方案wordpress 评论模版
  • 论坛网站用的虚拟主机做网站需要先学什么
  • 佛山网站推广seo定制网站开发流程图
  • 慈溪市住房和城乡建设局网站营销型网站建设需要懂什么软件
  • 参考网是正规网站吗平面设计大师
  • 网站策划书wordpress群
  • 查看网站开发平台苏州最新情况最新消息今天
  • 攀枝花网站建设兼职wap网站乱码
  • 开封景区网站建设项目方案婚纱摄影网站建站
  • 购物网站 购物车界面如何做访问网站提示输入用户名密码
  • 丹徒网站建设价格深圳网站建设软件定制公司
  • 有域名一定要买空间做网站安阳做网站的公司
  • 网站程序源码手机能制作网站吗
  • 简述电子政务系统网站建设的基本过程网站建设维护协议书
  • 苏州住房和城乡建设厅网站wordpress 定时显示
  • 机械公司网站模板爱网站
  • 全国精品课程建设网站cms源码下载
  • 邢台网站推广公司百度账户登录
  • 建网站如何上传门户网站建设方法
  • 北京住房投资建设中心网站首网站建设教程推荐