4.4-中间件之gRPC
一、初识gRPC
gRPC是一个通用RPC框架,通过.proto文件里面规定好的消息格式与函数格式,用protoc工具自动生成代码,得到
- .pb.h(包含消息类型(如请求、响应消息), 服务端和客户端带上
#include "IM.Login.pb.h"
才能使用这些消息类型和接口) - .pb.cc(.pb.h中声明的具体实现,客户端构造请求消息、服务端解析请求和构造响应,都依赖于
.pb.cc
的实现)
比如:IM.Login.proto
文件内容如下
syntax = "proto3";
package IM.Login;// 定义服务
service ImLogin {rpc Regist(IMRegistReq) returns (IMRegistRes) {} rpc Login(IMLoginReq) returns (IMLoginRes) {}
}// 注册账号
message IMRegistReq{string user_name = 1; // 用户名string password = 2; // 密码
}
// 注册返回
message IMRegistRes{string user_name = 1; // 用户名uint32 user_id = 2; uint32 result_code = 3; // 返回0的时候注册注册
}message IMLoginReq{string user_name = 1; // 用户名string password = 2; // 密码
}
message IMLoginRes{uint32 user_id = 1; uint32 result_code = 2; // 返回0的时候注册注册
}
由于里面定义了服务,运行protoc --go_out=. --go-grpc_out=. user_service.proto后:
除了生成.pb.cc .pb.h之外 还会生成.grpc.pb.cc和.grpc.pb.h,里面包含 service(服务端基类)和 stub(客户端代理对象)的声明和实现
1、service:规定了服务器提供哪些远程方法(函数)可供调用。服务器开发者需要继承这个自动生成的类,并重写 里面定义的虚函数方法。这些方法里包含了真正的业务逻辑。
// 这是 protoc 自动生成的 Service 基类
class Service : public ::grpc::Service {public:Service();virtual ~Service();virtual ::grpc::Status Regist(::grpc::ServerContext* context, const ::IM::Login::IMRegistReq* request, ::IM::Login::IMRegistRes* response);virtual ::grpc::Status Login(::grpc::ServerContext* context, const ::IM::Login::IMLoginReq* request, ::IM::Login::IMLoginRes* response);
};
2、stub:客户端只需要持有一个 stub(代理对象),通过它调用远程方法
// 这是 protoc 自动生成的 Stub 类
class Stub final : public StubInterface {public:Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());::grpc::Status Regist(::grpc::ClientContext* context, const ::IM::Login::IMRegistReq& request, ::IM::Login::IMRegistRes* response) override;std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::IM::Login::IMRegistRes>> AsyncRegist(::grpc::ClientContext* context, const ::IM::Login::IMRegistReq& request, ::grpc::CompletionQueue* cq);std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::IM::Login::IMRegistRes>> PrepareAsyncRegist(::grpc::ClientContext* context, const ::IM::Login::IMRegistReq& request, ::grpc::CompletionQueue* cq);::grpc::Status Login(::grpc::ClientContext* context, const ::IM::Login::IMLoginReq& request, ::IM::Login::IMLoginRes* response) override;std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::IM::Login::IMLoginRes>> AsyncLogin(::grpc::ClientContext* context, const ::IM::Login::IMLoginReq& request, ::grpc::CompletionQueue* cq);std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::IM::Login::IMLoginRes>> PrepareAsyncLogin(::grpc::ClientContext* context, const ::IM::Login::IMLoginReq& request, ::grpc::CompletionQueue* cq);// ... 还有 async 相关接口 ...
};
gRPC再基于HTTP2协议传输,将调用请求发送到服务端。
为什么采用HTTP2.0?
因为HTTP2.0支持pipeline发送请求,并且不限制服务端的处理响应顺序,而HTTP1.0只能一次发送一个请求不支持pipeline,HTTP1.1支持pipeline但是响应必须按照请求的顺序。
二、gRPC的四种模式 “流”
一元RPC模式:客户端发送一个请求,服务端回复一个响应
服务器端流RPC模式:客户端发送一个请求,服务端可以回复多个响应
客户端流RPC模式:客户端可以发送多个请求,服务端回复一个响应
双向流RPC模式:客户端可以发送多个请求,服务端可以回复多个响应
rpc ListFeatures(Rectangle)returns (stream Feature){} 服务端返回流 即返回多个响应客户端读取响应的方式:
std::unique ptr<clientReader<Feature> reader( //Feature是返回消息类型
stub_->ListFeatures(&context,rect));持续读取
while(reader->Read(&feature)){std::cout<< "Found feature called "<< feature.name()<<" at "<<feature.location().latitude()/kcoordFactor_ <<","<<feature.location().longitude()/ kcoordFactor_<< std::endl
}
Status status =reader->Finish();
三、gRPC的同步和异步
3.1在客户端调用方面
直观的区分就是客户端发送请求后,是持续等待服务端返回,还是先去干别的活等待返回了再去处理响应。
同步:
// 1. 创建 Channel 和存根
auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials());
GreeterClient greeter(channel);// 2. 准备请求
std::string user("world");
HelloReply reply;// 3. 发起同步调用!这一行会阻塞线程,直到收到回复或出错。
Status status = greeter.SayHello(user, &reply);// 4. 线程被唤醒,检查状态并使用回复
if (status.ok()) {std::cout << "Greeter received: " << reply.message() << std::endl;
} else {std::cout << "RPC failed: " << status.error_message() << std::endl;
}
异步:其核心是利用 CompletionQueue(完成队列,CQ) 来解耦请求的发送和响应的接收。
// 1. 创建 Channel、存根 和 最重要的 CompletionQueue
auto channel = ...;
GreeterClient greeter(channel);
CompletionQueue cq;// 2. 准备请求和用于存储响应的对象
HelloRequest request;
request.set_name("world");
HelloReply reply;
Status status;// 3. 创建上下文,并发起异步调用!此调用立即返回,不会阻塞。
ClientContext context;
// 注:这里使用 异步 方法,并传入了 cq 和一个唯一的 tag(指unique_ptr指针,标记这次请求)
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> rpc( stub_->AsyncSayHello(&context, request, &cq));// 4. 告诉 gRPC 当 Finish 操作完成(即收到响应)时,将 tag 为 (void*)1 的事件放入 CQ
rpc->Finish(&reply, &status, (void*)1);// 5. 主线程可以继续做其他事情...
// 同时,另一个工作线程(或主线程在某个时刻)会处理 CQ:void* received_tag;
bool ok = false;
// 6. 等待事件完成。这里会阻塞,直到有响应到达。
cq.Next(&received_tag, &ok);// 7. 通过 tag 识别是哪个请求完成了
if (received_tag == (void*)1) {// 8. 检查操作状态并处理响应if (ok && status.ok()) {// 处理 reply}
}
unique_ptr:
它性能最好,语义最清晰,但是一次只能被一个对象持有。
场景:
gRPC 异步调用这样所有权转移路径非常明确,且需要标记服务端的响应,适合用unique_ptr。比如请求的是(1,2,3),响应的可能是(3,2,1),没有标记会无法正常接收。
shared_ptr:看似很智能,其实消耗较大,在需要用指针的情况下优先考虑unique_ptr。对于
多个持有者,不确定谁先释放的情况,可以考虑shared_ptr。
场景:
在网络编程中,一个连接会话会被多个线程持有,接收线程会接收数据,工作线程会处理连接上的业务逻辑,心跳线程检查连接是否存活。此时各持有者的生命周期不确定,且不能由任意一个持有者放弃就销毁连接,此时shared_ptr的计数可以发挥作用。
3.2在服务端处理方面
不论同步还是异步处理,服务端都会采用多个epoll实例来均衡监听fd的压力。
在服务端的epoll监听到fd创建连接后,客户端发起调用请求如Register(),方法不会阻塞而是直接投递到 完成队列中,并且注册一个事件,服务端主线程不断从完成队列中取出事件(tag),执行对应的proceed函数。整体是一个事件驱动的异步模型,异步不是靠多线程堆积出来的,而是请求遇到等待的时候,工作线程去执行别的请求先,等待完成了,再加入回队列,工作线程会再取出执行。
注:具体的状态切换是在grpc框架的io底层帮助实现,服务端只需要:
service_->RequestRegist(&ctx_, &request_, &responder_, cq_, cq_, this);在这里注册了一个“监听器”,告诉 gRPC:“有新请求时请通知我”。
之后的所有底层操作(如等待新请求、网络I/O、等待客户端响应、响应完成后投递事件到 CompletionQueue)都由 gRPC 框架自动管理。不需要手动处理这些底层状态,gRPC 会在合适的时机把事件(tag)投递到队列,你只需要在 Proceed() 里处理业务逻辑即可。
四、手写rpc流程
4.1初始流程
- 编写proto文件
- 根据proto文件生成对应的pb.cc pb.h grpc.pb.cc grpc.pb.h文件
- server程序继承grpc.pb.cc中的Service类
- client程序继承grpc.pb.cc中的stub类
- 编译server 和 client程序
4.2增加 service 和 rpc 流程
(1)在 .proto 文件中增加 service 和 rpc 方法
例如,添加一个新服务和新方法:
service MyService {rpc MyMethod (MyRequest) returns (MyResponse);
}
或者在已有 service 里增加新 rpc:
service ImLogin {rpc Login (IMLoginReq) returns (IMLoginRes);rpc NewRpc (NewReq) returns (NewRes); // 新增的rpc
}
(2)重新生成代码
使用 protoc 工具(带 gRPC 插件)重新生成对的 .pb.h、.pb.cc、.grpc.pb.h、.grpc.pb.cc 文件。
(3)在服务端实现新 service 或新 rpc 方法
新 service:继承新生成的 Service 基类,实现所有 rpc 方法。
新 rpc:在已有 Service 派生类中实现新方法。
(4)在客户端调用新 service 或新 rpc
新 service:创建对应的 Stub,调用新方法。
新 rpc:用 Stub 调用新方法。
(5)重新编译并部署服务端和客户端
五、客户端和服务端代码编写实例
同步服务端
#include <iostream>
#include <string>// grpc头文件
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"// 命名空间
// grcp
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;class IMLoginServiceImpl : public ImLogin::Service {// 注册virtual Status Regist(ServerContext* context, const IMRegistReq* request, IMRegistRes* response) override {std::cout << "Regist user_name: " << request->user_name() << std::endl;response->set_user_name(request->user_name());response->set_user_id(10);response->set_result_code(0);return Status::OK;}// 登录virtual Status Login(ServerContext* context, const IMLoginReq* request, IMLoginRes* response) override {std::cout << "Login user_name: " << request->user_name() << std::endl;response->set_user_id(10);response->set_result_code(0);return Status::OK;}};void RunServer()
{std::string server_addr("0.0.0.0:50051");// 创建一个服务类IMLoginServiceImpl service;//由gRPC C++ 库提供的类 用于配置和创建一个 gRPC 服务器ServerBuilder builder;//配置服务器参数builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials());builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);//把我们自己的服务注册到服务器上builder.RegisterService(&service);//创建/启动std::unique_ptr<Server> server(builder.BuildAndStart());std::cout << "Server listening on " << server_addr << std::endl;// 进入服务循环server->Wait();
}// 怎么编译?
// 手动编译
// 通过cmake的方式
int main(int argc, const char** argv)
{RunServer();return 0;
}
客户端
#include <iostream>
#include <memory>
#include <string>
// /usr/local/include/grpcpp/grpcpp.h
#include <grpcpp/grpcpp.h>// 包含我们自己proto文件生成的.h
#include "IM.Login.pb.h"
#include "IM.Login.grpc.pb.h"// 命名空间
// grcp
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;//客户端封装类,用来组织和管理 gRPC 客户端的调用逻辑。 无需继承
class ImLoginClient
{
public:ImLoginClient(std::shared_ptr<Channel> channel):stub_(ImLogin::NewStub(channel)) {}void Regist(const std::string &user_name, const std::string &password) {IMRegistReq request;request.set_user_name(user_name);request.set_password(password);IMRegistRes response;ClientContext context;//通过 stub_ 调用远程的 Regist 方法,向服务端发起注册请求Status status = stub_->Regist(&context, request, &response);if(status.ok()) {std::cout << "user_name:" << response.user_name() << ", user_id:" << response.user_id() << std::endl;} else {std::cout << "user_name:" << response.user_name() << "Regist failed: " << response.result_code()<< std::endl;}}void Login(const std::string &user_name, const std::string &password) {IMLoginReq request;request.set_user_name(user_name);request.set_password(password);IMLoginRes response;ClientContext context;Status status = stub_->Login(&context, request, &response);if(status.ok()) {std::cout << "user_id:" << response.user_id() << " login ok" << std::endl;} else {std::cout << "user_name:" << request.user_name() << "Login failed: " << response.result_code()<< std::endl;}}private://IM.Login.grpc.pb.cc 生成的Stub类std::unique_ptr<ImLogin::Stub> stub_;
};int main()
{// 服务器的地址std::string server_addr = "localhost:50051";ImLoginClient im_login_client(grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials()));std::string user_name = "darren";std::string password = "123456";im_login_client.Regist(user_name, password);im_login_client.Login(user_name, password);return 0;
}
异步服务端
#include <iostream>
#include <memory>
#include <string>
#include <thread>#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;// 自己proto文件的命名空间
using IM::Login::ImLogin;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
using IM::Login::IMRegistReq;
using IM::Login::IMRegistRes;//不是继承 Service,而是持有 AsyncService,用于异步注册和管理所有 rpc
class ServerImpl final {public:~ServerImpl() {server_->Shutdown();cq_->Shutdown();}void Run() { // 启动std::string server_address("0.0.0.0:50051");ServerBuilder builder;builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());builder.RegisterService(&service_);cq_ = builder.AddCompletionQueue();server_ = builder.BuildAndStart();std::cout << "Server listening on " << server_address << std::endl;HandleRpcs();}private:/*calldata子类 提供虚函数 Proceed(),作为注册和登录的基类,让他们在里面实现具体的事件处理逻辑,就像同步里面重写的登录和注册接口,这里只是再他们之上再加封装一层calldata封装的方面:1、统一管理请求的生命周期和状态(CREATE、PROCESS、FINISH),便于异步下请求的状态区分。2、封装一次 RPC 请求的所有上下文信息调用:
调用proceed()的过程就是从cq_队列中取出事件,然后通过tag(this指针)找到对应的calldata对象,调用其Proceed()方法处理事件*/class CallData {public:CallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq): service_(service), cq_(cq), status_(CREATE) {Proceed();}virtual ~CallData(){}virtual void Proceed() {// std::cout << "CallData Prceed" << std::endl//;return;}ImLogin::AsyncService* service_;ServerCompletionQueue* cq_;ServerContext ctx_;enum CallStatus { CREATE, PROCESS, FINISH };CallStatus status_; };class RegistCallData : public CallData {public:RegistCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq) :CallData(service, cq), responder_(&ctx_) {Proceed();}~RegistCallData() {}void Proceed() override {// std::cout << "RegistCallData Prceed" << std::endl//;std::cout << "this: " << this<< " RegistCallData Proceed(), status: " << status_<< std::endl; // darren 增加if (status_ == CREATE) { // 0// Make this instance progress to the PROCESS state.status_ = PROCESS;//异步的关键 grpc底层会帮助状态的切换 然后在请求阻塞解除后再加入队列service_->RequestRegist(&ctx_, &request_, &responder_, cq_, cq_, this);} else if (status_ == PROCESS) { // 1new RegistCallData(service_, cq_); // 1. 创建处理逻辑reply_.set_user_name(request_.user_name());reply_.set_user_id(10);reply_.set_result_code(0);status_ = FINISH;responder_.Finish(reply_, Status::OK, this);} else {GPR_ASSERT(status_ == FINISH);delete this;}}private:IMRegistReq request_;IMRegistRes reply_;ServerAsyncResponseWriter<IMRegistRes> responder_;};class LoginCallData : public CallData {public:LoginCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq) :CallData(service, cq), responder_(&ctx_) {Proceed();}~LoginCallData() {}void Proceed() override {// std::cout << "LoginCallData Prceed" << std::endl//;std::cout << "this: " << this<< " LoginCallData Proceed(), status: " << status_<< std::endl; // darren 增加if (status_ == CREATE) { // 0std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "CREATE" << std::endl;status_ = PROCESS;
//异步的关键 grpc底层会帮助状态的切换 然后在请求阻塞解除后再加入队列service_->RequestLogin(&ctx_, &request_, &responder_, cq_, cq_, this);} else if (status_ == PROCESS) { // 1std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "PROCESS" << std::endl;new LoginCallData(service_, cq_); // 1. 创建处理逻辑reply_.set_user_id(10);reply_.set_result_code(0);status_ = FINISH;responder_.Finish(reply_, Status::OK, this);} else {std::cout << "this: " << this << " LoginCallData Proceed(), status: "<< "FINISH" << std::endl;GPR_ASSERT(status_ == FINISH);delete this;}}private:IMLoginReq request_;IMLoginRes reply_;ServerAsyncResponseWriter<IMLoginRes> responder_;};//事件主循环void HandleRpcs() { // 可以运行在多线程 这样就可以扩大并发能力 但单线程也已经是异步了//为注册和登录两个rpc方法创建监听对象 类似epoll的注册事件 这样才能监听到客户端的请求 放入队列new RegistCallData(&service_, cq_.get()); new LoginCallData(&service_, cq_.get());void* tag; bool ok;while (true) { //主线程不断从 CompletionQueue 取出事件(指针),并调用对应对象的 Proceed() 方法处理,实现事件驱动。GPR_ASSERT(cq_->Next(&tag, &ok));GPR_ASSERT(ok);static_cast<CallData*>(tag)->Proceed();}}std::unique_ptr<ServerCompletionQueue> cq_; //所有异步事件(如新请求、响应完成)都通过 CompletionQueue 分发。ImLogin::AsyncService service_;//在builder中注册服务 也在calldata中注册请求std::unique_ptr<Server> server_;//负责整个 gRPC 服务器的启动和管理
};int main(int argc, char** argv) {ServerImpl server;server.Run();return 0;
}