C++基于 brpc 的 Channel 管理封装
文章目录
- 基于 brpc 的 Channel 管理封装:实现服务动态发现与负载均衡
- 设计思路
- 代码实现与解析
- 头文件与类型定义
- ServiceChannel 类实现
- ServiceManager 类实现
- 核心功能解析
- 1. 线程安全设计
- 2. 服务动态管理
- 3. 负载均衡策略
- 4. 资源管理
- 使用场景
- 附录
基于 brpc 的 Channel 管理封装:实现服务动态发现与负载均衡
在分布式系统中,高效的服务通信是核心环节之一。百度的 brpc 框架提供了高性能的 RPC 通信能力,但在实际生产环境中,我们还需要解决服务节点动态上下线、负载均衡等问题。本文将介绍如何封装 brpc 的 Channel,实现一个可动态管理服务节点、支持负载均衡的服务通信组件。
设计思路
我们的封装主要包含两个核心类:
ServiceChannel
:管理单个服务的多个实例通道,负责通道的创建、删除和选择ServiceManager
:管理多个服务,负责服务的注册、下线和通道获取
整体架构采用了 “管理器 - 服务通道 - 通道实例” 的三层结构,既保证了单个服务内的负载均衡,又实现了多服务的统一管理。
代码实现与解析
头文件与类型定义
首先定义必要的头文件包含和类型别名,简化后续代码书写:
#pragma once
#include <atomic>
#include <brpc/channel.h>
#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "LogTool.hpp" // 日志工具namespace brpcChannelTool
{// 定义channel智能指针类型,简化书写并自动管理内存using channelptr = std::shared_ptr<brpc::Channel>;
ServiceChannel 类实现
该类负责管理单个服务的所有通道实例,实现了通道的添加、删除和轮询选择功能。
class ServiceChannel{public:/*** @brief 构造函数* @param service_name 服务名称*/ServiceChannel(std::string service_name): service_name_(service_name), index_(0) // 轮询索引初始化为0{}/*** @brief 添加服务节点通道* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool append(std::string host){try{// 创建新的channel智能指针channelptr ptr = std::make_shared<brpc::Channel>();brpc::ChannelOptions op;setChannelop(op); // 设置channel选项// 初始化channelif(ptr->Init(host.c_str(), &op) != 0){LOG_ERROR("{}-{} init fail!", service_name_, host);return false;}// 加锁保证线程安全std::unique_lock<std::mutex> ulock(mutex_);// 检查是否已存在该host的通道if(hostTochannel_.find(host) != hostTochannel_.end()){LOG_INFO("{}-{} already append!", service_name_, host);return false;}// 添加到通道列表和映射表channels_.push_back(ptr);hostTochannel_[host] = ptr;LOG_TRACE("{}-{} append sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}/*** @brief 移除服务节点通道* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool remove(std::string host){std::unique_lock<std::mutex> ulock(mutex_);try{// 查找host对应的通道auto it = hostTochannel_.find(host);if(it == hostTochannel_.end()){LOG_WARN("{}-{} cannot find to remove!", service_name_, host);return false;}// 从通道列表中移除for(auto cit = channels_.begin(); cit != channels_.end();cit++){if(*cit == it->second){channels_.erase(cit);break;}}// 从映射表中移除hostTochannel_.erase(it);LOG_TRACE("{}-{} remove sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}/*** @brief 选择一个通道(轮询算法)* @return 通道智能指针,失败返回空指针*/channelptr choose(){std::unique_lock<std::mutex> ulock(mutex_);try{// 检查是否有可用通道if(channels_.empty()){LOG_WARN("{}-No available channel!", service_name_);return channelptr();}// 轮询选择:取模运算保证索引在有效范围内index_ %= channels_.size();return channels_[index_++];} catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return channelptr();}~ServiceChannel(){}private:/*** @brief 设置channel选项* @param op 待设置的ChannelOptions对象*/void setChannelop(brpc::ChannelOptions& op){op.timeout_ms = -1; // 不设置超时op.connect_timeout_ms = -1; // 不设置连接超时op.connection_type = "single";// 单连接模式op.protocol = "baidu_std"; // 使用baidu_std协议op.max_retry = 3; // 最大重试次数}private:size_t index_; // 轮询索引std::mutex mutex_; // 互斥锁,保证线程安全std::string service_name_; // 服务名称std::vector<channelptr> channels_; // 通道列表,用于轮询选择std::unordered_map<std::string, channelptr> hostTochannel_; // host到通道的映射,用于快速查找和删除};
ServiceManager 类实现
该类负责管理多个服务,提供服务的上线、下线和通道选择功能,并支持服务白名单机制。
// 定义ServiceChannel智能指针类型using service_channel_ptr = std::shared_ptr<ServiceChannel>;class ServiceManager{public:/*** @brief 默认构造函数* 初始化为跟随模式,只处理已注册的服务*/ServiceManager() : is_follow(true){}/*** @brief 带单个服务参数的构造函数* @param service 要跟随的服务名称* 使用explicit防止隐式类型转换*/explicit ServiceManager(std::string service){services_follow_.insert(service);is_follow = true;}/*** @brief 带多个服务参数的构造函数* @param services 要跟随的服务名称列表*/ServiceManager(std::vector<std::string> services){services_follow_.insert(services.begin(), services.end());is_follow = true;}/*** @brief 设置是否启用跟随模式* @param trueorfalse true:启用(只处理已注册服务) false:禁用(处理所有服务)*/void set_is_follow(bool trueorfalse){is_follow = trueorfalse;}/*** @brief 添加要跟随的服务* @param service 服务名称* @return 成功返回true,失败返回false*/bool follow(std::string service){try{std::unique_lock<std::mutex> ulock(mutex_);services_follow_.insert(service);return true;}catch (const std::exception& e){LOG_ERROR("{}-{}-exception error!", e.what(), service);}catch(...){LOG_ERROR("follow {}-Unknown error!", service);}return false;}/*** @brief 服务节点上线* @param name 服务名称* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool Online(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);// 如果是跟随模式且服务未被跟随,则忽略auto it = services_follow_.find(name);if(it == services_follow_.end() && is_follow){LOG_INFO("{}-{} online, but manager overlooked it!", name, host);return false;}// 如果服务不存在,则创建对应的ServiceChannelif(manager_.find(name) == manager_.end()){manager_[name] = std::make_shared<ServiceChannel>(name);}// 获取ServiceChannel并添加hostauto scptr = manager_[name];ulock.unlock(); // 提前解锁,减少锁竞争scptr->append(host);LOG_TRACE("{}-{} Online sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Online {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Online {}-{}-Unknown error!", name, host);}return false;}/*** @brief 服务节点下线* @param name 服务名称* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool Offline(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);// 查找服务是否存在auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}-{}, manager cannot find it!", name, host);return false;}// 移除服务节点if(!manager_[name]->remove(host)){LOG_TRACE("{}-{} Offline fail!", name, host);return false;}// 如果服务已无节点,移除该服务manager_.erase(it);LOG_TRACE("{}-{} Offline sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Offline {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Offline {}-{}-Unknown error!", name, host);}return false;}/*** @brief 选择服务的一个通道* @param name 服务名称* @return 通道智能指针,失败返回空指针*/channelptr Choose(std::string name){try{std::unique_lock<std::mutex> ulock(mutex_);// 查找服务是否存在auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}, manager cannot find it!", name);return channelptr();}// 选择一个通道service_channel_ptr scptr = manager_[name];ulock.unlock(); // 提前解锁,减少锁竞争channelptr ptr = scptr->choose();if(!ptr){LOG_INFO("{}, prt is nullprt!", name);return ptr;}LOG_TRACE("{}-Chosse sucess!", name);return ptr;}catch (const std::exception& e){LOG_ERROR("Choose {}-{}-exception error!", name, e.what());}catch(...){LOG_ERROR("Choose {}-Unknown error!", name);}return channelptr();}~ServiceManager(){}private:std::mutex mutex_; // 互斥锁,保证线程安全std::atomic<bool> is_follow; // 是否启用跟随模式std::unordered_set<std::string> services_follow_; // 跟随的服务集合(白名单)std::unordered_map<std::string, service_channel_ptr> manager_; // 服务名称到ServiceChannel的映射};
};
核心功能解析
1. 线程安全设计
- 所有涉及共享数据修改的操作都使用
std::mutex
进行保护 - 使用
std::unique_lock
管理锁的生命周期,避免死锁 - 耗时操作前提前解锁,减少锁竞争时间
2. 服务动态管理
- 通过
Online()
和Offline()
方法实现服务节点的动态上下线 - 支持服务白名单机制,可通过
follow()
方法添加需要管理的服务 is_follow
标志控制是否只处理白名单中的服务
3. 负载均衡策略
- 采用轮询 (RR) 算法实现负载均衡,简单高效
- 通过
index_
记录当前选择位置,保证请求均匀分配到各个服务节点
4. 资源管理
- 使用智能指针 (
std::shared_ptr
) 管理 channel 资源,自动释放内存 - 维护
hostTochannel_
映射表,实现快速查找和删除操作
使用场景
该封装适用于需要动态管理服务节点的分布式系统,特别是:
- 服务节点可能动态扩缩容的场景
- 需要在客户端实现负载均衡的场景
- 需要对服务访问进行权限控制的场景
- 对服务可用性有较高要求的场景
附录
#pragma once
#include <atomic>
#include <brpc/channel.h>
#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "LogTool.hpp"namespace brpcChannelTool
{using channelptr = std::shared_ptr<brpc::Channel>;class ServiceChannel{public:ServiceChannel(std::string service_name): service_name_(service_name), index_(0){}bool append(std::string host){try{channelptr ptr = std::make_shared<brpc::Channel>();brpc::ChannelOptions op;setChannelop(op);if(ptr->Init(host.c_str(), &op) != 0){LOG_ERROR("{}-{} init fail!", service_name_, host);return false;}std::unique_lock<std::mutex> ulock(mutex_);if(hostTochannel_.find(host) != hostTochannel_.end()){LOG_INFO("{}-{} already append!", service_name_, host);return false;}channels_.push_back(ptr);hostTochannel_[host] = ptr;LOG_TRACE("{}-{} append sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}bool remove(std::string host){std::unique_lock<std::mutex> ulock(mutex_);try{auto it = hostTochannel_.find(host);if(it == hostTochannel_.end()){LOG_WARN("{}-{} cannot find to remove!", service_name_, host);return false;}for(auto cit = channels_.begin(); cit != channels_.end();cit++){if(*cit == it->second){channels_.erase(cit);break;}}hostTochannel_.erase(it);LOG_TRACE("{}-{} remove sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}channelptr choose(){std::unique_lock<std::mutex> ulock(mutex_);try{//RR轮转调度if(channels_.empty()){LOG_WARN("{}-No available channel!", service_name_);return channelptr();}index_ %= channels_.size();return channels_[index_++];} catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return channelptr();}~ServiceChannel(){}private:void setChannelop(brpc::ChannelOptions& op){op.timeout_ms = -1;op.connect_timeout_ms = -1;op.connection_type = "single";op.protocol = "baidu_std";op.max_retry = 3;}private:size_t index_;std::mutex mutex_;std::string service_name_;std::vector<channelptr> channels_;std::unordered_map<std::string, channelptr> hostTochannel_;};using service_channel_ptr = std::shared_ptr<ServiceChannel>;class ServiceManager{public:ServiceManager() : is_follow(true){}//防止隐式类型转换explicit ServiceManager(std::string service){services_follow_.insert(service);is_follow = true;}ServiceManager(std::vector<std::string> services){services_follow_.insert(services.begin(), services.end());is_follow = true;}void set_is_follow(bool trueorfalse){is_follow = trueorfalse;}bool follow(std::string service){try{std::unique_lock<std::mutex> ulock(mutex_);services_follow_.insert(service);return true;}catch (const std::exception& e){LOG_ERROR("{}-{}-exception error!", e.what(), service);}catch(...){LOG_ERROR("follow {}-Unknown error!", service);}return false;}bool Online(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = services_follow_.find(name);if(it == services_follow_.end() && is_follow){LOG_INFO("{}-{} online, but manager overlooked it!", name, host);return false;}if(manager_.find(name) == manager_.end()){manager_[name] = std::make_shared<ServiceChannel>(name);}auto scptr = manager_[name];ulock.unlock();scptr->append(host);LOG_TRACE("{}-{} Online sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Online {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Online {}-{}-Unknown error!", name, host);}return false;}bool Offline(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}-{}, manager cannot find it!", name, host);return false;}if(!manager_[name]->remove(host)){LOG_TRACE("{}-{} Offline fail!", name, host);return false;}manager_.erase(it);LOG_TRACE("{}-{} Offline sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Offline {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Offline {}-{}-Unknown error!", name, host);}return false;}channelptr Choose(std::string name){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}, manager cannot find it!", name);return channelptr();}service_channel_ptr scptr = manager_[name];ulock.unlock();channelptr ptr = scptr->choose();if(!ptr){LOG_INFO("{}, prt is nullprt!", name);return ptr;}LOG_TRACE("{}-Chosse sucess!", name);return ptr;}catch (const std::exception& e){LOG_ERROR("Choose {}-{}-exception error!", name, e.what());}catch(...){LOG_ERROR("Choose {}-Unknown error!", name);}return channelptr();;}~ServiceManager(){}private:std::mutex mutex_;std::atomic<bool> is_follow;std::unordered_set<std::string> services_follow_;std::unordered_map<std::string, service_channel_ptr> manager_;};
};
LogTool.hpp
#pragma once
#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
#include <vector>#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/async_logger.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/basic_file_sink.h>namespace LogModule
{enum OutputMode{CONSOLE_ONLY,FILE_ONLY,BOTH};typedef struct LogInfo{ OutputMode outmode = CONSOLE_ONLY;bool is_debug_ = true;std::string logfile_ = "logfile.txt";bool is_async_ = false;size_t queue_size_ = 1 << 12;size_t thread_num_ = 1;} LogInfo_t;class Log{public:static void Init(const LogInfo_t& loginfo = LogInfo_t()){if(is_init_)return;logconf_ = loginfo;create_logger();is_init_ = true;}static Log& GetInstance(){static Log log;return log;}template<typename... Args>void trace(const char* fmt, const Args&... args){if(logger_)logger_->trace(fmt, args...);}template<typename... Args>void info(const char* fmt, const Args&... args){if(logger_)logger_->info(fmt, args...);}template<typename... Args>void debug(const char* fmt, const Args&... args){if(logger_)logger_->debug(fmt, args...);}template<typename... Args>void warn(const char* fmt, const Args&... args){if(logger_)logger_->warn(fmt, args...);}template<typename... Args>void error(const char* fmt, const Args&... args){if(logger_)logger_->error(fmt, args...);}template<typename... Args>void critical(const char* fmt, const Args&... args){if(logger_)logger_->critical(fmt, args...);}static void shutdown(){spdlog::shutdown();}private:Log() = default;~Log() = default;Log& operator=(const Log&) = delete;Log(const Log&) = delete;static void create_logger(){std::vector<spdlog::sink_ptr> sinks;if(logconf_.outmode == CONSOLE_ONLY || logconf_.outmode == BOTH){auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>(); sinks.push_back(console_sink);}if(logconf_.outmode == FILE_ONLY || logconf_.outmode == BOTH){auto file_sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(logconf_.logfile_);sinks.push_back(file_sink);}spdlog::level::level_enum lenum = logconf_.is_debug_ ? spdlog::level::trace : spdlog::level::info;if(logconf_.is_async_){spdlog::init_thread_pool(logconf_.queue_size_, logconf_.thread_num_);logger_ = std::make_shared<spdlog::async_logger>("mainlog", sinks.begin(), sinks.end(), spdlog::thread_pool(), spdlog::async_overflow_policy::block);}else{logger_ = std::make_shared<spdlog::logger>("mainlog", sinks.begin(), sinks.end());}logger_->set_level(lenum);spdlog::set_default_logger(logger_); // 重要:设置默认日志器logger_->set_pattern("[%Y-%m-%d %H:%M:%S.%e][%t][%-8l]%v");}private:static inline std::shared_ptr<spdlog::logger> logger_;static inline LogInfo_t logconf_;static inline std::atomic<bool> is_init_ = false;};
};// // 使用简化版本的宏定义
// #define LOG_TRACE(...) LogModule::Log::GetInstance().trace(__VA_ARGS__)
// #define LOG_INFO(...) LogModule::Log::GetInstance().info(__VA_ARGS__)
// #define LOG_DEBUG(...) LogModule::Log::GetInstance().debug(__VA_ARGS__)
// #define LOG_WARN(...) LogModule::Log::GetInstance().warn(__VA_ARGS__)
// #define LOG_ERROR(...) LogModule::Log::GetInstance().error(__VA_ARGS__)
// #define LOG_CRITICAL(...) LogModule::Log::GetInstance().critical(__VA_ARGS__)// 修改后宏定义:
#define LOG_TRACE(fmt, ...) LogModule::Log::GetInstance().trace("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...) LogModule::Log::GetInstance().info("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) LogModule::Log::GetInstance().debug("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_WARN(fmt, ...) LogModule::Log::GetInstance().warn("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) LogModule::Log::GetInstance().error("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_CRITICAL(fmt, ...) LogModule::Log::GetInstance().critical("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)