RPC-Client模块
RegistryClient接口
设计思路
这个是用来实现provider的register_client模块的,当Rpc_server模块启动enableRegistry,就会调用RegistryClient接口。然后实例化了一个register_client对象,就会向register——server注册服务。
这个RegistryClient实现的逻辑就是:向外提供一个服务注册的接口registryMethod,这个接口是对rpc_registry模块的registryMethod进行的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过RegistryClient只需提供方法名和主机地址信息就能完成服务注册,而不需要关心:
- 如何创建与注册中心的连接
- 如何构造注册请求消息
- 如何发送请求和接收响应
- 如何处理响应结果
其次就是对于构造函数里面就是绑定两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb ,
这两个回调函数之间存在层级调用关系:
message_cb 是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给 Dispatcher::onMessage 方法
- 这是消息进入系统的统一入口点
rpc_cb 是二级回调:
- 它通过 registerHandler 注册到 Dispatcher 中
- 只处理特定类型的消息(REQ_RPC)
- 在 Dispatcher::onMessage 内部被调用
就是当服务注册发送给注册中心了之后,你肯定是要收到一个响应的,不然你怎么知道你成功注册了没有呢。所以就需要给register_client设置一个消息回调函数,当有消息到来了,然后调用message_cb,把这个消息传递给Dispatcher进行筛选的,Dispatcher发现这个是个RSP_SERVICE就会调用rsp_cb了。
网络层收到消息 -> 调用 message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用 rpc_cb (RpcRouter::onRpcRequest)
图解
代码
class RegistryClient {public:using ptr = std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string &method, const Address &host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};
DiscoveryClient接口
设计思路
这个DiscoveryClient实现的逻辑就是:向外提供一个服务发现的接口serviceDiscovery,这个接口是对client::Discoverer模块的serviceDiscovery方法进行的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过DiscoveryClient只需提供方法名就能获取到提供该服务的主机地址,而不需要关心:
- 如何创建与注册中心的连接
- 如何构造服务发现请求消息
- 如何发送请求和接收响应
- 如何处理响应结果
具体就是对于构造函数而言就是绑定了两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb,还有一个是对于服务请求的回调req_cb,为什么会多一个req_cb呢,req顾名思义就是请求,也就是别人主动发起请求到discovery_client,那么就是当服务提供者上线或下线时,注册中心会主动向所有DiscoveryClient发送REQ_SERVICE类型的消息,通知服务状态变化。
这三个回调函数之间存在层级调用关系:
message_cb 是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给 Dispatcher::onMessage 方法
- 这是消息进入系统的统一入口点
rsp_cb 和 req_cb 是二级回调:
- 它们通过 registerHandler 注册到 Dispatcher 中
- 只处理特定类型的消息(RSP_SERVICE和REQ_SERVICE)
- 在 Dispatcher::onMessage 内部被调用
就是当服务发现发送给注册中心了之后,你肯定是要收到一个响应的,不然你怎么知道你成功发现服务了没有呢。所以就需要给register_client设置一个消息回调函数,当有消息到来了,然后调用message_cb,把这个消息传递给Dispatcher进行筛选的,Dispatcher发现这个是个RSP_SERVICE就会调用rsp_cb了。
网络层收到消息 -> 调用 message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用对应的回调(rsp_cb或req_cb)
图解
代码
class DiscoveryClient {public:using ptr = std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb): _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()){auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string &method, Address &host) {return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};
RpcClient接口
设计思路
构造函数而言就是绑定了两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb,不过这个message_cb跟之前的不一样,它分两种情况,在直连模式下,直接设置message_cb。但是在服务发现模式下,是在DiscoveryClient内部设置自己的message_cb用于与注册中心通信。
需要一个enableDiscovery,进行判断是否启动了服务发现,如果启动了服务发现就创建一个服务下线回调函数,绑定到当前RpcClient的delClient方法,当服务下线时,这个回调会被触发。还需要创建服务发现客户端,将下线回调传递给它,这样DiscoveryClient在接收到服务下线通知时会自动调用RpcClient的delClient方法。
为什么需要这个下线回调函数呢?因为这种设计确保了当服务提供者下线时,RpcClient能够及时从连接池中移除对应连接,防止向不可用服务发送请求,提高系统的容错性和可靠性。
如果没有启用服务发现,就不需要下线通知机制。在直连模式下:如果直连的服务提供者出现问题,RPC调用会直接失败,客户端会立即感知到错误(如连接断开、超时等),而不需要通过通知机制来清理连接池。
向外提供三种call接口:同步call,异步call,回调call,这些接口是对RpcCaller模块的call方法进行的封装,它隐藏了底层的网络通信细节、服务发现过程和消息处理逻辑。
客户端通过RpcClient只需提供方法名、参数和接收结果的方式(同步、Future或回调)就能完成远程服务调用,而不需要关心:
- 如何查找并连接到服务提供者
- 如何构造和序列化RPC请求消息
- 如何发送请求和接收响应
- 如何处理服务提供者动态上下线
- 如何管理和复用客户端连接
其次还需要一个连接池,用于管理已经发现了的服务,在服务发现模式下:先通过服务发现获取提供服务的主机地址,查找连接池中是否已有该地址的连接,没有则创建新连接并加入连接池
对内设置了一些私有方法,包括newClient方法负责创建新连接,getClient(Address)方法用于从连接池查找已有连接,getClient(string)方法是调用链的核心,putClient方法安全地将连接添加到池中,delClient方法处理连接的移除,这些方法都是对连接池的连接进行管理的
图解
代码
class RpcClient {public:using ptr = std::shared_ptr<RpcClient>;//enableDiscovery--是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<bitrpc::client::RpcCaller>(_requestor)) {//针对rpc请求后的响应进行的回调处理auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_clientif (_enableDiscovery) {auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);}else {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result) {//获取服务提供者:1. 服务发现; 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address &host) {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}BaseClient::ptr getClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) {return BaseClient::ptr();}return it->second;}BaseClient::ptr getClient(const std::string method) {BaseClient::ptr client;if (_enableDiscovery) {//1. 通过服务发现,获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false) {ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建client = getClient(host);if (client.get() == nullptr) {//没有找打已实例化的客户端,则创建client = newClient(host);}}else {client = _rpc_client;}return client;}void putClient(const Address &host, BaseClient::ptr &client) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}void delClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address &host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现std::mutex _mutex;//<"127.0.0.1:8080", client1>std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池};
TopicClient接口
设计思路
这个TopicClient实现的逻辑就是:向外提供一组主题管理接口(create/remove/subscribe/cancel/publish),这些接口是对TopicManager模块相应方法的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过TopicClient只需提供主题名称和必要参数就能完成主题的创建、删除、订阅和发布,而不需要关心:
- 如何创建与主题服务器的连接
- 如何构造主题请求消息
- 如何发送请求和接收响应
- 如何处理响应结果和主题消息推送
其次就是对于构造函数里面就是绑定三个回调函数:
- 响应回调rsp_cb:处理服务器对主题操作请求的响应
- 消息推送回调msg_cb:接收发布到订阅主题的消息
- 一级消息回调message_cb:处理所有网络消息
这三个回调函数之间存在层级调用关系:
message_cb是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给Dispatcher::onMessage方法
- 这是消息进入系统的统一入口点
rsp_cb和msg_cb是二级回调:
- 它们通过registerHandler注册到Dispatcher中
- 分别只处理特定类型的消息(RSP_TOPIC和REQ_TOPIC)
- 在Dispatcher::onMessage内部被调用
就是当执行主题操作后,你肯定要收到响应确认成功与否,同时订阅主题后需要接收发布的消息。所以TopicClient设置了这些回调函数,当有消息到来,先调用message_cb,把消息传递给Dispatcher进行筛选,Dispatcher根据类型调用对应的处理函数。
网络层收到消息 -> 调用message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用对应回调(rsp_cb或msg_cb)
图解
代码
class TopicClient {public:TopicClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool create(const std::string &key) {return _topic_manager->create(_rpc_client->connection(), key);}bool remove(const std::string &key) {return _topic_manager->remove(_rpc_client->connection(), key);}bool subscribe(const std::string &key, const TopicManager::SubCallback &cb) {return _topic_manager->subscribe(_rpc_client->connection(), key, cb);}bool cancel(const std::string &key) {return _topic_manager->cancel(_rpc_client->connection(), key);}bool publish(const std::string &key, const std::string &msg) {return _topic_manager->publish(_rpc_client->connection(), key, msg);}void shutdown() {_rpc_client->shutdown();}private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现};
整体代码
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace bitrpc {namespace client {class RegistryClient {public:using ptr = std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string &method, const Address &host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class DiscoveryClient {public:using ptr = std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb): _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()){auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string &method, Address &host) {return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class RpcClient {public:using ptr = std::shared_ptr<RpcClient>;//enableDiscovery--是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<bitrpc::client::RpcCaller>(_requestor)) {//针对rpc请求后的响应进行的回调处理auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_clientif (_enableDiscovery) {auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);}else {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result) {//获取服务提供者:1. 服务发现; 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address &host) {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}BaseClient::ptr getClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) {return BaseClient::ptr();}return it->second;}BaseClient::ptr getClient(const std::string method) {BaseClient::ptr client;if (_enableDiscovery) {//1. 通过服务发现,获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false) {ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建client = getClient(host);if (client.get() == nullptr) {//没有找打已实例化的客户端,则创建client = newClient(host);}}else {client = _rpc_client;}return client;}void putClient(const Address &host, BaseClient::ptr &client) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}void delClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address &host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现std::mutex _mutex;//<"127.0.0.1:8080", client1>std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池};class TopicClient {public:TopicClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool create(const std::string &key) {return _topic_manager->create(_rpc_client->connection(), key);}bool remove(const std::string &key) {return _topic_manager->remove(_rpc_client->connection(), key);}bool subscribe(const std::string &key, const TopicManager::SubCallback &cb) {return _topic_manager->subscribe(_rpc_client->connection(), key, cb);}bool cancel(const std::string &key) {return _topic_manager->cancel(_rpc_client->connection(), key);}bool publish(const std::string &key, const std::string &msg) {return _topic_manager->publish(_rpc_client->connection(), key, msg);}void shutdown() {_rpc_client->shutdown();}private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现};}
}