当前位置: 首页 > news >正文

基于脚手架微服务的视频点播系统-脚手架开发部分-brpc中间件介绍与使用及二次封装

基于脚手架微服务的视频点播系统-脚手架开发部分-brpc中间件介绍与使用及二次封装

  • 一.介绍
  • 二.安装
  • 三.使用到的类与接口介绍
    • 3.1日志相关
    • 3.2protobuf相关类与接口
    • 3.3服务端使用到的接口
    • 3.4http相关类与接口
    • 3.5客户端相关接口
    • 3.6定时任务相关
  • 四.使用样例
    • 4.0编写proto文件
    • 4.1同步服务端
    • 4.2同步客户端
    • 4.3异步客户端
    • 4.4异步服务端
    • 4.5http服务端
    • 4.6http客户端
  • 五.二次封装
    • 完整封装如下:
    • 一个简单的使用样例:

一.介绍

RPC(Remote Procedure Call)远程过程调⽤,简单来说就是客⼾端在不知道调⽤细节的情况下,调⽤远程计算机上的某个功能就像调⽤本地功能⼀样,其主要⽬标就是让构建分布式计算(应⽤)更容易,在提供强⼤的远程调⽤能⼒时不损失本地调⽤的语义简洁性。

二.安装

先安装依赖:

dev@dev-host:~/workspace$ sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

再进行brpc的安装:

dev@dev-host:~/workspace$ git clone https://github.com/apache/brpc.git
dev@dev-host:~/workspace$ cd brpc/
dev@dev-host:~/workspace/brpc$ mkdir build && cd build
dev@dev-host:~/workspace/brpc/build$ cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
dev@dev-host:~/workspace/brpc/build$ make && sudo make install

三.使用到的类与接口介绍

3.1日志相关

因为brpc自带有日志输出模块,与websocket一样,无法更换除非改库内源码中所有的⽇志输出操作后重新编译库进⾏安装。我们这里主要是要禁用掉不需要的⽇志输出,避免大量日志输出占视线:

namespace logging {
enum LoggingDestination {LOG_TO_NONE = 0
};
struct BUTIL_EXPORT LoggingSettings {
LoggingSettings();LoggingDestination logging_dest;
};
bool InitLogging(const LoggingSettings& settings);
}

3.2protobuf相关类与接口

namespace google {
namespace protobuf {class PROTOBUF_EXPORT Closure {public:Closure() {}virtual ~Closure();virtual void Run() = 0;};inline Closure* NewCallback(void (*function)());class PROTOBUF_EXPORT RpcController {bool Failed();std::string ErrorText() ;};}
}

3.3服务端使用到的接口

namespace brpc {
struct ServerOptions {//⽆数据传输,则指定时间后关闭连接int idle_timeout_sec; // Default: -1 (disabled)int num_threads; // Default: #cpu-cores
//....
};
enum ServiceOwnership {//添加服务失败时,服务器将负责删除服务对象SERVER_OWNS_SERVICE,//添加服务失败时,服务器也不会删除服务对象SERVER_DOESNT_OWN_SERVICE
};
class Server {int AddService(google::protobuf::Service* service,ServiceOwnership ownership);int Start(int port, const ServerOptions* opt);int Stop(int closewait_ms/*not used anymore*/);int Join();//休眠直到ctrl+c按下,或者stop和join服务器void RunUntilAskedToQuit();
};
class ClosureGuard {explicit ClosureGuard(google::protobuf::Closure* done);~ClosureGuard() { if (_done) _done->Run(); }
};
} 

3.4http相关类与接口

class URI {typedef butil::FlatMap<std::string, std::string> QueryMap;typedef QueryMap::const_iterator QueryIterator;int SetHttpURL(const std::string& url);// Returns 0 on success, -1 otherwisevoid set_path(const std::string& path)void set_host(const std::string& host)void set_port(int port)void SetHostAndPort(const std::string& host_and_optional_port);size_t RemoveQuery(const char* key);// Returns 1 on removed, 0 otherwiseconst std::string& host() const { return _host; }int port()const std::string& path()const std::string& user_info()const std::string& query() const;const std::string* GetQuery(const std::string& key)void SetQuery(const std::string& key, const std::string& value);QueryIterator QueryBegin()QueryIterator QueryEnd()size_t QueryCount()
};enum HttpMethod {HTTP_METHOD_DELETE = 0,HTTP_METHOD_GET = 1,HTTP_METHOD_HEAD = 2,HTTP_METHOD_POST = 3,HTTP_METHOD_PUT = 4,
};const char *HttpMethod2Str(HttpMethod http_method);
bool Str2HttpMethod(const char* method_str, HttpMethod* method);
static const int HTTP_STATUS_OK = 200;
static const int HTTP_STATUS_BAD_REQUEST = 400;
static const int HTTP_STATUS_UNAUTHORIZED = 401;
static const int HTTP_STATUS_FORBIDDEN = 403;
static const int HTTP_STATUS_NOT_FOUND = 404;
static const int HTTP_STATUS_METHOD_NOT_ALLOWED = 405;
static const int HTTP_STATUS_INTERNAL_SERVER_ERROR = 500;class HttpHeader {const std::string& content_type()void set_content_type(const std::string& type)const std::string* GetHeader(const std::string& key)void SetHeader(const std::string& key,const std::string& value);const URI& uri() const { return _uri; }HttpMethod method() const { return _method; }void set_method(const HttpMethod method)int status_code()void set_status_code(int status_code);
};class Controller : public google::protobuf::RpcController {void set_timeout_ms(int64_t timeout_ms);void set_max_retry(int max_retry);void Reset();google::protobuf::Message* response();HttpHeader& http_response();butil::IOBuf& response_attachment();HttpHeader& http_request();butil::IOBuf& request_attachment();bool Failed();std::string ErrorText();using AfterRpcRespFnType = std::function<void(Controller* cntl,const google::protobuf::Message* req,const google::protobuf::Message* res)>;void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn);
};namespace butil {
class IOBuf {//Returns 0 on success, -1 otherwise.int append(const std::string& s);// Convert all data in this buffer to a std::string.std::string to_string() const;void clear();bool empty() const;size_t length() const;
};

3.5客户端相关接口

namespace brpc {
enum ProtocolType : int {PROTOCOL_UNKNOWN = 0,PROTOCOL_BAIDU_STD = 1,PROTOCOL_STREAMING_RPC = 2,PROTOCOL_HULU_PBRPC = 3,PROTOCOL_SOFA_PBRPC = 4,PROTOCOL_RTMP = 5,PROTOCOL_THRIFT = 6,PROTOCOL_HTTP = 7,PROTOCOL_PUBLIC_PBRPC = 8,PROTOCOL_NOVA_PBRPC = 9,PROTOCOL_REDIS = 10,PROTOCOL_NSHEAD_CLIENT = 11,PROTOCOL_NSHEAD = 12,PROTOCOL_HADOOP_RPC = 13,PROTOCOL_HADOOP_SERVER_RPC = 14,PROTOCOL_MONGO = 15,PROTOCOL_UBRPC_COMPACK = 16,PROTOCOL_DIDX_CLIENT = 17,PROTOCOL_MEMCACHE = 18,PROTOCOL_ITP = 19,PROTOCOL_NSHEAD_MCPACK = 20,PROTOCOL_DISP_IDL = 21,PROTOCOL_ERSDA_CLIENT = 22,PROTOCOL_UBRPC_MCPACK2 = 23,PROTOCOL_CDS_AGENT = 24,PROTOCOL_ESP = 25,PROTOCOL_H2 = 26
};
struct ChannelOptions {//请求连接超时时间int32_t connect_timeout_ms;// Default: 200 (milliseconds)//rpc请求超时时间int32_t timeout_ms;// Default: 500 (milliseconds)//最⼤重试次数int max_retry;// Default: 3//序列化协议类型 options.protocol = "baidu_std";AdaptiveProtocolType protocol;//....
};
class Channel : public ChannelBase {//初始化接⼝,成功返回0;int Init(const char* server_addr_and_port, //192.168.xx.xx:9000const ChannelOptions* options);void CallMethod(const google::protobuf::MethodDescriptor* method,google::protobuf::RpcController* controller,const google::protobuf::Message* request,google::protobuf::Message* response,google::protobuf::Closure* done);
};
inline ::google::protobuf::Closure* NewCallback(void (*function)());

3.6定时任务相关

需要包含如下头文件:
bthread/unstable.h , butil/time.h

// Return 0 on success, errno otherwise.
int bthread_timer_add(bthread_timer_t* id,struct timespec abstime,void (*on_timer)(void*),void* arg);
// Returns: 0 - exist & not-run;
// 1 - still running;
// EINVAL - not exist.
int bthread_timer_del(bthread_timer_t id);
namespace butil {timespec microseconds_from_now(int64_t milliseconds);timespec seconds_from_now(int64_t seconds);
}

四.使用样例

4.0编写proto文件

syntax = "proto3";//声明语法版本
package cal;//定义包名
option cc_generic_services = true;//是否启用rpc服务message AddRequest {//定义请求消息int32 a = 1;//第一个参数int32 b = 2;//第二个参数
}message AddResponse {//定义响应消息int32 c = 1;//结果
}//这是一个http请求,不需要任何字段
message HelloRequest {
}//这是一个http响应,不需要任何字段
message HelloResponse {}service Calculator {//定义服务rpc Add(AddRequest) returns (AddResponse);//定义rpc方法rpc Hello(HelloRequest) returns (HelloResponse);//定义另一个rpc方法
}

编译后会生成两个文件cal.pb.cc与cal.pb.h

4.1同步服务端

在这里插入图片描述
根据上图我们可以得到服务端创建的一个大致流程:
0.重写Calculator类中的服务函数,在我们的例子中应该是Add与Hello这两个业务处理函数
1.定义重写类对象
2.定义服务器配置对象ServerOptions(我们这里只把idle_timeout_sec连接超时时间设置为-1表示不超时即可)
3.创建brpc服务器对象
4.使用brpc::Server提供的AddService注册我们上面重写的服务类
5.启动服务器
6.等待服务器退出
案例代码如下:

#include <butil/logging.h>
#include <brpc/server.h>
#include "cal.pb.h"class CalculatorService : public cal::Calculator{
public:CalculatorService(){};~CalculatorService(){};void Add(google::protobuf::RpcController* controller,const cal::AddRequest* request,cal::AddResponse* response,google::protobuf::Closure* done) override{//当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用brpc::ClosureGuard done_guard(done);int result = request->a() + request->b();response->set_c(result);}
};int main() {//1.定义计算服务CalculatorService service;//2.定义服务器配置对象ServerOptions brpc::ServerOptions options;options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时//3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放servicebrpc::Server server;//4.注册服务int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放serviceif(ret != 0){std::cerr << "AddService failed" << std::endl;return -1;}//5.启动服务器if (server.Start(9000, &options)!= 0) {std::cerr << "Start server failed" << std::endl;return -1;}//6.等待服务器退出server.RunUntilAskedToQuit();return 0;
}

记住业务处理函数比如Add的声明周期是哪里调用done->Run()哪里才算结束,如果不调用就会导致业务函数始终不结束业务处理。

4.2同步客户端

客户端的创建流程大致如下:
1.创建服务端与客户端通信的信道channel
2.使用stub对象通过channel发起rpc调用
3.打印结果
示例代码如下:

#include "cal.pb.h"
#include <brpc/channel.h>//简单的同步远程rpc调用示例
int main(){//1.定义并设置channel的配置brpc::ChannelOptions options;options.protocol = "baidu_std"; //使用baidu_std协议//2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路brpc::Channel channel;channel.Init("192.168.30.128:9000", &options);//3.创建stub对象-用于发起rpc调用cal::Calculator_Stub stub(&channel);//4.创建请求对象-用于设置rpc调用参数cal::AddRequest request;request.set_a(10);request.set_b(20);//5.发起rpc调用-同步调用cal::AddResponse response;brpc::Controller controller;stub.Add(&controller, &request, &response, NULL);//NULL表示同步调用//检查调用是否成功if(controller.Failed()){std::cerr << "rpc远程调用失败: "<< controller.ErrorText() << std::endl;return -1;}//6.打印rpc调用的结果std::cout << "a+b=" << response.c() << std::endl;return 0;
}

运行之后客户端会打印出a+b=30的字样,博主这里便不再演示了。

4.3异步客户端

我们上面使用stub.Add发起远端的Add请求时,最后一个参数填NULL表示同步调用,如果想要异步调用,就需要使用brpc为我们提供的NewCallback来创建一个Closure的done对象。然后将收到请求之后的逻辑写到该done的回调函数中:

#include "cal.pb.h"
#include <brpc/channel.h>void callback(brpc::Controller* cntl,cal::AddRequest* request,cal::AddResponse* response) {std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<cal::AddRequest> req_guard(request);std::unique_ptr<cal::AddResponse> res_guard(response);if (cntl_guard->Failed()) {std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl;return;}//打印rpc调用的结果std::cout << "a+b=" << response->c() << std::endl;
}//简单的异步远程rpc调用示例
int main(){//1.定义并设置channel的配置brpc::ChannelOptions options;options.protocol = "baidu_std"; //使用baidu_std协议//2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路brpc::Channel channel;channel.Init("192.168.30.128:9000", &options);//3.创建stub对象-用于发起rpc调用cal::Calculator_Stub stub(&channel);//4.创建请求对象-用于设置rpc调用参数cal::AddRequest* request = new cal::AddRequest();//需要new,否则会有生命周期问题request->set_a(10);request->set_b(20);//5.发起rpc调用-异步调用cal::AddResponse* response = new cal::AddResponse();brpc::Controller* controller = new brpc::Controller();//补充:设置Controller的timeout时间,默认是3秒controller->set_timeout_ms(4000);//6.设置回调函数auto done = brpc::NewCallback(callback, controller, request, response);stub.Add(controller, request, response, done);//设置回调函数表示异步rpc调用std::cout << "rpc调用已发出,继续干其他事情..." << std::endl;//7.等待rpc调用结果-键盘按下回车键退出程序getchar();return 0;
}

需要注意的是,brpc::NewCallback 不允许直接传入一个需要多个参数的 lambda 表达式作为回调。因为brpc::NewCallback 的工作方式类似于 std::bind。你提供一个函数(如你的 callback 函数)和它需要的所有参数(controller, request, response)。NewCallback 会将这些东西“打包”起来。当 RPC 完成后,brpc 框架只会调用这个包的 Run() 方法,然后 Run() 方法内部再用你当初提供的参数去调用你的 callback 函数。
但是实际上我们这样子去调用NewCallback也是不行的,虽然官方给的例子是可行的,但不知道我这里为什么不行:

    auto done = brpc::NewCallback([controller,request,response](){std::unique_ptr<brpc::Controller> cntl_guard(controller);std::unique_ptr<cal::AddRequest> req_guard(request);std::unique_ptr<cal::AddResponse> res_guard(response);if (cntl_guard->Failed()) {std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl;return;}//打印rpc调用的结果std::cout << "a+b=" << response->c() << std::endl;});

所以我们自己构建一个方法来支持这种方式的lamda表达式传入:

    //因为google::protobuf::Closure的回调函数不允许传入多个参数,所以需要定义一个Object类来包装回调函数和参数google::protobuf::Closure* ClosureFactory::create(callback_t &&cb){ClosureFactory::Object::ptr obj = std::make_shared<ClosureFactory::Object>();obj->callback = std::move(cb);return google::protobuf::NewCallback(ClosureFactory::asyncCallback, obj);}//异步回调函数执行void ClosureFactory::asyncCallback(const ClosureFactory::Object::ptr obj){obj->callback();}

这其实是我们后面封装的方法,它放在命名空间limerpc中,这时我们这样子写就可以传入上面的一个lamda表达式了:

    //设置回调函数auto done = limerpc::ClosureFactory::create([controller,request,response](){std::unique_ptr<brpc::Controller> cntl_guard(controller);std::unique_ptr<cal::AddRequest> req_guard(request);std::unique_ptr<cal::AddResponse> res_guard(response);if (cntl_guard->Failed()) {std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl;return;}//打印rpc调用的结果std::cout << "a+b=" << response->c() << std::endl;});

4.4异步服务端

这其实就很简单了,我们上面说过,业务函数的声明周期是由done->Run()决定的,所以我们可以将原来的业务处理放到一个线程中,在线程处理完业务时调用done->Run()即可完成异步处理的逻辑:

#include <butil/logging.h>
#include <brpc/server.h>
#include <thread>
#include "cal.pb.h"//异步简易rpc服务端
class CalculatorService : public cal::Calculator{
public:CalculatorService(){};~CalculatorService(){};void Add(google::protobuf::RpcController* controller,const cal::AddRequest* request,cal::AddResponse* response,google::protobuf::Closure* done) override{//使用多线程进行异步处理std::thread thr([=](){//当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用brpc::ClosureGuard done_guard(done);int result = request->a() + request->b();response->set_c(result);//模拟业务处理时间std::this_thread::sleep_for(std::chrono::seconds(3));});thr.detach();}
};int main() {//1.定义计算服务CalculatorService service;//2.定义服务器配置对象ServerOptions brpc::ServerOptions options;options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时//3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放servicebrpc::Server server;//4.注册服务int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放serviceif(ret != 0){std::cerr << "AddService failed" << std::endl;return -1;}//5.启动服务器if (server.Start(9000, &options)!= 0) {std::cerr << "Start server failed" << std::endl;return -1;}//6.等待服务器退出server.RunUntilAskedToQuit();return 0;
}

4.5http服务端

我们上面说过,http请求和响应的相关信息都存储在google::protobuf::RpcController*中,所以我们服务端要想能够响应http请求,还是需要先重写对应的http请求处理函数,然后在该重写函数中实现我们的业务逻辑即可:

#include <butil/logging.h>
#include <brpc/server.h>
#include "cal.pb.h"class CalculatorService : public cal::Calculator{
public:CalculatorService(){};~CalculatorService(){};void Add(google::protobuf::RpcController* controller,const cal::AddRequest* request,cal::AddResponse* response,google::protobuf::Closure* done) override{//当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用brpc::ClosureGuard done_guard(done);int result = request->a() + request->b();response->set_c(result);}void Hello(google::protobuf::RpcController* controller,const cal::HelloRequest* request,cal::HelloResponse* response,google::protobuf::Closure* done) override{brpc::ClosureGuard done_guard(done);//切记不要忘记done->run()brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);//打印请求的http信息std::cout << "请求方法:" << cntl->http_request().method() << std::endl;std::cout << "请求uri:" << cntl->http_request().uri() << std::endl;std::cout << "请求body:" << cntl->request_attachment().to_string() << std::endl;//设置响应的http信息cntl->http_response().set_content_type("text/plain");cntl->http_response().set_status_code(200);//设置响应码-200表示成功cntl->response_attachment().append("回显:" + cntl->request_attachment().to_string());}
};int main() {//1.定义计算服务CalculatorService service;//2.定义服务器配置对象ServerOptions brpc::ServerOptions options;options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时//3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放servicebrpc::Server server;//4.注册服务int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放serviceif(ret != 0){std::cerr << "AddService failed" << std::endl;return -1;}//5.启动服务器if (server.Start(9000, &options)!= 0) {std::cerr << "Start server failed" << std::endl;return -1;}//6.等待服务器退出server.RunUntilAskedToQuit();return 0;
}

4.6http客户端

这里相较于上面的远程rpc请求有些不同,它不需要再通过protobuf提供的stub对象发起请求,而是直接通过brpc创建的channel信道中的CallMethod方法发起http请求。请求路由应该与我们在proto中定义的服务类名称一致。比如在我们的示例中如若想要访问服务端的Hello服务,路由应该是这样的:

/Calculator/Hello

还有一个比较特殊的点,我们也可以通过http客户端发起rpc请求,我们示例中的rpc服务为Add,首先需要将路由设置为:

/Calculator/Add

其次我们的请求正文格式必须是json格式,请求的json正文格式必须与Add在proto文件中定义的AddRequest格式相同:

message AddRequest {//定义请求消息int32 a = 1;//第一个参数int32 b = 2;//第二个参数
}

请求的json正文示例:

{"a":10,"b":20}

这样便可以访问远端的Add服务,最后服务端会返回json格式的响应内容写到brpc::Controller的response_attachment()中。客户端示例代码如下:

#include "cal.pb.h"
#include <brpc/channel.h>//基于brpc的简单http客户端示例
int main(){//1.定义并设置channel的配置brpc::ChannelOptions options;options.protocol = brpc::PROTOCOL_HTTP; //使用http协议//2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路brpc::Channel channel;channel.Init("192.168.30.128:9000", &options);//3.使用brpc::Controller来设置http请求的相关信息brpc::Controller cntl;//3-1.设置http请求的url-需要和proto文件中定义的服务名称对应cntl.http_request().uri().set_path("/Calculator/Add");//通过http访问rpc服务,正文内容必须为json格式//cntl.http_request().uri().set_path("/Calculator/Hello");//通过http访问普通的http服务//3-2.设置http请求的method-GET/POST/PUT/DELETE等cntl.http_request().set_method(brpc::HTTP_METHOD_POST);//3-3.设置http请求的header-可选cntl.http_request().SetHeader("Content-Type", "application/json");//cntl.http_request().set_content_type("text/plain");//3-4.设置http请求的body-可选cntl.request_attachment().append(R"({"a":10,"b":20})");//json格式字符串//cntl.request_attachment().append("hello world");//普通字符串//4.直接使用channel发起http请求channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);//NULL表示同步-此处会阻塞等待服务器的响应if(cntl.Failed()){std::cerr << "http请求失败: "<< cntl.ErrorText() << std::endl;return -1;}//5.打印http请求的结果std::cout << "http请求成功,响应码=" << cntl.http_response().status_code() << std::endl;std::cout << "响应内容=" << cntl.response_attachment().to_string() << std::endl;return 0;
}

注释部分为普通http请求,非注释部分为http访问rpc服务的请求,读者可自行去掉或加上注释编译运行查看不同结果。
Makefile如下:

.PHONY: all clean
all: server client async_client async_server http_client http_serverserver: server.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
client:client.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
async_client:async_client.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
async_server:async_server.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
http_client:http_client.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
http_server:http_server.cc cal.pb.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17
%.pb.cc: %.protoprotoc --cpp_out=. $<
clean:rm -f server client async_client async_server http_client http_server

五.二次封装

因为brpc一般是使用在分布式系统上的。一个服务可能在不同的机器上都会提供,但是有的时候部分机器可能会挂掉,所以此时就需要将其移除,如果有新的服务器添加了此项服务,那么就需要将其管理起来,所以我们可以设计如下的类:

    using ChannelPtr = std::shared_ptr<brpc::Channel>;// 服务信道管理类class RpcChannels{public:using ptr = std::shared_ptr<RpcChannels>;RpcChannels();// 获取服务信道ChannelPtr get_channel();// 增加服务信道void add_channel(const std::string &addr);// 删除服务信道void remove_channel(const std::string &addr);private:std::mutex _mtx;                                           // 互斥锁uint32_t _idx;                                             // 服务信道索引-轮询下标std::vector<ChannelPtr> _channels;                         // 服务信道列表std::unordered_map<std::string, ChannelPtr> _channels_map; // 服务信道映射表};

当然我们的服务不可能单单只有一个,所以我们要将不同服务的RpcChannels管理起来。而且有的时候并不是所有的服务器提供的所有的服务我们都需要关心,我们还需要管理哪些服务是需要关心的:

    // 服务管理类class SvcRpcChannels{public:using ptr = std::shared_ptr<SvcRpcChannels>;SvcRpcChannels() = default;// 设置服务关心void set_match(const std::string &service_name);// 新增结点void add_node(const std::string &service_name, const std::string &node_addr);// 删除结点void remove_node(const std::string &service_name, const std::string &node_addr);// 获取服务信道ChannelPtr get_channel(const std::string &service_name);private:std::mutex _mtx;std::unordered_map<std::string, RpcChannels::ptr> _svc_channels_map; // 服务名称-服务信道管理映射表};

也就是说,根据上面的两个类我们可以知道,客户端访问远端rpc服务的流程在我们封装下变成了:
1.告诉SvcRpcChannels我客户端想要访问的服务
2.SvcRpcChannels根据客户端提供的消息,查找对应的RpcChannels
3.SvcRpcChannels找到了对应的RpcChannels,让RpcChannels给一个可用服务信道
4.RpcChannels通过RR轮询的方式,遍历自身的可用信道,保证返回效率的同时负载均衡。最后通过SvcRpcChannels的get_channel返回一个可用信道。
5.客户端收到可用服务信道,使用该信道发起http请求或远程rpc服务请求
这是客户端的相关类,当然我们再封装一个解决我们上面说的客户端异步调用无法直接传lamda表达式的工厂类:

    // 异步回调⼯⼚类class ClosureFactory{public:using callback_t = std::function<void()>;static google::protobuf::Closure *create(callback_t &&cb);private:struct Object{using ptr = std::shared_ptr<Object>;callback_t callback;};static void asyncCallback(const Object::ptr obj);};

最后我们再来封装一个服务端的工厂类:

    // 服务器⼯⼚类class RpcServer{public:// 默认svc是堆上new出来的对象,将管理权移交给rpc服务器进⾏管理static std::shared_ptr<brpc::Server> create(int port,google::protobuf::Service *svc);};

完整封装如下:

//limerpc.h
#pragma once
#include <brpc/channel.h>
#include <brpc/server.h>
#include <mutex>
#include <unordered_map>
#include "limelog.h"namespace limerpc
{using ChannelPtr = std::shared_ptr<brpc::Channel>;// 服务信道管理类class RpcChannels{public:using ptr = std::shared_ptr<RpcChannels>;RpcChannels();// 获取服务信道ChannelPtr get_channel();// 增加服务信道void add_channel(const std::string &addr);// 删除服务信道void remove_channel(const std::string &addr);private:std::mutex _mtx;                                           // 互斥锁uint32_t _idx;                                             // 服务信道索引-轮询下标std::vector<ChannelPtr> _channels;                         // 服务信道列表std::unordered_map<std::string, ChannelPtr> _channels_map; // 服务信道映射表};// 服务管理类class SvcRpcChannels{public:using ptr = std::shared_ptr<SvcRpcChannels>;SvcRpcChannels() = default;// 设置服务关心void set_match(const std::string &service_name);// 新增结点void add_node(const std::string &service_name, const std::string &node_addr);// 删除结点void remove_node(const std::string &service_name, const std::string &node_addr);// 获取服务信道ChannelPtr get_channel(const std::string &service_name);private:std::mutex _mtx;std::unordered_map<std::string, RpcChannels::ptr> _svc_channels_map; // 服务名称-服务信道管理映射表};// 异步回调⼯⼚类class ClosureFactory{public:using callback_t = std::function<void()>;static google::protobuf::Closure *create(callback_t &&cb);private:struct Object{using ptr = std::shared_ptr<Object>;callback_t callback;};static void asyncCallback(const Object::ptr obj);};// 服务器⼯⼚类class RpcServer{public:// 默认svc是堆上new出来的对象,将管理权移交给rpc服务器进⾏管理static std::shared_ptr<brpc::Server> create(int port,google::protobuf::Service *svc);};
} // namespace limerpc
//limerpc.cc
#include "limerpc.h"namespace limerpc{RpcChannels::RpcChannels():_idx(0){}// 轮询获取服务信道ChannelPtr RpcChannels::get_channel(){std::unique_lock<std::mutex> lock(_mtx);size_t index = _idx % _channels.size();_idx++;return _channels[index];}// 增加服务信道void RpcChannels::add_channel(const std::string &addr){std::unique_lock<std::mutex> lock(_mtx);if(_channels_map.find(addr) != _channels_map.end()){DBG("信道:{} 已存在,不再添加", addr);return;}//1.定义并设置channel的配置brpc::ChannelOptions options;options.protocol = "baidu_std"; //使用baidu_std协议//2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路ChannelPtr channel = std::make_shared<brpc::Channel>();channel->Init(addr.c_str(), &options);//3.将channel添加到_channels中_channels.push_back(channel);//4.将channel添加到_channels_map中_channels_map[addr] = _channels.back();}// 删除服务信道void RpcChannels::remove_channel(const std::string &addr){std::unique_lock<std::mutex> lock(_mtx);auto it = _channels_map.find(addr);//找不到直接返回if(it == _channels_map.end()){WRN("信道:{} 不存在,无法删除", addr);return;}//找到后从_channels中删除auto item = std::find(_channels.begin(), _channels.end(), it->second);if (item != _channels.end()) {_channels.erase(item);}//从_channels_map中删除_channels_map.erase(it);}//设置服务关心void SvcRpcChannels::set_match(const std::string &service_name){std::unique_lock<std::mutex> lock(_mtx);_svc_channels_map[service_name] = std::make_shared<RpcChannels>();}//新增结点void SvcRpcChannels::add_node(const std::string &service_name, const std::string &node_addr){//判断是否为关心的服务auto it = _svc_channels_map.find(service_name);if(it == _svc_channels_map.end()){DBG("服务:{} 未设置关注,忽略添加", service_name);return;}//增加服务信道it->second->add_channel(node_addr);}//删除结点void SvcRpcChannels::remove_node(const std::string &service_name, const std::string &node_addr){//判断是否为关心的服务auto it = _svc_channels_map.find(service_name);if(it == _svc_channels_map.end()){DBG("服务:{} 未设置关注,忽略删除", service_name);return;}//删除服务信道it->second->remove_channel(node_addr);}//获取服务信道ChannelPtr SvcRpcChannels::get_channel(const std::string &service_name){//判断是否为关心的服务auto it = _svc_channels_map.find(service_name);if(it == _svc_channels_map.end()){DBG("服务:{} 未设置关注,无法获取信道", service_name);return ChannelPtr();}//获取服务信道return it->second->get_channel();}//因为google::protobuf::Closure的回调函数不允许传入多个参数,所以需要定义一个Object类来包装回调函数和参数google::protobuf::Closure* ClosureFactory::create(callback_t &&cb){ClosureFactory::Object::ptr obj = std::make_shared<ClosureFactory::Object>();obj->callback = std::move(cb);return google::protobuf::NewCallback(ClosureFactory::asyncCallback, obj);}//异步回调函数执行void ClosureFactory::asyncCallback(const ClosureFactory::Object::ptr obj){obj->callback();}//服务端创建std::shared_ptr<brpc::Server> RpcServer::create(int port,google::protobuf::Service *svc){//1.定义服务器配置对象ServerOptions brpc::ServerOptions options;options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时//2.创建服务器对象std::shared_ptr<brpc::Server> server = std::make_shared<brpc::Server>();//3.注册服务if (server->AddService(svc, brpc::SERVER_OWNS_SERVICE) != 0) {ERR("服务注册失败");exit(-1);//直接退出}//4.启动服务器if (server->Start(port, &options) != 0) {ERR("服务启动失败");exit(-1);//直接退出}return server;}
}

一个简单的使用样例:

cal.proto:

syntax = "proto3";//声明语法版本
package cal;//定义包名
option cc_generic_services = true;//是否启用rpc服务message AddRequest {//定义请求消息int32 a = 1;//第一个参数int32 b = 2;//第二个参数
}message AddResponse {//定义响应消息int32 c = 1;//结果
}//这是一个http请求,不需要任何字段
message HelloRequest {
}//这是一个http响应,不需要任何字段
message HelloResponse {}service Calculator {//定义服务rpc Add(AddRequest) returns (AddResponse);//定义rpc方法rpc Hello(HelloRequest) returns (HelloResponse);//定义另一个rpc方法
}

rpc_client.cc

#include "../../source/limerpc.h"
#include "cal.pb.h"int main() {//初始化日志limelog::limelog_init();//创建服务管理类limerpc::SvcRpcChannels svc_rpc_channels;//手动添加服务关心svc_rpc_channels.set_match("Calculator");//模拟服务发现svc_rpc_channels.add_node("Calculator","192.168.30.128:9000");//获取服务信道auto channel = svc_rpc_channels.get_channel("Calculator");//3.创建stub对象-用于发起rpc调用cal::Calculator_Stub stub(channel.get());//4.创建请求对象-用于设置rpc调用参数cal::AddRequest* request = new cal::AddRequest();//需要new,否则会有生命周期问题request->set_a(10);request->set_b(20);//5.发起rpc调用-异步调用cal::AddResponse* response = new cal::AddResponse();brpc::Controller* controller = new brpc::Controller();//补充:设置Controller的timeout时间,默认是3秒controller->set_timeout_ms(4000);//设置回调函数auto done = limerpc::ClosureFactory::create([controller,request,response](){std::unique_ptr<brpc::Controller> cntl_guard(controller);std::unique_ptr<cal::AddRequest> req_guard(request);std::unique_ptr<cal::AddResponse> res_guard(response);if (cntl_guard->Failed()) {std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl;return;}//打印rpc调用的结果std::cout << "a+b=" << response->c() << std::endl;});stub.Add(controller, request, response, done);//设置回调函数表示异步rpc调用std::cout << "rpc调用已发出,继续干其他事情..." << std::endl;//等待rpc调用结果-键盘按下回车键退出程序getchar();return 0;
}

rpc_server.cc

#include "../../source/limerpc.h"
#include "cal.pb.h"
#include <thread>//异步简易rpc服务端
class CalculatorService : public cal::Calculator{
public:CalculatorService(){};~CalculatorService(){};void Add(google::protobuf::RpcController* controller,const cal::AddRequest* request,cal::AddResponse* response,google::protobuf::Closure* done) override{//使用多线程进行异步处理std::thread thr([=](){//当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用brpc::ClosureGuard done_guard(done);int result = request->a() + request->b();response->set_c(result);//模拟业务处理时间std::this_thread::sleep_for(std::chrono::seconds(3));});thr.detach();}
};int main()
{//定义计算服务CalculatorService* service = new CalculatorService();//通过服务器工厂类获取一个服务器实例auto server = limerpc::RpcServer::create(9000, service);//等待服务器退出server->RunUntilAskedToQuit();return 0;
}

makefile:

.PHONY: all clean
all: server clientserver: rpc_server.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -std=c++17
client: rpc_client.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -std=c++17
%.pb.cc: %.protoprotoc --cpp_out=. $<
clean:rm -f server client
http://www.dtcms.com/a/449268.html

相关文章:

  • 网站建设需求登记表 免费下载软件开发公司企业简介
  • 北京城乡建设网站网站建设都是用什么软件
  • C++分布式语音识别服务实践
  • 轻松理解智能体设计模式(1/6):提示链(Prompt Chaining)
  • ARM TrustZone技术如何守护你的隐私
  • 北京恒伟网站建设wordpress 新页面打开空白
  • 从 0 到 1 搭建 Python 语言 Web UI自动化测试学习系列 8--基础知识 4--常用函数 2
  • 在 Python 项目中构建可靠的 CI/CD 流水线:从设计到实战(面向开发与运维的实用指南)
  • Linux基本指令(中)
  • 郑州服装网站建设公司php做电商网站安全性如何
  • 响应式网站 翻译网站关键词排名如何提升
  • python爬虫(二) ---- JS动态渲染数据抓取
  • 国庆回来的css
  • 廊坊做网站的电话公司网站外包
  • 八股文:计算机网络-20250925
  • MySql速成笔记5(多表关系)
  • 如何设计和建立一个公司的网站佛山网站优化有
  • 开源 C++ QT QML 开发(六)自定义控件--波形图
  • excel-mcp-server 安装
  • Axios快速上手
  • 中国建网站报价电子商务网站的建设流程图
  • 某单位固态硬盘站点备份至固态硬盘的站点备份方案
  • 高级经济师资源合集
  • 【数据结构】数据结构考研核心:树形查找算法对比与应用场景全指南
  • 做公司网站需要了解哪些东西体育网站的制作哪里可以做
  • 【Docker】解决Docker中“exec format error”错误:架构不匹配的完整指南
  • 如何自己开个网站平台成立网站建设公司要求
  • [嵌入式embed]RT-ThreadStudio-STM32F103C8T6(江协科技)+移植RT-Thread v4.11模版
  • 元宇宙的科幻预言:影视作品中的元宇宙畅想
  • Day07_刷题niuke20251007