etcd和brpc联调图解
介绍
Register: 构造服务对象,启动服务,之后通过 etcd 注册服务
Discovery: 创建 brpc 管理对象,之后创建 etcd 监听,监听到服务后,根据服务类型调用相应的回调函数,这里的回调函数会创建响应的 brpc 信道(结合的关键),之 后通过brpc信道管理对象,获取提供Echo服务的信道,进行相应的 rpc 调用
代码逻辑
先画个简单的架构图(文字描述更直观),整个流程分四步:
- 服务端启动:初始化 BRPC 服务 → 实现 Echo 业务逻辑 → 把自己的地址注册到 Etcd(带 3 秒约);
- 客户端启动:连接 Etcd → 拉取已注册的服务节点 → 初始化信道管理;
- 实时发现:Etcd 通过 Watcher 监控服务节点变化,新节点上线时客户端自动加信道,节点下线时自动删信道;
- 客户端调用:通过轮询(RR)策略从信道池选一个节点,发起 RPC 调用,拿到响应后打印结果。
etcd 服务图解
brpc 服务图解
etcd 和 brpc 结合服务图解
etcd 封装代码
进行键值对的注册于存储,查询与发现
// 服务注册客户端类class Registry
{public:using ptr = std::shared_ptr<Registry>;Registry(const std::string &host) : _client(new etcd::Client(host)),_keepalive(_client->leasekeepalive(10).get()),_lease_id(_keepalive->Lease()){}~Registry() {}//这里的key和value分别对应服务目录"/service/user" 服务地址"127.0.0.1:9999"bool registry(const std::string &key, const std::string &val){auto resp = _client->put(key, val, _lease_id).get();}private:std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::KeepAlive> _keepalive;uint64_t _lease_id;
};----------------------------------------------------------------------------------------
class Discovery
{
public:using ptr = std::shared_ptr<Discovery>;using Notify_callback = std::function<void(std::string, std::string)>;Discovery(const std::string &host, const std::string &basedir, const Notify_callback &put_cb,const Notify_callback &del_cb) : _client(new etcd::Client(host)), _put_cb(put_cb), _del_cb(del_cb){_watcher=std::make_shared<etcd::Watcher>(*_client.get(), basedir, std::bind(&Discovery::callback, this, std::placeholders::_1),true);}private://函数的回调,根据不同的请求类型进行函数的回调void callback(const etcd::Response &resp){}private:Notify_callback _put_cb;Notify_callback _del_cb;std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::Watcher> _watcher;
};
brpc 封装代码
// 封装单个服务的信道管理类:class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string &name) : _service_name(name), _index(0){}// 服务新上线了一个节点,调用append新增信道void append(const std::string host){}// 服务下线了一个节点,调用remove释放信道void remove(const std::string &host){}// 通过RR轮转策略,获取一个Channel用于对应服务的Rpc调用ChannelPtr choose(){}private:std::mutex _mutex;int32_t _index; // 当前轮转下标计数器std::string _service_name; // 服务名称std::vector<ChannelPtr> _channels; // 当前服务对应的信道集合std::unordered_map<std::string, ChannelPtr> _hosts; // hosts和CHannel对应关系
};class ServiceManager
{public:using ptr = std::shared_ptr<ServiceManager>;// 获取服务的节点信道ServiceChannel::ChannelPtr choose(const std::string &service_name){}// 先声明,我关注哪些服务的上线和下线,不关心的就不需要管理void declared(const std::string &service_name){}// 服务上线时调用的接口,将节点管理起来void onServiceOnline(const std::string &service_instance, const std::string &host){}// 服务下线的调用接口,从服务信道管理中,删除指定的节点信道void onServiceOffline(const std::string &service_instance, const std::string &host){}private:std::string getServiceName(std::string name){}private:std::mutex _mutex;std::unordered_set<std::string> _follow_service;//服务名称和服务信道的对应std::unordered_map<std::string, ServiceChannel::ptr> _services;
};
注册端
#include <gflags/gflags.h>
#include <brpc/server.h>
#include <butil/logging.h>#include "../common/etcd.hpp"
#include "main.pb.h"DEFINE_string(log_name, "default-logger", "日志名称");
DEFINE_string(file_name, "stdout", "打印文件名称,默认为调试模式");
DEFINE_int32(log_evel, 0, "打印日志等级,默认为trace");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(base_instance, "/echo/instance", "服务监控根目录");
DEFINE_string(access_host, "127.0.0.1:7070", "当前实例的外部访问地址");
DEFINE_int32(listen_port, 7070, "Rpc服务器监听端口");// 创建子类继承于EchoService,实现rpc调用的业务功能class EchoServiceImpl : public example::EchoService
{
public:EchoServiceImpl() {}~EchoServiceImpl() {}void Echo(google::protobuf::RpcController *cntl_base,const example::EchoRequest *req,example::EchoResponse *res,google::protobuf::Closure *done) override{brpc::ClosureGuard done_guard(done);std::cout << "收到消息: " << req->message() + "--这是响应";std::string str = req->message() + "--这是响应";res->set_message(str);}
};int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);logger::init_logger(FLAGS_log_name, FLAGS_file_name, FLAGS_log_evel);// 关闭brpc日志logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 构造服务器对象brpc::Server server;// 向服务器对象中新增加EchoService服务EchoServiceImpl echo_service; // 这是个局部对象, 但是下边用了SERVER_OWNS_SERVICE,转移了控制权//用了SERVER_OWNS_SERVICE就表示brpc退出的时候会free释放这个echo_serviceint ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1){std::cout << "添加服务失败" << std::endl;return -1;}// 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; // 连接空闲超时时间options.num_threads = 1; // 服务器io线程数量ret = server.Start(FLAGS_listen_port, &options);if (ret == -1){std::cout << "启动服务失败" << std::endl;return -1;}// 注册服务Registry::ptr clinet = std::make_shared<Registry>(FLAGS_etcd_host);//注册的服务地址是"/service/echo/instance"clinet->registry(FLAGS_base_service + FLAGS_base_instance, FLAGS_access_host);server.RunUntilAskedToQuit(); // 休眠等待运行结束return 0;
}
发现端
#include <gflags/gflags.h>
#include <brpc/server.h>
#include <butil/logging.h>
#include <thread>#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/Channel.hpp"#include "main.pb.h"DEFINE_string(log_name, "default-logger", "日志名称");
DEFINE_string(file_name, "stdout", "打印文件名称,默认为调试模式");
DEFINE_int32(log_evel, 0, "打印日志等级,默认为trace");DEFINE_string(etcd_host,"http://127.0.0.1:2379","服务注册中心");
DEFINE_string(base_service, "/service", "服务监控根目录");DEFINE_string(call_service, "/service/echo", "服务监控根目录");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);logger::init_logger(FLAGS_log_name, FLAGS_file_name, FLAGS_log_evel);// 先构造Rpc信道管理对象auto sm = std::make_shared<ServiceManager>();//关注"/service/echo"下的服务sm->declared(FLAGS_call_service);auto put_cb = std::bind(&ServiceManager::onServiceOnline, sm.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, sm.get(), std::placeholders::_1, std::placeholders::_2);// 构造发现对象Discovery::ptr discovery = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);while (1){// 通过prc信道管理对象,获取提供Echo服务的信道auto channel = sm->choose(FLAGS_call_service);if (!channel){LOG_INFO("channel_choose 失败了----");std::this_thread::sleep_for(std::chrono::seconds(1));continue;}LOG_INFO("channel_choose 成功了----");// 发起EchoRpc调用example::EchoService_Stub stub(channel.get());// 进行rpc调用example::EchoRequest req;LOG_INFO("设置服务信息-----");LOG_INFO("设置服务信息-----");LOG_INFO("设置服务信息-----");req.set_message("你好小王");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();stub.Echo(cntl, &req, rsp, nullptr);if (cntl->Failed()){std::cout << "rpc调用失败" << cntl->ErrorText() << std::endl;delete cntl;delete rsp;std::this_thread::sleep_for(std::chrono::seconds(1));continue;}std::cout << "收到响应:" << rsp->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));}// 通过rpcreturn 0;
}