RPC - 客户端注册和发现模块
registryMethod 函数详解:
函数目的
registryMethod 是 Provider 类的核心方法,用于向服务注册中心注册服务。注册成功后,服务注册中心会更新内部的服务映射表,建立服务名称到提供者地址的映射关系。
执行流程示例
场景: 多米诺注册芝士披萨服务
// 多米诺披萨店注册芝士披萨服务
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("127.0.0.1", 8000);
Address pizzaHost("192.168.1.101", 9001);
bool success = provider->registryMethod(conn, "芝士披萨", pizzaHost);
- 创建服务注册请求消息:
auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod("芝士披萨");msg_req->setHost({"192.168.1.101", 9001});msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);
发送请求并等待响应:
BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);
服务注册中心处理请求(内部过程,不在当前函数中):
// 服务注册中心内部的处理逻辑void PDManager::onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {if (msg->optype() == ServiceOptype::SERVICE_REGISTRY) {std::string method = msg->method();Address host = msg->host();// 更新映射表:将服务方法与提供者关联_providers->addProvider(conn, host, method);// 映射表变化:// 之前: _method_providers["芝士披萨"] = [...]// 之后: _method_providers["芝士披萨"] = [..., {"192.168.1.101", 9001}]// 通知所有关注此服务的客户端_discoverers->onlineNotify(method, host);// 发送响应ServiceResponse::ptr rsp = MessageFactory::create<ServiceResponse>();rsp->setRcode(RCode::RCODE_OK);conn->sendMessage(rsp);}// ...其他操作类型处理...}
处理响应:
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("响应类型向下转换失败!");return false;}
检查响应状态
if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务注册失败,原因:%s", errReason(service_rsp->rcode()).c_str());return false;}
注册成功,映射表更新完成:
return true;
映射表变化详解
- 注册前的映射表状态:
_method_providers = {"夏威夷披萨": [{"192.168.1.102", 9001}],"素食披萨": [{"192.168.1.103", 9001}]}
注册后的映射表状态:
_method_providers = {"夏威夷披萨": [{"192.168.1.102", 9001}],"素食披萨": [{"192.168.1.103", 9001}],"芝士披萨": [{"192.168.1.101", 9001}] // 新增映射}
如果是已有服务新增提供者:
// 如果另一家披萨店也注册提供芝士披萨_method_providers = {"夏威夷披萨": [{"192.168.1.102", 9001}],"素食披萨": [{"192.168.1.103", 9001}],"芝士披萨": [{"192.168.1.101", 9001}, {"192.168.1.104", 9002}] // 添加新提供者}
注册成功后,服务注册中心还会通过 onlineNotify 通知所有关注"芝士披萨"服务的客户端,有新的服务提供者上线,客户端的 Discoverer 会更新本地缓存的服务提供者列表。
生活中的类比
新餐厅在美食外卖平台注册
想象一下,registryMethod 就像一家新开的餐厅(服务提供者)在美团或饿了么这样的外卖平台(服务注册中心)上注册自己的特色菜品(服务)。
场景:小王的川菜馆注册麻婆豆腐服务
现实生活中的流程:
小王开了一家川菜馆,想在外卖平台上注册提供麻婆豆腐。
小王登录外卖平台商家端,填写注册表单:
- 店铺名称:小王川菜馆
- 地址:北京市海淀区中关村大街128号
- 特色菜品:麻婆豆腐
- 联系电话:010-12345678
小王提交注册申请,等待平台审核。
外卖平台处理注册请求:
- 验证小王提供的信息
- 将"麻婆豆腐"与"小王川菜馆"关联起来
- 更新平台的菜品目录
注册成功,平台向小王发送确认消息。
平台更新用户端,现在用户搜索"麻婆豆腐"时,可以看到小王川菜馆。
代码中对应的流程
// 小王川菜馆注册麻婆豆腐服务
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("delivery-platform.com", 8000);
Address restaurantAddress("北京市海淀区中关村大街128号", 12345678); // 地址和电话
bool success = provider->registryMethod(conn, "麻婆豆腐", restaurantAddress);
映射表变化:
注册前的外卖平台菜品目录:
菜品目录 = {"宫保鸡丁": ["老张川菜馆", "蜀香小馆"],"水煮鱼": ["蜀香小馆", "渝味轩"]}
注册后的外卖平台菜品目录
菜品目录 = {"宫保鸡丁": ["老张川菜馆", "蜀香小馆"],"水煮鱼": ["蜀香小馆", "渝味轩"],"麻婆豆腐": ["小王川菜馆"] // 新增映射}
用户体验变化:
- 之前用户搜索"麻婆豆腐"时,找不到提供这道菜的餐厅
- 注册成功后,用户搜索"麻婆豆腐"时,会看到"小王川菜馆"
- 平台会向已经收藏"麻婆豆腐"的用户推送通知:"小王川菜馆现在提供麻婆豆腐啦!"
MethodHost 类详解:
类目的
MethodHost 类是一个服务地址管理器,负责存储和管理能够提供特定服务的所有主机地址,并提供负载均衡功能。它是服务发现机制中的核心组件,用于维护服务提供者列表并支持动态更新。
方法详解与执行流程
场景1: 麻辣烫服务新增提供者
当"老王麻辣烫"店铺上线,开始提供麻辣烫服务:
// 服务注册中心收到上线通知后
MethodHost::ptr methodHost = _method_hosts["麻辣烫"];
methodHost->appendHost({"192.168.1.105", 8080}); // 老王麻辣烫的地址
执行步骤:
- 获取互斥锁保护共享数据:std::unique_lock<std::mutex> lock(_mutex)
- 添加新主机到列表:_hosts.push_back({"192.168.1.105", 8080})
映射表变化
之前: _hosts = [{"192.168.1.101", 8080}] // 只有小李麻辣烫之后: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}] // 增加了老王麻辣烫
场景2: 麻辣烫服务提供者下线
当"小李麻辣烫"暂停营业,需要从服务列表中移除:
// 服务注册中心检测到连接断开后
MethodHost::ptr methodHost = _method_hosts["麻辣烫"];
methodHost->removeHost({"192.168.1.101", 8080}); // 小李麻辣烫的地址
执行步骤:
- 获取互斥锁保护共享数据:std::unique_lock<std::mutex> lock(_mutex)
- 遍历主机列表查找匹配地址:
for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == {"192.168.1.101", 8080}) {_hosts.erase(it);break;}}
映射表变化:
之前: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}]之后: _hosts = [{"192.168.1.105", 8080}] // 只剩下老王麻辣烫
- 现在客户端调用chooseHost()时,只会返回老王麻辣烫的地址
生活中的例子
想象一个外卖平台的"麻辣烫"分类页面:
初始状态:
- 页面显示3家麻辣烫店铺:小李、老王、小张
- 平台后台维护一个麻辣烫店铺列表:[小李, 老王, 小张]
新店开业(appendHost):
- 小赵开了一家新麻辣烫店并在平台注册
- 平台将小赵的店添加到列表:[小李, 老王, 小张, 小赵]
- 用户刷新页面,现在能看到4家麻辣烫店铺
店铺暂停营业(removeHost):
- 小张的店因装修暂停营业
- 平台将小张的店从列表移除:[小李, 老王, 小赵]
- 用户刷新页面,现在只能看到3家麻辣烫店铺
负载均衡(chooseHost):
- 当用户点击"随机推荐一家"按钮时
- 平台会轮流推荐列表中的店铺,确保每家店获得平等的展示机会
MethodHost类就像这个平台的后台管理系统,动态维护服务提供者列表,并在客户端请求时提供负载均衡的选择。
Discoverer 类详解:
类目的
Discoverer 类是RPC框架中的服务发现组件,负责查找和管理可用的服务提供者。它的主要职责是:
- 发现并获取能提供特定服务的主机地址
- 缓存服务提供者信息,提高查询效率
- 处理服务上线和下线通知,动态更新服务提供者列表
- 提供负载均衡功能,在多个服务提供者之间分配请求
核心功能
服务发现(serviceDiscovery方法):
- 查找能提供特定服务的主机
- 先检查本地缓存,如无则向服务注册中心查询
- 将查询结果缓存起来供后续使用
服务状态变更处理(onServiceRequest方法):
- 处理服务上线通知,添加新的服务提供者
- 处理服务下线通知,移除不可用的服务提供者
- 调用下线回调函数处理相关资源清理
生活中的例子
Discoverer类就像一个智能外卖APP:
查找服务(serviceDiscovery):
- 用户想点一份特定菜品(如"麻辣香锅")
- APP先检查缓存,看是否有最近浏览过的提供该菜品的餐厅
- 如果没有,则向服务器查询提供"麻辣香锅"的餐厅列表
- 获取列表后,选择一家餐厅(负载均衡)并缓存整个列表
处理餐厅状态变更(onServiceRequest):
- 新餐厅上线:APP收到通知"新餐厅'川香阁'开始提供麻辣香锅",更新列表
- 餐厅下线:APP收到通知"'香辣坊'暂停营业",从列表中移除并取消相关订单
关键组件
_method_hosts:服务方法到主机列表的映射表
- 键:服务方法名(如"用户认证"、"支付处理")
- 值:提供该服务的主机列表(MethodHost对象)
_requestor:负责发送请求和接收响应的组件
_offline_callback:服务下线时的回调函数,用于处理资源清理
_mutex:互斥锁,保护在多线程环境下对共享数据的访问
总结
Discoverer类是分布式系统中服务发现的核心组件,它使客户端能够:
- 动态发现可用的服务提供者
- 在多个提供者之间实现负载均衡
- 实时感知服务提供者的变化
- 通过本地缓存提高查询效率
这使得系统具有高可用性、可扩展性和弹性,服务提供者可以动态加入或离开系统,而客户端能够自动适应这些变化。
serviceDiscovery 方法详解:
方法目的
serviceDiscovery 是 Discoverer 类的核心方法,用于发现和获取能提供特定服务的主机地址。它首先检查本地缓存,如果没有找到则向服务注册中心发起查询,并将结果缓存起来以供后续使用。
执行流程示例
场景: 用户查找附近的奶茶店
想象一个用户想要点一杯奶茶,使用APP查找附近提供奶茶服务的店铺:
// 用户APP初始化
Discoverer::ptr discoverer = std::make_shared<Discoverer>(requestor, offlineCallback);
BaseConnection::ptr conn = ConnectionFactory::createConnection("service-center.com", 8000);
Address milkTeaShop;
bool found = discoverer->serviceDiscovery(conn, "奶茶", milkTeaShop);
执行步骤:
- 检查本地缓存:
{std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find("奶茶");if (it != _method_hosts.end()) {if (it->second->empty() == false) {milkTeaShop = it->second->chooseHost();return true; // 成功找到缓存的奶茶店地址}}}
- 如果APP之前已经查询过奶茶店,会直接从缓存返回一个地址
- 类似用户打开APP时,显示上次浏览过的奶茶店
缓存未命中,创建服务发现请求:
auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod("奶茶");msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);
- 类似用户在APP搜索框输入"奶茶"并点击搜索
- APP向服务器发送查询请求
发送请求并等待响应:
BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服务发现失败!");return false;}
- APP向服务中心发送请求
- 如果网络问题导致请求失败,显示"搜索失败,请检查网络"
处理响应:
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (!service_rsp) {ELOG("服务发现失败!响应类型转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}
- 检查服务器返回的响应是否有效
- 类似APP收到服务器响应后检查数据格式是否正确
更新本地缓存并返回结果:
std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());if (method_host->empty()) {ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());return false;}milkTeaShop = method_host->chooseHost();_method_hosts["奶茶"] = method_host;return true;
- 将服务器返回的奶茶店列表保存到本地缓存
- 从列表中选择一家奶茶店返回给用户
- 下次用户再查询奶茶店时可以直接从缓存获取
生活中的例子
想象小明想喝奶茶,但不知道附近有哪些奶茶店:
小明先检查记忆(检查本地缓存):
- 小明回忆上次是在"一点点"买的奶茶
- 如果记得路线,他可以直接去那里(缓存命中)
- 如果忘记了或想尝试新店,需要查询(缓存未命中)
小明问路人(发送服务发现请求):
- "请问附近有奶茶店吗?"
- 等待路人回答(等待响应)
路人回答(处理响应):
- "有啊,附近有'一点点'、'CoCo'和'喜茶'"
- 小明记下这些店的位置(更新缓存)
选择一家店(chooseHost):
- 小明决定去"CoCo"(负载均衡选择)
- 下次想喝奶茶时,他已经知道三家店的位置(缓存有效)
这个方法通过本地缓存优化了服务发现的效率,避免了每次都需要向服务中心查询,同时通过动态更新保证了服务信息的及时性。
onServiceRequest 方法详解:
方法目的
onServiceRequest 是 Discoverer 类的方法,用于处理服务状态变更通知,包括服务上线和下线。它是服务动态发现机制的核心,确保客户端能够及时感知服务提供者的变化。
执行流程示例
场景1: 新的披萨店开始提供服务
想象一个新的披萨店"意式风情"开业,开始提供披萨服务:
// 服务注册中心向所有关注披萨服务的客户端发送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披萨");
msg->setHost({"192.168.1.110", 8080}); // 意式风情披萨店地址
msg->setOptype(ServiceOptype::SERVICE_ONLINE);// 对每个关注披萨服务的客户端
discoverer->onServiceRequest(conn, msg);
执行步骤:
判断操作类型:
auto optype = msg->optype(); // SERVICE_ONLINEstd::string method = msg->method(); // "披萨"
- 确认这是一个服务上线通知
- 获取服务方法名称为"披萨"
加锁保护共享数据:
std::unique_lock<std::mutex> lock(_mutex);
- 确保在多线程环境下安全访问和修改数据
处理上线通知:
if (optype == ServiceOptype::SERVICE_ONLINE) {auto it = _method_hosts.find("披萨");if (it == _method_hosts.end()) {// 如果这是第一家提供披萨服务的店auto method_host = std::make_shared<MethodHost>();method_host->appendHost({"192.168.1.110", 8080});_method_hosts["披萨"] = method_host;} else {// 如果已有其他披萨店,将新店添加到列表it->second->appendHost({"192.168.1.110", 8080});}}
- 检查是否已有披萨服务的提供者列表
- 如果没有,创建新列表并添加"意式风情"披萨店
- 如果已有,将"意式风情"披萨店添加到现有列表
更新后的映射表:
_method_hosts = {"汉堡": [...],"披萨": [..., {"192.168.1.110", 8080}] // 添加了新的披萨店}
场景2: 披萨店暂停营业
想象"意式风情"披萨店因设备故障暂停营业:
// 服务注册中心向所有关注披萨服务的客户端发送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披萨");
msg->setHost({"192.168.1.110", 8080});
msg->setOptype(ServiceOptype::SERVICE_OFFLINE);// 对每个关注披萨服务的客户端
discoverer->onServiceRequest(conn, msg);
执行步骤:
判断操作类型:
auto optype = msg->optype(); // SERVICE_OFFLINEstd::string method = msg->method(); // "披萨"
- 确认这是一个服务下线通知
- 获取服务方法名称为"披萨"
加锁保护共享数据:
std::unique_lock<std::mutex> lock(_mutex);
处理下线通知:
if (optype == ServiceOptype::SERVICE_OFFLINE) {auto it = _method_hosts.find("披萨");if (it == _method_hosts.end()) {return; // 没有找到披萨服务提供者列表,直接返回}it->second->removeHost({"192.168.1.110", 8080}); // 从列表中移除_offline_callback({"192.168.1.110", 8080}); // 调用下线回调}
- 查找披萨服务的提供者列表
- 从列表中移除"意式风情"披萨店
- 调用下线回调函数,可能用于清理与该店相关的连接或缓存
_method_hosts = {"汉堡": [...],"披萨": [...] // 移除了"意式风情"披萨店}
生活中的例子
想象一个使用外卖APP的用户:
新餐厅上线通知(SERVICE_ONLINE):
- 用户订阅了披萨类别的推送
- 一家新的披萨店"意式风情"在APP上线
- APP推送通知:"新店上线!意式风情披萨店开业啦"
- APP更新披萨分类页面,添加新店信息
- 用户下次浏览披萨分类时,能看到这家新店
餐厅下线通知(SERVICE_OFFLINE):
- "意式风情"披萨店因设备故障暂停营业
- APP收到下线通知
- APP更新披萨分类页面,移除该店或标记为"暂停营业"
- 如果用户之前有未完成的订单,APP可能会通知用户:"您的订单餐厅暂停营业,建议取消订单"(类似_offline_callback的功能)
这个方法确保了客户端能够实时感知服务提供者的变化,提供了动态服务发现的能力,使系统更加灵活和可靠。