RPC - 服务注册与发现模块
为什么要服务注册,服务注册是做什么
服务注册主要是实现分布式的系统,让系统更加的健壮,一个节点主机将自己所能提供的服务,在注册中心进行登记
为什么要服务发现,服务发现是要做什么
rpc调用者需要知道哪个节点主机能够为自己提供指定的服务
服务发现其实就是询问注册中心,谁能为自己提供指定的服务,将节点信息给保存起来以待后用
服务下线
当前使用长连接进行服务主机是否在线的判断,一旦服务提供方断开连接,查询这个主机提供了哪些服务,分析哪些调用者进行过这些服务发现,则进行服务下线通知
服务上线
因为服务发现是一锤子买卖(调用方不会进行二次服务发现),因此一旦中途有新的主机可以提供指定的服务,调用方是不知道的
因此,一旦某个服务上线了,则对发现过这个服务的主机进行一次服务上线通知
所以现在要做的事情:
就是将服务注册,发现功能集合到客户端中
将服务信息管理集合到服务端中
服务端
服务端需要如何实现服务信息的管理:
服务端要能够提供服务注册,发现的请求业务处理
1.需要将哪个服务能够由哪个主机提供管理起来,所以必然是需要hash<method,vector<host>>,当有这个method的时候,就把对应的host加进去,如果没有method的时候,就新建一个method和host的对应关系
实现当由caller进行服务发现的时候,告诉caller谁能提供指定的服务
2.需要将哪个主机发现过哪个服务管理起来
当进行服务通知的时候,都是根据谁发现过这个服务,才会给谁通知,所以也需要个hash<method,vector<discoverer>>
3.需要将哪个连接对应哪个服务提供者管理起来,所以也需要hash<conn,provider>
作用:当一个链接断开的时候,能够知道哪个主机的哪些服务下线了,然后才能给发现者通知xxx的xxx服务下线了
4.需要将哪个连接对应哪个服务发现者管理起来hash<conn,discoverver>
当一个连接断开的时候,如果有服务上线下线,就不需要给它通知了
客户端
客户端的功能比较分离,注册端跟发现端根本就不再同一个主机上。因此客户端的注册与发现是完全分离的
1.作为服务提供者,--需要一个能够进行服务注册的接口
连接注册中心,进行服务注册,
2.作为服务发现者 -- 需要一个能够进行服务发现的接口,需要进行服务上线/下线通知请求的处理(需要向dispatcher提供一个请求处理的回调函数)。需要将获取到的能够提供指定服务的主机信息管理起来hash<method,vector<host>> 一次发现,多次使用,没有的话再次进行发现。
连接注册中心,进行服务发现
生活中的例子
想象一个连锁餐厅系统,需要一个中央管理办公室(注册中心)来协调各个分店(服务提供者)和顾客(客户端):
ProviderManager(供应商管理器):
核心功能:管理服务提供者(Provider)及其提供的服务
class ProviderManager {public:using ptr = std::shared_ptr<ProviderManager>;struct Provider {using ptr = std::shared_ptr<Provider>;std::mutex _mutex;BaseConnection::ptr conn;Address host;std::vector<std::string> methods;Provider(const BaseConnection::ptr &c, const Address &h):conn(c), host(h){}void appendMethod(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//当一个新的服务提供者进行服务注册的时候调用void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method) {Provider::ptr provider;//查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {provider = it->second;}else {provider = std::make_shared<Provider>(c, h);_conns.insert(std::make_pair(c, provider));}//method方法的提供主机要多出一个,_providers新增数据auto &providers = _providers[method];providers.insert(provider);}//向服务对象中新增一个所能提供的服务名称provider->appendMethod(method);}//当一个服务提供者断开连接的时候,获取他的信息--用于进行服务下线通知Provider::ptr getProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {return it->second;}return Provider::ptr();}//当一个服务提供者断开连接的时候,删除它的关联信息void delProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()) {//当前断开连接的不是一个服务提供者return;}//如果是提供者,看看提供了什么服务,从服务者提供信息中删除当前服务提供者for (auto & method : it->second->methods) {auto &providers = _providers[method];providers.erase(it->second);}//删除连接与服务提供者的关联关系_conns.erase(it);}std::vector<Address> methodHosts(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end()) {return std::vector<Address>();}std::vector<Address> result;for (auto &provider : it->second) {result.push_back(provider->host);}return result;}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Provider::ptr>> _providers;std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;};
- 相当于总部的"分店登记册"
- 记录了每家分店(Provider)的位置、联系方式(conn),以及提供的菜品(methods)
- 当新分店开业,会调用addProvider登记信息
- 当分店关闭,会调用delProvider注销信息
- 顾客询问特定菜品时,通过methodHosts查询哪些分店提供此菜品
addProvider函数详解
函数目的
注册一个服务提供者
void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method) {Provider::ptr provider;//查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {provider = it->second;}else {provider = std::make_shared<Provider>(c, h);_conns.insert(std::make_pair(c, provider));}//method方法的提供主机要多出一个,_providers新增数据auto &providers = _providers[method];providers.insert(provider);}//向服务对象中新增一个所能提供的服务名称provider->appendMethod(method);}
addProvider函数需要三个关键参数:
- c (BaseConnection::ptr):与服务提供者的网络连接,用于后续通信
- h (Address):服务提供者的网络地址,包含IP和端口信息
- method (std::string):服务提供者要注册的服务方法名
过程
- 先加锁检查连接是否已关联提供者对象,如果不加锁,那在检查到存在之后,突然别人调用删除的函数,把这个提供者删除了,但是原来的不知道以为还在呢,就会造成错误。
- 若已存在,获取已有提供者对象
- 若不存在,创建新的提供者对象并记录在_conns中
- 将提供者加入到对应服务名的提供者集合中(_providers[method])
- 在提供者的方法列表中添加此服务名
_conns映射表和_providers映射表里面存放的关系如下:
-------------------------------------------------------------------------
_conns映射表:
TCP连接#1253 -> {地址: "192.168.1.10:8080", 提供的服务: ["做炒饭", "做汉堡", "做披萨"]}
-------------------------------------------------------------------------
_providers映射表:
"做炒饭" -> {阳光餐厅}
"做汉堡" -> {阳光餐厅}
"做披萨" -> {阳光餐厅}
-------------------------------------------------------------------------
注册的例子
假设有两家披萨店要注册到我们的送餐平台:
_conns和_providers的初始状态:
_conns = {} // 空映射表
_providers = {} // 空映射表
场景1: 多米诺披萨店注册"芝士披萨"服务
addProvider(conn1, {"192.168.1.101", 9001}, "芝士披萨");
执行步骤:
- 创建未初始化的provider变量
Provider::ptr provider; // 空智能指针
- 加锁,查找conn1在_conns中
auto it = _conns.find(conn1); // 找不到,it == _conns.end()
- 创建新的Provider对象并建立关联
provider = std::make_shared<Provider>(conn1, {"192.168.1.101", 9001});// provider现在指向一个新对象: {conn: conn1, host: {"192.168.1.101", 9001}, methods: []}_conns.insert(std::make_pair(conn1, provider));// _conns变为: {conn1 -> provider}
- 处理服务提供者集合
auto &providers = _providers["芝士披萨"];// _providers变为: {"芝士披萨" -> []} (创建空集合)providers.insert(provider);// _providers变为: {"芝士披萨" -> [provider]}
- 解锁,添加方法到提供者
provider->appendMethod("芝士披萨");// provider变为: {conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨"]}
- 此时的映射表
_conns = {conn1 -> Provider{conn: conn1,host: {"192.168.1.101", 9001},methods: ["芝士披萨"]}
}_providers = {"芝士披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨"]}}
}
场景2: 多米诺披萨店注册"夏威夷披萨"服务
addProvider(conn1, {"192.168.1.101", 9001}, "夏威夷披萨");
执行步骤:
- 创建provider变量
Provider::ptr provider; // 空智能指针
- 加锁,查找conn1在_conns中
auto it = _conns.find(conn1); // 找到了,it->second指向多米诺的Provider对象provider = it->second;// provider现在指向已存在对象: {conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨"]}
- 处理服务提供者集合
auto &providers = _providers["夏威夷披萨"];// _providers变为: {"芝士披萨" -> [多米诺provider], "夏威夷披萨" -> []}providers.insert(provider);// _providers变为: {"芝士披萨" -> [多米诺provider], "夏威夷披萨" -> [多米诺provider]}
- 解锁,添加方法到提供者
provider->appendMethod("夏威夷披萨");// provider变为: {conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}
- 此时的映射表
_conns = {conn1 -> Provider{conn: conn1,host: {"192.168.1.101", 9001},methods: ["芝士披萨", "夏威夷披萨"]}
}_providers = {"芝士披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}},"夏威夷披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}}
}
场景3: 必胜客注册"芝士披萨"服务
addProvider(conn2, {"192.168.1.102", 9002}, "芝士披萨");
执行步骤:
- 创建provider变量
Provider::ptr provider; // 空智能指针
- 加锁,查找conn2在_conns中
auto it = _conns.find(conn2); // 找不到,it == _conns.end()
- 创建新的Provider对象并建立关联
provider = std::make_shared<Provider>(conn2, {"192.168.1.102", 9002});// provider现在指向一个新对象: {conn: conn2, host: {"192.168.1.102", 9002}, methods: []}_conns.insert(std::make_pair(conn2, provider));// _conns变为: {conn1 -> 多米诺provider, conn2 -> 必胜客provider}
- 处理服务提供者集合
auto &providers = _providers["芝士披萨"];// 获取已存在的集合: {"芝士披萨" -> [多米诺provider]}providers.insert(provider);// _providers变为: {"芝士披萨" -> [多米诺provider, 必胜客provider], "夏威夷披萨" -> [多米诺provider]}
- 解锁,添加方法到提供者
provider->appendMethod("芝士披萨");// 必胜客provider变为: {conn: conn2, host: {"192.168.1.102", 9002}, methods: ["芝士披萨"]}
- 最终的映射表
_conns = {conn1 -> Provider{conn: conn1,host: {"192.168.1.101", 9001},methods: ["芝士披萨", "夏威夷披萨"]},conn2 -> Provider{conn: conn2,host: {"192.168.1.102", 9002},methods: ["芝士披萨"]}
}_providers = {"芝士披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]},Provider{conn: conn2, host: {"192.168.1.102", 9002}, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}}
}
getProvider函数详解:
函数目的
获取与指定连接关联的服务提供者对象。主要用于当提供者连接断开时,获取其提供者信息以便进行服务下线通知。
执行过程示例
假设当前系统状态如下:
场景1: 查找存在的连接
Provider::ptr provider = getProvider(conn1);
执行步骤:
- 加锁保护共享数据
std::unique_lock<std::mutex> lock(_mutex);
- 在_conns映射表中查找conn1
auto it = _conns.find(conn1); // 找到了,it指向{conn1 -> 多米诺Provider对象}的条目
- 找到了,返回对应的Provider对象
return it->second; // 返回多米诺Provider对象的智能指针
- 函数结束,锁自动释放
- 结果:返回多米诺的Provider对象指针。
场景2: 查找不存在的连接
Provider::ptr provider = getProvider(conn3); // conn3是一个不在_conns中的连接
执行步骤:
- 加锁保护共享数据
- 在_conns映射表中查找conn
auto it = _conns.find(conn3); // 找不到,it == _conns.end()
没找到,返回空智能指针
return Provider::ptr(); // 返回一个默认构造的空智能指针
- 函数结束,锁自动释放
结果:返回空智能指针,表示该连接不对应任何服务提供者。
delProvider函数详解:
函数目的
当服务提供者断开连接时,从系统中清理该提供者的所有相关信息,包括:
- 从各个服务的提供者集合中移除该提供者
- 删除连接与提供者对象的关联关系
假设当前系统状态如下:
_conns = {conn1 -> Provider{conn: conn1,host: {"192.168.1.101", 9001},methods: ["芝士披萨", "夏威夷披萨"]},conn2 -> Provider{conn: conn2,host: {"192.168.1.102", 9002},methods: ["芝士披萨"]}
}_providers = {"芝士披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]},Provider{conn: conn2, host: {"192.168.1.102", 9002}, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}}
}
场景1: 删除多米诺披萨店(conn1)
delProvider(conn1);
执行步骤:
- 加锁保护共享数据
std::unique_lock<std::mutex> lock(_mutex);
- 在_conns映射表中查找conn1
auto it = _conns.find(conn1); // 找到了,it指向{conn1 -> 多米诺Provider对象}的条目
- 找到了,开始清理
- 遍历提供者的所有服务方法
for (auto & method : it->second->methods) {// 遍历 ["芝士披萨", "夏威夷披萨"]
- 对于"芝士披萨":
auto &providers = _providers["芝士披萨"];// providers是 {多米诺provider, 必胜客provider}providers.erase(it->second);// 从集合中移除多米诺provider// providers变为 {必胜客provider}
- 对于"夏威夷披萨":
auto &providers = _providers["夏威夷披萨"];// providers是 {多米诺provider}providers.erase(it->second);// 从集合中移除多米诺provider// providers变为空集合 {}
- 删除连接与提供者的关联
_conns.erase(it);// _conns变为 {conn2 -> 必胜客provider}
- 函数结束,锁自动释放
- 最终的映射表
_conns = {conn2 -> Provider{conn: conn2,host: {"192.168.1.102", 9002},methods: ["芝士披萨"]}
}_providers = {"芝士披萨" -> {Provider{conn: conn2, host: {"192.168.1.102", 9002}, methods: ["芝士披萨"]}},"夏威夷披萨" -> {} // 空集合
}
场景2: 尝试删除不存在的连接
delProvider(conn3); // conn3是一个不在_conns中的连接
执行步骤:
- 加锁
- 在_conns中查找conn3,找不到
- 直接返回,不做任何操作
- 函数结束,锁自动释放
- 数据结构不变。
methodHosts函数详解:
函数目的
获取能提供特定服务的所有提供者地址列表。这是服务发现的核心功能,当客户端想使用某项服务时,通过此函数获取所有可用的服务提供者地址。
执行过程示例
_providers = {"芝士披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]},Provider{conn: conn2, host: {"192.168.1.102", 9002}, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Provider{conn: conn1, host: {"192.168.1.101", 9001}, methods: ["芝士披萨", "夏威夷披萨"]}},"意式披萨" -> {Provider{conn: conn3, host: {"192.168.1.103", 9003}, methods: ["意式披萨"]}}
}
场景1: 查询有多家提供的服务
std::vector<Address> hosts = methodHosts("芝士披萨");
执行步骤:
加锁保护共享数据
std::unique_lock<std::mutex> lock(_mutex);
在_providers映射表中查找"芝士披萨"
auto it = _providers.find("芝士披萨");// 找到了,it->second是provider集合 {多米诺provider, 必胜客provider}
创建结果向量
std::vector<Address> result; // 空向量
遍历所有提供"芝士披萨"的提供者
// 对于多米诺provider:result.push_back({"192.168.1.101", 9001});// result = [{"192.168.1.101", 9001}]// 对于必胜客provider:result.push_back({"192.168.1.102", 9002});// result = [{"192.168.1.101", 9001}, {"192.168.1.102", 9002}]
返回结果向量,锁自动释放
return result; // [{"192.168.1.101", 9001}, {"192.168.1.102", 9002}]
结果:返回两个地址的向量,表示有两家店提供芝士披萨。
场景2: 查询只有一家提供的服务
std::vector<Address> hosts = methodHosts("夏威夷披萨");
执行后,返回:[{"192.168.1.101", 9001}]
只包含多米诺披萨店的地址。
场景3: 查询不存在的服务
std::vector<Address> hosts = methodHosts("素食披萨");
执行步骤:
- 加锁
- 在_providers中查找"素食披萨",找不到
- 直接返回空向量
return std::vector<Address>(); // []
结果:返回空向量,表示没有店提供素食披萨。
DiscovererManager(发现者管理器):
class DiscovererManager {public:using ptr = std::shared_ptr<DiscovererManager>;struct Discoverer {using ptr = std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn; //发现者关联的客户端连接std::vector<std::string> methods; //发现过的服务名称Discoverer(const BaseConnection::ptr &c) : conn(c){}void appendMethod(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//当每次客户端进行服务发现的时候新增发现者,新增服务名称Discoverer::ptr addDiscoverer(const BaseConnection::ptr &c, const std::string &method) {Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {discoverer = it->second;}else {discoverer = std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c, discoverer));}auto &discoverers = _discoverers[method];discoverers.insert(discoverer);}discoverer->appendMethod(method);return discoverer;}//发现者客户端断开连接时,找到发现者信息,删除关联数据void delDiscoverer(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()) {//没有找到连接对应的发现者信息,代表客户端不是一个服务发现者return;}for (auto &method : it->second->methods) {auto discoverers = _discoverers[method];discoverers.erase(it->second);}_conns.erase(it);}//当有一个新的服务提供者上线,则进行上线通知void onlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_ONLINE);}//当有一个服务提供者断开连接,则进行下线通知void offlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method, const Address &host, ServiceOptype optype) {std::unique_lock<std::mutex> lock(_mutex);auto it = _discoverers.find(method);if (it == _discoverers.end()) {//这代表这个服务当前没有发现者return;}auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(optype);for (auto &discoverer : it->second) {discoverer->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers;std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;};
- 相当于总部的"顾客关注登记册"
- 记录了每位顾客(Discoverer)的联系方式和他们关注的菜品
- 顾客询问某菜品时,调用addDiscoverer记录他们的关注
- 当有新分店开业提供某菜品,通过onlineNotify通知所有关注此菜品的顾客
- 当分店关闭,通过offlineNotify通知顾客该分店不再提供服务
addDiscoverer函数详解:
函数目的
将客户端注册为特定服务的"发现者",以便在服务状态变更时收到通知。这是实现服务状态变更推送通知的基础。
执行过程示例
假设初始系统状态如下:
_conns = {} // 空映射表
_discoverers = {} // 空映射表
场景1: 小王第一次查询"芝士披萨"服务
Discoverer::ptr disc = addDiscoverer(clientConn1, "芝士披萨");
执行步骤:
创建discoverer变量(尚未初始化)
Discoverer::ptr discoverer; // 空智能指针
加锁保护共享数据
std::unique_lock<std::mutex> lock(_mutex);
在_conns映射表中查找clientConn1
auto it = _conns.find(clientConn1); // 找不到,it == _conns.end()
创建新的Discoverer对象
discoverer = std::make_shared<Discoverer>(clientConn1);// discoverer现在指向: {conn: clientConn1, methods: []}_conns.insert(std::make_pair(clientConn1, discoverer));// _conns变为: {clientConn1 -> discoverer}
将发现者添加到对应服务的发现者集合
auto &discoverers = _discoverers["芝士披萨"];// _discoverers变为: {"芝士披萨" -> []} (创建空集合)discoverers.insert(discoverer);// _discoverers变为: {"芝士披萨" -> [小王discoverer]}
解锁,向发现者添加已发现的服务
discoverer->appendMethod("芝士披萨");// discoverer变为: {conn: clientConn1, methods: ["芝士披萨"]}
返回discoverer对象
return discoverer;
此时的映射表
_conns = {clientConn1 -> Discoverer{conn: clientConn1,methods: ["芝士披萨"]}
}_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨"]}}
}
场景2: 小王又查询"夏威夷披萨"服务
Discoverer::ptr disc = addDiscoverer(clientConn1, "夏威夷披萨");
执行步骤:
- 加锁
- 在_conns中查找clientConn1 - 已存在
discoverer = it->second; // 获取已存在的发现者对象
将发现者添加到"夏威夷披萨"服务的发现者集合
auto &discoverers = _discoverers["夏威夷披萨"]; // 创建空集合discoverers.insert(discoverer); // 添加小王的发现者对象
解锁,添加方法
discoverer->appendMethod("夏威夷披萨");// discoverer变为: {conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}
- 返回discoverer对象
此时的映射表
_conns = {clientConn1 -> Discoverer{conn: clientConn1,methods: ["芝士披萨", "夏威夷披萨"]}
}_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}},"夏威夷披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}}
}
场景3: 小李查询"芝士披萨"服务
Discoverer::ptr disc = addDiscoverer(clientConn2, "芝士披萨");
执行后的映射表
_conns = {clientConn1 -> Discoverer{conn: clientConn1,methods: ["芝士披萨", "夏威夷披萨"]},clientConn2 -> Discoverer{conn: clientConn2,methods: ["芝士披萨"]}
}_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]},Discoverer{conn: clientConn2, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}}
}
delDiscoverer函数详解:
函数目的
当客户端断开连接时,从系统中清理该客户端的所有发现者相关信息,包括:
- 从各服务的发现者集合中移除该客户端
- 删除连接与发现者对象的关联关系
执行过程示例
假设当前系统状态如下:
_conns = {clientConn1 -> Discoverer{conn: clientConn1,methods: ["芝士披萨", "夏威夷披萨"]},clientConn2 -> Discoverer{conn: clientConn2,methods: ["芝士披萨"]}
}_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]},Discoverer{conn: clientConn2, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}}
}
场景1: 小王断开连接(clientConn1)
delDiscoverer(clientConn1);
执行步骤:
- 加锁保护共享数据
std::unique_lock<std::mutex> lock(_mutex);
在_conns映射表中查找clientConn1
auto it = _conns.find(clientConn1); // 找到了,it指向clientConn1的条目
遍历发现者的所有关注服务
for (auto &method : it->second->methods) {// 遍历 ["芝士披萨", "夏威夷披萨"]
从每个服务的发现者集合中移除小王
// 对于"芝士披萨":auto discoverers = _discoverers["芝士披萨"];// discoverers是 {小王discoverer, 小李discoverer}discoverers.erase(it->second);// 从集合中移除小王discoverer// discoverers变为 {小李discoverer}// 对于"夏威夷披萨":auto discoverers = _discoverers["夏威夷披萨"];// discoverers是 {小王discoverer}discoverers.erase(it->second);// 从集合中移除小王discoverer// discoverers变为空集合 {}
删除连接与发现者的关联
_conns.erase(it);// _conns变为 {clientConn2 -> 小李discoverer}
函数结束,锁自动释放
执行后的映射表
_conns = {clientConn2 -> Discoverer{conn: clientConn2,methods: ["芝士披萨"]}
}_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn2, methods: ["芝士披萨"]}},"夏威夷披萨" -> {} // 空集合
}
场景2: 尝试删除不是发现者的连接
delDiscoverer(clientConn3); // clientConn3不在_conns中
执行步骤:
- 加锁
- 在_conns中查找clientConn3,找不到
- 直接返回,不执行任何操作
- 锁自动释放
数据结构不变。
onlineNotify和offlineNotify函数详解:
函数目的
这两个函数是服务状态变更通知的入口:
- onlineNotify: 当新服务提供者注册时,向所有关注该服务的客户端发送上线通知
- offlineNotify: 当服务提供者下线时,向所有关注该服务的客户端发送下线通知
使用场景示例
场景1: 多米诺注册"芝士披萨"服务
当多米诺披萨店注册提供芝士披萨服务时:
// ProviderManager注册服务
providerManager.addProvider(conn1, {"192.168.1.101", 9001}, "芝士披萨");// PDManager通知所有关注芝士披萨服务的客户端
discovererManager.onlineNotify("芝士披萨", {"192.168.1.101", 9001});
这会调用notify函数,向所有已注册为"芝士披萨"服务发现者的客户端发送通知:
"多米诺披萨店(192.168.1.101:9001)现在可以提供芝士披萨服务了"
场景2: 多米诺突然断开连接
当多米诺披萨店由于意外断开连接时:
// 在连接关闭处理中
Provider::ptr provider = providerManager.getProvider(conn1);
if (provider) {// 对每个提供的服务发送下线通知for (auto &method : provider->methods) {discovererManager.offlineNotify(method, provider->host);}// 清理提供者数据providerManager.delProvider(conn1);
}
对于"芝士披萨"服务,这会调用notify函数,向所有关注该服务的客户端发送通知:
"多米诺披萨店(192.168.1.101:9001)已不再提供芝士披萨服务"
同样对于"夏威夷披萨"服务也会发送类似通知。
实际应用价值
这两个函数实现了服务注册中心的核心价值之一:实时服务状态推送。
在传统的服务发现模式中,客户端需要定期轮询注册中心来获取最新的服务可用性信息,这会导致以下问题:
- 增加注册中心负载
- 服务状态变化不能实时感知
- 轮询间隔设置困难(太短增加负载,太长影响实时性)
通过这种主动推送机制,客户端能够:
- 实时获知服务状态变化
- 减少不必要的轮询请求
- 更快速地对服务变更作出反应
比如当一个重要的服务提供者下线时,客户端可以立即切换到其他服务提供者,而不需要等到下一次轮询发现服务不可用。
这种设计使整个分布式系统的可用性和响应速度得到了显著提升。
notify函数详解:
函数目的
向所有关注特定服务的客户端发送服务状态变更通知。这是服务上线/下线通知的实际实现,负责构造通知消息并发送给相关客户端。
执行过程示例
假设当前系统状态如下:
_discoverers = {"芝士披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]},Discoverer{conn: clientConn2, methods: ["芝士披萨"]}},"夏威夷披萨" -> {Discoverer{conn: clientConn1, methods: ["芝士披萨", "夏威夷披萨"]}},"意式披萨" -> {} // 没有发现者关注此服务
}
场景1: 多米诺披萨店上线,提供"芝士披萨"服务
场景2: 尝试通知"意式披萨"服务变更
notify("意式披萨", {"192.168.1.103", 9003}, ServiceOptype::SERVICE_ONLINE);
执行步骤:
- 加锁
在_discoverers中查找"意式披萨"
auto it = _discoverers.find("意式披萨");// 找到了,但it->second是空集合
- 发现没有客户端关注此服务,直接返回
- 锁自动释放
场景3: 尝试通知"素食披萨"服务变更
notify("素食披萨", {"192.168.1.104", 9004}, ServiceOptype::SERVICE_ONLINE);
执行步骤:
- 加锁
- 在_discoverers中查找"素食披萨
auto it = _discoverers.find("素食披萨");// 找不到,it == _discoverers.end()
- 直接返回,不执行通知
- 锁自动释放
PDManager(提供者和发现者管理器):
- 相当于总部的"客服中心"
- 处理所有服务请求(onServiceRequest)
- 当分店要开业注册,处理登记并通知顾客(SERVICE_REGISTRY)
- 当顾客查询菜品,帮他登记关注并返回当前提供此菜品的分店列表(SERVICE_DISCOVERY)
- 当分店突然关闭连接,通知所有关注该分店菜品的顾客(onConnShutdown)
PDManager 类函数详解
PDManager(Provider-Discoverer Manager)是注册中心的核心组件,管理服务提供者和服务发现者。我将详细解析其中的每个函数。
1. 构造函数
PDManager():_providers(std::make_shared<ProviderManager>()),_discoverers(std::make_shared<DiscovererManager>())
{}
功能:初始化 PDManager,创建提供者管理器和发现者管理器。
生活类比:这就像建立一个美食平台的两个核心部门:
- 餐厅管理部门(ProviderManager):负责管理所有注册的餐厅
- 用户服务部门(DiscovererManager):负责处理用户的搜索请求和推送通知
onServiceRequest函数详解:
函数目的
这是PDManager的核心方法,处理所有服务相关请求,包括服务注册和服务发现。它是客户端与服务注册中心交互的主要入口点。
执行流程示例
场景1: 多米诺注册芝士披萨服务
收到消息: {type: SERVICE_REGISTRY,method: "芝士披萨",host: {"192.168.1.101", 9001}
}
执行步骤:
- 获取操作类型:optype = SERVICE_REGISTRY
- 判断为服务注册请求
- 记录日志:"192.168.1.101:9001 注册服务 芝士披萨"
- 添加服务提供者:_providers->addProvider(conn, {"192.168.1.101", 9001}, "芝士披萨")
- 将多米诺关联到"芝士披萨"服务
- 通知所有关注此服务的客户端:_discoverers->onlineNotify("芝士披萨", {"192.168.1.101", 9001})
- 向小王、小李等已关注芝士披萨服务的客户端发送服务上线通知
- 向多米诺发送注册成功响应:registryResponse(conn, msg)
- 返回状态码RCODE_OK,确认注册成功
场景2: 小张查询芝士披萨服务
收到消息: {type: SERVICE_DISCOVERY,method: "芝士披萨"
}
执行步骤:
- 获取操作类型:optype = SERVICE_DISCOVERY
- 判断为服务发现请求
- 记录日志:"客户端要进行 芝士披萨 服务发现!"
- 注册客户端为服务发现者:_discoverers->addDiscoverer(conn, "芝士披萨")
- 将小张添加到"芝士披萨"服务的发现者列表
- 这样当有新店提供芝士披萨时,小张也会收到通知
- 向小张发送发现响应:discoveryResponse(conn, msg)
- 获取所有提供"芝士披萨"的服务地址:hosts = _providers->methodHosts("芝士披萨")
- 如果找到提供者:
- 设置状态码RCODE_OK
- 设置方法名和提供者地址列表
- 如果没找到提供者:
- 设置状态码RCODE_NOT_FOUND_SERVICE
- 发送响应给客户端
场景3: 收到无效操作类型请求
收到消息: {type: UNKNOWN_OPERATION,method: "芝士披萨"
}
执行步骤:
- 获取操作类型:无效类型
- 不匹配任何已知操作类型
- 记录错误日志:"收到服务操作请求,但是操作类型错误!"
- 发送错误响应:errorResponse(conn, msg)
- 设置状态码RCODE_INVALID_OPTYPE
- 设置操作类型SERVICE_UNKNOW
- 发送响应给客户端
设计亮点
- 统一入口点:所有服务相关请求通过一个函数处理,简化了接口
- 职责明确:
- 服务注册 → 添加提供者 + 通知关注者 + 响应注册者
- 服务发现 → 添加发现者 + 返回当前提供者列表
- 双向关联:
- 服务发现不仅返回当前可用服务,还注册客户端为发现者
- 当服务状态变化时,客户端会收到推送通知
- 错误处理:针对无效请求有专门的错误响应机制
- 日志记录:关键操作都有日志记录,便于问题排查
这个函数是整个服务注册中心的核心处理逻辑,它将客户端的请求分派给具体的处理逻辑,并确保响应正确返回给客户端。通过这种方式,它实现了服务注册、服务发现和服务状态通知的完整功能。
处理流程:
- 获取操作类型(注册/发现)
- 服务注册处理:
- 添加服务提供者
- 向所有相关的发现者发送服务上线通知
- 发送注册响应
- 服务发现处理:
- 添加服务发现者
- 发送发现响应,包含提供该服务的主机列表
- 错误操作类型:发送错误响应
生活类比:这就像美食平台的客服中心:
- 餐厅来电:希望在平台上架新菜品(服务注册)
- 记录餐厅信息和菜品
- 通知已经关注这道菜的用户有新餐厅可选
- 向餐厅确认上架成功
- 用户来电:想找提供某道菜的餐厅(服务发现)
- 记录用户的查询偏好
- 回复提供该菜品的所有餐厅列表
- 接到不明来电:回复服务不理解请求