当前位置: 首页 > news >正文

实战:基于 BRPC+Etcd 打造轻量级 RPC 服务 —— 从注册到调用的完整实现

在实际开发中,当多个服务需要跨进程通信时,直接用 HTTP 或自定义协议会面临很多问题:比如服务地址硬编码导致扩容困难、服务下线后客户端还在调用僵尸节点、缺乏统一的日志和监控等。为了解决这些痛点,我基于百度的 BRPC 框架和 Etcd 服务注册中心,搭了一套轻量级 RPC 服务,支持服务自动注册、实时发现和负载均衡,今天就把整个实现过程拆解开讲清楚,代码可直接复用。

一、项目背景:为什么要造这个轮子?

先说说为什么选 BRPC 和 Etcd:

  • BRPC:相比 gRPC,BRPC 更贴合国内场景,支持百度_std、HTTP 等多种协议,性能稳定,而且自带连接池、重试等机制,不用重复造轮子;

  • Etcd:作为分布式键值存储,天生适合做服务注册中心 —— 支持租约(避免僵尸节点)、Watcher(实时感知服务变化),轻量且容易部署;

  • 场景适配:这套方案针对中小型项目设计,没有过度封装,核心逻辑清晰,后续加熔断、监控都很方便。

整个服务的核心目标很简单:让客户端能 “无感” 调用服务端,不用关心服务端在哪、有多少个节点,服务下线后自动剔除

二、整体架构:从注册到调用的闭环

先画个简单的架构图(文字描述更直观),整个流程分四步:

  1. 服务端启动:初始化 BRPC 服务 → 实现 Echo 业务逻辑 → 把自己的地址注册到 Etcd(带 3 秒租约);

  2. 客户端启动:连接 Etcd → 拉取已注册的服务节点 → 初始化信道管理;

  3. 实时发现:Etcd 通过 Watcher 监控服务节点变化,新节点上线时客户端自动加信道,节点下线时自动删信道;

  4. 客户端调用:通过轮询(RR)策略从信道池选一个节点,发起 RPC 调用,拿到响应后打印结果。

核心模块分为 4 个:日志模块(统一日志输出)、Etcd 注册发现模块(连接 Etcd)、BRPC 服务模块(业务逻辑载体)、信道管理模块(负载均衡 + 资源复用)。

三、核心模块实现:一步步拆轮子

1. 先搞定基础:日志模块(logger.hpp)

日志是排查问题的关键,我用了轻量级的 spdlog 库,支持调试 / 发布两种模式,还能自动打印文件名和行号。

1.1 日志模块代码
#pragma once
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <iostream>// 全局日志器,所有模块共用
std::shared_ptr<spdlog::logger> g_default_logger;/*** @brief 初始化日志器* @param mode 运行模式:false-调试(输出控制台),true-发布(输出文件)* @param file 发布模式下的日志文件路径* @param level 日志等级(0-trace,1-debug,2-info,3-warn,4-error,5-critical)*/
void init_logger(bool mode, const std::string &file, int32_t level) {// 调试模式:控制台输出,最低等级(trace),方便开发定位if (!mode) {g_default_logger = spdlog::stdout_color_mt("rpc-debug");g_default_logger->set_level(spdlog::level::trace);g_default_logger->flush_on(spdlog::level::trace);} // 发布模式:文件输出,按参数控制等级,避免控制台刷屏else {if (file.empty()) {std::cerr << "发布模式必须指定日志文件!" << std::endl;exit(1);}g_default_logger = spdlog::basic_logger_mt("rpc-release", file);// 等级范围校验,避免非法值auto log_level = (level < 0 || level > 5) ? spdlog::level::info : (spdlog::level::level_enum)level;g_default_logger->set_level(log_level);g_default_logger->flush_on(log_level);}// 日志格式:[日志器名][时间][线程ID][等级] 内容(文件名:行号)g_default_logger->set_pattern("[%n][%H:%M:%S][%t][%-8l] %v (%s:%#)");
}// 日志宏定义,简化调用
#define LOG_TRACE(fmt, ...) g_default_logger->trace(fmt, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) g_default_logger->debug(fmt, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...)  g_default_logger->info(fmt, ##__VA_ARGS__)
#define LOG_WARN(fmt, ...)  g_default_logger->warn(fmt, ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) g_default_logger->error(fmt, ##__VA_ARGS__)
#define LOG_FATAL(fmt, ...) g_default_logger->critical(fmt, ##__VA_ARGS__)
1.2 关键逻辑解释
  • 全局日志器g_default_logger让所有模块不用重复创建日志器,避免资源浪费;

  • 模式区分:调试时输出控制台,带颜色区分等级(spdlog 的 stdout_color_sink),发布时输出文件,避免线上服务器控制台堆积日志;

  • 格式设计:包含线程 ID 和文件名行号,比如[rpc-debug][15:30:00][1234][debug] 初始化信道成功 (channel.hpp:45),定位问题时一眼就能找到代码位置。

2. 核心中的核心:Etcd 注册发现模块(etcd.hpp)

这是解决 “服务在哪” 的关键模块,分两个类:Registry(服务端用,注册服务)和Discovery(客户端用,发现服务)。

2.1 先明确依赖

需要安装 Etcd 的 C++ 客户端库(etcd-cpp-apiv3),Ubuntu 下可以用源码编译:

git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3 && mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local
make -j4 && sudo make install
2.2 Etcd 模块代码
#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <functional>
#include "logger.hpp"  // 依赖日志模块/*** @brief 服务注册类(服务端用)* 功能:将服务端地址注册到Etcd,带租约自动续期,服务下线后自动删除注册信息*/
class Registry {
public:using ptr = std::shared_ptr<Registry>;  // 智能指针,避免内存泄漏/*** @brief 构造函数:初始化Etcd客户端和租约* @param etcd_host Etcd服务地址(如http://127.0.0.1:2379)* @param lease_ttl 租约时间(秒),默认3秒(太短频繁续期,太长僵尸节点存活久)*/Registry(const std::string &etcd_host, int lease_ttl = 3) {// 初始化Etcd客户端_client = std::make_shared<etcd::Client>(etcd_host);// 创建租约:租约过期前会自动续期,服务端下线后续期失败,Etcd删除节点_keep_alive = _client->leasekeepalive(lease_ttl).get();if (!_keep_alive) {LOG_FATAL("创建Etcd租约失败!Etcd地址:{}", etcd_host);exit(1);}_lease_id = _keep_alive->Lease();LOG_INFO("Etcd租约初始化成功,租约ID:{},TTL:{}秒", _lease_id, lease_ttl);}// 析构函数:取消租约,避免服务端正常退出后租约残留~Registry() {if (_keep_alive) {_keep_alive->Cancel();LOG_INFO("Etcd租约已取消,租约ID:{}", _lease_id);}}/*** @brief 注册服务到Etcd* @param service_key 服务唯一标识(如/service/echo/instance1)* @param service_value 服务访问地址(如127.0.0.1:7070)* @return true-注册成功,false-失败*/bool registry(const std::string &service_key, const std::string &service_value) {if (service_key.empty() || service_value.empty()) {LOG_ERROR("服务Key或Value不能为空!Key:{},Value:{}", service_key, service_value);return false;}// 带租约写入Etcd:Key=服务标识,Value=访问地址auto resp = _client->put(service_key, service_value, _lease_id).get();if (resp.is_ok()) {LOG_INFO("服务注册成功!Etcd Key:{},Value:{}", service_key, service_value);return true;} else {LOG_ERROR("服务注册失败!错误信息:{}", resp.error_message());return false;}}private:std::shared_ptr<etcd::Client> _client;       // Etcd客户端std::shared_ptr<etcd::KeepAlive> _keep_alive;// 租约续期对象uint64_t _lease_id;                          // 租约ID
};/*** @brief 服务发现类(客户端用)* 功能:拉取Etcd中的服务节点,监控节点变化,触发回调更新信道*/
class Discovery {
public:using ptr = std::shared_ptr<Discovery>;// 回调函数类型:参数1-服务Key,参数2-服务Value(访问地址)using NotifyCallback = std::function<void(const std::string &, const std::string &)>;/*** @brief 构造函数:初始化发现并监控服务* @param etcd_host Etcd地址* @param base_key 服务根目录(如/service,所有服务都在这个目录下)* @param put_cb 节点上线回调(新增信道)* @param del_cb 节点下线回调(删除信道)*/Discovery(const std::string &etcd_host, const std::string &base_key,const NotifyCallback &put_cb,const NotifyCallback &del_cb) : _put_cb(put_cb), _del_cb(del_cb) {// 初始化Etcd客户端_client = std::make_shared<etcd::Client>(etcd_host);if (!_client) {LOG_FATAL("创建Etcd客户端失败!地址:{}", etcd_host);exit(1);}// 第一步:先拉取已存在的服务节点(避免客户端启动时漏节点)auto resp = _client->ls(base_key).get();if (resp.is_ok()) {int node_count = resp.keys().size();LOG_INFO("拉取到{}个服务节点,根目录:{}", node_count, base_key);for (int i = 0; i < node_count; ++i) {std::string key = resp.key(i);std::string val = resp.value(i).as_string();if (_put_cb) _put_cb(key, val);  // 触发上线回调,添加信道}} else {LOG_WARN("拉取初始服务节点失败,可能是根目录不存在:{},错误:{}", base_key, resp.error_message());}// 第二步:启动Watcher监控节点变化(实时感知上线/下线)// 第三个参数true表示"递归监控":监控base_key下所有子节点_watcher = std::make_shared<etcd::Watcher>(*_client.get(), base_key, std::bind(&Discovery::on_etcd_event, this, std::placeholders::_1), true);LOG_INFO("Etcd Watcher启动成功,监控根目录:{}", base_key);}// 析构函数:取消Watcher~Discovery() {if (_watcher) {_watcher->Cancel();LOG_INFO("Etcd Watcher已取消");}}private:/*** @brief Etcd事件回调:处理节点上线/下线* @param resp Etcd事件响应*/void on_etcd_event(const etcd::Response &resp) {if (!resp.is_ok()) {LOG_ERROR("Etcd事件回调出错!错误:{}", resp.error_message());return;}// 遍历所有事件(可能同时有多个节点变化)for (const auto &event : resp.events()) {if (event.event_type() == etcd::Event::EventType::PUT) {// PUT事件:节点上线或更新std::string key = event.kv().key();std::string val = event.kv().as_string();LOG_DEBUG("Etcd收到PUT事件:Key={},Value={}", key, val);if (_put_cb) _put_cb(key, val);} else if (event.event_type() == etcd::Event::EventType::DELETE_) {// DELETE事件:节点下线std::string key = event.prev_kv().key();std::string val = event.prev_kv().as_string();LOG_DEBUG("Etcd收到DELETE事件:Key={},Value={}", key, val);if (_del_cb) _del_cb(key, val);}}}private:NotifyCallback _put_cb;                      // 节点上线回调NotifyCallback _del_cb;                      // 节点下线回调std::shared_ptr<etcd::Client> _client;       // Etcd客户端std::shared_ptr<etcd::Watcher> _watcher;     // 事件监控对象
};
2.3 关键逻辑拆解
  • Registry 的租约机制:这是避免僵尸节点的核心 —— 服务端正常运行时,KeepAlive会自动续期;如果服务端崩溃,续期失败,3 秒后 Etcd 会删除该节点的注册信息,客户端不会再调用到死节点;

  • Discovery 的两步初始化:先拉取已有节点(避免客户端启动时漏节点),再启动 Watcher(实时监控新变化),这两步缺一不可;

  • 回调解耦put_cbdel_cb由客户端传入,Discovery 只负责 “发现变化”,不关心 “如何处理变化”,这样后续换信道管理逻辑时不用改 Discovery 代码。

3. 连接客户端和服务端:BRPC 服务定义(main.proto)

RPC 服务需要先定义通信协议,用 Protobuf 来定义 Echo 服务的请求和响应结构,然后生成 C++ 代码。

3.1 Proto 文件定义(main.proto)
syntax = "proto2";  // BRPC推荐用proto2,兼容性更好
package example;    // 命名空间,避免类名冲突// Echo请求:只传一个字符串
message EchoRequest {required string message = 1;  // required表示必填
}// Echo响应:返回处理后的字符串
message EchoResponse {required string message = 1;
}// Echo服务:只有一个Echo方法
service EchoService {// 同步RPC方法:客户端发请求,等待响应rpc Echo(EchoRequest) returns (EchoResponse);
}
3.2 生成 C++ 代码

需要安装 Protobuf 编译器(sudo apt install protobuf-compiler),然后执行:

# 生成main.pb.h和main.pb.cc(C++代码)
protoc --cpp_out=. main.proto
# 生成BRPC服务端/客户端代码(brpc_pb.pl在BRPC安装目录的bin下)
# 注意:如果brpc_pb.pl不在PATH里,需要写全路径(如/usr/local/brpc/bin/brpc_pb.pl)
brpc_pb.pl main.proto

生成的main.pb.h里包含EchoService的基类,服务端需要继承它实现业务逻辑,客户端用它的 Stub 发起调用。

4. 服务端实现:启动 BRPC + 注册到 Etcd(server.cc)

服务端的核心是 “实现 Echo 业务逻辑” 和 “注册服务”,代码里加了详细注释。

4.1 服务端代码
#include "../common/etcd.hpp"    // 自己写的Etcd模块
#include "../common/logger.hpp"  // 自己写的日志模块
#include <gflags/gflags.h>       // 解析命令行参数
#include <thread>
#include <brpc/server.h>         // BRPC服务端头文件
#include <butil/logging.h>       // BRPC自带日志(会被我们的日志覆盖)
#include "main.pb.h"             // Proto生成的代码// 命令行参数定义(gflags库),--help可以查看所有参数
DEFINE_bool(run_mode, false, "运行模式:false-调试(日志输控制台),true-发布(日志输文件)");
DEFINE_string(log_file, "", "发布模式必填:日志文件路径(如./server.log)");
DEFINE_int32(log_level, 2, "日志等级:0-trace,1-debug,2-info,3-warn,4-error,5-critical");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "Etcd服务地址");
DEFINE_string(base_service, "/service", "服务根目录(Etcd中的Key前缀)");
DEFINE_string(instance_name, "/echo/instance1", "当前服务实例名(确保唯一)");
DEFINE_string(access_host, "127.0.0.1:7070", "服务外部访问地址(客户端用这个地址调用)");
DEFINE_int32(listen_port, 7070, "BRPC服务监听端口(要和access_host的端口一致)");/*** @brief Echo服务实现类:继承Proto生成的EchoService*/
class EchoServiceImpl : public example::EchoService {
public:EchoServiceImpl() {}~EchoServiceImpl() {}/*** @brief 实现Echo方法:业务逻辑核心* @param controller BRPC控制器:用于设置错误信息、超时等* @param request 客户端请求* @param response 服务端响应* @param done 回调对象:必须调用Run(),否则会内存泄漏*/void Echo(google::protobuf::RpcController* controller,const example::EchoRequest* request,example::EchoResponse* response,google::protobuf::Closure* done) {// BRPC的ClosureGuard:自动调用done->Run(),避免漏写brpc::ClosureGuard rpc_guard(done);// 转换为BRPC控制器,方便获取更多信息(如客户端IP)brpc::Controller* brpc_cntl = dynamic_cast<brpc::Controller*>(controller);if (!brpc_cntl) {LOG_ERROR("转换BRPC控制器失败!");return;}// 打印客户端请求信息LOG_INFO("收到客户端请求:客户端IP={}:{},消息={}",butil::ip2str(brpc_cntl->remote_side().ip).c_str(),brpc_cntl->remote_side().port,request->message().c_str());// 业务逻辑:简单拼接字符串(实际项目中这里可以调用其他模块)std::string resp_msg = request->message() + " -- 服务端已收到,这是响应!";response->set_message(resp_msg);// (可选)设置调用成功标识(默认就是成功,失败时可以用brpc_cntl->SetFailed())brpc_cntl->SetFailed(0, "success");}
};int main(int argc, char *argv[]) {// 第一步:解析命令行参数(gflags库)google::ParseCommandLineFlags(&argc, &argv, true);// 第二步:初始化日志init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);LOG_INFO("日志模块初始化成功,运行模式:{}", FLAGS_run_mode ? "发布" : "调试");// 第三步:关闭BRPC自带日志(避免和我们的日志冲突)logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 第四步:初始化BRPC服务端brpc::Server server;EchoServiceImpl echo_service;  // 实例化服务实现类// 把服务添加到BRPC Server// 第二个参数:SERVER_DOESNT_OWN_SERVICE表示服务对象由用户管理(不是Server)int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret != 0) {LOG_FATAL("添加Echo服务到BRPC失败!错误码:{}", ret);return -1;}// 配置BRPC Server选项brpc::ServerOptions options;options.idle_timeout_sec = -1;  // 连接空闲不超时(长连接,适合频繁调用)options.num_threads = 4;       // IO线程数:建议设为CPU核数(我的机器是4核)options.max_concurrency = 100; // 最大并发数:避免服务被压垮// 启动BRPC Server,监听指定端口ret = server.Start(FLAGS_listen_port, &options);if (ret != 0) {LOG_FATAL("启动BRPC服务失败!端口:{},错误码:{}", FLAGS_listen_port, ret);return -1;}LOG_INFO("BRPC服务启动成功,监听端口:{}", FLAGS_listen_port);// 第五步:注册服务到Etcd// 服务Key:base_service + instance_name(如/service/echo/instance1)std::string service_key = FLAGS_base_service + FLAGS_instance_name;Registry::ptr registry = std::make_shared<Registry>(FLAGS_etcd_host);if (!registry->registry(service_key, FLAGS_access_host)) {LOG_FATAL("服务注册到Etcd失败!");server.Stop(0);  // 注册失败,停止服务return -1;}// 第六步:阻塞等待服务停止(按Ctrl+C触发)LOG_INFO("服务端已就绪,等待客户端调用...(按Ctrl+C退出)");server.RunUntilAskedToQuit();// 退出前清理:停止服务server.Stop(0);LOG_INFO("服务端已退出");return 0;
}

5. 客户端实现:发现服务 + 发起 RPC 调用(client.cc)

客户端需要 “管理信道”(避免每次调用创建连接)和 “负载均衡”,所以依赖之前写的信道管理模块(channel.hpp)。

5.1 先补信道管理模块(channel.hpp)
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "logger.hpp"  // 依赖日志模块/*** @brief 单个服务的信道管理类:管理一个服务的所有节点信道,实现轮询负载均衡*/
class ServiceChannel {
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;  // BRPC信道智能指针ServiceChannel(const std::string &service_name) : _service_name(service_name), _index(0) {}/*** @brief 添加节点信道:服务上线时调用* @param host 节点访问地址(如127.0.0.1:7070)*/void append(const std::string &host) {if (host.empty()) {LOG_ERROR("添加信道失败:节点地址为空!服务名:{}", _service_name);return;}// 初始化BRPC信道auto channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = 3000;  // 连接超时3秒options.timeout_ms = 5000;          // 调用超时5秒options.max_retry = 2;              // 重试2次(避免临时网络问题)options.protocol = "baidu_std";     // BRPC默认协议int ret = channel->Init(host.c_str(), &options);if (ret != 0) {LOG_ERROR("初始化信道失败!服务名:{},节点:{},错误码:{}",_service_name, host, ret);return;}// 加锁:避免多线程同时修改信道列表std::unique_lock<std::mutex> lock(_mutex);_host_map[host] = channel;  // 地址→信道映射,方便删除_channel_list.push_back(channel);  // 信道列表,用于轮询LOG_DEBUG("添加信道成功!服务名:{},节点:{},当前信道数:{}",_service_name, host, _channel_list.size());}/*** @brief 删除节点信道:服务下线时调用* @param host 节点访问地址*/void remove(const std::string &host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _host_map.find(host);if (it == _host_map.end()) {LOG_WARN("删除信道失败:节点不存在!服务名:{},节点:{}",_service_name, host);return;}// 从信道列表中删除for (auto vit = _channel_list.begin(); vit != _channel_list.end(); ++vit) {if (*vit == it->second) {_channel_list.erase(vit);break;}}_host_map.erase(it);LOG_DEBUG("删除信道成功!服务名:{},节点:{},当前信道数:{}",_service_name, host, _channel_list.size());}/*** @brief 轮询选择一个信道:客户端调用时用* @return 可用的信道(空表示没有可用节点)*/ChannelPtr choose() {std::unique_lock<std::mutex> lock(_mutex);if (_channel_list.empty()) {LOG_ERROR("没有可用的信道!服务名:{}", _service_name);return nullptr;}// 轮询策略:_index自增取模,保证每个节点调用次数均匀int idx = _index++ % _channel_list.size();LOG_DEBUG("轮询选择信道:服务名:{},信道索引:{},当前计数器:{}",_service_name, idx, _index);return _channel_list[idx];}private:std::mutex _mutex;                  // 互斥锁:保护信道列表线程安全int32_t _index;                     // 轮询计数器std::string _service_name;          // 服务名(如/service/echo)std::vector<ChannelPtr> _channel_list;  // 信道列表:用于轮询std::unordered_map<std::string, ChannelPtr> _host_map;  // 地址→信道映射
};/*** @brief 多服务信道管理类:管理多个服务的ServiceChannel,是客户端的核心*/
class ServiceManager {
public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() {}/*** @brief 声明要关注的服务:避免处理无关服务的节点变化* @param service_name 服务名(如/service/echo)*/void declared(const std::string &service_name) {if (service_name.empty()) {LOG_ERROR("声明服务失败:服务名为空!");return;}std::unique_lock<std::mutex> lock(_mutex);_关注的服务.insert(service_name);LOG_INFO("已声明关注服务:{}", service_name);}/*** @brief 选择服务的一个信道:客户端发起调用前调用* @param service_name 服务名* @return 可用信道(空表示无可用节点)*/ServiceChannel::ChannelPtr choose_channel(const std::string &service_name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _service_channel_map.find(service_name);if (it == _service_channel_map.end()) {LOG_ERROR("选择信道失败:服务未初始化!服务名:{}", service_name);return nullptr;}return it->second->choose();}/*** @brief 服务节点上线回调:由Etcd Discovery触发* @param service_key Etcd中的Key(如/service/echo/instance1)* @param host 节点地址(如127.0.0.1:7070)*/void on_service_online(const std::string &service_key, const std::string &host) {// 从Key中提取服务名(如/service/echo/instance1 → /service/echo)std::string service_name = extract_service_name(service_key);if (service_name.empty()) {LOG_ERROR("服务上线回调失败:Key格式错误!Key:{}", service_key);return;}// 检查是否关注该服务std::unique_lock<std::mutex> lock(_mutex);if (_关注的服务.count(service_name) == 0) {LOG_DEBUG("服务上线但不关注:服务名:{},节点:{}", service_name, host);return;}// 初始化ServiceChannel(不存在则创建)auto it = _service_channel_map.find(service_name);if (it == _service_channel_map.end()) {auto service_channel = std::make_shared<ServiceChannel>(service_name);_service_channel_map[service_name] = service_channel;LOG_INFO("初始化服务信道管理:服务名:{}", service_name);it = _service_channel_map.find(service_name);}// 解锁后调用append(避免append中的锁和当前锁嵌套)lock.unlock();it->second->append(host);}/*** @brief 服务节点下线回调:由Etcd Discovery触发* @param service_key Etcd中的Key* @param host 节点地址*/void on_service_offline(const std::string &service_key, const std::string &host) {std::string service_name = extract_service_name(service_key);if (service_name.empty()) {LOG_ERROR("服务下线回调失败:Key格式错误!Key:{}", service_key);return;}std::unique_lock<std::mutex> lock(_mutex);if (_关注的服务.count(service_name) == 0) {LOG_DEBUG("服务下线但不关注:服务名:{},节点:{}", service_name, host);return;}auto it = _service_channel_map.find(service_name);if (it == _service_channel_map.end()) {LOG_WARN("服务下线但信道管理未初始化:服务名:{}", service_name);return;}// 解锁后调用removelock.unlock();it->second->remove(host);}private:/*** @brief 从Etcd Key中提取服务名* @param service_key Etcd Key(如/service/echo/instance1)* @return 服务名(如/service/echo)*/std::string extract_service_name(const std::string &service_key) {// 找到最后一个'/'的位置(如/service/echo/instance1 → 最后一个/在index=13)size_t last_slash_pos = service_key.find_last_of('/');if (last_slash_pos == std::string::npos || last_slash_pos == 0) {LOG_ERROR("Key格式错误,必须是/xxx/xxx格式!Key:{}", service_key);return "";}// 截取从0到last_slash_pos的部分(不含last_slash_pos)return service_key.substr(0, last_slash_pos);}private:std::mutex _mutex;  // 互斥锁:保护服务列表和信道映射// 关注的服务集合:只处理这些服务的节点变化std::unordered_set<std::string> _关注的服务;// 服务名→ServiceChannel映射:一个服务对应一个信道管理std::unordered_map<std::string, ServiceChannel::ptr> _service_channel_map;
};
5.2 客户端代码(client.cc)
#include "../common/etcd.hpp"    // 自己写的Etcd模块
#include "../common/channel.hpp"  // 自己写的信道管理模块
#include "../common/logger.hpp"  // 自己写的日志模块
#include <gflags/gflags.h>       // 解析命令行参数
#include <thread>
#include "main.pb.h"             // Proto生成的代码// 命令行参数定义
DEFINE_bool(run_mode, false, "运行模式:false-调试,true-发布");
DEFINE_string(log_file, "", "发布模式必填:日志文件路径(如./client.log)");
DEFINE_int32(log_level, 2, "日志等级:0-trace~5-critical");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "Etcd服务地址");
DEFINE_string(base_service, "/service", "服务根目录(Etcd中的Key前缀)");
DEFINE_string(call_service, "/service/echo", "要调用的服务名(如/service/echo)");
DEFINE_int32(call_interval, 1, "调用间隔(秒):默认1秒调用一次");int main(int argc, char *argv[]) {// 第一步:解析命令行参数google::ParseCommandLineFlags(&argc, &argv, true);// 第二步:初始化日志init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);LOG_INFO("客户端日志模块初始化成功,运行模式:{}", FLAGS_run_mode ? "发布" : "调试");// 第三步:初始化信道管理器ServiceManager::ptr service_manager = std::make_shared<ServiceManager>();// 声明要调用的服务(只处理这个服务的节点变化)service_manager->declared(FLAGS_call_service);// 第四步:初始化Etcd Discovery,绑定节点变化回调// 节点上线回调:绑定到service_manager的on_service_onlineauto on_online = std::bind(&ServiceManager::on_service_online, service_manager.get(), std::placeholders::_1,  // 第一个参数:service_keystd::placeholders::_2   // 第二个参数:host);// 节点下线回调:绑定到service_manager的on_service_offlineauto on_offline = std::bind(&ServiceManager::on_service_offline, service_manager.get(), std::placeholders::_1, std::placeholders::_2);// 创建Discovery对象:拉取节点+启动监控Discovery::ptr discovery = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, on_online, on_offline);// 第五步:循环发起RPC调用LOG_INFO("客户端已就绪,开始调用服务:{},间隔:{}秒(按Ctrl+C退出)",FLAGS_call_service, FLAGS_call_interval);while (true) {// 5.1 从信道管理器获取一个可用信道auto channel = service_manager->choose_channel(FLAGS_call_service);if (!channel) {LOG_ERROR("获取信道失败,{}秒后重试...", FLAGS_call_interval);std::this_thread::sleep_for(std::chrono::seconds(FLAGS_call_interval));continue;}// 5.2 创建BRPC Stub(用于发起调用)example::EchoService_Stub stub(channel.get());// 5.3 构造请求example::EchoRequest req;req.set_message("你好,这是客户端的请求!");  // 实际项目中可以传业务参数// 5.4 发起同步RPC调用brpc::Controller cntl;  // BRPC控制器example::EchoResponse rsp;stub.Echo(&cntl, &req, &rsp, nullptr);  // 最后一个参数:异步回调(这里用同步)// 5.5 处理调用结果if (cntl.Failed()) {LOG_ERROR("RPC调用失败!错误信息:{},错误码:{}",cntl.ErrorText(), cntl.ErrorCode());} else {LOG_INFO("RPC调用成功!请求:{},响应:{}",req.message().c_str(), rsp.message().c_str());}// 5.6 间隔指定时间后继续调用std::this_thread::sleep_for(std::chrono::seconds(FLAGS_call_interval));}return 0;
}

四、部署运行:从编译到测试

1. 目录结构

先理清楚目录结构,避免编译时路径错误:

rpc-demo/
├── common/                # 公共模块
│   ├── etcd.hpp           # Etcd注册发现
│   ├── channel.hpp        # 信道管理
│   └── logger.hpp         # 日志模块
├── server/                # 服务端
│   └── server.cc          # 服务端代码
├── client/                # 客户端
│   └── client.cc          # 客户端代码
└── proto/                 # Proto文件和生成的代码├── main.proto         # Proto定义├── main.pb.h          # 生成的头文件└── main.pb.cc         # 生成的源文件

2. 编译命令

需要链接的库:BRPC、Etcd 客户端、Protobuf、gflags、spdlog、pthread(线程库)等。

rpc-demo根目录下创建Makefile(方便编译):

# Makefile for RPC Demo
CC = g++
CFLAGS = -std=c++11 -Wall -g  # -g用于调试,发布时可以去掉
INCLUDES = -I./common -I./proto  # 头文件路径
# 依赖库路径(如果库不在默认路径,需要加-L/usr/local/lib)
LIBS = -lbrpc -letcd-cpp-api -lprotobuf -lgflags -lspdlog -lpthread -lz -lssl -lcrypto# 目标:服务端和客户端
TARGETS = server/client server/serverall: $(TARGETS)# 编译服务端
server/server: server/server.cc proto/main.pb.cc$(CC) $(CFLAGS) $(INCLUDES) $^ -o $@ $(LIBS)# 编译客户端
server/client: client/client.cc proto/main.pb.cc$(CC) $(CFLAGS) $(INCLUDES) $^ -o $@ $(LIBS)# 清理生成的文件
clean:rm -f $(TARGETS)rm -f proto/*.pb.h proto/*.pb.cc  # 可选:清理生成的Proto代码

然后执行编译:

make -j4  # -j4表示用4个线程编译,速度更快

3. 启动步骤

3.1 先启动 Etcd

如果没有 Etcd,先安装(Ubuntu):

sudo apt install etcd
# 启动Etcd(默认监听2379端口)
etcd
3.2 启动服务端
# 调试模式启动:日志输控制台,监听7070端口
./server/server --run_mode=false --listen_port=7070 --instance_name=/echo/instance1# 或者发布模式启动:日志输文件
# ./server/server --run_mode=true --log_file=./server.log --listen_port=7070 --instance_name=/echo/instance1

服务端启动成功后,会输出:

[rpc-debug][16:00:00][1234][info] 日志模块初始化成功,运行模式:调试 (logger.hpp:58)
[rpc-debug][16:00:00][1234][info] BRPC服务启动成功,监听端口:7070 (server.cc:103)
[rpc-debug][16:00:00][1234][info] Etcd租约初始化成功,租约ID:123456,TTL:3秒 (etcd.hpp:45)
[rpc-debug][16:00:00][1234][info] 服务注册成功!Etcd Key:/service/echo/instance1,Value:127.0.0.1:7070 (etcd.hpp:78)
[rpc-debug][16:00:00][1234][info] 服务端已就绪,等待客户端调用...(按Ctrl+C退出) (server.cc:115)
3.3 启动客户端

打开新的终端,启动客户端:

# 调试模式启动:1秒调用一次
./server/client --run_mode=false --call_interval=1# 或者发布模式启动
# ./server/client --run_mode=true --log_file=./client.log --call_interval=1

客户端启动成功后,会输出:

[rpc-debug][16:01:00][5678][info] 客户端日志模块初始化成功,运行模式:调试 (logger.hpp:58)
[rpc-debug][16:01:00][5678][info] 已声明关注服务:/service/echo (channel.hpp:165)
[rpc-debug][16:01:00][5678][info] 拉取到1个服务节点,根目录:/service (etcd.hpp:153)
[rpc-debug][16:01:00][5678][info] Etcd Watcher启动成功,监控根目录:/service (etcd.hpp:172)
[rpc-debug][16:01:00][5678][info] 客户端已就绪,开始调用服务:/service/echo,间隔:1秒(按Ctrl+C退出) (client.cc:75)
[rpc-debug][16:01:00][5678][debug] 轮询选择信道:服务名:/service/echo,信道索引:0,当前计数器:1 (channel.hpp:109)
[rpc-debug][16:01:00][5678][info] RPC调用成功!请求:你好,这是客户端的请求!,响应:你好,这是客户端的请求! -- 服务端已收到,这是响应! (client.cc:98)
3.4 测试负载均衡

可以启动多个服务端(修改instance_namelisten_port),比如:

# 第二个服务端:监听7071端口,实例名instance2
./server/server --run_mode=false --listen_port=7071 --instance_name=/echo/instance2 --access_host=127.0.0.1:7071

此时客户端会自动发现新节点,调用时会轮询两个服务端,日志中会显示 “信道索引 0” 和 “信道索引 1” 交替出现。

五、实战优化:那些容易忽略的细节

  1. Etcd 租约时间:3 秒是比较均衡的选择 —— 太短会导致频繁续期(增加 Etcd 压力),太长会让僵尸节点存活久(客户端可能调用失败);

  2. BRPC 线程数options.num_threads建议设为 CPU 核数(如 4 核设 4),太多会导致线程切换开销,太少会处理不过来;

  3. 日志轮转:发布模式下,日志文件会越来越大,需要用logrotate工具配置轮转(比如每天生成一个新文件,保留 7 天);

  4. 异常处理:代码中加了很多错误检查(如信道初始化失败、Etcd 注册失败),实际项目中还可以加告警(如调用失败次数超过阈值时发邮件);

  5. 服务名规范:建议用/业务/服务名的格式(如/payment/order),避免不同业务的服务名冲突。

http://www.dtcms.com/a/403320.html

相关文章:

  • 企业为什么建设网站万网主机怎么上传网站吗
  • 【C++】Visual Studio+CMake 开发 C++ 入门指南:从环境搭建到项目实战
  • web3hardhat 框架实战-ERC20
  • Linux《线程同步和互斥(上)》
  • Hibernate批量操作详解及最优实践
  • 住房与建设部网站如何修改dns 快速使用境外网站
  • 【复习】计网每日一题--PPP链路
  • cpt和pretrain的差别,IFT和SFT的差别是怎么样的
  • RTX5060 Ti显卡安装cuda版本PyTorch踩坑记录
  • MongoDB数据类型与python操作
  • 脑电模型实战系:脑电模型进阶-构建一个高效的全连接网络
  • 东莞网站建设怎么收费展会布置效果图
  • 滕滕州网站建设住房和建设局官网
  • Vue调用本地EXE程序
  • Vue2 全局事件总线:通俗易懂 + 简单案例
  • Flask模板中使用Vue、ant-design-vue、@ant-design/icons-vue示例模板
  • 石狮建设局网站保定网站建设系统
  • vLLM PD分离推理服务配置指南
  • C++ 学习与 CLion 使用:(十五)多文件编程,和C语言一样的多文件编程
  • BEAT币
  • 淘宝的网站怎么做公司网站如何被收录
  • Ansible实现自动化运维
  • Zabbix7.4.8(三):通过Zabbix agent 2监控Docker相关指标
  • 小型个人网站制作网页打不开的原因及解决方法
  • Ansible 入门到实战:自动化运维的瑞士军刀
  • 嵌入式学习---(linux驱动)
  • k8s集群与gitlab registry连接
  • MySQL笔记---对表的操作
  • 【实战避坑】MySQL修改表字段长度完整指南:从语法、锁表机制到在线DDL详解
  • 乐峰网网站是谁做的海门住房和城乡建设局网站