当前位置: 首页 > news >正文

基于brpc的轻量级服务注册中心设计与实现

在微服务架构日益普及的今天,服务之间的高效通信与动态发现已成为构建高可用分布式系统的核心挑战。面对成百上千个可能随时上下线的微服务,如何确保消费者能够快速、准确地找到可用的服务提供者?服务注册中心正是解决这一问题的关键所在。

brpc 通过 NamingService 支持 consul、zookeeper 等作为服务注册中心。有时候可能系统并不需要现有服务注册中心功能太过复杂,同时可以对接自己的持久化系统,实现简单的服务注册和服务发现功能。

一、整体架构设计:三大角色协同作业

基于brpc框架实现的轻量级服务注册中心构建了一个完整的服务治理闭环,系统包含三个核心组件:

  • RegistryServer(注册中心):作为系统核心,负责维护和管理服务注册表
  • ServiceProvider(服务提供者):向注册中心注册自身服务信息并提供实际服务能力
  • ServiceConsumer(服务消费者):从注册中心获取服务列表并进行服务调用

image.png

从架构示意图可以看出,这些组件之间的交互关系清晰而高效:

  • ServiceProvider启动时向注册中心注册服务信息,并通过定期心跳保持连接
  • ServiceConsumer从注册中心获取可用服务列表,并定期更新
  • 消费者直接向提供者发起服务请求,注册中心不参与实际的数据传输

二、注册中心核心功能解析

  1. 服务注册表设计
    注册中心通过两个核心数据结构管理服务信息:
// 服务名到服务地址列表的映射
unordered_map<string, vector<registry::ServerAddress>> _service_map;
// 服务唯一标识到最后心跳时间的映射(用于健康检查)
unordered_map<string, time_t> _heartbeat_map;
  1. 四大核心接口:

注册中心通过RPC接口实现服务:

  • Register服务注册服务提供者启动时注册自身
  • Unregister服务注销服务提供者优雅退出时调用
  • Discover服务发现消费者获取可用服务列表
  • Heartbeat心跳检测维护服务实例的存活状态
    以注册接口为例:
void Register(google::protobuf::RpcController* cntl,const registry::RegisterRequest* request,registry::RegisterResponse* response,google::protobuf::Closure* done) override {
}
  1. 健康检查机制:保证服务列表实时准确
    为了保证服务列表的实时性和准确性,注册中心实现了完整的健康检查机制:

定期扫描:后台线程每10秒检查一次所有服务的心跳状态
超时剔除:超过30秒未收到心跳的服务会被自动移除
自动清理:当某个服务的所有实例都被移除后,该服务名也会从注册表中删除

void health_check() {while (!_stop_health_check) {this_thread::sleep_for(chrono::seconds(10));  // 每10秒检查一次// 1. 遍历检查所有服务的心跳状态// 2. 移除超时服务实例// 3. 更新服务注册表状态}
}

三、数据结构设计:简洁而完整

服务注册中心通过Protobuf定义了一套简洁而完整的数据结构:

// 服务地址信息
message ServerAddress {required string ip = 1;      // 服务IP地址required int32 port = 2;     // 服务端口号
}// 注册中心核心服务接口
service RegistryService {rpc Register(RegisterRequest) returns (RegisterResponse);rpc Unregister(UnregisterRequest) returns (UnregisterResponse);rpc Discover(DiscoverRequest) returns (DiscoverResponse);rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

这种设计保证了组件间通信的标准化和可扩展性。

四、完整示例代码

#include <brpc/server.h>
#include <unordered_map>
#include <mutex>
#include <chrono>
#include <thread>
#include <ctime>
#include <iomanip>
#include <iostream>
#include "registry_service.pb.h"using namespace std;
using namespace brpc;// 日志输出宏(带时间戳)
#define LOG_INFO(msg) do { \auto now = chrono::system_clock::now(); \time_t now_time = chrono::system_clock::to_time_t(now); \cout << "[" << put_time(localtime(&now_time), "%Y-%m-%d %H:%M:%S") << "] [INFO] " << msg << endl; \
} while(0)#define LOG_WARN(msg) do { \auto now = chrono::system_clock::now(); \time_t now_time = chrono::system_clock::to_time_t(now); \cerr << "[" << put_time(localtime(&now_time), "%Y-%m-%d %H:%M:%S") << "] [WARN] " << msg << endl; \
} while(0)// 服务注册中心核心类,管理服务列表和健康检查
class RegistryServiceImpl : public registry::RegistryService {
public:RegistryServiceImpl() {// 启动健康检查线程(定期清理超时服务)_health_check_thread = thread(&RegistryServiceImpl::health_check, this);LOG_INFO("Registry service initialized, health check thread started");}~RegistryServiceImpl() {_stop_health_check = true;if (_health_check_thread.joinable()) {_health_check_thread.join();}LOG_INFO("Registry service stopped");}// 服务注册void Register(google::protobuf::RpcController* cntl,const registry::RegisterRequest* request,registry::RegisterResponse* response,google::protobuf::Closure* done) override {brpc::ClosureGuard done_guard(done);brpc::Controller* c = static_cast<brpc::Controller*>(cntl);lock_guard<mutex> lock(_mutex);const string& service_name = request->service_name();const auto& addr = request->address();string addr_str = addr.ip() + ":" + to_string(addr.port());// 验证参数if (service_name.empty() || addr.ip().empty() || addr.port() <= 0) {response->set_status(registry::RegisterResponse::INVALID_PARAM);response->set_message("Invalid service name or address");LOG_WARN("Register failed (invalid param): service=" << service_name << ", address=" << addr_str);return;}// 检查是否已注册auto& servers = _service_map[service_name];for (const auto& s : servers) {if (s.ip() == addr.ip() && s.port() == addr.port()) {response->set_status(registry::RegisterResponse::ALREADY_EXISTS);response->set_message("Service already registered");LOG_WARN("Register failed (already exists): service=" << service_name << ", address=" << addr_str);return;}}// 添加服务并更新心跳时间servers.push_back(addr);_heartbeat_map[get_key(service_name, addr)] = time(nullptr);response->set_status(registry::RegisterResponse::SUCCESS);response->set_message("Register success");LOG_INFO("Service registered: service=" << service_name << ", address=" << addr_str);// 打印当前service_map状态print_service_map();}// 服务注销void Unregister(google::protobuf::RpcController* cntl,const registry::UnregisterRequest* request,registry::UnregisterResponse* response,google::protobuf::Closure* done) override {brpc::ClosureGuard done_guard(done);lock_guard<mutex> lock(_mutex);const string& service_name = request->service_name();const auto& addr = request->address();string addr_str = addr.ip() + ":" + to_string(addr.port());auto it = _service_map.find(service_name);if (it == _service_map.end()) {response->set_status(registry::UnregisterResponse::NOT_FOUND);LOG_WARN("Unregister failed (not found): service=" << service_name << ", address=" << addr_str);return;}// 从服务列表中移除auto& servers = it->second;bool removed = false;for (auto iter = servers.begin(); iter != servers.end(); ++iter) {if (iter->ip() == addr.ip() && iter->port() == addr.port()) {servers.erase(iter);removed = true;break;}}if (removed) {_heartbeat_map.erase(get_key(service_name, addr));response->set_status(registry::UnregisterResponse::SUCCESS);LOG_INFO("Service unregistered: service=" << service_name << ", address=" << addr_str);// 如果服务列表为空,从map中移除该服务名if (servers.empty()) {_service_map.erase(it);LOG_INFO("Service entry removed (no addresses left): service=" << service_name);}// 打印当前service_map状态print_service_map();} else {response->set_status(registry::UnregisterResponse::NOT_FOUND);LOG_WARN("Unregister failed (not found): service=" << service_name << ", address=" << addr_str);}}// 服务发现void Discover(google::protobuf::RpcController* cntl,const registry::DiscoverRequest* request,registry::DiscoverResponse* response,google::protobuf::Closure* done) override {brpc::ClosureGuard done_guard(done);lock_guard<mutex> lock(_mutex);const string& service_name = request->service_name();auto it = _service_map.find(service_name);if (it != _service_map.end()) {// 返回所有可用服务地址for (const auto& addr : it->second) {*response->add_addresses() = addr;}LOG_INFO("Service discovered: service=" << service_name << ", count=" << it->second.size());} else {LOG_WARN("Service not found during discovery: service=" << service_name);}}// 心跳检测(更新服务存活状态)void Heartbeat(google::protobuf::RpcController* cntl,const registry::HeartbeatRequest* request,registry::HeartbeatResponse* response,google::protobuf::Closure* done) override {brpc::ClosureGuard done_guard(done);lock_guard<mutex> lock(_mutex);const string service_name = request->service_name();const auto& addr = request->address();const string key = get_key(service_name, addr);string addr_str = addr.ip() + ":" + to_string(addr.port());if (_heartbeat_map.count(key)) {_heartbeat_map[key] = time(nullptr);  // 更新心跳时间response->set_status(registry::HeartbeatResponse::SUCCESS);LOG_INFO("Heartbeat received: service=" << service_name << ", address=" << addr_str);} else {response->set_status(registry::HeartbeatResponse::NOT_REGISTERED);LOG_WARN("Heartbeat failed (not registered): service=" << service_name << ", address=" << addr_str);}}private:// 生成服务唯一标识(服务名+IP+端口)string get_key(const string& service_name, const registry::ServerAddress& addr) {return service_name + "|" + addr.ip() + ":" + to_string(addr.port());}// 打印当前service_map中的所有服务信息void print_service_map() {LOG_INFO("Current service map status:");if (_service_map.empty()) {LOG_INFO("  No services registered");return;}for (const auto& entry : _service_map) {LOG_INFO("  Service: " << entry.first << " (count=" << entry.second.size() << ")");for (const auto& addr : entry.second) {LOG_INFO("    Address: " << addr.ip() << ":" << addr.port() << ", weight=" << addr.weight());}}}// 健康检查线程:清理超过30秒未心跳的服务void health_check() {while (!_stop_health_check) {this_thread::sleep_for(chrono::seconds(10));  // 每10秒检查一次lock_guard<mutex> lock(_mutex);const time_t now = time(nullptr);vector<string> expired_keys;// 找出超时服务for (const auto& p : _heartbeat_map) {if (now - p.second > 30) {  // 30秒超时expired_keys.push_back(p.first);}}// 移除超时服务if (!expired_keys.empty()) {LOG_INFO("Health check: found " << expired_keys.size() << " expired service(s)");for (const string& key : expired_keys) {_heartbeat_map.erase(key);// 解析key获取服务名和地址size_t pos1 = key.find('|');size_t pos2 = key.find(':', pos1 + 1);if (pos1 == string::npos || pos2 == string::npos) {LOG_WARN("Invalid key format during cleanup: " << key);continue;}string service_name = key.substr(0, pos1);string ip = key.substr(pos1 + 1, pos2 - pos1 - 1);int port = stoi(key.substr(pos2 + 1));string addr_str = ip + ":" + to_string(port);// 从服务列表中删除auto it = _service_map.find(service_name);if (it != _service_map.end()) {auto& servers = it->second;for (auto iter = servers.begin(); iter != servers.end(); ++iter) {if (iter->ip() == ip && iter->port() == port) {servers.erase(iter);LOG_INFO("Expired service removed: service=" << service_name << ", address=" << addr_str);break;}}// 如果服务列表为空,从map中移除该服务名if (servers.empty()) {_service_map.erase(it);LOG_INFO("Service entry removed (no addresses left): service=" << service_name);}}}// 打印清理后的service_map状态print_service_map();} else {LOG_INFO("Health check: no expired services");}}}mutex _mutex;// 服务名 -> 服务地址列表映射unordered_map<string, vector<registry::ServerAddress>> _service_map;// 服务唯一标识 -> 最后心跳时间映射(用于健康检查)unordered_map<string, time_t> _heartbeat_map;thread _health_check_thread;atomic<bool> _stop_health_check{false};
};int main(int argc, char* argv[]) {// 初始化服务器brpc::Server server;RegistryServiceImpl registry_service;// 注册服务if (server.AddService(&registry_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {cerr << "Failed to add registry service" << endl;return -1;}// 设置服务器地址brpc::ServerOptions options;options.idle_timeout_sec = 300;if (server.Start(18888, &options) != 0) {  // 注册中心监听8888端口cerr << "Failed to start registry server" << endl;return -1;}LOG_INFO("Registry server started on port 18888");server.RunUntilAskedToQuit();return 0;
}

五、高可用与扩展性考虑

虽然这个轻量级注册中心已经实现了核心功能,但在生产环境中还需要考虑以下扩展方案:

  1. 注册中心集群化
    使用braft等框架实现注册中心集群,保证数据一致性
    采用主备模式或多活模式,避免单点故障

  2. 数据持久化
    将服务列表持久化到高性能存储系统
    实现定期快照和操作日志记录,支持快速恢复

  3. 性能优化策略
    服务列表本地缓存,减少注册中心访问压力
    批量操作接口,提高大批量服务注册/注销效率

总结

基于brpc实现的轻量级服务注册中心提供了一个简单而高效的解决方案,特别适合那些不需要复杂功能但要求高性能和可靠性的微服务场景。通过核心的注册表管理、健康检查机制和简洁的接口设计,该系统能够有效地解决服务动态发现的问题。同时,通过集群化、持久化和性能优化等扩展方案,这个轻量级系统完全可以满足生产环境的高可用和高性能要求。


文章转载自:

http://kWZrGpua.kmLmf.cn
http://Jg2WNLjM.kmLmf.cn
http://6dmA9yjo.kmLmf.cn
http://FakmULnZ.kmLmf.cn
http://QaenD1uV.kmLmf.cn
http://hEvRu3iv.kmLmf.cn
http://X1XLG1aN.kmLmf.cn
http://lIgBIaIX.kmLmf.cn
http://qCsigZYq.kmLmf.cn
http://OTLE8fD7.kmLmf.cn
http://GOZ2xaWM.kmLmf.cn
http://2dmVuEcK.kmLmf.cn
http://81Y9C7gx.kmLmf.cn
http://02ek6N8C.kmLmf.cn
http://OTm7PJY5.kmLmf.cn
http://ouqOgCHv.kmLmf.cn
http://4l4yzKzq.kmLmf.cn
http://MWXzJ0LM.kmLmf.cn
http://lP0IZNre.kmLmf.cn
http://DwkrHX9D.kmLmf.cn
http://qLhkWKB8.kmLmf.cn
http://6Y0JAKUN.kmLmf.cn
http://P2SMcTS2.kmLmf.cn
http://3cBqEl1f.kmLmf.cn
http://e34M4ktm.kmLmf.cn
http://yfyIrr5o.kmLmf.cn
http://OGtj5wRT.kmLmf.cn
http://j3v7fQOw.kmLmf.cn
http://i7JBWJeB.kmLmf.cn
http://ajiF7c68.kmLmf.cn
http://www.dtcms.com/a/372558.html

相关文章:

  • 作用域報錯
  • 代码随想录学习摘抄day7(二叉树11-21)
  • 固态硬盘——M.2接口技术
  • 数字化浪潮下,传统加工厂如何智能化转型?
  • Miniflux – RSS 订阅
  • Nginx主配置文件
  • 架构进阶——解读121页IT规划咨询项目规划报告【附全文阅读】
  • 大模型显存占用量换算
  • Compose笔记(五十)--stickyHeader
  • WebGIS三维可视化 + 数据驱动:智慧煤仓监控系统如何破解煤炭仓储行业痛点
  • 刷题集(1)
  • 别墅装修的价钱如何估算?
  • Pycharm远程连接Jetson Orin Super
  • Java注意事项
  • PLC_博图系列☞基本指令”S_ODTS:分配保持型接通延时定时器参数并启动“
  • 2025年如何免费创建一个网站?
  • Linux驱动开发(1)概念、环境与代码框架
  • 3种XSS攻击简单案例
  • Windows存储IOPS的预测性扩容
  • 模式组合应用-装饰器模式
  • 【数据结构与算法Trip第1站】基本介绍
  • Dockerfile解析器指令(Parser Directive)指定语法版本,如:# syntax=docker/dockerfile:1
  • Docker命令(全)
  • 【基于yolo和web的垃圾分类系统】
  • Dify工作流节点(二)
  • Hologres自增序列Serial使用简介
  • SpringBoot-Web开发-内容协商——多端内容适配内容协商原理HttpMessageConverter
  • ESWA修改后投稿流程
  • 可能断更说明
  • Batch Normalization:深度学习中的“加速器”与“稳定器”