brpc的安装与使用介绍以及channel的封装
文章目录
- 前言
- 一、 介绍
- 1. brpc介绍
- 2. 与其它rpc对比:
- 二、 ubuntu 24.04安装brpc
- 三、 使用介绍与简单示例
- 1. proto文件编写
- 2. 服务端编写
- 1. 继承EchoService重写Echo服务
- 2. 创建rpc服务器类,搭建服务器
- 3. 向服务器对象中,新增EchoService服务
- 4. 启动服务器
- 5. server完整代码
- 3. 客户端编写
- 1. 构造Channel信道,连接服务器
- 2. 构造存根对象,用于rpc调用
- 3. 进行rpc调用 --同步调用
- 4. 异步调用
- 5. client完整代码
- 4. makefile编写
- 5. 运行结果
- 四、 channel的封装
- 1. ServiceChannel
- 2. ServiceManager
- 3. 完整代码
前言
这是一个ChatIM的项目,有兴趣的伙伴可以照着我的博客和gitee进行完成
gitee链接:https://gitee.com/qi-haozhe/chat-im
一、 介绍
1. brpc介绍
brpc是用c++语言编写的工业级RPC框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统。
你可以使用它:
- 搭建能在一个端口支持多协议的服务, 或访问各种服务
- restful http/https, h2/gRPC。使用brpc的http实现比libcurl方便多了。从其他语言通过HTTP/h2+json访问基于protobuf的协议.
- redis和memcached, 线程安全,比官方client更方便。
- rtmp/flv/hls, 可用于搭建流媒体服务.
- 支持thrift , 线程安全,比官方client更方便
- 各种百度内使用的协议: baidu_std, streaming_rpc, hulu_pbrpc, sofa_pbrpc, nova_pbrpc, public_pbrpc, ubrpc和使用nshead的各种协议.
- 基于工业级的RAFT算法实现搭建高可用分布式系统,已在braft开源。
- Server能同步或异步处理请求。
- Client支持同步、异步、半同步,或使用组合channels简化复杂的分库或并发访问。
- 通过http界面调试服务, 使用cpu, heap, contention profilers.
- 获得更好的延时和吞吐.
- 把你组织中使用的协议快速地加入brpc,或定制各类组件, 包括命名服务(dns, zk, etcd), 负载均衡(rr, random, consistent hashing)
2. 与其它rpc对比:
对比维度 | brpc | gRPC | Thrift | Dubbo |
---|---|---|---|---|
开发者 | 百度 | Facebook(现 Apache) | 阿里巴巴(现 Apache) | |
语言支持 | C++(核心),支持 Java、Python 等(通过包装) | C++、Java、Python、Go 等 | 支持多种语言(C++、Java、Python 等) | 主要支持 Java |
核心协议 | 基于 Protocol Buffers(PB),支持 HTTP、gRPC、Thrift、FLV 等 20+ 协议 | HTTP/2 + PB | 自定义二进制协议(TBinaryProtocol) | Dubbo 协议(基于 TCP 的长连接) |
性能 | 高性能,多线程优化出色,QPS 和延迟表现优于其他框架(尤其在长尾请求处理上) | 高性能(基于 HTTP/2),但长尾请求处理弱于 brpc | 性能较好,但二进制协议扩展性较差 | 高性能(专为 Java 优化) |
序列化 | PB(高效,支持前向/后向兼容) | PB(强制使用) | 支持 PB、JSON、二进制等(但二进制协议灵活性低) | Hessian2(默认)、PB、Java 序列化等 |
服务发现 | 支持 DNS、ZooKeeper、etcd 等,内置 BNS(百度命名服务) | 支持 etcd、Consul 等(需集成) | 需自行实现或集成第三方组件 | 支持 ZooKeeper、Nacos 等 |
负载均衡 | 内置轮询、随机、一致性哈希等策略,支持本地化感知 | 支持轮询、权重等(需集成) | 需自行实现或集成第三方组件 | 支持随机、轮询、最少活跃调用等 |
异步调用 | 通过回调函数支持客户端异步调用,流式传输接口原生 | 支持异步客户端(需手动管理线程) | 支持异步调用(但接口复杂) | 支持异步调用(基于 CompletableFuture) |
调试与运维 | 内置 HTTP 调试接口,支持 CPU、内存、争用分析器,问题排查方便 | 需集成第三方工具(如 Prometheus) | 缺乏统一调试接口 | 需集成第三方工具(如 SkyWalking) |
多协议支持 | 优势:同一端口支持多协议(如 HTTP + gRPC + Redis) | 仅支持 HTTP/2(gRPC 协议) | 需为不同协议实现单独服务端 | 仅支持 Dubbo 协议 |
扩展性 | 高扩展性,支持自定义协议、命名服务、负载均衡等 | 扩展性较强(通过 HTTP/2 扩展) | 扩展性较差(二进制协议修改成本高) | 扩展性较强(通过 SPI 机制) |
社区与生态 | 百度内部大规模使用,GitHub 14.5K Star,社区活跃 | Google 主导,多语言支持,生态完善 | 早期流行,但社区活跃度下降 | 国内广泛使用,社区活跃 |
适用场景 | 高性能分布式系统(如搜索、推荐、广告等),需处理长尾请求的场景 | 跨语言微服务架构(尤其适合 Go/Python 与 Java 互通) | 内部服务通信(对性能要求高且语言统一) | Java 微服务架构(如电商、金融) |
总结:
-
brpc:
- 优势:高性能(尤其长尾请求处理)、多协议支持、内置调试工具、百度内部大规模验证。
- 适用场景:C++ 高性能服务、混合协议通信(如同时支持 HTTP 和 RPC)、需要深度定制的分布式系统。
-
gRPC:
- 优势:跨语言支持、生态完善、基于 HTTP/2 标准。
- 适用场景:多语言微服务架构(如 Go/Python 与 Java 互通)、云原生环境。
-
Thrift:
- 优势:早期跨语言 RPC 解决方案,性能较好。
- 局限:二进制协议扩展性差,社区活跃度下降。
- 适用场景:遗留系统或对性能要求高且语言统一的服务。
-
Dubbo:
- 优势:Java 生态成熟,功能丰富(如服务治理、集群容错)。
- 局限:仅支持 Java,多语言场景需额外适配。
- 适用场景:Java 微服务架构(如电商、金融领域)。
二、 ubuntu 24.04安装brpc
安装
先安装依赖
//C++ :
dev@dev-host:~/workspace$ sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装brpc
//C++:
dev@dev-host:~/workspace$ git clone https://github.com/apache/brpc.git
dev@dev-host:~/workspace$ cd brpc/
dev@dev-host:~/workspace/brpc$ mkdir build && cd build
dev@dev-host:~/workspace/brpc/build$ cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6
dev@dev-host:~/workspace/brpc/build$ make && sudo make install
三、 使用介绍与简单示例
1. proto文件编写
使用brpc会用到protobuf,如果不熟悉的话,大家可以先去学一下。
这里我们先实现一个简单的服务,echo服务,你要远程调用什么服务,那就在proto文件里写什么服务。
比如我未来要调用echo服务,那我proto文件里就写了一个
service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}
这个定义模板如下:
服务名称随便起个名字,函数名也可以随便起。一个服务下面可以有多个函数,就是多个功能。
service 服务名称 {rpc 函数名(Request) returns (Response);
}
比如说文件服务,我可以上传也可以下载,可以上传多个也可以下载多个,所以一个FileService服务里就可以有多个函数
service FileService {rpc GetSingleFile(GetSingleFileReq) returns (GetSingleFileRsp);rpc GetMultiFile(GetMultiFileReq) returns (GetMultiFileRsp);rpc PutSingleFile(PutSingleFileReq) returns (PutSingleFileRsp);rpc PutMultiFile(PutMultiFileReq) returns (PutMultiFileRsp);
}
而且每个函数只有一个参数和一个返回值,就是你需要定义的request和response。在echo服务中这个request和response就是下面这个:
message EchoRequest {string message = 1;
}message EchoResponse {string message = 1;
}
这样就在proto文件里写好proto文件了
//main.proto文件
syntax="proto3";package example;option cc_generic_services = true;message EchoRequest {string message = 1;
}message EchoResponse {string message = 1;
}service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}
之后调用
protoc --cpp_out=. main.proto
会自动生成这俩文件,我们等等写客户端和服务器要用到。
2. 服务端编写
服务端大体流程:
1.创建rpc服务子类继承pb中的EchoService服务类,并实现内部的业务接口逻辑
2.创建rpc服务器类,搭建服务器
3. 向服务器类中添加 rpc子服务对象 – 告诉服务器收到什么请求用哪个接口处理
4.启动服务器
这是每个rpc服务都适用的一套流程
1. 继承EchoService重写Echo服务
重写的方法就是.proto文件中声明的,这个方法有四个参数,req和resp是从req中获取数据,进行业务处理,并把结果写入到resp中。
其中,第一个参数RpcController,这是一个上下文管理类,主要是用来判断请求是否ok。
而第四个参数Closure,在服务器端当响应处理完毕后,需要显示效用这个类中run方法,告诉brpc响应已经处理完了,结果已经写入到resp中,可以给客户端进行响应了。
而为了防止用户忘记调用run,我们可以使用ClosureGuard来管理这个closure对象,他会帮我们调用run方法。
//1. 继承于EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService {public:EchoServiceImpl(){}~EchoServiceImpl(){}void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = request->message() + "--这是响应!!";response->set_message(str);//done->Run();}void test() {std::cout << "test" << std::endl;}
};
2. 创建rpc服务器类,搭建服务器
构造一个server
//2. 构造服务器对象brpc::Server server;
3. 向服务器对象中,新增EchoService服务
把刚才实现的EchoServiceImpl实例化一个对象,然后调用server.AddService方法,把这个对象添加到服务器中,以后有客户端连接过来就可以在服务器中找到这个服务,并调用该服务中的函数。
//3. 向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}
4. 启动服务器
启动服务器相当于在指定的ip,port上开放,别的客户端可以连接这个server进行远程调用。
启动服务器需要填写监听端口和一个ServerOptions对象.
这个对象主要是设置服务器相关选项,多长时未发送连接相关活动就断开连接,io的线程数量等。
//4. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; //连接空闲超时时间-超时后连接被关闭options.num_threads = 1; // io线程数量ret = server.Start(8080, &options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit();//修改等待运行结束
5. server完整代码
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"//1. 继承于EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService {public:EchoServiceImpl(){}~EchoServiceImpl(){}void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = request->message() + "--这是响应!!";response->set_message(str);//done->Run();}void test() {std::cout << "test" << std::endl;}
};
int main(int argc, char *argv[])
{//关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);//2. 构造服务器对象brpc::Server server;//3. 向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}//4. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; //连接空闲超时时间-超时后连接被关闭options.num_threads = 1; // io线程数量ret = server.Start(8080, &options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit();//修改等待运行结束return 0;
}
3. 客户端编写
客户端大体流程:
1.创建网络通信信道
2.实例化pb中的EchoService Stub类对象
3.发起rpc请求,获取响应进行处理
1. 构造Channel信道,连接服务器
server端绑定了ip+port了,服务器必须得直到ip+port才能去访问这个server,这件事由channel去做,此外还需要设置一些其它的属性,其他属性需要由option来传入channel:
可以看到的:channel.Init("127.0.0.1:8080", &options)
,由此可知,channel是用来网络通信的,一个channel对应于一个指定的连接,也就是说可以通过这个channel连接指定ip+port的server,调用指定的服务。
//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; //rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std"; //序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1) {std::cout << "初始化信道失败!\n";return -1;}
2. 构造存根对象,用于rpc调用
利用channel创建个存根,我们就可以利用这个stub,像调用本地方法一样调用本地服务了,brpc内部会自动进行网络传输,获取结果。
//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);
3. 进行rpc调用 --同步调用
同步调用Echo会阻塞等待响应的返回。
同样的,在客户端的调用中也有四个参数。
第一个参数Controller是用于判断请求是否Ok.
第四个参数就不同了,在服务器中第四个参数是为了告知brpc业务已经处理完毕,可以进行返回响应。客户端的第四个参数主要是为了支持异步调用。
example::EchoRequest req;req.set_message("你好,lkm");example::EchoResponse resp;brpc::Controller cntl;stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed() == true) {std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return -1;} std::cout << "收到响应: " << rsp->message() << std::endl;delete cntl;delete rsp;
4. 异步调用
第四个参数是一个Closure对象,它可以设置一个回调函数,当响应返回后调用设置的回调函数进行处理。
需要注意的是,由于Echo现在是异步调用,所有他在调用完后就会立即返回,所以为了防止作用域问题,我们需要把resp和Controller在堆上创建。
//3. 进行Rpc调用/example::EchoRequest req;req.set_message("你好 server!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();auto clusure = google::protobuf::NewCallback(callback, cntl, rsp);stub.Echo(cntl, &req, rsp, clusure); //异步调用std::cout << "异步调用结束!\n";std::this_thread::sleep_for(std::chrono::seconds(3));
5. client完整代码
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"void callback(brpc::Controller* cntl, ::example::EchoResponse* response) {std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> resp_guard(response);if (cntl->Failed() == true) {std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << response->message() << std::endl;
}int main(int argc, char *argv[])
{//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; //rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std"; //序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1) {std::cout << "初始化信道失败!\n";return -1;}//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);//3. 进行Rpc调用/example::EchoRequest req;req.set_message("你好 server!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();// stub.Echo(cntl, &req, rsp, nullptr);// if (cntl->Failed() == true) {// std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;// return -1;// }// std::cout << "收到响应: " << rsp->message() << std::endl;// delete cntl;// delete rsp;auto clusure = google::protobuf::NewCallback(callback, cntl, rsp);stub.Echo(cntl, &req, rsp, clusure); //异步调用std::cout << "异步调用结束!\n";std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}
4. makefile编写
比较简单不多赘述
all : server client
server : server.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb -ldl -pthread
client : client.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb -ldl -pthread@PHONY:clean
clean:rm -f client server
5. 运行结果
启动两个窗口,一个运行server,一个运行client即可。
四、 channel的封装
1. ServiceChannel
ServiceChannel类的作用
ServiceChannel
是一个封装单个服务信道管理的类,主要用于:
- 维护与某个服务所有节点的RPC连接信道
- 提供信道的动态添加和移除功能(服务节点上下线)
- 实现轮转(Round-Robin)策略选择信道进行RPC调用
成员变量
-
_mutex
- 作用: 互斥锁,用于保护多线程环境下对信道集合的并发访问
-
_index
- 作用: 记录当前轮转位置,用于实现RR(Round-Robin)信道选择策略
-
_service_name
- 作用: 存储对应的服务名称,用于日志记录和标识
-
_channels
- 作用: 存储所有可用的信道指针,按添加顺序保存
-
_hosts
- 作用: 主机地址到信道指针的映射,用于快速查找特定主机的信道
成员函数
-
append(const std::string &host)
- 参数: 新节点的主机地址
- 作用:
- 创建新的brpc信道并配置连接选项(超时时间、重试次数、协议等)
- 初始化信道连接
- 线程安全地将新信道添加到
_channels
和_hosts
中 - 记录服务节点上线日志
-
remove(const std::string &host)
- 参数: 要移除的节点主机地址
- 作用:
- 线程安全地从
_hosts
中查找并移除指定主机的信道 - 同时从
_channels
中移除对应的信道指针 - 记录服务节点下线日志或未找到节点的警告
- 线程安全地从
-
choose()
- 返回值: 返回一个信道智能指针(
ChannelPtr
) - 作用:
- 线程安全地选择信道
- 使用RR策略从
_channels
中选择一个信道 - 如果无可用信道,返回空指针并记录错误日志
- 返回值: 返回一个信道智能指针(
// 封装单个服务的信道管理类:
class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &service_name): _index(0), _service_name(service_name) {}~ServiceChannel() {}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){auto channel = std::make_shared<brpc::Channel>();// 设置字段的属性brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接超时时间options.timeout_ms = -1; // 请求超时时间options.max_retry = 3; // 最大重试次数options.protocol = "baidu_std";bool ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;LOG_DEBUG("服务{}上线了一个节点: {}", _service_name, host);}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it != _hosts.end()){auto channel = it->second;_hosts.erase(it);auto channel_it = std::find(_channels.begin(), _channels.end(), channel);if (channel_it != _channels.end()){_channels.erase(channel_it);}LOG_DEBUG("服务{}下线了一个节点: {}", _service_name, host);}else{LOG_WARN("服务{}没有找到节点: {}", _service_name, host);}}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.empty()){LOG_ERROR("服务{}没有可用的信道!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;std::string _service_name;;std::vector<ChannelPtr> _channels;std::unordered_map<std::string, ChannelPtr> _hosts;
};
2. ServiceManager
ServiceManager类作用
ServiceManager
是一个服务信道管理类,负责管理多个服务的连接通道。它维护了关注的服务集合,并为每个服务管理其对应的通信信道,处理服务的上线和下线事件。
成员变量
-
std::unordered_set<std::string> _follow_services
- 作用:存储被关注的服务名称集合
- 特点:只有在这个集合中的服务才会被管理
-
std::unordered_map<std::string, ServiceChannel::ptr> _services
- 作用:服务名称到服务信道对象的映射
- 特点:存储每个服务的信道管理对象
成员函数
-
std::string getServiceName(const std::string &service_instance)
- 作用:从服务实例名称中提取服务名称
- 逻辑:查找最后一个’/'字符,取其前面的部分作为服务名称
- 示例:输入"user_service/v1"返回"user_service"
-
ServiceChannel::ChannelPtr choose(std::string service_name)
- 作用:获取指定服务的节点信道
- 流程:
- 加锁保护共享数据
- 查找服务是否存在
- 如果不存在记录错误日志
- 存在则调用服务的choose方法返回信道
- 返回值:服务信道指针(可能为空)
-
void declared(const std::string &service_name)
- 作用:声明关注某个服务
- 流程:
- 加锁
- 将服务名称添加到关注集合中
- 特点:只有声明关注的服务才会被管理
-
void onServiceOnline(const std::string &service_instance, const std::string &host)
- 作用:处理服务上线事件
- 流程:
- 提取服务名称
- 加锁检查是否关注该服务
- 如果未关注则记录警告并返回
- 如果服务信道不存在则创建
- 解锁后调用信道的append方法添加主机节点
- 日志:记录服务节点上线信息
-
void onServiceOffline(const std::string &service_instance, const std::string &host)
- 作用:处理服务下线事件
- 流程:
- 提取服务名称
- 加锁检查是否关注该服务
- 如果未关注或服务不存在则记录警告并返回
- 解锁后调用信道的remove方法删除主机节点
- 日志:记录服务节点下线信息
// 整体的服务信道管理类
class ServiceManager
{
private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;// 关注的服务集合(只有在这里的服务才会被管理)std::unordered_map<std::string, ServiceChannel::ptr> _services;// 服务名称到ServiceChannel的映射private:// 辅助函数,获取服务名称std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;return service_instance.substr(0, pos);}public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() = default;~ServiceManager() = default;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(std::string service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("服务{}没有找到!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 声明,关注哪些服务的上下线,不关心的就不需要管理了void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略上线通知!", service_name);return;}// 先获取管理对象,没有则创建,有则添加节点auto it = _services.find(service_name);if (it == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else{service = it->second;}}if (!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从服务信道管理中,删除指定节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);//先去找有没有这个服务auto fit = _follow_services.find(service_name);if(fit== _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略下线通知!", service_name);return;//没找到直接结束}//存在则找到该服务,删除对应的hostauto it = _services.find(service_name);if (it == _services.end()){LOG_WARN("服务{}没有找到,忽略下线通知!", service_name);return;}service = it->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}
};
3. 完整代码
/*** @file channel.hpp* @brief 对于信道管理的封装* @author qhz (2695432062@qq.com)*/
#pragma once#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
#include "logger.hpp"namespace im
{// 封装单个服务的信道管理类:class ServiceChannel{public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &service_name): _index(0), _service_name(service_name) {}~ServiceChannel() {}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){auto channel = std::make_shared<brpc::Channel>();// 设置字段的属性brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接超时时间options.timeout_ms = -1; // 请求超时时间options.max_retry = 3; // 最大重试次数options.protocol = "baidu_std";bool ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;LOG_DEBUG("服务{}上线了一个节点: {}", _service_name, host);}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it != _hosts.end()){auto channel = it->second;_hosts.erase(it);auto channel_it = std::find(_channels.begin(), _channels.end(), channel);if (channel_it != _channels.end()){_channels.erase(channel_it);}LOG_DEBUG("服务{}下线了一个节点: {}", _service_name, host);}else{LOG_WARN("服务{}没有找到节点: {}", _service_name, host);}}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.empty()){LOG_ERROR("服务{}没有可用的信道!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;std::string _service_name;;std::vector<ChannelPtr> _channels;std::unordered_map<std::string, ChannelPtr> _hosts;};// 整体的服务信道管理类class ServiceManager{private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;// 关注的服务集合(只有在这里的服务才会被管理)std::unordered_map<std::string, ServiceChannel::ptr> _services;// 服务名称到ServiceChannel的映射private:// 辅助函数,获取服务名称std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;return service_instance.substr(0, pos);}public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() = default;~ServiceManager() = default;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(std::string service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("服务{}没有找到!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 声明,关注哪些服务的上下线,不关心的就不需要管理了void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略上线通知!", service_name);return;}// 先获取管理对象,没有则创建,有则添加节点auto it = _services.find(service_name);if (it == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else{service = it->second;}}if (!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从服务信道管理中,删除指定节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);//先去找有没有这个服务auto fit = _follow_services.find(service_name);if(fit== _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略下线通知!", service_name);return;//没找到直接结束}//存在则找到该服务,删除对应的hostauto it = _services.find(service_name);if (it == _services.end()){LOG_WARN("服务{}没有找到,忽略下线通知!", service_name);return;}service = it->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}};
}