【项目】分布式Json-RPC框架 - 应用层实现
目录
RPC远程调用
RPC功能实现
服务端的RpcRoute模块
客户端的Requestor模块
客户端RpcCaller模块
服务端测试
客户端测试
服务注册与发现功能实现
服务注册与发现功能设计
服务端实现
客户端实现
业务客户端封装
注册中心服务端实现
RPC服务端实现
简单RPC功能测试
基于服务注册与发现的RPC功能测试
主题发布与订阅
服务端实现
客户端实现
发布订阅测试
之前实现的主要是通信功能和消息分发功能,接下来要实现业务功能了。之前实现的模块对于服务端和客户端都是会使用到的。现在实现的就是针对于服务端或客户端了。
RPC远程调用
RPC功能实现
服务端的RpcRoute模块
RpcRouter模块存在的意义:提供rpc请求的处理回调函数,内部所要实现的功能,分辨出客户端请求的服务进行处理得到结果进行响应。
在RPC请求中,可能会有大量不同的RPC请求:比如加法,翻译...。作为服务端,首先需要对自己所能提供的服务进行管理,以便于收到请求以后,能够明确判断自身能否提供客户端所请求的服务
能提供服务,则调用接口进行处理,返回结果;不能提供服务,则响应客户端请求的服务不存在。
RpcRoute模块中会有一个哈希表,用于保存方法名称与方法描述的映射,这个哈希表的内容就是这个服务端所能提供的RPC服务。方法描述中就有这个方法的回调函数、方法名称、参数字段及格式描述、参数校验接口。并且向外提供一个函数onRpcRequest,用于对RPC请求进行处理,注册到Dispatcher模块中。服务端接收到一个请求时,Dispatcher模块就会判断出这是一个什么类型的请求,如果是一个RPC请求,就会调用onRpcRequest函数进行处理。
在RpcRoute模块,哈希表中保存的是方法名称与方法描述的映射关系,所以需要有一个方法描述,我们先来实现这个方法描述。
// 对于Json中的数据类型的枚举
enum class VType
{BOOL = 0, // 布尔类型INTEGRAL, // 整数类型NUMERIC, // 浮点数类型STRING, // 字符串类型ARRAY, // 数组类型OBJECT // 对象类型
};
// 方法描述
class ServiceDescribe
{
public:// 回调函数类型,根据传入的请求,获取响应using ptr = std::shared_ptr<ServiceDescribe>;using ServiceCallback = std::function<void(const Json::Value&, Json::Value&)>;using ParamsDescribe = std::pair<std::string, VType>;// 针对接收到的请求中的参数进行校验bool paramCheck(const Json::Value& params){// 需要判断请求中的参数描述是否一致,以及参数类型是否一致for (auto& desc : _params_desc){if (params.isMember(desc.first) == false){ELOG("参数字段完整性校验失败! %s 字段缺失!", desc.first.c_str());return false;}if (check(desc.second, params[desc.first]) == false){ELOG("%s 参数类型校验失败!", desc.first.c_str());return false;}}return true;}// 调用回调函数bool call(const Json::Value& params, Json::Value& result){_callback(params, result);if (rtypeCheck(result) == false){ELOG("回调函数中的相应信息校验失败!");return false;}return true;}const std::string& method() { return _method_name; }
private:// 校验类型,判断这个Json::Value的对象具体是什么类型bool check(VType vtype, const Json::Value& val){switch (vtype){case VType::BOOL: return val.isBool();case VType::INTEGRAL: return val.isIntegral();case VType::NUMERIC: return val.isNumeric();case VType::STRING: return val.isString();case VType::ARRAY: return val.isArray();case VType::OBJECT: return val.isObject();}return false;}// 对返回值类型进行校验bool rtypeCheck(const Json::Value& val) { return check(_return_type, val); }
private:std::string _method_name; // 方法名称ServiceCallback _callback; // 实际的业务处理回调函数std::vector<ParamsDescribe> _params_desc; // 参数字段格式描述VType _return_type; // 返回值类型描述
};
用于校验的数据,也就是方法描述中的成员变量从哪里来呢?我们可以提供一些设置的接口,但是这样可能会校验一半被设置了,需要加锁,保证访问描述字段的线程安全。我们不这样设计。我们采用建造者模式。
// 对于Json中的数据类型的枚举
enum class VType
{BOOL = 0, // 布尔类型INTEGRAL, // 整数类型NUMERIC, // 浮点数类型STRING, // 字符串类型ARRAY, // 数组类型OBJECT // 对象类型
};
// 方法描述
class ServiceDescribe
{
public:// 回调函数类型,根据传入的请求,获取响应using ptr = std::shared_ptr<ServiceDescribe>;using ServiceCallback = std::function<void(const Json::Value&, Json::Value&)>;using ParamsDescribe = std::pair<std::string, VType>;ServiceDescribe(std::string&& mname, std::vector<ParamsDescribe>&& desc,VType vtype, const ServiceCallback&& handler) :_method_name(std::move(mname)),_callback(std::move(handler)),_params_desc(std::move(desc)),_return_type(vtype){}// 针对接收到的请求中的参数进行校验bool paramCheck(const Json::Value& params){// 需要判断请求中的参数描述是否一致,以及参数类型是否一致for (auto& desc : _params_desc){if (params.isMember(desc.first) == false){ELOG("参数字段完整性校验失败! %s 字段缺失!", desc.first.c_str());return false;}if (check(desc.second, params[desc.first]) == false){ELOG("%s 参数类型校验失败!", desc.first.c_str());return false;}}return true;}// 调用回调函数bool call(const Json::Value& params, Json::Value& result){_callback(params, result);if (rtypeCheck(result) == false){ELOG("回调函数中的相应信息校验失败!");return false;}return true;}const std::string& method() { return _method_name; }
private:// 校验类型,判断这个Json::Value的对象具体是什么类型bool check(VType vtype, const Json::Value& val){switch (vtype){case VType::BOOL: return val.isBool();case VType::INTEGRAL: return val.isIntegral();case VType::NUMERIC: return val.isNumeric();case VType::STRING: return val.isString();case VType::ARRAY: return val.isArray();case VType::OBJECT: return val.isObject();}return false;}// 对返回值类型进行校验bool rtypeCheck(const Json::Value& val) { return check(_return_type, val); }
private:std::string _method_name; // 方法名称ServiceCallback _callback; // 实际的业务处理回调函数std::vector<ParamsDescribe> _params_desc; // 参数字段格式描述VType _return_type; // 返回值类型描述
};// 建造者模式的工厂,用于生产方法描述对象,也可以说是建造一个方法描述对象
class SDescribeFactory
{
public:void setMethodName(const std::string& name) { _method_name = name; }void setReturnType(VType vtype) { _return_type = vtype; }void setParamDesc(const std::string& pname, VType vtype){_params_desc.push_back(ServiceDescribe::ParamsDescribe(pname, vtype));}void setCallback(const ServiceDescribe::ServiceCallback& cb){_callback = cb;}// 建造ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));}
private:std::string _method_name; // 方法名称ServiceDescribe::ServiceCallback _callback; // 实际的业务处理回调函数std::vector<ServiceDescribe::ParamsDescribe> _params_desc; // 参数字段格式描述VType _return_type; // 返回值类型描述
};
工厂模式与建造者模式的区别:工厂模式直接使用参数构造对象,建造者模式是将各个零件准备好,再建造对象。本质都是构造对象,都是将生产与使用分离的一种方式。
RpcRoute类需要向外提供两个接口,一个是注册RPC服务的接口,一个是提供给Dispatcher模块,用于服务端接收到RPC请求时的调用接口。它里面还需要管理好方法名称与方法描述的映射关系。这样做的话,这个类需要实现的功能太多了,我们将其拆分成两个类。RpcRoute类向外提供两个接口,ServiceManger类对注册的服务进行管理。
// 对注册的服务进行管理的类
class ServiceManager
{
public:using ptr = std::shared_ptr<ServiceManager>;// 新增方法void insert(const ServiceDescribe::ptr& desc){std::unique_lock<std::mutex> lock(_mutex);_services.insert(std::make_pair(desc->method(), desc));}// 查找方法ServiceDescribe::ptr select(const std::string& method_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(method_name);if (it == _services.end()) { return ServiceDescribe::ptr(); }return it->second;}// 删除方法void remove(const std::string& method_name){std::unique_lock<std::mutex> lock(_mutex);_services.erase(method_name);}
private:std::mutex _mutex;std::unordered_map<std::string, ServiceDescribe::ptr> _services; // 方法名称与方法描述的映射
};class RpcRouter
{
public:using ptr = std::shared_ptr<RpcRouter>;RpcRouter() : _service_manager(std::make_shared<ServiceManager>()) {}// 注册RPC服务void registerMethod(const ServiceDescribe::ptr& service){_service_manager->insert(service);}// 下面这个函数是注册到Dispatcher模块中的函数,用于服务端接收到RPC请求后的回调函数// 传入一个连接,业务要发送应答,再传入一个请求,业务要根据请求构建应答void onRpcRequest(const BaseConnection::ptr& conn, RpcRequest::ptr& request){// 1. 根据请求中的方法名称,判断当前服务端能否提供对应服务auto service = _service_manager->select(request->method());if (service.get() == nullptr){ELOG("%s 服务未找到!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);}// 2. 进行参数校验,确定能否提供服务if (service->paramCheck(request->params()) == false){ELOG("%s 服务参数校验失败!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);}// 3. 调用业务回调接口进行业务处理Json::Value result;bool ret = service->call(request->params(), result);if (ret == false){ELOG("%s 服务参数校验失败!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);}// 4. 处理完毕得到结果,组织响应,向客户端发送return response(conn, request, result, RCode::RCODE_OK);}
private:// 组织响应// 传入连接,才知道从那个连接发送响应void response(const BaseConnection::ptr& conn,const RpcRequest::ptr& req,const Json::Value& res, RCode rcode){auto msg = MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(cxfrpc::MType::RSP_RPC);msg->setRCode(rcode);msg->setResult(res);conn->send(msg);}
private:ServiceManager::ptr _service_manager;
};
客户端的Requestor模块
在上一个模块中,我们已经将服务端对于RPC请求的处理完成了。接下来就需要实现客户端对于RPC请求的处理了。在这之前,我们需要先完成客户端的Requestor模块。因为必须让请求和响应对应,所以客户端要对发送出去的请求进行管理。
Requestor模块存在的意义:针对客户端的每一条请求进行管理,以便于对请求对应的响应做出合适的操作。
可以看到,为了对请求进行管理,我们是需要对请求进行描述的。
class Requestor
{
public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;// 请求描述struct RequestDescribe{using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request; // 请求信息RType rtype; // 请求类型std::promise<BaseMessage::ptr> response; // 异步存储响应信息的字段RequestCallback callback; // 回调函数};// 注册到Dispatcher模块的接收到响应时的回调函数// 第一个参数是连接,第二个参数是响应void onResponse(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){// 1. 根据响应中的ID找到对应的请求描述std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr){ELOG("接收到响应 - %s, 但是未找到对应的请求描述!", rid.c_str());return;}// 根据请求类型,进行相应的处理if (rdp->rtype == RType::REQ_ASYNC) { rdp->response.set_value(msg); } // 异步else if (rdp->rtype == RType::REQ_CALLBACK) // 回调函数{if (rdp->callback) rdp->callback(msg);}else { ELOG("请求类型未知!"); }delDescribe(rid);}// 同步// 传入一个连接,才知道从哪一个连接发送数据bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, BaseMessage::ptr& rsp){AsyncResponse rsp_future;// 转换为异步调用bool ret = send(conn, req, rsp_future);if (ret == false) return false;rsp = rsp_future.get();return true;}// 异步bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, AsyncResponse& async_rsp){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}// 回调函数bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, const RequestCallback& cb){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}
private:// 新增请求描述RequestDescribe::ptr newDescribe(const BaseMessage::ptr& req, RType rtype,const RequestCallback& cb = RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb){rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}// 根据请求ID获取请求描述RequestDescribe::ptr getDescribe(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()){return RequestDescribe::ptr();}return it->second;}// 删除请求描述void delDescribe(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc; // 请求ID与全球描述的映射
};
我们这里并没有对同步的请求进行处理,因为我们只需要使用异步的方式,拿到一个future,直接get就是同步。
客户端RpcCaller模块
RpcCaller模块存在的意义:向用户提供进行rpc调用的模块。
给用户提供3个接口,让用户能够进行发送请求,并获得结果即可,内部会向服务端发送请求,不是直接发送的,而是通过Requestor模块发送的。
class RpcCaller
{
public:using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value&)>;RpcCaller(const Requestor::ptr& requestor) : _requestor(requestor) {}// 同步调用接口bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, Json::Value& result){}// 异步调用接口bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, JsonAsyncResponse& result){}// 回调函数调用接口bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, const JsonResponseCallback& result){}
private:Requestor::ptr _requestor;
};
因为RpcCaller需要通过Requestor发送消息,所以RpcCaller里面是需要有一个Requestor类型的对象的。未来这3个call会调用Requestor里面相应的send发送数据。此时我们会发现,Requestor里面的处理是针对BaseMessage进行处理的,而RpcCaller中是针对RpcResponse里的result进行处理的。所以需要额外嵌套一层。因为我们需要拿到结果之后,提取到消息里面的result才能进行下一步。Requestor模块之所以要设计为针对BaseMessage处理,是因为这个模块不仅仅为RPC服务,未来主题响应、服务响应都需要使用。而主题响应和服务响应里面是没有result的。所以,我们就是在底层先完成对响应的处理,上层再对其进行封装,完成自己所需的功能。
class RpcCaller
{
public:using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value&)>;RpcCaller(const Requestor::ptr& requestor) : _requestor(requestor) {}// 同步调用接口// 连接、方法名称、参数、结果bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, Json::Value& result){// 1. 构建请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;// 2. 发送请求,通过Requestor模块发送// 这里要进行类型转换,因为Requestor模块的send是函数重载,要求参数类型完全匹配bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);if (ret == false){ELOG("同步RPC请求失败!");return false;}// 3. 等待响应auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if (!rpc_rsp_msg){ELOG("RPC响应向下类型转换失败!");return false;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("RPC请求出错: %s", errReason(rpc_rsp_msg->rcode()).c_str());return false;}result = rpc_rsp_msg->result();return true;}// 异步调用接口bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, JsonAsyncResponse& result){// 向服务端发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中对promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// json_promise一定要使用智能指针,不能是临时对象,临时对象被释放掉之后,未来在获取结果时就会出错auto json_promise = std::make_shared<std::promise<Json::Value>>();result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback1, this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if (ret == false) {ELOG("异步Rpc请求失败!");return false;}return true;}// 回调函数调用接口bool call(const BaseConnection::ptr& conn, const std::string& method,const Json::Value& params, const JsonResponseCallback& cb){auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback2, this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("回调Rpc请求失败!");return false;}return true;}
private:void Callback1(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr& msg){auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg){ELOG("rpc响应, 向下类型转换失败!");return;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc异步请求出错: %s", errReason(rpc_rsp_msg->rcode()).c_str());return;}result->set_value(rpc_rsp_msg->result());}void Callback2(const JsonResponseCallback& cb, const BaseMessage::ptr& msg){auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应, 向下类型转换失败!");return;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc回调请求出错: %s", errReason(rpc_rsp_msg->rcode()).c_str());return;}cb(rpc_rsp_msg->result());}
private:Requestor::ptr _requestor;
};
在异步的call中一定要使用智能指针,不能使用临时对象,临时对象出来作用域就没有,智能指针在绑定时会让引用计数++,出了作用域也仍然存在。
服务端测试
void Add(const Json::Value &req, Json::Value &rsp)
{int num1 = req["num1"].asInt();int num2 = req["num2"].asInt();rsp = num1 + num2;
}int main()
{// 注册映射关系auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();auto router = std::make_shared<cxfrpc::server::RpcRouter>();// 构建出工厂对象,设置号信息之后,通过工厂生产出一个对象,再注册进去std::unique_ptr<cxfrpc::server::SDescribeFactory> desc_factory(new cxfrpc::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamDesc("num1", cxfrpc::server::VType::INTEGRAL);desc_factory->setParamDesc("num2", cxfrpc::server::VType::INTEGRAL);desc_factory->setReturnType(cxfrpc::server::VType::INTEGRAL);desc_factory->setCallback(Add);router->registerMethod(desc_factory->build());auto cb = std::bind(&cxfrpc::server::RpcRouter::onRpcRequest, router.get(),std::placeholders::_1, std::placeholders::_2);dispatcher->registerHandler<cxfrpc::RpcRequest>(cxfrpc::MType::REQ_RPC, cb);auto server = cxfrpc::ServerFactory::create(8080);// 这是类的成员函数,所以要进行绑定auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, dispatcher.get(),std::placeholders::_1, std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}
这样,我们就像Dispatcher模块注册了一个服务,当服务端接收到一个RPC请求后,就会调用RpcRoute模块的onRpcRequest,这个函数就会调用根据RPC请求中的方法名称去调用具体的回调函数。所以,回调是一环套一环的。
客户端测试
void callback(const Json::Value &result)
{ILOG("callback result: %d", result.asInt());
}int main()
{auto requestor = std::make_shared<cxfrpc::client::Requestor>();auto caller = std::make_shared<cxfrpc::client::RpcCaller>(requestor);// 注册映射关系auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();auto rsp_cb = std::bind(&cxfrpc::client::Requestor::onResponse, requestor.get(),std::placeholders::_1, std::placeholders::_2);// 在客户端这里,不能使用特定的消息类型,因为dispatcher模块与Requestor模块会对应不上,// 使用了特定的消息类型后,dispatcher模块是特定消息类型,而Requestor模块是BaseMessagedispatcher->registerHandler<cxfrpc::BaseMessage>(cxfrpc::MType::RSP_RPC, rsp_cb);auto client = cxfrpc::ClientFactory::create("127.0.0.1", 8080);// 这是类的成员函数,所以要进行绑定auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();// 同步测试auto conn = client->connection();Json::Value params, result;params["num1"] = 33;params["num2"] = 44;bool ret = caller->call(conn, "Add", params, result);if(ret != false){ILOG("同步result: %d", result.asInt());}std::cout << "--------------------------------------------------------------" << std::endl;// 异步测试cxfrpc::client::RpcCaller::JsonAsyncResponse res_future;params["num1"] = 11;params["num2"] = 44;ret = caller->call(conn, "Add", params, res_future);if(ret != false){result = res_future.get();ILOG("异步result: %d", result.asInt());}std::cout << "--------------------------------------------------------------" << std::endl;// 回调函数测试params["num1"] = 55;params["num2"] = 66;ret = caller->call(conn, "Add", params, callback);std::cout << "--------------------------------------------------------------" << std::endl;// 等待客户端收到响应再关闭连接std::this_thread::sleep_for(std::chrono::seconds(10));client->shutdown();return 0;
}
客户端在发送请求时,是通过RpcCaller的接口发送的,RpcCaller又会调用Requestor模块的接口进行发送。客户端在发送请求之后是需要等待响应的,当客户端接收到响应时,就会调用messageCallback函数,其实就是调用Dispatcher模块的onMessage函数,onMessage就会判断消息类型,发现是RPC响应,就会调用注册进来的RPC响应的回调函数进行处理,其实调用的就是Requestor模块的onResponse,这个函数内部就会根据响应里的请求ID找到请求描述,然后将结果设置进去,或者调用内部的回调函数。
现在,已经能够正常地进行RPC调用了。
服务注册与发现功能实现
服务注册与发现功能设计
1. 为什么要服务注册?服务注册是要做什么?
服务注册主要是实现分布式的系统,让系统更加健壮。一个服务提供者,将自己所能提供的服务,在注册中心进行登记,以便于为服务调用者提供服务。
2.为什么要服务发现?服务发现是要做什么?
服务调用者需要直到哪些主机能够为自己提供指定的服务,服务发现其实就是询问注册中心,谁能为自己提供指定的服务,获取到服务提供者的信息后,将服务提供者的信息给保存起来以待后用。当服务发现者获取到多个能提供服务的主机后,就可以以负载均衡的方式去请求这些主机提供服务。
3.服务下线。
当前使用长连接进行服务主机是否在线的判断,一旦服务提供者断开连接,查询这个主机提供了哪些服务,分析哪些调用者进行过这些服务发现,则进行服务下线通知。
4. 服务上线。
当某个服务提供者新增了某项服务,就通知曾经发现过该项服务的服务调用者。
现在要做的事情:
- 将服务注册,发现功能集合到客户端中
- 将服务信息管理集合到服务端中
这样,服务端既可以作为RPC远程调用的服务端,也可以作为服务注册与发现的客户端,每一个服务提供者都可以作为一个注册中心。
服务端的功能:服务端需要实现对服务信息的管理,并且服务端要提供对于服务注册、服务发现请求的处理函数。服务端需要管理的信息:
- hash<method, set<provide>>。保存那个服务能由那些主机提供。当服务调用者在进行服务发现请求时,就可以查询这个哈希表,从而进行响应。
- hash<method, set<discoverer>>。保存发现过某个方法的主机信息。当进行服务上线/下线通知时,就可以查询这个哈希表,从而获取需要通知的主机信息。
- hash<conn, provide>。保存连接与服务提供者的映射关系。当一个连接断开时,才能够知道是哪一个服务提供者,才能进行服务下线通知。
- hash<conn, discoverer>。保存连接与服务调用者的映射关系。当一个连接断开时,才能知道是哪一个服务调用者,往后再有服务上线/下线通知时,就不需要再通知它了。
客户端的功能:对于客户端,我们要实现两种客户端,一个是给服务提供者的客户端,一个是给服务调用者的客户端:
- 对于服务提供者的客户端,需要有一个能够进行服务注册的接口,功能就是连接注册中心,进行服务注册。
- 对于服务调用者的客户端,需要有一个能够进行服务发现的接口,功能就是连接注册中心,进行服务发现。另外,还需要对服务发现获得到的结果进行保存,hash<method, vector<host>>。还需要给Dispatcher模块提供一个对于服务上线/下线请求的处理函数。
服务端实现
我们先对服务端要管理的信息进行描述。
// 对服务提供者的管理
class ProvideManager
{
public:using ptr = std::shared_ptr<ProvideManager>;// 对服务提供者的描述struct Provider{using ptr = std::shared_ptr<Provider>;std::mutex _mutex;BaseConnection::ptr conn; // 服务提供者对应的连接Address host; // 服务提供者的主机信息std::vector<std::string> methods; // 服务提供者所能提供的服务Provider(const BaseConnection::ptr& c, const Address& h): conn(c), host(h) {}void appendMethod(const std::string& method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};// 新增服务提供者void addProvider(const BaseConnection::ptr& c, const Address& h, const std::string& method){Provider::ptr provider;// 查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()){provider = it->second;}else{provider = std::make_shared<Provider>(c, h);_conns.insert(std::make_pair(c, provider));}auto& providers = _providers[method];providers.insert(provider);}// 向服务提供者对象中新增一个能提供的服务名称provider->appendMethod(method);}// 根据连接,获取服务提供者的信息Provider::ptr getProvider(const BaseConnection::ptr& c){std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) return it->second;return Provider::ptr();}// 删除服务提供者void delProvider(const BaseConnection::ptr& c){std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()){// 当前断开连接的不是一个服务提供者return;}// 当前断开连接的是一个服务提供者// 将这个服务提供者与它所能提供服务的映射删除for (auto& method : it->second->methods){auto& providers = _providers[method];providers.erase(it->second);}// 删除连接与服务提供者的关联关系_conns.erase(it);}// 获取能提供指定方法的所有主机信息std::vector<Address> methodHosts(const std::string& method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end()){return std::vector<Address>();}std::vector<Address> result;for (auto& provider : it->second){result.push_back(provider->host);}return result;}
private:std::mutex _mutex;std::unordered_map<std::string, std::set<Provider::ptr>> _providers; // 方法名称与能提供方法的服务提供者的映射std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns; // 连接与服务提供者的映射
};// 对服务调用者的管理
class DiscovererManager
{
public:using ptr = std::shared_ptr<DiscovererManager>;// 对服务调用者的描述struct Discoverer{using ptr = std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn; // 服务发现者对应的连接std::vector<std::string> methods; // 发现过的服务名称Discoverer(const BaseConnection::ptr& c) : conn(c) {}void appendMethod(const std::string& method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};// 新增服务调用者Discoverer::ptr addDiscoverer(const BaseConnection::ptr& c, const std::string& method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()){discoverer = it->second;}else{discoverer = std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c, discoverer));}auto& discoverers = _discoverers[method];discoverers.insert(discoverer);}discoverer->appendMethod(method);return discoverer;}// 删除服务调用者void delDiscoverer(const BaseConnection::ptr& c){std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()){// 当前断开连接的不是一个服务调用者return;}for (auto& method : it->second->methods){auto discoverers = _discoverers[method];discoverers.erase(it->second);}_conns.erase(it);}// 上线通知void onlineNotify(const std::string& method, const Address& host){notify(method, host, ServiceOptype::SERVICE_ONLINE);}// 下线通知void offlineNotify(const std::string& method, const Address& host){notify(method, host, ServiceOptype::SERVICE_OFFLINE);}
private:// 根据方法名称,对发现过该方法的服务调用者进行服务上线/下线的通知void notify(const std::string& method, const Address& host, ServiceOptype optype){std::unique_lock<std::mutex> lock(_mutex);auto it = _discoverers.find(method);if (it == _discoverers.end()){// 这个服务没有发现者return;}// 发送服务上线/下线请求auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(optype);// 给发现过这个方法的所有服务调用者发送for (auto& discoverer : it->second){discoverer->conn->send(msg_req);}}
private:std::mutex _mutex;std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers; // 方法名称与发现过该方法的服务调用者的映射std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns; // 连接与服务调用者的映射
};
这两个类都是服务端对服务提供者和服务调用者进行管理的类。我们还需要实现一个提供服务的类
class PDManager
{
public:using ptr = std::shared_ptr<PDManager>;PDManager() {}// 接收到服务注册/发现请求时的回调函数void onServiceRequest(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg);// 连接关闭时需要调用的函数void onConnShutdown(const BaseConnection::ptr& conn);
private:ProvideManager::ptr _providers; // 对于服务提供者的管理DiscovererManager::ptr _discoverers; // 对于服务调用者的管理
};
- 第一个函数是需要注册到Dispatcher模块的,是服务端接收到的服务注册、服务发现请求的回调函数
- 第二个函数是在连接关闭时的回调函数
class PDManager
{
public:using ptr = std::shared_ptr<PDManager>;PDManager() :_providers(std::make_shared<ProvideManager>()),_discoverers(std::make_shared<DiscovererManager>()){}// 接收到服务注册/发现请求时的回调函数void onServiceRequest(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg){// 服务操作请求:服务注册/服务发现ServiceOptype optype = msg->optype();if (optype == ServiceOptype::SERVICE_REGISTRY){// 服务注册// 1. 新增服务提供者_providers->addProvider(conn, msg->host(), msg->method());// 2. 进行服务上线通知_discoverers->onlineNotify(msg->method(), msg->host());// 3. 进行响应。这里一定要return,否则会向后走return registryResponse(conn, msg);}else if (optype == ServiceOptype::SERVICE_DISCOVERY){// 服务发现// 1. 新增服务调用者_discoverers->addDiscoverer(conn, msg->method());// 2. 进行响应return discoveryResponse(conn, msg);}else{ELOG("收到服务操作请求,但是操作类型错误!");return errorResponse(conn, msg);}}// 连接关闭时需要调用的函数void onConnShutdown(const BaseConnection::ptr& conn) {auto provider = _providers->getProvider(conn);if (provider.get() != nullptr) {ILOG("%s:%d 服务下线", provider->host.first.c_str(), provider->host.second);for (auto& method : provider->methods) {_discoverers->offlineNotify(method, provider->host);}_providers->delProvider(conn);}_discoverers->delDiscoverer(conn);}
private:// 组织服务注册的响应void registryResponse(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);conn->send(msg_rsp);}// 组织服务发现的响应void discoveryResponse(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);std::vector<Address> hosts = _providers->methodHosts(msg->method());if (hosts.empty()) {// 没有主机提供该服务msg_rsp->setRCode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}// 组织错误响应void errorResponse(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_INVALID_OPTYPE);msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);conn->send(msg_rsp);}
private:ProvideManager::ptr _providers; // 对于服务提供者的管理DiscovererManager::ptr _discoverers; // 对于服务调用者的管理
};
客户端实现
在客户端这里,我们需要实现两个客户端,一个是给服务提供者的客户端,一个是给服务调用者的客户端。
class Provider
{
public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr& requestor) : _requestor(requestor) {}bool registryMethod(const BaseConnection::ptr& conn, const std::string& method, const Address& host) {auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("%s 服务注册失败!", method.c_str());return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务注册失败,原因:%s", errReason(service_rsp->rcode()).c_str());return false;}return true;}
private:// 发送请求需要通过Requestor模块Requestor::ptr _requestor;
};
在服务调用者的客户端,一次服务发现可能会获取到多个能提供服务的主机信息,到底向那一台主机请求服务更合理呢?此时我们需要有一种负载均衡的思想,在这里,我们采用轮询的方式。就是每一次服务发现之后,将能提供某种服务的主机信息全部保存下来,然后每一次调用都调用不同的主机。为了能够进行轮询,我们对能提供某一种服务的全部主机进行封装。
// 对能提供某种服务的全部主机的封装
class MethodHost
{
public:using ptr = std::shared_ptr<MethodHost>;MethodHost() : _idx(0) {}MethodHost(const std::vector<Address>& hosts) :_hosts(hosts.begin(), hosts.end()), _idx(0) {}void appendHost(const Address& host) {//中途收到了服务上线请求后被调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address& host) {//中途收到了服务下线请求后被调用std::unique_lock<std::mutex> lock(_mutex);for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == host) {_hosts.erase(it);break;}}}// 从这些主机中选择一台主机,是采用轮询的方式Address chooseHost() {std::unique_lock<std::mutex> lock(_mutex);size_t pos = _idx++ % _hosts.size();return _hosts[pos];}bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}
private:std::mutex _mutex;size_t _idx;std::vector<Address> _hosts;
};class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;Discoverer(const Requestor::ptr& requestor) :_requestor(requestor) {}// 进行服务发现的接口。传入的连接是注册中心的连接,这样才能向注册中心发送请求// 将能提供服务的主机信息放到host中bool serviceDiscovery(const BaseConnection::ptr& conn, const std::string& method, Address& host) {{//当前所保管的提供者信息存在,则直接返回地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if (it != _method_hosts.end()) {if (it->second->empty() == false) {host = it->second->chooseHost();return true;}}}//当前服务的提供者为空// 1. 构建服务发现请求auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);// 2. 发送服务发现请求BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服务发现失败!");return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (!service_rsp) {ELOG("服务发现失败!响应类型转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}// 成功接收到了响应,获取响应中的主机信息std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());if (method_host->empty()) {ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());return false;}host = method_host->chooseHost();_method_hosts[method] = method_host;return true;}// 这个接口是提供给Dispatcher模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg) {// 1. 判断是上线还是下线请求,如果都不是那就不用处理了auto optype = msg->optype();std::string method = msg->method();std::unique_lock<std::mutex> lock(_mutex);if (optype == ServiceOptype::SERVICE_ONLINE) {// 2. 上线请求:找到MethodHost,向其中新增一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {auto method_host = std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;}else {it->second->appendHost(msg->host());}}else if (optype == ServiceOptype::SERVICE_OFFLINE) {// 3. 下线请求:找到MethodHost,从其中删除一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {return;}it->second->removeHost(msg->host());}// 我们是不关心响应的// 当能提供某项服务的全部主机下线后,我们不会将方法与主机的映射关系删除}
private:std::mutex _mutex;// 某一种服务与能提供这种服务的所有主机信息的映射std::unordered_map<std::string, MethodHost::ptr> _method_hosts;// 发送请求需要通过Resquestor模块Requestor::ptr _requestor;
};
在MethodHost中,保存主机信息时,既可以选择vector,也可以选择unordered_set。使用vector时选择主机的效率更高,使用unordered_set时,插入、删除的效率会更高,明显是选择主机的接口被调用的频率更高,所以我们就使用vector。
业务客户端封装
现在我们已经将RPC功能、服务注册与发现功能都实现了,RPC相关的业务功能就已经全部完成了。我们现在要将RPC功能、服务的注册与发现功能、网络通信功能合并。
我们需要封装出3个客户端:
- 服务注册客户端:应用于服务提供者向服务中心进行服务注册
- 服务发现客户端:应用于服务调用者向服务中心进行服务发现
- RPC客户端:可以进行RPC调用,同时也可以进行服务发现。
// 服务注册客户端: 实现服务注册功能
class RegistryClient
{
public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string& ip, int port) :_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<cxfrpc::Dispatcher>()){// 向Dispatcher模块注册服务响应的回调函数auto rsp_cb = std::bind(&cxfrpc::client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<cxfrpc::BaseMessage>(cxfrpc::MType::RSP_SERVICE, rsp_cb);// 注册客户端接收到消息时的回调函数auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_client = cxfrpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口// 这里的接口不需要连接了,连接从_client中来bool registryMethod(const std::string& method, const Address& host){return _provider->registryMethod(_client->connection(), method, host);}
private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;
};// 服务发现客户端: 实现服务发现功能
class DiscoveryClient
{
public:using ptr = std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string& ip, int port) :_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){// 向Dispatcher模块注册服务响应的回调函数 auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);// 向Dispatcher模块注册服务请求的回调函数 --- 对于服务上线/下线的回调函数auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);// 注册客户端接收到消息时的回调函数auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string& method, Address& host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}
private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;
};// RPC客户端: 实现RPC远程调用和服务发现功能
class RpcClient
{
public:using ptr = std::shared_ptr<RpcClient>;// 没有启用服务发现功能时,传入服务提供者的地址,启用了服务发现功能时,传入注册中心的地址RpcClient(bool enableDiscovery, const std::string& ip, int port);bool call(const std::string& method, const Json::Value& params, Json::Value& result);bool call(const std::string& method, const Json::Value& params, RpcCaller::JsonAsyncResponse& result);bool call(const std::string& method, const Json::Value& params, const RpcCaller::JsonResponseCallback& cb);
private:bool _enableDiscovery; // 是否启用服务发现功能DiscoveryClient::ptr _discovery_client; // 服务发现客户端RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;
};
对于这个RPC客户端,我们需要思考一个问题:RPC客户端是采用长连接还是短连接?
短链接是每次要调用那个功能时,直接读取提前缓存的能提供服务的主机信息,或者通过服务发现客户端获取到能提供服务的主机信息,然后实例化一个RPC客户端对象,向能提供服务的主机发送请求,获取到响应之后,释放掉RPC客户端。
- 优点:思想简单,用的时候创建
- 缺点:RPC调用效率稍低,RPC调用完毕后需要关闭客户端,异步处理的时候较为麻烦。异步调用时,call函数调用完了还不能关闭客户端,必须等到拿到响应了才能关闭客户端,所以必须设置一个回调函数,在获取到响应时关闭客户端。相当于没有异步,直接是回调函数。
长连接思想:在客户端这边又一个连接池,在第一次实例化客户端进行RPC调用后,并不会关闭客户端,而是将客户端放入连接池中,在后续再次发起相同RPC调用的时候,在连接池中发现有已连接成功的客户端,这时候,就可以直接取出客户端对象,发起RPC调用。
- 难点:一旦收到服务下线通知,就需要从连接池中删除对应的客户端对象
- 优点:进行二次相同rpc调用时,效率高(直接复用之前的连接)
- 缺点:在服务下线后,需要对连接池中的连接进行处理
长连接说明:假设现在有两个主机信息能提供Add服务,127.0.0.1:8080和127.0.0.1:9090,当我们要进行RPC调用时,会创建一个RPC客户端,连接127.0.0.1:8080,发送请求后将这个RPC客户端放入到连接池中,下一次要进行RPC调用时,会采用轮询的方式,创建RPC客户端,连接127.0.0.1:9090,发送请求后将这个RPC客户端放入到连接池中。再下次要进行RPC调用时,轮询到了127.0.0.1:8080,会发现连接池中已经有这个连接了,如果连接池中的这个客户端已经完成了RPC调用的过程,那么直接使用这个客户端,就不需要再进行创建RPC客户端对象了。
在这里,我们采用长连接。
// RPC客户端: 实现RPC远程调用和服务发现功能
class RpcClient
{
public:using ptr = std::shared_ptr<RpcClient>;// 没有启用服务发现功能时,传入服务提供者的地址,启用了服务发现功能时,传入注册中心的地址RpcClient(bool enableDiscovery, const std::string& ip, int port) :_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<cxfrpc::client::RpcCaller>(_requestor)){// 注册RPC响应的回调函数auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandlers<BaseMessage>(MType::RSP_RPC, rsp_cb);// 如果启用了服务发现,地址信息是注册中心的,是服务发现客户端需要连接的地址,则通过地址信息实例化_discovery_client// 如果未启用服务发现,地址信息是服务提供者的,则直接实例化_rpc_clientif (_enableDiscovery){_discovery_client = std::make_shared<DiscoveryClient>(ip, port);}else{auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string& method, const Json::Value& params, Json::Value& result){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params, RpcCaller::JsonAsyncResponse& result){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params, const RpcCaller::JsonResponseCallback& cb){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, result);}
private:// 封装一下对_rpc_clients的操作// 向连接池插入一个客户端对象void putClient(const Address& host, BaseClient::ptr& client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}// 从连接池删除一个客户端对象void delClient(const Address& host){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}// 新建一个客户端对象BaseClient::ptr newClient(const Address& host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}// 根据地址查找客户端BaseClient::ptr getClient(const Address& host){std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) return BaseClient::ptr();return it->second;}// 根据方法名称查找客户端BaseClient::ptr getClient(const std::string& method){BaseClient::ptr client;if (_enableDiscovery){// 通过服务发现,获取服务提供者的地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false){ELOG("当前%s服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}// 查看服务提供者是否已经有实例化客户端,有则直接使用,没有则创建client = getClient(host);if (client.get() == nullptr){client = newClient(host);}}else{client = _rpc_client;}return client;}
private:struct AddressHash{size_t operator()(const Address& host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery; // 是否启用服务发现功能DiscoveryClient::ptr _discovery_client; // 服务发现客户端Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client; // 用于未启用服务发现std::mutex _mutex;// 提供服务的主机信息与RPC客户端对象的映射关系std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients; // 用于服务发现的客户端连接池
};
在这里unordered_map的第一个参数是Address这种自定义类型,无法计算哈希值,所以需要传入一个计算Address哈希值的函数。std::hash这个结构体中有一个对于0的重载,所以可以当成仿函数使用,内部实现了对各种内置类型的哈希值的计算。
当服务下线时,就需要从连接池中删除掉RPC客户端对象了。要在服务下线通知的回调函数中进行这个操作。
class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;// 服务下线时的回调函数,用于将一个客户端从连接池中删除using OfflineCallback = std::function<void(const Address&)>;Discoverer(const Requestor::ptr& requestor, const OfflineCallback &cb) :_requestor(requestor), _offline_callback(cb) {}// 进行服务发现的接口。传入的连接是注册中心的连接,这样才能向注册中心发送请求// 将能提供服务的主机信息放到host中bool serviceDiscovery(const BaseConnection::ptr& conn, const std::string& method, Address& host){{//当前所保管的提供者信息存在,则直接返回地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if (it != _method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//当前服务的提供者为空// 1. 构建服务发现请求auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);// 2. 发送服务发现请求BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false){ELOG("服务发现失败!");return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (!service_rsp){ELOG("服务发现失败!响应类型转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK){ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}// 成功接收到了响应,获取响应中的主机信息std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());if (method_host->empty()){ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());return false;}host = method_host->chooseHost();_method_hosts[method] = method_host;return true;}// 这个接口是提供给Dispatcher模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr& conn, const ServiceRequest::ptr& msg){// 1. 判断是上线还是下线请求,如果都不是那就不用处理了auto optype = msg->optype();std::string method = msg->method();std::unique_lock<std::mutex> lock(_mutex);if (optype == ServiceOptype::SERVICE_ONLINE){// 2. 上线请求:找到MethodHost,向其中新增一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()){auto method_host = std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;}else{it->second->appendHost(msg->host());}}else if (optype == ServiceOptype::SERVICE_OFFLINE){// 3. 下线请求:找到MethodHost,从其中删除一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()){return;}it->second->removeHost(msg->host());_offline_callback(msg->host());}// 我们是不关心响应的// 当能提供某项服务的全部主机下线后,我们不会将方法与主机的映射关系删除}
private:// 服务下线时的回调函数OfflineCallback _offline_callback;std::mutex _mutex;// 某一种服务与能提供这种服务的所有主机信息的映射std::unordered_map<std::string, MethodHost::ptr> _method_hosts;// 发送请求需要通过Resquestor模块Requestor::ptr _requestor;
};
给Discoverer增加了一个成员变量,是服务下线时的回调函数,则其他使用了Discoverer的地方需要改变创建的方式。
// 服务发现客户端: 实现服务发现功能
class DiscoveryClient
{
public:using ptr = std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string& ip, int port, const Discoverer::OfflineCallback& cb) :_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()){// 向Dispatcher模块注册服务响应的回调函数 auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);// 向Dispatcher模块注册服务请求的回调函数 --- 对于服务上线/下线的回调函数auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);// 注册客户端接收到消息时的回调函数auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string& method, Address& host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}
private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;
};// RPC客户端: 实现RPC远程调用和服务发现功能
class RpcClient
{
public:using ptr = std::shared_ptr<RpcClient>;// 没有启用服务发现功能时,传入服务提供者的地址,启用了服务发现功能时,传入注册中心的地址RpcClient(bool enableDiscovery, const std::string& ip, int port) :_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<cxfrpc::client::RpcCaller>(_requestor)){// 注册RPC响应的回调函数auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);// 如果启用了服务发现,地址信息是注册中心的,是服务发现客户端需要连接的地址,则通过地址信息实例化_discovery_client// 如果未启用服务发现,地址信息是服务提供者的,则直接实例化_rpc_clientif (_enableDiscovery){auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);}else{auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string& method, const Json::Value& params, Json::Value& result){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params, RpcCaller::JsonAsyncResponse& result){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params, const RpcCaller::JsonResponseCallback& cb){// 获取服务提供者:1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) return false;// 通过客户端连接,发送RPC请求return _caller->call(client->connection(), method, params, cb);}
private:// 封装一下对_rpc_clients的操作// 向连接池插入一个客户端对象void putClient(const Address& host, BaseClient::ptr& client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}// 从连接池删除一个客户端对象void delClient(const Address& host){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}// 新建一个客户端对象BaseClient::ptr newClient(const Address& host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}// 根据地址查找客户端BaseClient::ptr getClient(const Address& host){std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) return BaseClient::ptr();return it->second;}// 根据方法名称查找客户端BaseClient::ptr getClient(const std::string& method){BaseClient::ptr client;if (_enableDiscovery){// 通过服务发现,获取服务提供者的地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false){ELOG("当前%s服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}// 查看服务提供者是否已经有实例化客户端,有则直接使用,没有则创建client = getClient(host);if (client.get() == nullptr){client = newClient(host);}}else{client = _rpc_client;}return client;}
private:struct AddressHash{size_t operator()(const Address& host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery; // 是否启用服务发现功能DiscoveryClient::ptr _discovery_client; // 服务发现客户端Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client; // 用于未启用服务发现std::mutex _mutex;// 提供服务的主机信息与RPC客户端对象的映射关系std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients; // 用于服务发现的客户端连接池
};
注册中心服务端实现
在整个项目中,需要封装3个服务端:
- RPC服务端
- 注册中心服务端
- 发布订阅服务端(后边实现)
我们先实现注册中心服务端:需要实现对服务提供者和服务调用者的管理,并提供对服务发现和服务注册请求的回调函数。
// 注册中心服务端
class RegistryServer
{
public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port) :_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<cxfrpc::Dispatcher>()){// 向Dispatcher模块注册服务注册请求与服务发现请求的回调函数auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);// 注册接收到消息时的回调函数和连接关闭时的回调函数_server = cxfrpc::ServerFactory::create(port);auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, _dispatcher.get(),std::placehandlers::_1, std::placehandler::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}
private:// 连接关闭的回调函数void onConnShutdown(const BaseConnection::ptr& conn){_pd_manager->onConnShutdown(conn);}
private:PDManager::ptr _pd_manager; // 对服务提供者与服务调用者的管理Dispatcher::ptr _dispatcher; // 需要向Dispatcher模块注册服务注册与服务发现的回调函数BaseServer::ptr _server;
};
RPC服务端实现
RPC服务端:包含一个RPC服务端,以及一个服务注册客户端(需要在启动后,先连接注册中心,进行所能提供的服务注册)。
- RPC服务端:主要针对RPC请求进行处理
- 服务注册客户端:主要实现向服务中心进行服务注册
// RPC服务端
class RpcServer
{
public:using ptr = std::shared_ptr<RpcServer>;// RpcServer会有两套地址信息:// 1. RPC服务端地址信息// 2. 注册中心服务端地址信息RpcServer(const Address& access_addr,bool enableRegistry = false,const Address& registry_server_addr = Address()) :_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<cxfrpc::server::RpcRouter>()),_dispatcher(std::make_shared<cxfrpc::Dispatcher>()){// 若启用了服务注册功能,则初始化服务注册客户端if (_enableRegistry){_reg_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);}// 向Dispatcher模块注册对于RPC请求的回调处理函数auto rpc_cb = std::bind(&cxfrpc::server::RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<cxfrpc::RpcRequest>(cxfrpc::MType::REQ_RPC, rpc_cb);// 注册接收到消息时的回调处理函数_server = cxfrpc::ServerFactory::create(_access_addr.second);auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}// 服务注册接口void registerMethod(const ServiceDescribe::ptr& service){if (_enableRegistry){_reg_client->registryMethod(service->method(), _access_addr);}_router->registerMethod(service);}void start(){_server->start();}
private:bool _enableRegistry; // 是否启用服务注册功能Address _access_addr;client::RegistryClient::ptr _reg_client; // 服务注册客户端RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;
};
简单RPC功能测试
因为我们对RPC客户端和服务端进以先测试以下简单RPC功能是否正常。
服务端:
void Add(const Json::Value &req, Json::Value &rsp)
{int num1 = req["num1"].asInt();int num2 = req["num2"].asInt();rsp = num1 + num2;
}
int main()
{std::unique_ptr<cxfrpc::server::SDescribeFactory> desc_factory(new cxfrpc::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamDesc("num1", cxfrpc::server::VType::INTEGRAL);desc_factory->setParamDesc("num2", cxfrpc::server::VType::INTEGRAL);desc_factory->setReturnType(cxfrpc::server::VType::INTEGRAL);desc_factory->setCallback(Add);cxfrpc::server::RpcServer server(cxfrpc::Address("127.0.0.1", 8080));server.registerMethod(desc_factory->build());server.start();return 0;
}
客户端:
void callback(const Json::Value &result)
{ILOG("callback result: %d", result.asInt());
}int main()
{cxfrpc::client::RpcClient client(false, "127.0.0.1", 8080);Json::Value params, result;params["num1"] = 11;params["num2"] = 22;bool ret = client.call("Add", params, result);if (ret != false) {ILOG("result: %d", result.asInt());}cxfrpc::client::RpcCaller::JsonAsyncResponse res_future;params["num1"] = 33;params["num2"] = 44;ret = client.call("Add", params, res_future);if (ret != false) {result = res_future.get();ILOG("result: %d", result.asInt());}params["num1"] = 55;params["num2"] = 66;ret = client.call("Add", params, callback);DLOG("---------------------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
可以正常进行RPC调用。
基于服务注册与发现的RPC功能测试
注册中心服务端:
int main()
{cxfrpc::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}
RPC服务端:
void Add(const Json::Value &req, Json::Value &rsp)
{int num1 = req["num1"].asInt();int num2 = req["num2"].asInt();rsp = num1 + num2;
}
int main()
{std::unique_ptr<cxfrpc::server::SDescribeFactory> desc_factory(new cxfrpc::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamDesc("num1", cxfrpc::server::VType::INTEGRAL);desc_factory->setParamDesc("num2", cxfrpc::server::VType::INTEGRAL);desc_factory->setReturnType(cxfrpc::server::VType::INTEGRAL);desc_factory->setCallback(Add);cxfrpc::server::RpcServer server(cxfrpc::Address("127.0.0.1", 8888), true, cxfrpc::Address("127.0.0.1", 8080));server.registerMethod(desc_factory->build());server.start();return 0;
}
客户端:
void callback(const Json::Value &result)
{ILOG("callback result: %d", result.asInt());
}int main()
{cxfrpc::client::RpcClient client(true, "127.0.0.1", 8080);Json::Value params, result;params["num1"] = 11;params["num2"] = 22;bool ret = client.call("Add", params, result);if (ret != false) {ILOG("result: %d", result.asInt());}cxfrpc::client::RpcCaller::JsonAsyncResponse res_future;params["num1"] = 33;params["num2"] = 44;ret = client.call("Add", params, res_future);if (ret != false) {result = res_future.get();ILOG("result: %d", result.asInt());}params["num1"] = 55;params["num2"] = 66;ret = client.call("Add", params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
先启动注册中心服务器,再启动RPC服务器,连接注册中心服务器并进行服务注册,再启动RPC客户端,连接注册中心并进行服务发现。
可以看到,此时是可以正常进行RPC调用的。现在我们让RPC服务端退出,此时就没有服务提供者了,然后再让RPC服务端请求服务。
可以看到RPC服务端退出时,注册中心客户端显示了服务下线,并且此时没办法进行RPC调用。
主题发布与订阅
发布与订阅实际上就是多个客户端与一个服务端实现中间数据转发的功能。
应用场景1:假设现在有一个新闻系统,一个用户编辑了关于某位歌手的绯闻的新闻,然后将其发布。这条新闻既属于音乐新闻,也属于花边新闻。现在新闻系统想将这条用户推送给其他用户,必然不可能推送给所有的用户,那应该推送给那些用户呢?此时就引入了一个主题的概念。用户通过客户端既可以发送消息,也可以创建主题,发布的消息一定要属于某一个或某几个主题。像这条新闻就既属于音乐主题,也属于花边主题,用户是可以通过客户端订阅主题的。新闻系统在推送时,会根据这条新闻的主题,推送给订阅了主题的用户。
应用场景2:假设现在有一个购物网站,有客户端、中转服务器、若干个具体的业务处理服务器,当一个用户通过客户端发送请求时,会经由中转服务器交给具体的业务处理服务器进行处理。假设现在一个用户的客户端发送了一个购物请求,中转服务器应该将这个请求交给哪一个中转服务器进行处理呢?此时也可以使用主题。让这些业务处理服务器去订阅主题,通过主题决定交给哪一个业务处理服务器进行处理。
这两种场景都是对主题的应用,前者是广播,后者是单播。我们的项目中采用的是前者。
服务端实现
服务端设计:
- 对外提供一个主题操作请求的回调处理函数,未来注册到Dispatcher模块。
- 对外提供一个订阅者连接断开的回调处理函数,未来设置到网络功能的连接断开处理函数中。
- 对数据的管理:hash_map<主题名称,主题描述>、hash_map<连接,订阅者>。
在订阅者描述中,会有这个订阅者对应的连接,以及这个订阅者订阅的所有的主题。在主题描述中,会有这个主题的名称,以及这订阅了这个主题的所有订阅者的描述。
我们先来实现出一个对数据进行管理的类。
class TopicManager
{
public:using ptr = std::shared_ptr<TopicManager>;TopicManager() {}// 对于服务操作请求的回调函数,需要注册到Dispatcher模块void onTopicRequest(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){TopicOptype topic_optype = msg->optype();bool ret = true;switch (topic_optype){//主题的创建case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;//主题的删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;//主题的订阅case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;//主题的取消订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;//主题消息的发布case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE);}// 只有当没找到主题时,ret才会是falseif (!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}//一个订阅者在连接断开时的处理---删除其关联的数据void onShutdown(const BaseConnection::ptr& conn){//消息发布者断开连接,不需要任何操作; 消息订阅者断开连接需要删除管理数据//因为我们只对主题和订阅者进行了管理,并没有对发布者进行管理//1. 判断断开连接的是否是订阅者,不是的话则直接返回std::vector<Topic::ptr> topics;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto it = _subscribers.find(conn);if (it == _subscribers.end()){return;//断开的连接,不是一个订阅者的连接}subscriber = it->second;//2. 获取到订阅者退出,受影响的主题对象for (auto& topic_name : subscriber->topics){auto topic_it = _topics.find(topic_name);if (topic_it == _topics.end()) continue;topics.push_back(topic_it->second);}//4. 从订阅者映射信息中,删除订阅者_subscribers.erase(it);}//3. 从受影响的主题对象中,移除订阅者for (auto& topic : topics){topic->removeSubscriber(subscriber);}}
private:// 发送错误应答,最后一个参数是错误类型void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(rcode);conn->send(msg_rsp);}// 发送正确应答void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(RCode::RCODE_OK);conn->send(msg_rsp);}// 创建主题void topicCreate(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){std::unique_lock<std::mutex> lock(_mutex);//构造一个主题对象,添加映射关系的管理std::string topic_name = msg->topicKey();auto topic = std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name, topic));}// 删除主题void topicRemove(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){// 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删除掉// 2. 删除主题的数据 -- 主题名称与主题对象的映射关系std::string topic_name = msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//在删除主题之前,先找出会受到影响的订阅者auto it = _topics.find(topic_name);if (it == _topics.end()) return;subscribers = it->second->subscribers;_topics.erase(it);//删除当前的主题映射关系,}for (auto& subscriber : subscribers){subscriber->removeTopic(topic_name);}}// 订阅主题bool topicSubscribe(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){//1. 先找出主题对象,以及订阅者对象// 如果没有找到主题--就要报错; 但是如果没有找到订阅者对象,那就要构造一个订阅者Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()) return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}//2. 在主题对象中,新增一个订阅者对象关联的连接; 在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}// 取消订阅主题void topicCancel(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){//1. 先找出主题对象,和订阅者对象Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end()){topic = topic_it->second;}auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}}//2. 从主题对象中删除当前的订阅者连接; 从订阅者信息中删除所订阅的主题名称if (subscriber) subscriber->removeTopic(msg->topicKey());if (topic && subscriber) topic->removeSubscriber(subscriber);}// 主题消息发布bool topicPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()) return false;topic = topic_it->second;}topic->pushMessage(msg);return true;}
private:// 对订阅者的描述struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn; // 订阅者对应的连接std::unordered_set<std::string> topics; //订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr& c) : conn(c) { }//订阅主题的时候调用void appendTopic(const std::string& topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}//主题被删除 或者 取消订阅的时候,调用void removeTopic(const std::string& topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 对主题的描述struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name; // 主题名称std::unordered_set<Subscriber::ptr> subscribers; //当前主题的订阅者Topic(const std::string& name) : topic_name(name) {}//新增订阅的时候调用void appendSubscriber(const Subscriber::ptr& subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}//取消订阅 或者 订阅者连接断开 的时候调用void removeSubscriber(const Subscriber::ptr& subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}//向订阅了该主题的所有订阅者发送消息,收到消息发布请求的时候调用void pushMessage(const BaseMessage::ptr& msg){std::unique_lock<std::mutex> lock(_mutex);for (auto& subscriber : subscribers){subscriber->conn->send(msg);}}};
private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics; // 主题名称与主题描述的映射std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers; // 连接与订阅者描述的映射
};
现在,我们基于这个对数据进行管理的类封装出发布订阅服务端。
// 发布订阅服务端
class TopicServer
{
public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port) :_topic_manager(std::make_shared<TopicManager>()),_dispatcher(std::make_shared<cxfrpc::Dispatcher>()){auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);_server = cxfrpc::ServerFactory::create(port);auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}
private:void onConnShutdown(const BaseConnection::ptr& conn){_topic_manager->onShutdown(conn);}
private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;
};
客户端实现
客户端设计:
- 对外提供5个操作接口:主题的创建、删除、订阅、取消订阅、发布消息。
- 对外提供一个对于发布消息的处理接口,未来需要注册到Dispatcher模块。
- 内部管理的数据:主题名称与消息处理回调函数的映射关系。
因为一个客户端什么时候接收到服务器推送过来的消息是不一定的,所以无法对消息进行同步处理,只能进行异步处理。异步处理就是先设置好每个主题消息的回调函数,接收到消息后,直接调用回调函数进行处理。
我们先来实现一个对于数据进行管理的类
class TopicManager
{
public:// 回调函数 --- 传入主题名称和要发送的消息using SubCallback = std::function<void(const std::string& key, const std::string& msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr& requestor) : _requestor(requestor) {}// 下面5个函数是提供给用户的// 创建主题bool create(const BaseConnection::ptr& conn, const std::string& key){return commonRequest(conn, key, TopicOptype::TOPIC_CREATE);}// 删除主题bool remove(const BaseConnection::ptr& conn, const std::string& key){return commonRequest(conn, key, TopicOptype::TOPIC_REMOVE);}// 订阅主题,需要设置一个接收到该主题消息的回调函数bool subscribe(const BaseConnection::ptr& conn, const std::string& key, const SubCallback& cb){addSubscribe(key, cb);bool ret = commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);if (ret == false){delSubscribe(key);return false;}return true;}// 取消订阅主题bool cancel(const BaseConnection::ptr& conn, const std::string& key){delSubscribe(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);}// 主题消息发布bool publish(const BaseConnection::ptr& conn, const std::string& key, const std::string& msg){return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}// 接收到消息发布请求的处理函数,注册到Dispatcher模块中void onPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){//1. 从消息中取出操作类型进行判断,是否是消息请求auto type = msg->optype();if (type != TopicOptype::TOPIC_PUBLISH){ELOG("收到了错误类型的主题操作!");return;}//2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();//3. 通过主题名称,查找对应主题的回调处理函数,有在处理,无在报错auto callback = getSubscribe(topic_key);if (!callback){ELOG("收到了 %s 主题消息,但是该消息无主题处理回调!", topic_key.c_str());return;}return callback(topic_key, topic_msg);}
private:// 下面3个函数是操作_topic_callbacks的封装void addSubscribe(const std::string& key, const SubCallback& cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscribe(const std::string& key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubscribe(const std::string& key){std::unique_lock<std::mutex> lock(_mutex);auto it = _topic_callbacks.find(key);if (it == _topic_callbacks.end()) return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr& conn, const std::string& key,TopicOptype type, const std::string& msg = ""){//1. 构造请求对象,并填充数据auto msg_req = MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setOptype(type);msg_req->setTopicKey(key);if (type == TopicOptype::TOPIC_PUBLISH){msg_req->setTopicMsg(msg);}//2. 向服务端发送请求,等待响应BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false){ELOG("主题操作请求失败!");return false;}//3. 判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if (!topic_rsp_msg){ELOG("主题操作响应,向下类型转换失败!");return false;}if (topic_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}
private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks;Requestor::ptr _requestor;
};
现在,我们基于这个对数据进行管理的类封装出发布订阅客户端。
class TopicClient
{
public:// 传入服务器的IP地址和端口号TopicClient(const std::string& ip, int port) :_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool create(const std::string& key) {return _topic_manager->create(_rpc_client->connection(), key);}bool remove(const std::string& key) {return _topic_manager->remove(_rpc_client->connection(), key);}bool subscribe(const std::string& key, const TopicManager::SubCallback& cb) {return _topic_manager->subscribe(_rpc_client->connection(), key, cb);}bool cancel(const std::string& key) {return _topic_manager->cancel(_rpc_client->connection(), key);}bool publish(const std::string& key, const std::string& msg) {return _topic_manager->publish(_rpc_client->connection(), key, msg);}void shutdown() {_rpc_client->shutdown();}
private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;
};
发布订阅测试
服务端:
int main()
{auto server = std::make_shared<cxfrpc::server::TopicServer>(8080);server->start();return 0;
}
消息发布客户端:
int main()
{//1. 实例化客户端对象auto client = std::make_shared<cxfrpc::client::TopicClient>("127.0.0.1", 8080);//2. 创建主题bool ret = client->create("hello");if (ret == false) {ELOG("创建主题失败!");}//3. 向主题发布消息for (int i = 0; i < 10; i++) {client->publish("hello", "Hello World-" + std::to_string(i));}client->shutdown();return 0;
}
消息订阅客户端:
void callback(const std::string &key, const std::string &msg)
{ILOG("%s 主题收到推送过来的消息: %s", key.c_str(), msg.c_str());
}int main()
{//1. 实例化客户端对象auto client = std::make_shared<cxfrpc::client::TopicClient>("127.0.0.1", 7070);//2. 创建主题bool ret = client->create("hello");if (ret == false) {ELOG("创建主题失败!");}//3. 订阅主题ret = client->subscribe("hello", callback);//4. 等待->退出std::this_thread::sleep_for(std::chrono::seconds(10));client->shutdown();return 0;
}
先启动服务端,再启动两个消息订阅客户端,再启动一个消息发布客户端。
可以看到,消息发布客户端发送了一次消息,两个消息订阅客户端都接收到了。此时就是借助发布订阅服务端实现了广播的功能。