【brpc】安装与使用
brpc安装与使用
- 1. brpc是什么
- 2. 安装
- 3. 类与接口介绍
- 3.1 日志输出类与接口
- 3.2 protobuf 类与接口
- 3.3 服务端类与接口
- 3.4 客户端类与接口
- 4. 使用
- 4.1 同步调用
- 4.2 异步调用
1. brpc是什么
brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统。
brpc具体来说它是一个RPC框架(RPC是一个远程调用的框架)。当客户端想完成某个功能,但是本质来说这个功能并不在客户端完成,而是将这个算力交给服务器来进行。通过网络通信的方式将所需要的参数/数据/输入参数发送给服务器,服务器在针对参数/数据进行处理,然后给客户端返回一个结果。
对客户端来说它的上层并不关心内部网络通信的过程,感觉就是调用了一个接口,就完成了数据的处理拿到了结果。其实就是将数据处理过程交给服务器完成。
就比如下面,服务器内部实现了一个具体的Add功能,而本地也有一个Add接口,但是这个接口内部实现的并不是具体的处理过程,而是在内部通过网络通信向服务器发布一个请求,将参数num1、num2发送给服务器,服务器在调用它内部实现的Add方法去完成具体的处理,处理之后在将结果返回给本地,本地拿到结果,然后它的Add接口在返回最终结果。
可以使用brpc:
- 搭建能在一个端口支持多协议的服务, 或访问各种服务
-
- restful http/https, h2/gRPC。使用 brpc 的 http 实现比 libcurl 方便多了。从其他语言通过 HTTP/h2+json 访问基于 protobuf 的协议.
-
- redis 和 memcached, 线程安全,比官方 client 更方便。
-
- rtmp/flv/hls, 可用于搭建流媒体服务.
-
- 支持 thrift , 线程安全,比官方 client 更方便
-
- 各种百度内使用的协议: baidu_std, streaming_rpc, hulu_pbrpc, sofa_pbrpc, nova_pbrpc, public_pbrpc, ubrpc 和使用 nshead 的各种协议.
-
- 基于工业级的 RAFT 算法实现搭建高可用分布式系统,已在 braft 开源。
- Server 能同步或异步处理请求。
- Client 支持同步、异步、半同步,或使用组合 channels 简化复杂的分库或并发访问。
- 通过 http 界面调试服务, 使用 cpu, heap, contention profilers.
- 获得更好的延时和吞吐.
- 把你组织中使用的协议快速地加入 brpc,或定制各类组件, 包括命名服务 (dns, zk, etcd), 负载均衡 (rr, random, consistent hashing)
2. 安装
先安装依赖
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装 brpc
git clone https://github.com/apache/brpc.git
cd brpc
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
make
sudo make install
3. 类与接口介绍
3.1 日志输出类与接口
包含头文件: #include <butil/logging.h>
日志输出这里,本质上我们其实用不着 brpc 的日志输出,因此在这里主要介绍如何关闭日志输出。
日志初始化接口InitLogging,里面参数是LoggingSettings类里面包含一个 LoggingDestination(日志要输出到那里的设定),我们给它设置为 LOG_TO_NONE,也就是不进行日志输出。
namespace logging
{enum LoggingDestination{LOG_TO_NONE = 0};struct BUTIL_EXPORT LoggingSettings{LoggingSettings();LoggingDestination logging_dest;};bool InitLogging(const LoggingSettings &settings);
}
3.2 protobuf 类与接口
protobuf最主要就是替我们进行某种结构的序列化以及反序列化,除此之外,它还可以生成rpc通信接口来供我们进行rpc调用。就是替我们生成一系列接口,我们只需要创建子类去继承基类,然后重写基类内部虚函数,然后就可以实现client和server远程调用了。用起来还是比较方便的。
关于下面所有接口介绍,我们编写一个简单的proto文件,通过这个简单的proto文件进行一个框架代码生成后,在对这些结构进行介绍。
下面我们以 Echo(输出 hello world)方法为例, 来讲解基础的同步 RPC 请求是如何实现的
创建 proto 文件 - main.proto
//声明proto版本
syntax="proto3";//声明命名空间,protobuf会针对proto编译,使用C++生成一系列框架,框架里面包含很多类,为了避免这些类与其他冲突,所以让我们设置一个命名空间
package example;//生成相关rpc接口
option cc_generic_services = true;//定义 Echo 方法请求对象,请求对象内部有那些字段
message EchoRequest {string message = 1;//protobuf会针对我们所写的内容进行编译,会将各个字段名称和消息都记录起来,因此需要我们给每个字段一个编号
};//定义 Echo 方法响应对象,响应对象内部有那些字段
message EchoResponse {string message = 1;
};//定义 RPC 远端调用接口,(EchoService是接口基类名称)
service EchoService {//到时候会生成一个远程的Echo调用接口,请求的时候需要传EchoRequest进行请求,最终会返回一个EchoResponserpc Echo(EchoRequest) returns (EchoResponse);
};
执行下面指令,就会生成两个文件,mian.pb.h 是头文件,main.pb.cc是实现
protoc --cpp_out=./ main.proto
这是刚才的命名空间,里面有EchoRequset类继承protobuf中的Message
这里我们只有message一个字段,有获取message和设置message,这里都给我们生成了。
当前这里还有生成的EchoResponse类,里面也有个message字段
还有EchoService继承protobuf中的Service类,这个就是远程服务器使用的一个类,这里面包含了我们设置的Echo接口,不过它是一个虚函数。到时候我们编写的时候创建一个子类继承EchoService去实现内部的Echo虚函数,针对requset请求做出什么处理放到response里。最终发送给客户端。
EchoService_Stub是客户端使用的一个类,内部也有一个Echo接口让用户去使用,用户自己定义一个request请求,最终得到一个response响应。
不管是服务端还是客户端里面都会涉及到protobuf类的一些接口
protobuf相关类:
RpcController:上下文管理类–目前主要用于判断rpc请求是否是OK的
Closure:客户端-主要用于设置异步处理回调;服务端:通过Run()接口宣告请求处理完毕
namespace google
{namespace protobuf{class PROTOBUF_EXPORT Closure{public:Closure() {}virtual ~Closure();virtual void Run() = 0;};inline Closure *NewCallback(void (*function)());class PROTOBUF_EXPORT RpcController{bool Failed();std::string ErrorText();}}
}
3.3 服务端类与接口
这里只介绍主要用到的成员与接口。
服务端相关类:
Server类:服务器类;
ServerOptions:服务器参数配置类
ClosureGuard:Closure对象的智能管理类
namespace brpc
{//服务器参数设置struct ServerOptions{// 无数据传输,则指定时间后关闭连接int idle_timeout_sec; // Default: -1 (disabled)int num_threads; // Default: #cpu-cores//....} enum ServiceOwnership {// 添加服务失败时,服务器将负责删除服务对象SERVER_OWNS_SERVICE,// 添加服务失败时,服务器也不会删除服务对象SERVER_DOESNT_OWN_SERVICE};class Server{//添加什么服务,才会处理什么请求int AddService(google::protobuf::Service *service,ServiceOwnership ownership);//启动服务器int Start(int port, const ServerOptions *opt);int Stop(int closewait_ms /*not used anymore*/);int Join();// 休眠直到 ctrl+c 按下,或者 stop 和 join 服务器void RunUntilAskedToQuit();} class ClosureGuard{explicit ClosureGuard(google::protobuf::Closure *done);~ClosureGuard(){if (_done)_done->Run();}} class HttpHeader{void set_content_type(const std::string &type)const std::string *GetHeader(const std::string &key) void SetHeader(const std::string &key,const std::string &value);const URI &uri() const { return _uri; }HttpMethod method() const { return _method; }void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);} class Controller : public google::protobuf::RpcController{void set_timeout_ms(int64_t timeout_ms);void set_max_retry(int max_retry);google::protobuf::Message *response();HttpHeader &http_response();HttpHeader &http_request();//判断请求是否成功bool Failed();//获取错误信息std::string ErrorText();//可以设置对应回调函数,当响应被发送完毕进行调用using AfterRpcRespFnType = std::function<void(Controller *cntl,const google::protobuf::Message *req,const google::protobuf::Message *res)>;void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn)}
}
3.4 客户端类与接口
客户端相关类:
Channel:客户端与服务器网络通信信道类;
ChannelOptions:信道配置类
namespace brpc
{struct ChannelOptions{// 请求连接超时时间int32_t connect_timeout_ms; // Default: 200 (milliseconds)// rpc 请求超时时间int32_t timeout_ms; // Default: 500 (milliseconds)// 最大重试次数int max_retry; // Default: 3// 序列化协议类型 options.protocol = "baidu_std";AdaptiveProtocolType protocol;//....} //以信道方式与服务器进行网络通信class Channel : public ChannelBase{// 初始化接口,成功返回 0;int Init(const char *server_addr_and_port,const ChannelOptions *options);}
}
4. 使用
Rpc调用实现样例:
服务端:
- 创建rpc服务子类继承pb中的EchoService服务类,并实现内部的业务接口逻辑
- 创建rpc服务器类,搭建服务器
- 向服务器类中添加rpc子服务对象–告诉服务器收到什么请求用哪个接口处理
- 启动服务器
客户端:
- 创建网络通信信道
- 实例化pb中的EchoService_Stub类对象
- 发起rpc请求,获取响应进行处理
4.1 同步调用
同步调用是指客户端会阻塞收到 server 端的响应或发生错误。
服务器:
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"//1. 继承与EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServerImpl : public example::EchoService
{public:EchoServerImpl(){}virtual ~EchoServerImpl(){}// controller:包含除了 request 和 response 之外的参数集合// request: 请求,只读的,来自 client 端的数据包// response: 回复。需要用户填充,如果存在 required 字段没有被设置,该次调用会失败。/* done: done 由框架创建,递给服务回调,包含了调用服务回调后的后续动作,包括检查 response 正确性,序列化,打包,发送等逻辑。不管成功失败,done->Run()必须在请求处理完成后被用户调用一次*/void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* don){//将don管理起来,析构的时候自动调用don->run();brpc::ClosureGuard rpc_guard(don);std::cout<<"收到消息: "<<request->message()<<std::endl;std::string str = request->message() + "--这是响应";response->set_message(str);}
};int main(int argc,char* argv[])
{//关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);//2. 构造服务器对象brpc::Server server;//3. 向服务器对象新增EchoService服务EchoServerImpl echo_service;if(server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE) == -1){std::cout << "添加Rpc服务失败!\n";return -1;}//4. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1;//连接空闲超时时间-超时后连接被关闭options.num_threads = 1;// IO线程数量if(server.Start(8080,&options) == -1){std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit();//修改等待运行结束
}
客户端:
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"int main(int argc,char* argv[])
{//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std";//序列化协议,默认使用baidu_stdbrpc::Channel channel;if(channel.Init("127.0.0.1:8080",&options) == -1){std::cout << "初始化信道失败!\n";return -1;}//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);//3. 进行Rpc调用,发起Rpc请求,获取响应进行处理example::EchoRequest req;req.set_message("你好,吃饭了吗");brpc::Controller cntl;example::EchoResponse rsp;stub.Echo(&cntl,&req,&rsp,nullptr);//同步调用,阻塞等待响应if(cntl->Failed()){std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;return -1;}std::cout<<"收到响应: "<<rsp->message()<<std::endl;delete cntl;delete rsp;return 0;
}
all : server client
server : server.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client : client.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
4.2 异步调用
异步调用是指客户端注册一个响应处理回调函数, 当调用一个 RPC 接口时立即返回,不会阻塞等待响应, 当 server 端返回响应时会调用传入的回调函数处理响应。
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"void callback(brpc::Controller* cntl, ::example::EchoResponse* response)
{//使用智能指针管理不然调用失败,两个指针指向对象没释放造成内存泄漏std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> resp_guard(response);if (cntl->Failed()) {std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << response->message() << std::endl;
}int main(int argc,char* argv[])
{//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std";//序列化协议,默认使用baidu_stdbrpc::Channel channel;if(channel.Init("127.0.0.1:8080",&options) == -1){std::cout << "初始化信道失败!\n";return -1;}//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);//3. 进行Rpc调用,发起Rpc请求,获取响应进行处理example::EchoRequest req;req.set_message("你好,吃饭了吗");//异步调用,并不会阻塞在Echo,如果下面两个用的是局部变量就有可能出了作用域就销毁了//但是异步调用接口还要用,因此在堆上申请brpc::Controller* cntl = new brpc::Controller;example::EchoResponse* rsp = new example::EchoResponse;// stub.Echo(cntl,&req,rsp,nullptr);//同步调用,阻塞等待响应// if(cntl->Failed())// {// std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;// return -1;// }// std::cout<<"收到响应: "<<rsp->message()<<std::endl;// delete cntl;// delete rsp;auto clusure = google::protobuf::NewCallback(callback,cntl,rsp);stub.Echo(cntl, &req, rsp, clusure); //异步调用std::cout << "异步调用结束!\n";std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}