C++ brpc Channel 管理封装方案
在 C++ 中使用 brpc 时,对 Channel 进行封装管理可以大大提高代码的可维护性和复用性。下面是一个完整的 Channel 管理封装方案:
1. 基础 Channel 管理器
cpp
// ChannelManager.h
#pragma once#include <brpc/channel.h>
#include <mutex>
#include <unordered_map>
#include <memory>
#include <string>class ChannelManager {
public:static ChannelManager& GetInstance();// 禁止拷贝和移动ChannelManager(const ChannelManager&) = delete;ChannelManager& operator=(const ChannelManager&) = delete;// 获取 Channelstd::shared_ptr<brpc::Channel> GetChannel(const std::string& service_name);// 预创建 Channelbool PreCreateChannel(const std::string& service_name, const std::string& load_balancer = "");// 移除 Channelvoid RemoveChannel(const std::string& service_name);// 获取所有 Channel 状态void GetAllChannelStatus(std::unordered_map<std::string, std::string>& status_map);// 健康检查bool HealthCheck(const std::string& service_name);private:ChannelManager() = default;~ChannelManager() = default;std::shared_ptr<brpc::Channel> CreateChannel(const std::string& service_name, const std::string& load_balancer = "");private:std::mutex mutex_;std::unordered_map<std::string, std::shared_ptr<brpc::Channel>> channel_pool_;std::unordered_map<std::string, std::string> service_endpoints_;
};2. Channel 管理器实现
cpp
// ChannelManager.cpp
#include "ChannelManager.h"
#include <brpc/controller.h>
#include <glog/logging.h>ChannelManager& ChannelManager::GetInstance() {static ChannelManager instance;return instance;
}std::shared_ptr<brpc::Channel> ChannelManager::GetChannel(const std::string& service_name) {std::lock_guard<std::mutex> lock(mutex_);auto it = channel_pool_.find(service_name);if (it != channel_pool_.end()) {return it->second;}// 自动创建 Channel(使用默认负载均衡)auto channel = CreateChannel(service_name, "");if (channel) {channel_pool_[service_name] = channel;
rd.xjyl.gov.cn/upload/1982077718157033472.html
rd.xjyl.gov.cn/upload/1982077718207365120.html
rd.xjyl.gov.cn/upload/1982077718211559424.html
rd.xjyl.gov.cn/upload/1982077718710681600.html
rd.xjyl.gov.cn/upload/1982077718769401856.html
rd.xjyl.gov.cn/upload/1982077718836510720.html
rd.xjyl.gov.cn/upload/1982077718832316416.html
rd.xjyl.gov.cn/upload/1982077718853287936.html
rd.xjyl.gov.cn/upload/1982077718832316417.html
rd.xjyl.gov.cn/upload/1982077718828122112.html
rd.xjyl.gov.cn/upload/1982077718912008192.html
rd.xjyl.gov.cn/upload/1982077718920396800.html
rd.xjyl.gov.cn/upload/1982077718899425280.html
rd.xjyl.gov.cn/upload/1982077718970728448.html
rd.xjyl.gov.cn/upload/1982077718966534144.html
rd.xjyl.gov.cn/upload/1982077718987505664.html
rd.xjyl.gov.cn/upload/1982077718991699968.html
rd.xjyl.gov.cn/upload/1982077719503405056.html
rd.xjyl.gov.cn/upload/1982077719520182272.html
rd.xjyl.gov.cn/upload/1982077719524376576.html
rd.xjyl.gov.cn/upload/1982077719553736704.html}return channel;
}bool ChannelManager::PreCreateChannel(const std::string& service_name, const std::string& load_balancer) {std::lock_guard<std::mutex> lock(mutex_);if (channel_pool_.find(service_name) != channel_pool_.end()) {LOG(WARNING) << "Channel for service " << service_name << " already exists";return true;}auto channel = CreateChannel(service_name, load_balancer);if (!channel) {LOG(ERROR) << "Failed to create channel for service " << service_name;return false;}channel_pool_[service_name] = channel;LOG(INFO) << "Successfully pre-created channel for service " << service_name;return true;
}std::shared_ptr<brpc::Channel> ChannelManager::CreateChannel(const std::string& service_name, const std::string& load_balancer) {auto channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;// 配置 Channel 参数options.protocol = brpc::PROTOCOL_BAIDU_STD;options.connection_type = "single";options.timeout_ms = 1000; // 1秒超时options.max_retry = 3;// 构建服务地址std::string server_address = service_name;if (!load_balancer.empty()) {server_address += "?lb=" + load_balancer;}if (channel->Init(server_address.c_str(), load_balancer.empty() ? nullptr : load_balancer.c_str(), &options) != 0) {LOG(ERROR) << "Failed to initialize channel for " << server_address;return nullptr;}return channel;
}void ChannelManager::RemoveChannel(const std::string& service_name) {std::lock_guard<std::mutex> lock(mutex_);channel_pool_.erase(service_name);LOG(INFO) << "Removed channel for service " << service_name;
}void ChannelManager::GetAllChannelStatus(std::unordered_map<std::string, std::string>& status_map) {std::lock_guard<std::mutex> lock(mutex_);for (const auto& pair : channel_pool_) {brpc::ChannelStatistics stat;pair.second->GetStatistics(&stat);status_map[pair.first] = std::to_string(stat.error_count) + " errors, " +std::to_string(stat.success_count) + " successes";}
}bool ChannelManager::HealthCheck(const std::string& service_name) {auto channel = GetChannel(service_name);if (!channel) {return false;}// 这里可以添加具体的健康检查逻辑// 例如发送一个空的 RPC 请求或使用 Channel 的内置健康检查return true;
}3. 带重试和熔断的增强版 Channel
cpp
// EnhancedChannel.h
#pragma once#include <brpc/channel.h>
#include <brpc/controller.h>
#include <functional>
#include <memory>template<typename Request, typename Response>
class EnhancedChannel {
public:using RpcCall = std::function<void(brpc::Controller*, const Request*, Response*, google::protobuf::Closure*)>;EnhancedChannel(const std::string& service_name, std::shared_ptr<brpc::Channel> channel): service_name_(service_name), channel_(channel) {}// 同步调用bool Call(const Request& request, Response& response, const std::string& method_name,int timeout_ms = 1000, int max_retry = 3);// 异步调用bool AsyncCall(const Request& request, Response* response,const std::string& method_name,google::protobuf::Closure* done,int timeout_ms = 1000);// 获取底层 Channelstd::shared_ptr<brpc::Channel> GetRawChannel() { return channel_; }// 健康状态bool IsHealthy() const;private:std::string service_name_;std::shared_ptr<brpc::Channel> channel_;
rd.xjyl.gov.cn/upload/1982077719574708224.html
rd.xjyl.gov.cn/upload/1982077719604068352.html
rd.xjyl.gov.cn/upload/1982077719692148736.html
rd.xjyl.gov.cn/upload/1982077719721508864.html
rd.xjyl.gov.cn/upload/1982077719994138624.html
rd.xjyl.gov.cn/upload/1982077720019304448.html
rd.xjyl.gov.cn/upload/1982077720191270912.html
rd.xjyl.gov.cn/upload/1982077720208048128.html
rd.xjyl.gov.cn/upload/1982077720233213952.html
rd.xjyl.gov.cn/upload/1982077720241602560.html
rd.xjyl.gov.cn/upload/1982077720338071552.html
rd.xjyl.gov.cn/upload/1982077720396791808.html
rd.xjyl.gov.cn/upload/1982077720493260800.html
rd.xjyl.gov.cn/upload/1982077720610701312.html
rd.xjyl.gov.cn/upload/1982077720690393088.html
rd.xjyl.gov.cn/upload/1982077720866553856.html
rd.xjyl.gov.cn/upload/1982077720996577280.html
rd.xjyl.gov.cn/upload/1982077720992382976.html
rd.xjyl.gov.cn/upload/1982077721051103232.html
rd.xjyl.gov.cn/upload/1982077721021743104.html
rd.xjyl.gov.cn/upload/1982077721055297536.html
rd.xjyl.gov.cn/upload/1982077721139183616.html
rd.xjyl.gov.cn/upload/1982077721235652608.html
rd.xjyl.gov.cn/upload/1982077721244041216.html
rd.xjyl.gov.cn/upload/1982077721365676032.html
};template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::Call(const Request& request, Response& response,const std::string& method_name,int timeout_ms, int max_retry) {if (!channel_ || !IsHealthy()) {LOG(ERROR) << "Channel for " << service_name_ << " is not available";return false;}brpc::Controller cntl;cntl.set_timeout_ms(timeout_ms);cntl.set_max_retry(max_retry);// 执行 RPC 调用auto stub = std::make_shared<typename Response::Service_Stub>(channel_.get());(stub.get()->*method_name)(&cntl, &request, &response, nullptr);if (cntl.Failed()) {LOG(ERROR) << "RPC call failed: " << cntl.ErrorText();return false;}return true;
}template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::AsyncCall(const Request& request, Response* response,const std::string& method_name,google::protobuf::Closure* done,int timeout_ms) {if (!channel_ || !IsHealthy()) {LOG(ERROR) << "Channel for " << service_name_ << " is not available";return false;}brpc::Controller* cntl = new brpc::Controller();cntl->set_timeout_ms(timeout_ms);auto stub = std::make_shared<typename Response::Service_Stub>(channel_.get());(stub.get()->*method_name)(cntl, &request, response, brpc::NewCallback([cntl, done]() {delete cntl;if (done) done->Run();}));return true;
}template<typename Request, typename Response>
bool EnhancedChannel<Request, Response>::IsHealthy() const {if (!channel_) return false;// 简单的健康检查:检查 Channel 是否初始化成功// 可以扩展更复杂的健康检查逻辑brpc::ChannelStatistics stat;channel_->GetStatistics(&stat);// 如果错误率过高,认为不健康uint64_t total_calls = stat.success_count + stat.error_count;if (total_calls > 10 && (static_cast<double>(stat.error_count) / total_calls) > 0.5) {return false;}return true;
}4. 服务工厂类
cpp
// ServiceFactory.h
#pragma once#include "ChannelManager.h"
#include "EnhancedChannel.h"
#include <memory>
#include <string>template<typename ServiceStub, typename Request, typename Response>
class ServiceFactory {
public:static std::shared_ptr<EnhancedChannel<Request, Response>> GetServiceChannel(const std::string& service_name) {auto channel = ChannelManager::GetInstance().GetChannel(service_name);if (!channel) {return nullptr;}return std::make_shared<EnhancedChannel<Request, Response>>(service_name, channel);}static bool PreCreateServiceChannel(const std::string& service_name, const std::string& load_balancer = "") {return ChannelManager::GetInstance().PreCreateChannel(service_name, load_balancer);}
};5. 使用示例
cpp
// example_usage.cpp
#include "ServiceFactory.h"
#include "your_proto_file.pb.h" // 你的 protobuf 头文件class UserServiceClient {
public:UserServiceClient() {// 预创建 ChannelServiceFactory<UserService_Stub, GetUserRequest, GetUserResponse>::PreCreateServiceChannel("user.service:8000", "rr");}bool GetUserInfo(int user_id, UserInfo& user_info) {auto channel = ServiceFactory<UserService_Stub, GetUserRequest, GetUserResponse>::GetServiceChannel("user.service:8000");if (!channel) {LOG(ERROR) << "Failed to get user service channel";return false;}GetUserRequest request;GetUserResponse response;request.set_user_id(user_id);// 使用宏获取方法指针bool success = channel->Call(request, response, &UserService_Stub::GetUserInfo);if (success) {user_info = response.user_info();}return success;}
};// 监控所有 Channel 状态
void MonitorChannels() {std::unordered_map<std::string, std::string> status_map;ChannelManager::GetInstance().GetAllChannelStatus(status_map);for (const auto& pair : status_map) {LOG(INFO) << "Service: " << pair.first << " - Status: " << pair.second;}
}6. 配置化管理
cpp
// ChannelConfig.h
#pragma once#include <string>
#include <unordered_map>struct ServiceConfig {std::string endpoint;std::string load_balancer;int timeout_ms;int max_retry;
};class ChannelConfigManager {
public:static ChannelConfigManager& GetInstance();void LoadConfig(const std::string& config_file);ServiceConfig GetServiceConfig(const std::string& service_name);private:std::unordered_map<std::string, ServiceConfig> service_configs_;
};这个封装方案提供了以下特性:
单例管理:全局 Channel 管理
线程安全:使用互斥锁保护共享资源
连接复用:避免重复创建 Channel
健康检查:自动检测服务可用性
模板化设计:支持不同类型的服务
配置化:支持灵活的配置管理
监控支持:提供状态查询接口
你可以根据具体需求进一步扩展这个基础框架。
