当前位置: 首页 > news >正文

C++ brpc Channel 管理封装方案

在 C++ 中使用 brpc 时,对 Channel 进行封装管理可以大大提高代码的可维护性和复用性。下面是一个完整的 Channel 管理封装方案:

1. 基础 Channel 管理器

cpp

// ChannelManager.h
#pragma once#include <brpc/channel.h>
#include <mutex>
#include <unordered_map>
#include <memory>
#include <string>class ChannelManager {
public:static ChannelManager& GetInstance();// 禁止拷贝和移动ChannelManager(const ChannelManager&) = delete;ChannelManager& operator=(const ChannelManager&) = delete;// 获取 Channelstd::shared_ptr<brpc::Channel> GetChannel(const std::string& service_name);// 预创建 Channelbool PreCreateChannel(const std::string& service_name, const std::string& load_balancer = "");// 移除 Channelvoid RemoveChannel(const std::string& service_name);// 获取所有 Channel 状态void GetAllChannelStatus(std::unordered_map<std::string, std::string>& status_map);// 健康检查bool HealthCheck(const std::string& service_name);private:ChannelManager() = default;~ChannelManager() = default;std::shared_ptr<brpc::Channel> CreateChannel(const std::string& service_name, const std::string& load_balancer = "");private:std::mutex mutex_;std::unordered_map<std::string, std::shared_ptr<brpc::Channel>> channel_pool_;std::unordered_map<std::string, std::string> service_endpoints_;
};

2. Channel 管理器实现

cpp

// ChannelManager.cpp
#include "ChannelManager.h"
#include <brpc/controller.h>
#include <glog/logging.h>ChannelManager& ChannelManager::GetInstance() {static ChannelManager instance;return instance;
}std::shared_ptr<brpc::Channel> ChannelManager::GetChannel(const std::string& service_name) {std::lock_guard<std::mutex> lock(mutex_);auto it = channel_pool_.find(service_name);if (it != channel_pool_.end()) {return it->second;}// 自动创建 Channel(使用默认负载均衡)auto channel = CreateChannel(service_name, "");if (channel) {channel_pool_[service_name] = channel;
rd.xjyl.gov.cn/upload/1982077718157033472.html
rd.xjyl.gov.cn/upload/1982077718207365120.html
rd.xjyl.gov.cn/upload/1982077718211559424.html
rd.xjyl.gov.cn/upload/1982077718710681600.html
rd.xjyl.gov.cn/upload/1982077718769401856.html
rd.xjyl.gov.cn/upload/1982077718836510720.html
rd.xjyl.gov.cn/upload/1982077718832316416.html
rd.xjyl.gov.cn/upload/1982077718853287936.html
rd.xjyl.gov.cn/upload/1982077718832316417.html
rd.xjyl.gov.cn/upload/1982077718828122112.html
rd.xjyl.gov.cn/upload/1982077718912008192.html
rd.xjyl.gov.cn/upload/1982077718920396800.html
rd.xjyl.gov.cn/upload/1982077718899425280.html
rd.xjyl.gov.cn/upload/1982077718970728448.html
rd.xjyl.gov.cn/upload/1982077718966534144.html
rd.xjyl.gov.cn/upload/1982077718987505664.html
rd.xjyl.gov.cn/upload/1982077718991699968.html
rd.xjyl.gov.cn/upload/1982077719503405056.html
rd.xjyl.gov.cn/upload/1982077719520182272.html
rd.xjyl.gov.cn/upload/1982077719524376576.html
rd.xjyl.gov.cn/upload/1982077719553736704.html}return channel;
}bool ChannelManager::PreCreateChannel(const std::string& service_name, const std::string& load_balancer) {std::lock_guard<std::mutex> lock(mutex_);if (channel_pool_.find(service_name) != channel_pool_.end()) {LOG(WARNING) << "Channel for service " << service_name << " already exists";return true;}auto channel = CreateChannel(service_name, load_balancer);if (!channel) {LOG(ERROR) << "Failed to create channel for service " << service_name;return false;}channel_pool_[service_name] = channel;LOG(INFO) << "Successfully pre-created channel for service " << service_name;return true;
}std::shared_ptr<brpc::Channel> ChannelManager::CreateChannel(const std::string& service_name, const std::string& load_balancer) {auto channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;// 配置 Channel 参数options.protocol = brpc::PROTOCOL_BAIDU_STD;options.connection_type = "single";options.timeout_ms = 1000; // 1秒超时options.max_retry = 3;// 构建服务地址std::string server_address = service_name;if (!load_balancer.empty()) {server_address += "?lb=" + load_balancer;}if (channel->Init(server_address.c_str(), load_balancer.empty() ? nullptr : load_balancer.c_str(), &options) != 0) {LOG(ERROR) << "Failed to initialize channel for " << server_address;return nullptr;}return channel;
}void ChannelManager::RemoveChannel(const std::string& service_name) {std::lock_guard<std::mutex> lock(mutex_);channel_pool_.erase(service_name);LOG(INFO) << "Removed channel for service " << service_name;
}void ChannelManager::GetAllChannelStatus(std::unordered_map<std::string, std::string>& status_map) {std::lock_guard<std::mutex> lock(mutex_);for (const auto& pair : channel_pool_) {brpc::ChannelStatistics stat;pair.second->GetStatistics(&stat);status_map[pair.first] = std::to_string(stat.error_count) + " errors, " +std::to_string(stat.success_count) + " successes";}
}bool ChannelManager::HealthCheck(const std::string& service_name) {auto channel = GetChannel(service_name);if (!channel) {return false;}// 这里可以添加具体的健康检查逻辑// 例如发送一个空的 RPC 请求或使用 Channel 的内置健康检查return true;
}

3. 带重试和熔断的增强版 Channel

cpp

// EnhancedChannel.h
#pragma once#include <brpc/channel.h>
#include <brpc/controller.h>
#include <functional>
#include <memory>template<typename Request, typename Response>
class EnhancedChannel {
public:using RpcCall = std::function<void(brpc::Controller*, const Request*, Response*, google::protobuf::Closure*)>;EnhancedChannel(const std::string& service_name, std::shared_ptr<brpc::Channel> channel): service_name_(service_name), channel_(channel) {}// 同步调用bool Call(const Request& request, Response& response, const std::string& method_name,int timeout_ms = 1000, int max_retry = 3);// 异步调用bool AsyncCall(const Request& request, Response* response,const std::string& method_name,google::protobuf::Closure* done,int timeout_ms = 1000);// 获取底层 Channelstd::shared_ptr<brpc::Channel> GetRawChannel() { return channel_; }// 健康状态bool IsHealthy() const;private:std::string service_name_;std::shared_ptr<brpc::Channel> channel_;
rd.xjyl.gov.cn/upload/1982077719574708224.html
rd.xjyl.gov.cn/upload/1982077719604068352.html
rd.xjyl.gov.cn/upload/1982077719692148736.html
rd.xjyl.gov.cn/upload/1982077719721508864.html
rd.xjyl.gov.cn/upload/1982077719994138624.html
rd.xjyl.gov.cn/upload/1982077720019304448.html
rd.xjyl.gov.cn/upload/1982077720191270912.html
rd.xjyl.gov.cn/upload/1982077720208048128.html
rd.xjyl.gov.cn/upload/1982077720233213952.html
rd.xjyl.gov.cn/upload/1982077720241602560.html
rd.xjyl.gov.cn/upload/1982077720338071552.html
rd.xjyl.gov.cn/upload/1982077720396791808.html
rd.xjyl.gov.cn/upload/1982077720493260800.html
rd.xjyl.gov.cn/upload/1982077720610701312.html
rd.xjyl.gov.cn/upload/1982077720690393088.html
rd.xjyl.gov.cn/upload/1982077720866553856.html
rd.xjyl.gov.cn/upload/1982077720996577280.html
rd.xjyl.gov.cn/upload/1982077720992382976.html
rd.xjyl.gov.cn/upload/1982077721051103232.html
rd.xjyl.gov.cn/upload/1982077721021743104.html
rd.xjyl.gov.cn/upload/1982077721055297536.html
rd.xjyl.gov.cn/upload/1982077721139183616.html
rd.xjyl.gov.cn/upload/1982077721235652608.html
rd.xjyl.gov.cn/upload/1982077721244041216.html
rd.xjyl.gov.cn/upload/1982077721365676032.html
};template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::Call(const Request& request, Response& response,const std::string& method_name,int timeout_ms, int max_retry) {if (!channel_ || !IsHealthy()) {LOG(ERROR) << "Channel for " << service_name_ << " is not available";return false;}brpc::Controller cntl;cntl.set_timeout_ms(timeout_ms);cntl.set_max_retry(max_retry);// 执行 RPC 调用auto stub = std::make_shared<typename Response::Service_Stub>(channel_.get());(stub.get()->*method_name)(&cntl, &request, &response, nullptr);if (cntl.Failed()) {LOG(ERROR) << "RPC call failed: " << cntl.ErrorText();return false;}return true;
}template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::AsyncCall(const Request& request, Response* response,const std::string& method_name,google::protobuf::Closure* done,int timeout_ms) {if (!channel_ || !IsHealthy()) {LOG(ERROR) << "Channel for " << service_name_ << " is not available";return false;}brpc::Controller* cntl = new brpc::Controller();cntl->set_timeout_ms(timeout_ms);auto stub = std::make_shared<typename Response::Service_Stub>(channel_.get());(stub.get()->*method_name)(cntl, &request, response, brpc::NewCallback([cntl, done]() {delete cntl;if (done) done->Run();}));return true;
}template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::IsHealthy() const {if (!channel_) return false;// 简单的健康检查:检查 Channel 是否初始化成功// 可以扩展更复杂的健康检查逻辑brpc::ChannelStatistics stat;channel_->GetStatistics(&stat);// 如果错误率过高,认为不健康uint64_t total_calls = stat.success_count + stat.error_count;if (total_calls > 10 && (static_cast<double>(stat.error_count) / total_calls) > 0.5) {return false;}return true;
}

4. 服务工厂类

cpp

// ServiceFactory.h
#pragma once#include "ChannelManager.h"
#include "EnhancedChannel.h"
#include <memory>
#include <string>template<typename ServiceStub, typename Request, typename Response>
class ServiceFactory {
public:static std::shared_ptr<EnhancedChannel<Request, Response>> GetServiceChannel(const std::string& service_name) {auto channel = ChannelManager::GetInstance().GetChannel(service_name);if (!channel) {return nullptr;}return std::make_shared<EnhancedChannel<Request, Response>>(service_name, channel);}static bool PreCreateServiceChannel(const std::string& service_name, const std::string& load_balancer = "") {return ChannelManager::GetInstance().PreCreateChannel(service_name, load_balancer);}
};

5. 使用示例

cpp

// example_usage.cpp
#include "ServiceFactory.h"
#include "your_proto_file.pb.h"  // 你的 protobuf 头文件class UserServiceClient {
public:UserServiceClient() {// 预创建 ChannelServiceFactory<UserService_Stub, GetUserRequest, GetUserResponse>::PreCreateServiceChannel("user.service:8000", "rr");}bool GetUserInfo(int user_id, UserInfo& user_info) {auto channel = ServiceFactory<UserService_Stub, GetUserRequest, GetUserResponse>::GetServiceChannel("user.service:8000");if (!channel) {LOG(ERROR) << "Failed to get user service channel";return false;}GetUserRequest request;GetUserResponse response;request.set_user_id(user_id);// 使用宏获取方法指针bool success = channel->Call(request, response, &UserService_Stub::GetUserInfo);if (success) {user_info = response.user_info();}return success;}
};// 监控所有 Channel 状态
void MonitorChannels() {std::unordered_map<std::string, std::string> status_map;ChannelManager::GetInstance().GetAllChannelStatus(status_map);for (const auto& pair : status_map) {LOG(INFO) << "Service: " << pair.first << " - Status: " << pair.second;}
}

6. 配置化管理

cpp

// ChannelConfig.h
#pragma once#include <string>
#include <unordered_map>struct ServiceConfig {std::string endpoint;std::string load_balancer;int timeout_ms;int max_retry;
};class ChannelConfigManager {
public:static ChannelConfigManager& GetInstance();void LoadConfig(const std::string& config_file);ServiceConfig GetServiceConfig(const std::string& service_name);private:std::unordered_map<std::string, ServiceConfig> service_configs_;
};

这个封装方案提供了以下特性:

  1. 单例管理:全局 Channel 管理

  2. 线程安全:使用互斥锁保护共享资源

  3. 连接复用:避免重复创建 Channel

  4. 健康检查:自动检测服务可用性

  5. 模板化设计:支持不同类型的服务

  6. 配置化:支持灵活的配置管理

  7. 监控支持:提供状态查询接口

你可以根据具体需求进一步扩展这个基础框架。

http://www.dtcms.com/a/529930.html

相关文章:

  • 合肥网站建设报价做美食教程的网站
  • 企业网站空间买虚拟主机网站做后怎么可以在百度搜索到
  • 网站建设部岗位职责直播软件开发公司
  • app免费制作网站模板北京最新新闻
  • 网站建设物理架构服务管理系统
  • 网站备案 法人变更做品牌网站找谁
  • 使用 C# 流式解析 超大XML:按路径遍历子节点的实用方法
  • 网站建设 成都上海网站建设就q479185700顶上
  • 中小企业网站制作广州网络服务公司找赛合公司点个赞科技 网站制作
  • 网站建设 全是乱码百度一下你就知道官网网页版
  • 网站域名注册证明wordpress页面布局插件
  • Java 项目 — 五种创建方式
  • 可以做推文的网站为何公司做的网站很丑
  • 宜春网站开发公司电话可以用来展示的网站
  • custed谁做的网站商品详情页设计模板
  • 【开题答辩全过程】以 布哩民宿预定系统的设计与实现为例,包含答辩的问题和答案
  • 不到网站是为什么深圳开发微信公众号
  • 网站基础建设一般多少钱自己做网站的服务器
  • 平台网站模板素材图片网页设计与网站开发项目
  • 专业建网站平台电商网站需要哪些备案
  • 网站设计维护内容asp.net 网站管理工具 安全
  • AUTOSAR AP通信管理规范:设计背景与技术实现解析
  • 网上订餐网站模板wordpress如何加html
  • 【开发者导航】高性能跨平台数据整理与分析工具:qsv让CSV处理更高效
  • 佛山市网站建设 乾图信息科技一 网站建设方案
  • 毕业设计做视频网站江苏建设部网站
  • 成都企业网站备案流程dede 网站目录
  • Spring Boot3零基础教程,Mybatis 自动配置解析,笔记53
  • 俄语网站里做外贸shop中国对外建设有限公司网站
  • 南昌网站设计公司哪家好石家庄新闻频道