当前位置: 首页 > news >正文

C++基于 brpc 的 Channel 管理封装

文章目录

  • 基于 brpc 的 Channel 管理封装:实现服务动态发现与负载均衡
    • 设计思路
    • 代码实现与解析
      • 头文件与类型定义
      • ServiceChannel 类实现
      • ServiceManager 类实现
    • 核心功能解析
      • 1. 线程安全设计
      • 2. 服务动态管理
      • 3. 负载均衡策略
      • 4. 资源管理
    • 使用场景
    • 附录

基于 brpc 的 Channel 管理封装:实现服务动态发现与负载均衡

在分布式系统中,高效的服务通信是核心环节之一。百度的 brpc 框架提供了高性能的 RPC 通信能力,但在实际生产环境中,我们还需要解决服务节点动态上下线、负载均衡等问题。本文将介绍如何封装 brpc 的 Channel,实现一个可动态管理服务节点、支持负载均衡的服务通信组件。

设计思路

我们的封装主要包含两个核心类:

  • ServiceChannel:管理单个服务的多个实例通道,负责通道的创建、删除和选择
  • ServiceManager:管理多个服务,负责服务的注册、下线和通道获取

整体架构采用了 “管理器 - 服务通道 - 通道实例” 的三层结构,既保证了单个服务内的负载均衡,又实现了多服务的统一管理。

代码实现与解析

头文件与类型定义

首先定义必要的头文件包含和类型别名,简化后续代码书写:

#pragma once
#include <atomic>
#include <brpc/channel.h>
#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "LogTool.hpp"  // 日志工具namespace brpcChannelTool
{// 定义channel智能指针类型,简化书写并自动管理内存using channelptr = std::shared_ptr<brpc::Channel>;

ServiceChannel 类实现

该类负责管理单个服务的所有通道实例,实现了通道的添加、删除和轮询选择功能。

    class ServiceChannel{public:/*** @brief 构造函数* @param service_name 服务名称*/ServiceChannel(std::string service_name): service_name_(service_name), index_(0)  // 轮询索引初始化为0{}/*** @brief 添加服务节点通道* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool append(std::string host){try{// 创建新的channel智能指针channelptr ptr = std::make_shared<brpc::Channel>();brpc::ChannelOptions op;setChannelop(op);  // 设置channel选项// 初始化channelif(ptr->Init(host.c_str(), &op) != 0){LOG_ERROR("{}-{} init fail!", service_name_, host);return false;}// 加锁保证线程安全std::unique_lock<std::mutex> ulock(mutex_);// 检查是否已存在该host的通道if(hostTochannel_.find(host) != hostTochannel_.end()){LOG_INFO("{}-{} already append!", service_name_, host);return false;}// 添加到通道列表和映射表channels_.push_back(ptr);hostTochannel_[host] = ptr;LOG_TRACE("{}-{} append sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}/*** @brief 移除服务节点通道* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool remove(std::string host){std::unique_lock<std::mutex> ulock(mutex_);try{// 查找host对应的通道auto it = hostTochannel_.find(host);if(it == hostTochannel_.end()){LOG_WARN("{}-{} cannot find to remove!", service_name_, host);return false;}// 从通道列表中移除for(auto cit = channels_.begin(); cit != channels_.end();cit++){if(*cit == it->second){channels_.erase(cit);break;}}// 从映射表中移除hostTochannel_.erase(it);LOG_TRACE("{}-{} remove sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}/*** @brief 选择一个通道(轮询算法)* @return 通道智能指针,失败返回空指针*/channelptr choose(){std::unique_lock<std::mutex> ulock(mutex_);try{// 检查是否有可用通道if(channels_.empty()){LOG_WARN("{}-No available channel!", service_name_);return channelptr();}// 轮询选择:取模运算保证索引在有效范围内index_ %= channels_.size();return channels_[index_++];}   catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return channelptr();}~ServiceChannel(){}private:/*** @brief 设置channel选项* @param op 待设置的ChannelOptions对象*/void setChannelop(brpc::ChannelOptions& op){op.timeout_ms = -1;           // 不设置超时op.connect_timeout_ms = -1;   // 不设置连接超时op.connection_type = "single";// 单连接模式op.protocol = "baidu_std";    // 使用baidu_std协议op.max_retry = 3;             // 最大重试次数}private:size_t index_;                       // 轮询索引std::mutex mutex_;                   // 互斥锁,保证线程安全std::string service_name_;           // 服务名称std::vector<channelptr> channels_;   // 通道列表,用于轮询选择std::unordered_map<std::string, channelptr> hostTochannel_;  // host到通道的映射,用于快速查找和删除};

ServiceManager 类实现

该类负责管理多个服务,提供服务的上线、下线和通道选择功能,并支持服务白名单机制。

    // 定义ServiceChannel智能指针类型using service_channel_ptr = std::shared_ptr<ServiceChannel>;class ServiceManager{public:/*** @brief 默认构造函数* 初始化为跟随模式,只处理已注册的服务*/ServiceManager() : is_follow(true){}/*** @brief 带单个服务参数的构造函数* @param service 要跟随的服务名称* 使用explicit防止隐式类型转换*/explicit ServiceManager(std::string service){services_follow_.insert(service);is_follow = true;}/*** @brief 带多个服务参数的构造函数* @param services 要跟随的服务名称列表*/ServiceManager(std::vector<std::string> services){services_follow_.insert(services.begin(), services.end());is_follow = true;}/*** @brief 设置是否启用跟随模式* @param trueorfalse true:启用(只处理已注册服务) false:禁用(处理所有服务)*/void set_is_follow(bool trueorfalse){is_follow = trueorfalse;}/*** @brief 添加要跟随的服务* @param service 服务名称* @return 成功返回true,失败返回false*/bool follow(std::string service){try{std::unique_lock<std::mutex> ulock(mutex_);services_follow_.insert(service);return true;}catch (const std::exception& e){LOG_ERROR("{}-{}-exception error!", e.what(), service);}catch(...){LOG_ERROR("follow {}-Unknown error!", service);}return false;}/*** @brief 服务节点上线* @param name 服务名称* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool Online(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);// 如果是跟随模式且服务未被跟随,则忽略auto it = services_follow_.find(name);if(it == services_follow_.end() && is_follow){LOG_INFO("{}-{} online, but manager overlooked it!", name, host);return false;}// 如果服务不存在,则创建对应的ServiceChannelif(manager_.find(name) == manager_.end()){manager_[name] = std::make_shared<ServiceChannel>(name);}// 获取ServiceChannel并添加hostauto scptr = manager_[name];ulock.unlock();  // 提前解锁,减少锁竞争scptr->append(host);LOG_TRACE("{}-{} Online sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Online {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Online {}-{}-Unknown error!", name, host);}return false;}/*** @brief 服务节点下线* @param name 服务名称* @param host 服务节点地址(格式: ip:port)* @return 成功返回true,失败返回false*/bool Offline(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);// 查找服务是否存在auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}-{}, manager cannot find it!", name, host);return false;}// 移除服务节点if(!manager_[name]->remove(host)){LOG_TRACE("{}-{} Offline fail!", name, host);return false;}// 如果服务已无节点,移除该服务manager_.erase(it);LOG_TRACE("{}-{} Offline sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Offline {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Offline {}-{}-Unknown error!", name, host);}return false;}/*** @brief 选择服务的一个通道* @param name 服务名称* @return 通道智能指针,失败返回空指针*/channelptr Choose(std::string name){try{std::unique_lock<std::mutex> ulock(mutex_);// 查找服务是否存在auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}, manager cannot find it!", name);return channelptr();}// 选择一个通道service_channel_ptr scptr = manager_[name];ulock.unlock();  // 提前解锁,减少锁竞争channelptr ptr = scptr->choose();if(!ptr){LOG_INFO("{}, prt is nullprt!", name);return ptr;}LOG_TRACE("{}-Chosse sucess!", name);return ptr;}catch (const std::exception& e){LOG_ERROR("Choose {}-{}-exception error!", name, e.what());}catch(...){LOG_ERROR("Choose {}-Unknown error!", name);}return channelptr();}~ServiceManager(){}private:std::mutex mutex_;                                  // 互斥锁,保证线程安全std::atomic<bool> is_follow;                        // 是否启用跟随模式std::unordered_set<std::string> services_follow_;   // 跟随的服务集合(白名单)std::unordered_map<std::string, service_channel_ptr> manager_;  // 服务名称到ServiceChannel的映射};
};

核心功能解析

1. 线程安全设计

  • 所有涉及共享数据修改的操作都使用std::mutex进行保护
  • 使用std::unique_lock管理锁的生命周期,避免死锁
  • 耗时操作前提前解锁,减少锁竞争时间

2. 服务动态管理

  • 通过Online()Offline()方法实现服务节点的动态上下线
  • 支持服务白名单机制,可通过follow()方法添加需要管理的服务
  • is_follow标志控制是否只处理白名单中的服务

3. 负载均衡策略

  • 采用轮询 (RR) 算法实现负载均衡,简单高效
  • 通过index_记录当前选择位置,保证请求均匀分配到各个服务节点

4. 资源管理

  • 使用智能指针 (std::shared_ptr) 管理 channel 资源,自动释放内存
  • 维护hostTochannel_映射表,实现快速查找和删除操作

使用场景

该封装适用于需要动态管理服务节点的分布式系统,特别是:

  1. 服务节点可能动态扩缩容的场景
  2. 需要在客户端实现负载均衡的场景
  3. 需要对服务访问进行权限控制的场景
  4. 对服务可用性有较高要求的场景

附录

#pragma once
#include <atomic>
#include <brpc/channel.h>
#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "LogTool.hpp"namespace brpcChannelTool
{using channelptr = std::shared_ptr<brpc::Channel>;class ServiceChannel{public:ServiceChannel(std::string service_name): service_name_(service_name), index_(0){}bool append(std::string host){try{channelptr ptr = std::make_shared<brpc::Channel>();brpc::ChannelOptions op;setChannelop(op);if(ptr->Init(host.c_str(), &op) != 0){LOG_ERROR("{}-{} init fail!", service_name_, host);return false;}std::unique_lock<std::mutex> ulock(mutex_);if(hostTochannel_.find(host) != hostTochannel_.end()){LOG_INFO("{}-{} already append!", service_name_, host);return false;}channels_.push_back(ptr);hostTochannel_[host] = ptr;LOG_TRACE("{}-{} append sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}bool remove(std::string host){std::unique_lock<std::mutex> ulock(mutex_);try{auto it = hostTochannel_.find(host);if(it == hostTochannel_.end()){LOG_WARN("{}-{} cannot find to remove!", service_name_, host);return false;}for(auto cit = channels_.begin(); cit != channels_.end();cit++){if(*cit == it->second){channels_.erase(cit);break;}}hostTochannel_.erase(it);LOG_TRACE("{}-{} remove sucess!", service_name_, host);return true;}catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return false;}channelptr choose(){std::unique_lock<std::mutex> ulock(mutex_);try{//RR轮转调度if(channels_.empty()){LOG_WARN("{}-No available channel!", service_name_);return channelptr();}index_ %= channels_.size();return channels_[index_++];}   catch (const std::exception& e){LOG_ERROR("{}-exception error!", e.what());}catch(...){LOG_ERROR("{}-Unknown error!", service_name_);}return channelptr();}~ServiceChannel(){}private:void setChannelop(brpc::ChannelOptions& op){op.timeout_ms = -1;op.connect_timeout_ms = -1;op.connection_type = "single";op.protocol = "baidu_std";op.max_retry = 3;}private:size_t index_;std::mutex mutex_;std::string service_name_;std::vector<channelptr> channels_;std::unordered_map<std::string, channelptr> hostTochannel_;};using service_channel_ptr = std::shared_ptr<ServiceChannel>;class ServiceManager{public:ServiceManager() : is_follow(true){}//防止隐式类型转换explicit ServiceManager(std::string service){services_follow_.insert(service);is_follow = true;}ServiceManager(std::vector<std::string> services){services_follow_.insert(services.begin(), services.end());is_follow = true;}void set_is_follow(bool trueorfalse){is_follow = trueorfalse;}bool follow(std::string service){try{std::unique_lock<std::mutex> ulock(mutex_);services_follow_.insert(service);return true;}catch (const std::exception& e){LOG_ERROR("{}-{}-exception error!", e.what(), service);}catch(...){LOG_ERROR("follow {}-Unknown error!", service);}return false;}bool Online(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = services_follow_.find(name);if(it == services_follow_.end() && is_follow){LOG_INFO("{}-{} online, but manager overlooked it!", name, host);return false;}if(manager_.find(name) == manager_.end()){manager_[name] = std::make_shared<ServiceChannel>(name);}auto scptr = manager_[name];ulock.unlock();scptr->append(host);LOG_TRACE("{}-{} Online sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Online {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Online {}-{}-Unknown error!", name, host);}return false;}bool Offline(std::string name, std::string host){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}-{}, manager cannot find it!", name, host);return false;}if(!manager_[name]->remove(host)){LOG_TRACE("{}-{} Offline fail!", name, host);return false;}manager_.erase(it);LOG_TRACE("{}-{} Offline sucess!", name, host);return true;}catch (const std::exception& e){LOG_ERROR("Offline {}-{}-{}-exception error!", name, host, e.what());}catch(...){LOG_ERROR("Offline {}-{}-Unknown error!", name, host);}return false;}channelptr Choose(std::string name){try{std::unique_lock<std::mutex> ulock(mutex_);auto it = manager_.find(name);if(it == manager_.end()){LOG_INFO("{}, manager cannot find it!", name);return channelptr();}service_channel_ptr scptr = manager_[name];ulock.unlock();channelptr ptr = scptr->choose();if(!ptr){LOG_INFO("{}, prt is nullprt!", name);return ptr;}LOG_TRACE("{}-Chosse sucess!", name);return ptr;}catch (const std::exception& e){LOG_ERROR("Choose {}-{}-exception error!", name, e.what());}catch(...){LOG_ERROR("Choose {}-Unknown error!", name);}return channelptr();;}~ServiceManager(){}private:std::mutex mutex_;std::atomic<bool> is_follow;std::unordered_set<std::string> services_follow_;std::unordered_map<std::string, service_channel_ptr> manager_;};
};

LogTool.hpp

#pragma once
#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
#include <vector>#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/async_logger.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/basic_file_sink.h>namespace LogModule 
{enum OutputMode{CONSOLE_ONLY,FILE_ONLY,BOTH};typedef struct LogInfo{   OutputMode outmode = CONSOLE_ONLY;bool is_debug_ = true;std::string logfile_ = "logfile.txt";bool is_async_ = false;size_t queue_size_ = 1 << 12;size_t thread_num_ = 1;} LogInfo_t;class Log{public:static void Init(const LogInfo_t& loginfo = LogInfo_t()){if(is_init_)return;logconf_ = loginfo;create_logger();is_init_ = true;}static Log& GetInstance(){static Log log;return log;}template<typename... Args>void trace(const char* fmt, const Args&... args){if(logger_)logger_->trace(fmt, args...);}template<typename... Args>void info(const char* fmt, const Args&... args){if(logger_)logger_->info(fmt, args...);}template<typename... Args>void debug(const char* fmt, const Args&... args){if(logger_)logger_->debug(fmt, args...);}template<typename... Args>void warn(const char* fmt, const Args&... args){if(logger_)logger_->warn(fmt, args...);}template<typename... Args>void error(const char* fmt, const Args&... args){if(logger_)logger_->error(fmt, args...);}template<typename... Args>void critical(const char* fmt, const Args&... args){if(logger_)logger_->critical(fmt, args...);}static void shutdown(){spdlog::shutdown();}private:Log() = default;~Log() = default;Log& operator=(const Log&) = delete;Log(const Log&) = delete;static void create_logger(){std::vector<spdlog::sink_ptr> sinks;if(logconf_.outmode == CONSOLE_ONLY || logconf_.outmode == BOTH){auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();  sinks.push_back(console_sink);}if(logconf_.outmode == FILE_ONLY || logconf_.outmode == BOTH){auto file_sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(logconf_.logfile_);sinks.push_back(file_sink);}spdlog::level::level_enum lenum = logconf_.is_debug_ ? spdlog::level::trace : spdlog::level::info;if(logconf_.is_async_){spdlog::init_thread_pool(logconf_.queue_size_, logconf_.thread_num_);logger_ = std::make_shared<spdlog::async_logger>("mainlog", sinks.begin(), sinks.end(), spdlog::thread_pool(), spdlog::async_overflow_policy::block);}else{logger_ = std::make_shared<spdlog::logger>("mainlog", sinks.begin(), sinks.end());}logger_->set_level(lenum);spdlog::set_default_logger(logger_);  // 重要:设置默认日志器logger_->set_pattern("[%Y-%m-%d %H:%M:%S.%e][%t][%-8l]%v");}private:static inline std::shared_ptr<spdlog::logger> logger_;static inline LogInfo_t logconf_;static inline std::atomic<bool> is_init_ = false;};
};// // 使用简化版本的宏定义
// #define LOG_TRACE(...) LogModule::Log::GetInstance().trace(__VA_ARGS__)
// #define LOG_INFO(...) LogModule::Log::GetInstance().info(__VA_ARGS__)
// #define LOG_DEBUG(...) LogModule::Log::GetInstance().debug(__VA_ARGS__)
// #define LOG_WARN(...) LogModule::Log::GetInstance().warn(__VA_ARGS__)
// #define LOG_ERROR(...) LogModule::Log::GetInstance().error(__VA_ARGS__)
// #define LOG_CRITICAL(...) LogModule::Log::GetInstance().critical(__VA_ARGS__)// 修改后宏定义:
#define LOG_TRACE(fmt, ...) LogModule::Log::GetInstance().trace("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...) LogModule::Log::GetInstance().info("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) LogModule::Log::GetInstance().debug("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_WARN(fmt, ...) LogModule::Log::GetInstance().warn("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) LogModule::Log::GetInstance().error("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define LOG_CRITICAL(fmt, ...) LogModule::Log::GetInstance().critical("[{}:{}] " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
http://www.dtcms.com/a/446317.html

相关文章:

  • OpenWrt 的 Overlay 文件系统到底是怎么回事?
  • 优选算法-双指针:2.复写零解析
  • Leetcode 3703. Remove K-Balanced Substrings
  • 创意网站设计团队常州金坛网站建设
  • 浅聊一下网页显示过程
  • h 函数的运用场景=== 函数式封装组件 (弹窗调用)
  • 数据结构——排序算法全解析(入门到精通)
  • 建设装饰网站创客贴做网站吗
  • 爆炸特效-Unity-04-shader粒子系统
  • 公司做网站一般用什么域名网店设计师是干什么的
  • 【Redis】RedLock算法讲解
  • 网站专题页功能河北省住宅和城乡建设厅网站
  • stp root secondary 概念及题目
  • 马尔可夫链蒙特卡洛(MCMC):高维迷宫里的 “智能导航仪”—— 从商场找店到 AI 参数模拟
  • 无穿戴动捕大空间交互:如何靠摄像头实现全感官沉浸体验?
  • 求个没封的w站2022高端网站建设的要求
  • 网站经常修改好不好拼多多网店注册
  • 题解:洛谷P14127 [SCCPC 2021] K-skip Permutation
  • FreeBSD14.1 安装中文输入法fcitx
  • C++STL反向迭代器设计
  • 一文学会《C++》进阶系列之C++11
  • 腊肉网站的建设前景网页版微信可以发朋友圈吗
  • 大连凯杰建设有限公司网站wordpress 文章链接失效
  • 百度网站优化升上去国外网站入口
  • BIT*算法
  • Python常用三方模块——psutil
  • 网站开发的优势建设京东物流网站的目标是什么
  • 制作网站详细步骤爱客crm系统登录
  • Linux事件循环——高效处理多任务(高并发)
  • 【Linux】POSIX信号量、环形队列、基于环形队列实现生产者消费者模型