当前位置: 首页 > news >正文

brpc的安装与使用介绍以及channel的封装

文章目录

  • 前言
  • 一、 介绍
    • 1. brpc介绍
    • 2. 与其它rpc对比:
  • 二、 ubuntu 24.04安装brpc
  • 三、 使用介绍与简单示例
    • 1. proto文件编写
    • 2. 服务端编写
      • 1. 继承EchoService重写Echo服务
      • 2. 创建rpc服务器类,搭建服务器
      • 3. 向服务器对象中,新增EchoService服务
      • 4. 启动服务器
      • 5. server完整代码
    • 3. 客户端编写
      • 1. 构造Channel信道,连接服务器
      • 2. 构造存根对象,用于rpc调用
      • 3. 进行rpc调用 --同步调用
      • 4. 异步调用
      • 5. client完整代码
    • 4. makefile编写
    • 5. 运行结果
  • 四、 channel的封装
    • 1. ServiceChannel
    • 2. ServiceManager
    • 3. 完整代码


前言

这是一个ChatIM的项目,有兴趣的伙伴可以照着我的博客和gitee进行完成
gitee链接:https://gitee.com/qi-haozhe/chat-im

一、 介绍

1. brpc介绍

brpc是用c++语言编写的工业级RPC框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统。
你可以使用它:

  • 搭建能在一个端口支持多协议的服务, 或访问各种服务
    • restful http/https, h2/gRPC。使用brpc的http实现比libcurl方便多了。从其他语言通过HTTP/h2+json访问基于protobuf的协议.
    • redis和memcached, 线程安全,比官方client更方便。
    • rtmp/flv/hls, 可用于搭建流媒体服务.
    • 支持thrift , 线程安全,比官方client更方便
    • 各种百度内使用的协议: baidu_std, streaming_rpc, hulu_pbrpc, sofa_pbrpc, nova_pbrpc, public_pbrpc, ubrpc和使用nshead的各种协议.
    • 基于工业级的RAFT算法实现搭建高可用分布式系统,已在braft开源。
  • Server能同步或异步处理请求。
  • Client支持同步、异步、半同步,或使用组合channels简化复杂的分库或并发访问。
  • 通过http界面调试服务, 使用cpu, heap, contention profilers.
  • 获得更好的延时和吞吐.
  • 把你组织中使用的协议快速地加入brpc,或定制各类组件, 包括命名服务(dns, zk, etcd), 负载均衡(rr, random, consistent hashing)

2. 与其它rpc对比:

对比维度brpcgRPCThriftDubbo
开发者百度GoogleFacebook(现 Apache)阿里巴巴(现 Apache)
语言支持C++(核心),支持 Java、Python 等(通过包装)C++、Java、Python、Go 等支持多种语言(C++、Java、Python 等)主要支持 Java
核心协议基于 Protocol Buffers(PB),支持 HTTP、gRPC、Thrift、FLV 等 20+ 协议HTTP/2 + PB自定义二进制协议(TBinaryProtocol)Dubbo 协议(基于 TCP 的长连接)
性能高性能,多线程优化出色,QPS 和延迟表现优于其他框架(尤其在长尾请求处理上)高性能(基于 HTTP/2),但长尾请求处理弱于 brpc性能较好,但二进制协议扩展性较差高性能(专为 Java 优化)
序列化PB(高效,支持前向/后向兼容)PB(强制使用)支持 PB、JSON、二进制等(但二进制协议灵活性低)Hessian2(默认)、PB、Java 序列化等
服务发现支持 DNS、ZooKeeper、etcd 等,内置 BNS(百度命名服务)支持 etcd、Consul 等(需集成)需自行实现或集成第三方组件支持 ZooKeeper、Nacos 等
负载均衡内置轮询、随机、一致性哈希等策略,支持本地化感知支持轮询、权重等(需集成)需自行实现或集成第三方组件支持随机、轮询、最少活跃调用等
异步调用通过回调函数支持客户端异步调用,流式传输接口原生支持异步客户端(需手动管理线程)支持异步调用(但接口复杂)支持异步调用(基于 CompletableFuture)
调试与运维内置 HTTP 调试接口,支持 CPU、内存、争用分析器,问题排查方便需集成第三方工具(如 Prometheus)缺乏统一调试接口需集成第三方工具(如 SkyWalking)
多协议支持优势:同一端口支持多协议(如 HTTP + gRPC + Redis)仅支持 HTTP/2(gRPC 协议)需为不同协议实现单独服务端仅支持 Dubbo 协议
扩展性高扩展性,支持自定义协议、命名服务、负载均衡等扩展性较强(通过 HTTP/2 扩展)扩展性较差(二进制协议修改成本高)扩展性较强(通过 SPI 机制)
社区与生态百度内部大规模使用,GitHub 14.5K Star,社区活跃Google 主导,多语言支持,生态完善早期流行,但社区活跃度下降国内广泛使用,社区活跃
适用场景高性能分布式系统(如搜索、推荐、广告等),需处理长尾请求的场景跨语言微服务架构(尤其适合 Go/Python 与 Java 互通)内部服务通信(对性能要求高且语言统一)Java 微服务架构(如电商、金融)

总结:

  1. brpc

    • 优势:高性能(尤其长尾请求处理)、多协议支持、内置调试工具、百度内部大规模验证。
    • 适用场景:C++ 高性能服务、混合协议通信(如同时支持 HTTP 和 RPC)、需要深度定制的分布式系统。
  2. gRPC

    • 优势:跨语言支持、生态完善、基于 HTTP/2 标准。
    • 适用场景:多语言微服务架构(如 Go/Python 与 Java 互通)、云原生环境。
  3. Thrift

    • 优势:早期跨语言 RPC 解决方案,性能较好。
    • 局限:二进制协议扩展性差,社区活跃度下降。
    • 适用场景:遗留系统或对性能要求高且语言统一的服务。
  4. Dubbo

    • 优势:Java 生态成熟,功能丰富(如服务治理、集群容错)。
    • 局限:仅支持 Java,多语言场景需额外适配。
    • 适用场景:Java 微服务架构(如电商、金融领域)。

二、 ubuntu 24.04安装brpc

安装
先安装依赖

//C++ :
dev@dev-host:~/workspace$ sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

安装brpc

//C++:
dev@dev-host:~/workspace$ git clone https://github.com/apache/brpc.git 
dev@dev-host:~/workspace$ cd brpc/ 
dev@dev-host:~/workspace/brpc$ mkdir build && cd build 
dev@dev-host:~/workspace/brpc/build$ cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6 
dev@dev-host:~/workspace/brpc/build$ make && sudo make install

三、 使用介绍与简单示例

1. proto文件编写

使用brpc会用到protobuf,如果不熟悉的话,大家可以先去学一下。
这里我们先实现一个简单的服务,echo服务,你要远程调用什么服务,那就在proto文件里写什么服务。
比如我未来要调用echo服务,那我proto文件里就写了一个

service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}

这个定义模板如下:
服务名称随便起个名字,函数名也可以随便起。一个服务下面可以有多个函数,就是多个功能。

service 服务名称 {rpc 函数名(Request) returns (Response);
}

比如说文件服务,我可以上传也可以下载,可以上传多个也可以下载多个,所以一个FileService服务里就可以有多个函数

service FileService {rpc GetSingleFile(GetSingleFileReq) returns (GetSingleFileRsp);rpc GetMultiFile(GetMultiFileReq) returns (GetMultiFileRsp);rpc PutSingleFile(PutSingleFileReq) returns (PutSingleFileRsp);rpc PutMultiFile(PutMultiFileReq) returns (PutMultiFileRsp);
}

而且每个函数只有一个参数和一个返回值,就是你需要定义的request和response。在echo服务中这个request和response就是下面这个:

message EchoRequest {string message = 1;
}message EchoResponse {string message = 1;
}

这样就在proto文件里写好proto文件了

//main.proto文件
syntax="proto3";package example;option cc_generic_services = true;message EchoRequest {string message = 1;
}message EchoResponse {string message = 1;
}service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}

之后调用

protoc --cpp_out=. main.proto

会自动生成这俩文件,我们等等写客户端和服务器要用到。
在这里插入图片描述

2. 服务端编写

服务端大体流程:
1.创建rpc服务子类继承pb中的EchoService服务类,并实现内部的业务接口逻辑
2.创建rpc服务器类,搭建服务器
3. 向服务器类中添加 rpc子服务对象 – 告诉服务器收到什么请求用哪个接口处理
4.启动服务器

这是每个rpc服务都适用的一套流程

1. 继承EchoService重写Echo服务

重写的方法就是.proto文件中声明的,这个方法有四个参数,req和resp是从req中获取数据,进行业务处理,并把结果写入到resp中。
其中,第一个参数RpcController,这是一个上下文管理类,主要是用来判断请求是否ok。
而第四个参数Closure,在服务器端当响应处理完毕后,需要显示效用这个类中run方法,告诉brpc响应已经处理完了,结果已经写入到resp中,可以给客户端进行响应了。
而为了防止用户忘记调用run,我们可以使用ClosureGuard来管理这个closure对象,他会帮我们调用run方法。

//1. 继承于EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService {public:EchoServiceImpl(){}~EchoServiceImpl(){}void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = request->message() + "--这是响应!!";response->set_message(str);//done->Run();}void test() {std::cout << "test" << std::endl;}
};

2. 创建rpc服务器类,搭建服务器

构造一个server

	//2. 构造服务器对象brpc::Server server;

3. 向服务器对象中,新增EchoService服务

把刚才实现的EchoServiceImpl实例化一个对象,然后调用server.AddService方法,把这个对象添加到服务器中,以后有客户端连接过来就可以在服务器中找到这个服务,并调用该服务中的函数。

	//3. 向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}

4. 启动服务器

启动服务器相当于在指定的ip,port上开放,别的客户端可以连接这个server进行远程调用。
启动服务器需要填写监听端口和一个ServerOptions对象.
这个对象主要是设置服务器相关选项,多长时未发送连接相关活动就断开连接,io的线程数量等。

//4. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; //连接空闲超时时间-超时后连接被关闭options.num_threads = 1; // io线程数量ret = server.Start(8080, &options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit();//修改等待运行结束

5. server完整代码

#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"//1. 继承于EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService {public:EchoServiceImpl(){}~EchoServiceImpl(){}void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);std::cout << "收到消息:" << request->message() << std::endl;std::string str = request->message() + "--这是响应!!";response->set_message(str);//done->Run();}void test() {std::cout << "test" << std::endl;}
};
int main(int argc, char *argv[])
{//关闭brpc的默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);//2. 构造服务器对象brpc::Server server;//3. 向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}//4. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; //连接空闲超时时间-超时后连接被关闭options.num_threads = 1; // io线程数量ret = server.Start(8080, &options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}server.RunUntilAskedToQuit();//修改等待运行结束return 0;
}

3. 客户端编写

客户端大体流程:
1.创建网络通信信道
2.实例化pb中的EchoService Stub类对象
3.发起rpc请求,获取响应进行处理

1. 构造Channel信道,连接服务器

server端绑定了ip+port了,服务器必须得直到ip+port才能去访问这个server,这件事由channel去做,此外还需要设置一些其它的属性,其他属性需要由option来传入channel:
可以看到的:channel.Init("127.0.0.1:8080", &options),由此可知,channel是用来网络通信的,一个channel对应于一个指定的连接,也就是说可以通过这个channel连接指定ip+port的server,调用指定的服务。

//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; //rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std"; //序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1) {std::cout << "初始化信道失败!\n";return -1;}

2. 构造存根对象,用于rpc调用

利用channel创建个存根,我们就可以利用这个stub,像调用本地方法一样调用本地服务了,brpc内部会自动进行网络传输,获取结果。

	//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);

3. 进行rpc调用 --同步调用

同步调用Echo会阻塞等待响应的返回。
同样的,在客户端的调用中也有四个参数。
第一个参数Controller是用于判断请求是否Ok.
第四个参数就不同了,在服务器中第四个参数是为了告知brpc业务已经处理完毕,可以进行返回响应。客户端的第四个参数主要是为了支持异步调用。

    example::EchoRequest req;req.set_message("你好,lkm");example::EchoResponse resp;brpc::Controller cntl;stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed() == true) {std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return -1;} std::cout << "收到响应: " << rsp->message() << std::endl;delete cntl;delete rsp;

4. 异步调用

第四个参数是一个Closure对象,它可以设置一个回调函数,当响应返回后调用设置的回调函数进行处理。
需要注意的是,由于Echo现在是异步调用,所有他在调用完后就会立即返回,所以为了防止作用域问题,我们需要把resp和Controller在堆上创建。

	 //3. 进行Rpc调用/example::EchoRequest req;req.set_message("你好 server!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();auto clusure = google::protobuf::NewCallback(callback, cntl, rsp);stub.Echo(cntl, &req, rsp, clusure); //异步调用std::cout << "异步调用结束!\n";std::this_thread::sleep_for(std::chrono::seconds(3));

5. client完整代码

#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"void callback(brpc::Controller* cntl, ::example::EchoResponse* response) {std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<example::EchoResponse> resp_guard(response);if (cntl->Failed() == true) {std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << response->message() << std::endl;
}int main(int argc, char *argv[])
{//1. 构造Channel信道,连接服务器brpc::ChannelOptions options;options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待options.timeout_ms = -1; //rpc请求等待超时时间,-1表示一直等待options.max_retry = 3;//请求重试次数options.protocol = "baidu_std"; //序列化协议,默认使用baidu_stdbrpc::Channel channel;int ret = channel.Init("127.0.0.1:8080", &options);if (ret == -1) {std::cout << "初始化信道失败!\n";return -1;}//2. 构造EchoService_Stub对象,用于进行rpc调用example::EchoService_Stub stub(&channel);//3. 进行Rpc调用/example::EchoRequest req;req.set_message("你好 server!");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();// stub.Echo(cntl, &req, rsp, nullptr);// if (cntl->Failed() == true) {//     std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;//     return -1;// }// std::cout << "收到响应: " << rsp->message() << std::endl;// delete cntl;// delete rsp;auto clusure = google::protobuf::NewCallback(callback, cntl, rsp);stub.Echo(cntl, &req, rsp, clusure); //异步调用std::cout << "异步调用结束!\n";std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}

4. makefile编写

比较简单不多赘述

all : server client
server : server.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb -ldl -pthread
client : client.cc main.pb.ccg++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb -ldl -pthread@PHONY:clean
clean:rm -f client server

5. 运行结果

启动两个窗口,一个运行server,一个运行client即可。
在这里插入图片描述
在这里插入图片描述

四、 channel的封装

1. ServiceChannel

ServiceChannel类的作用
ServiceChannel 是一个封装单个服务信道管理的类,主要用于:

  1. 维护与某个服务所有节点的RPC连接信道
  2. 提供信道的动态添加和移除功能(服务节点上下线)
  3. 实现轮转(Round-Robin)策略选择信道进行RPC调用

成员变量

  1. _mutex

    • 作用: 互斥锁,用于保护多线程环境下对信道集合的并发访问
  2. _index

    • 作用: 记录当前轮转位置,用于实现RR(Round-Robin)信道选择策略
  3. _service_name

    • 作用: 存储对应的服务名称,用于日志记录和标识
  4. _channels

    • 作用: 存储所有可用的信道指针,按添加顺序保存
  5. _hosts

    • 作用: 主机地址到信道指针的映射,用于快速查找特定主机的信道

成员函数

  1. append(const std::string &host)

    • 参数: 新节点的主机地址
    • 作用:
      • 创建新的brpc信道并配置连接选项(超时时间、重试次数、协议等)
      • 初始化信道连接
      • 线程安全地将新信道添加到_channels_hosts
      • 记录服务节点上线日志
  2. remove(const std::string &host)

    • 参数: 要移除的节点主机地址
    • 作用:
      • 线程安全地从_hosts中查找并移除指定主机的信道
      • 同时从_channels中移除对应的信道指针
      • 记录服务节点下线日志或未找到节点的警告
  3. choose()

    • 返回值: 返回一个信道智能指针(ChannelPtr)
    • 作用:
      • 线程安全地选择信道
      • 使用RR策略从_channels中选择一个信道
      • 如果无可用信道,返回空指针并记录错误日志
// 封装单个服务的信道管理类:
class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &service_name): _index(0), _service_name(service_name) {}~ServiceChannel() {}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){auto channel = std::make_shared<brpc::Channel>();// 设置字段的属性brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接超时时间options.timeout_ms = -1;         // 请求超时时间options.max_retry = 3;           // 最大重试次数options.protocol = "baidu_std";bool ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;LOG_DEBUG("服务{}上线了一个节点: {}", _service_name, host);}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it != _hosts.end()){auto channel = it->second;_hosts.erase(it);auto channel_it = std::find(_channels.begin(), _channels.end(), channel);if (channel_it != _channels.end()){_channels.erase(channel_it);}LOG_DEBUG("服务{}下线了一个节点: {}", _service_name, host);}else{LOG_WARN("服务{}没有找到节点: {}", _service_name, host);}}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.empty()){LOG_ERROR("服务{}没有可用的信道!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;std::string _service_name;;std::vector<ChannelPtr> _channels;std::unordered_map<std::string, ChannelPtr> _hosts;
};

2. ServiceManager

ServiceManager类作用
ServiceManager 是一个服务信道管理类,负责管理多个服务的连接通道。它维护了关注的服务集合,并为每个服务管理其对应的通信信道,处理服务的上线和下线事件。

成员变量

  1. std::unordered_set<std::string> _follow_services

    • 作用:存储被关注的服务名称集合
    • 特点:只有在这个集合中的服务才会被管理
  2. std::unordered_map<std::string, ServiceChannel::ptr> _services

    • 作用:服务名称到服务信道对象的映射
    • 特点:存储每个服务的信道管理对象

成员函数

  1. std::string getServiceName(const std::string &service_instance)

    • 作用:从服务实例名称中提取服务名称
    • 逻辑:查找最后一个’/'字符,取其前面的部分作为服务名称
    • 示例:输入"user_service/v1"返回"user_service"
  2. ServiceChannel::ChannelPtr choose(std::string service_name)

    • 作用:获取指定服务的节点信道
    • 流程:
      1. 加锁保护共享数据
      2. 查找服务是否存在
      3. 如果不存在记录错误日志
      4. 存在则调用服务的choose方法返回信道
    • 返回值:服务信道指针(可能为空)
  3. void declared(const std::string &service_name)

    • 作用:声明关注某个服务
    • 流程:
      1. 加锁
      2. 将服务名称添加到关注集合中
    • 特点:只有声明关注的服务才会被管理
  4. void onServiceOnline(const std::string &service_instance, const std::string &host)

    • 作用:处理服务上线事件
    • 流程:
      1. 提取服务名称
      2. 加锁检查是否关注该服务
      3. 如果未关注则记录警告并返回
      4. 如果服务信道不存在则创建
      5. 解锁后调用信道的append方法添加主机节点
    • 日志:记录服务节点上线信息
  5. void onServiceOffline(const std::string &service_instance, const std::string &host)

    • 作用:处理服务下线事件
    • 流程:
      1. 提取服务名称
      2. 加锁检查是否关注该服务
      3. 如果未关注或服务不存在则记录警告并返回
      4. 解锁后调用信道的remove方法删除主机节点
    • 日志:记录服务节点下线信息
// 整体的服务信道管理类
class ServiceManager
{
private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;// 关注的服务集合(只有在这里的服务才会被管理)std::unordered_map<std::string, ServiceChannel::ptr> _services;// 服务名称到ServiceChannel的映射private:// 辅助函数,获取服务名称std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;return service_instance.substr(0, pos);}public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() = default;~ServiceManager() = default;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(std::string service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("服务{}没有找到!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 声明,关注哪些服务的上下线,不关心的就不需要管理了void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略上线通知!", service_name);return;}// 先获取管理对象,没有则创建,有则添加节点auto it = _services.find(service_name);if (it == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else{service = it->second;}}if (!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从服务信道管理中,删除指定节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);//先去找有没有这个服务auto fit = _follow_services.find(service_name);if(fit== _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略下线通知!", service_name);return;//没找到直接结束}//存在则找到该服务,删除对应的hostauto it = _services.find(service_name);if (it == _services.end()){LOG_WARN("服务{}没有找到,忽略下线通知!", service_name);return;}service = it->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}
};

3. 完整代码

/*** @file channel.hpp* @brief 对于信道管理的封装* @author qhz (2695432062@qq.com)*/
#pragma once#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
#include "logger.hpp"namespace im
{// 封装单个服务的信道管理类:class ServiceChannel{public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &service_name): _index(0), _service_name(service_name) {}~ServiceChannel() {}// 服务上线了一个节点,则调用append新增信道void append(const std::string &host){auto channel = std::make_shared<brpc::Channel>();// 设置字段的属性brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 连接超时时间options.timeout_ms = -1;         // 请求超时时间options.max_retry = 3;           // 最大重试次数options.protocol = "baidu_std";bool ret = channel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(channel);_hosts[host] = channel;LOG_DEBUG("服务{}上线了一个节点: {}", _service_name, host);}// 服务下线了一个节点,则调用remove释放信道void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it != _hosts.end()){auto channel = it->second;_hosts.erase(it);auto channel_it = std::find(_channels.begin(), _channels.end(), channel);if (channel_it != _channels.end()){_channels.erase(channel_it);}LOG_DEBUG("服务{}下线了一个节点: {}", _service_name, host);}else{LOG_WARN("服务{}没有找到节点: {}", _service_name, host);}}// 通过RR轮转策略,获取一个Channel用于发起对应服务的Rpc调用ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.empty()){LOG_ERROR("服务{}没有可用的信道!", _service_name);return ChannelPtr();}int32_t index = _index++ % _channels.size();return _channels[index];}private:std::mutex _mutex;int32_t _index;std::string _service_name;;std::vector<ChannelPtr> _channels;std::unordered_map<std::string, ChannelPtr> _hosts;};// 整体的服务信道管理类class ServiceManager{private:std::mutex _mutex;std::unordered_set<std::string> _follow_services;// 关注的服务集合(只有在这里的服务才会被管理)std::unordered_map<std::string, ServiceChannel::ptr> _services;// 服务名称到ServiceChannel的映射private:// 辅助函数,获取服务名称std::string getServiceName(const std::string &service_instance){auto pos = service_instance.find_last_of('/');if (pos == std::string::npos)return service_instance;return service_instance.substr(0, pos);}public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() = default;~ServiceManager() = default;// 获取指定服务的节点信道ServiceChannel::ChannelPtr choose(std::string service_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if (it == _services.end()){LOG_ERROR("服务{}没有找到!", service_name);return ServiceChannel::ChannelPtr();}return it->second->choose();}// 声明,关注哪些服务的上下线,不关心的就不需要管理了void declared(const std::string &service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}// 服务上线时调用的回调接口,将服务节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略上线通知!", service_name);return;}// 先获取管理对象,没有则创建,有则添加节点auto it = _services.find(service_name);if (it == _services.end()){service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else{service = it->second;}}if (!service){LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}// 服务下线时调用的回调接口,从服务信道管理中,删除指定节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);//先去找有没有这个服务auto fit = _follow_services.find(service_name);if(fit== _follow_services.end()){LOG_WARN("服务{}没有被关注,忽略下线通知!", service_name);return;//没找到直接结束}//存在则找到该服务,删除对应的hostauto it = _services.find(service_name);if (it == _services.end()){LOG_WARN("服务{}没有找到,忽略下线通知!", service_name);return;}service = it->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}};
}
http://www.dtcms.com/a/289780.html

相关文章:

  • spring-ai-alibaba 迭代字符分割器
  • RPG61.制作敌人攻击波数一
  • 30天打牢数模基础-AdaBoost讲解
  • CICS Application Programming Fundamentals 第8-6章
  • arinc818_icd设计范例
  • LLVM中AST节点类型
  • RGB颜色值如何转到灰度值
  • [每日随题14] 递推 - 滑动窗口 - 数学
  • JavaScript 中Object、Array 和 String的常用方法
  • java抗疫物质管理系统设计和实现
  • 【超分辨率专题】OSEDiff:针对Real-World ISR的单步Diffusion
  • [FDBUS 4.2]fdbus消息发送失败后的流程处理
  • SigLIP和SigLIP2
  • 题单【循环结构】
  • maven构建Could not transfer artifact失败原因
  • 系统思考:整体论
  • 【成品设计】基于STM32的家庭用水检测系统设计
  • 2025《艾诺提亚失落之歌》新手攻略
  • 看板中如何处理跨职能任务协作?
  • 大模型词表设计与作用解析
  • Autosar RTE实现观测量生成-基于ETAS软件
  • [Python] -项目实践2- 用Python快速抓取网页内容(爬虫入门)
  • python网络爬虫小项目(爬取评论)超级简单
  • 阶段1--Linux中的计划任务
  • 调试Claude code的正确姿势
  • 类型混淆、越界写入漏洞
  • 基于单片机出租车计价器设计
  • 重塑优化建模与算法设计:2024上半年大模型(LLM)在优化领域的应用盘点
  • Java入门-【3】变量、字符串、条件、循环、数组的简单总结
  • python 字典中取值