Json-rpc通信项目(基于C++ Jsoncpp muduo库)
一、介绍RPC
RPC(Remote Procedure Call)远程过程调用,一种通过网络从远程计算器上请求服务,而不需要了解底层网络通信细节,RPC可以使用多种网络协议进行通信,并且在TCP/IP网络四层模型中跨越了传输层和应用层。RPC就是像调用本地方法一样调用远程方法。
项目技术选取:
1.实现一个远程调用接口call,然后通过传入函数名参数来调用RPC接口
2.选取JSON类型来进行网络传输的参数和返回值映射到RPC接口上
3.网络传输:选取muduo库
4.序列化和反序列化:JSON
主要环境为Linux(Ubuntu)
二、 介绍JSON
Json是一种数据交换格式,它采用完全独立于变成语言的文本格式来存储和表示数据
JsonCpp库主要是用于实现Json格式数据的序列化和反序列化,他实现了将多个数据对象组织成为Json格式字符串,以及将json格式字符串解析得到多个数据对象的功能
class JSON {public:static bool serialize(const Json::Value &val, std::string &body) {std::stringstream ss;//先实例化一个工厂类对象Json::StreamWriterBuilder swb;//通过工厂类对象来生产派生类对象std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());int ret = sw->write(val, &ss);if (ret != 0) {ELOG("json serialize failed!");return false;}body = ss.str();return true;}static bool unserialize(const std::string &body, Json::Value &val) {Json::CharReaderBuilder crb;std::string errs;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);if (ret == false) {ELOG("json unserialize failed : %s", errs.c_str());return false;}return true;}};
三、介绍Muduo库
Muduo库是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库,是一款基于主从Reactor模型的网络库,主要使用的线程模型是 one loop per thread即一个线程只能有一个事件循环,用于响应计时器和IO事件
一个文件描述符只能由一个线程进行读写,这里只做简单叙述,后面具体项目代码中会使用
四、C++11 异步操作
std::future 是C++11标准库中的一个模版类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候获取任务的执行结果,他能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
五、项目设计
理解项目功能
本质上来讲,我们要实现远程调用思想并不复杂,其实就是客户端想要完成某个任务的处理,但是这个处理过程并不是自己来完成,而是将请求发送到服务器上,让服务器来帮助其完成处理过程,并返回结果,客户端拿到结果后返回
但是一对多火一对一的关系中,一但服务端掉线,则客户端无法进行远端调用,且服务器的负载也会较高,我们选择设计为分布式的架构(由多个节点组成一个系统,这些节点通常指的是服务器,将不同的业务或者同一个业务拆分在不同节点上,通过协同工作解决高并发的问题,提高系统扩展性和可用性)主要是增加一个注册中心,基于注册中心不同的服务向注册中心进行注册,客户端在进行调用时先通过注册中心进行服务发现,找到能够提供服务的服务器
项目还提供了发布订阅功能,依托客户端围绕服务端进行消息的转发
项目主要三个功能:
1.RPC调用
2.服务的注册与发现以及服务的下线上线通知
3.消息的发布订阅
六、框架设计
主要将整个项目分为三层来进行实现
1.抽象层:将底层的网络通信以及应用层协议以及请求响应进行抽象,使项目更具扩展性和灵活性
2.具象层:针对抽象的功能进行具体的实现
3.业务层:基于抽象的框架在上层实现项目所需功能
整体框架设计:
服务端模块划分
首先我们要清楚服务端的功能需求:
1.基于网络通信接收客户端的请求,提供rpc服务
2.基于网络通信接收客户端的请求,提供服务注册与发现,上线下线通知
3.基于网络通信接收客户端的请求,提供主题操作(创建,删除,订阅,取消订阅),消息发布功能
按照功能需求可划分模块
1.Network :网络通信模块
网络通信模块,实现底层的网络通信功能,这个模块的本质上也是一个比较复杂庞大的模块,上面我们也说了,这里使用muduo库来进行搭建
2.Protocol :应用层通信协议模块
应用层通信协议模块存在的意义就是解析数据,解决通信中有可能存在的粘包问题,能够获取到一条完整的消息
我们选取的方式为LV格式
Length:固定为4字节长度,用于表示后续的本条消息数据长度
MType:固定4字节长度,用于表示该条消息的类型
1.rpc调用请求,相应类型消息
2.发布,订阅,取消订阅,消息推送类型消息
3.主题创建,删除类型消息
4.服务注册,发现,上线,下线消息类型
IDLength:固定4字节长度,用于描述后续ID字段的实际长度
MID:用于唯一标识消息,ID字段长度不固定
Body:消息主题正文数据字段,是请求或响应的实际内容字段
3.Dispatcher : 消息分发处理模块
区分消息类型,根据不同的类型,调用不同的业务处理函数进行消息处理
当在网络通信收到数据后,在onMessage回调函数中对数据进行应用层协议解析,得到一条实际消息载荷后,我们就该决定这条消息代表着客户端的什么请求,以及应该如何处理,本模块主要哈希表来表示消息类型和回调函数的映射关系,即,收到消息后,在模块中找到对应的处理回调函数进行调用即可
消息类型:1.rpc请求与响应 2.服务注册。发现,上线,下线 请求与响应 3. 主题创建,删除,订阅,取消订阅 请求与响应 消息发布的请求与响应
4.RpcRouter : 远端调用路由功能模块
提供rpc请求的处理回调函数,内部所要实现的功能,分辨出客户端请求的服务进行处理得到结果进行响应
1.具备一个rpc路由管理,其中包含对于每个服务的参数校验功能
2.具备一个方法名称和方法业务回调的映射
3.必须向外提供rpc请求的业务处理函数
5.Publish—Subscribe :发布订阅功能模块
针对发布订阅请求进行处理,提供一个回调函数设置给Dispatch模块
主题的创建,主题的删除,主题的订阅,主题的取消订阅,主题消息的发布
该模块必须具备一个主题管理,且主题中需要保存订阅了该主题的客户端连接,当主题收到一条消息,需要将这条消息推送给订阅了该主题的所有客户端
必须具备一个订阅者管理,且每个订阅者描述中都必须保存自己所订阅的主题名称,当一个订阅客户端断开连接时,能够找到订阅信息的关联关系,进行删除
必须向外提供 主题创建,销毁,主题订阅,取消订阅,能够找到订阅信息的关联关系,进行删除
6.Registry—Discovery :服务注册,发现,上线,下线功能模块
针对服务注册与发现请求的处理
1.服务注册:服务provider告诉中转中心,自己能提供哪些服务
2.服务发现:服务caller询问中转中心,谁能提供指定服务
3.服务上线:在一个provider上线了指定服务后,通知发现过该服务的客户端有provider可以提供该服务
4.服务下线:在一个provider断开连接,通知发现过该服务的caller,谁下线了哪个服务
必须具备一个服务发现者的管理:
方法与发现者:当一个客户端进行服务发现的时候,进行记录谁发现过该服务,当有一个新的提供者上线的时 候,可以通知该发现者
连接与发现者:当一个发现者断开连接了,删除关联关系,往后就不需要通知了
必须具备一个服务提供者的管理:
连接与提供者:当一个提供者断开连接的时候,能够通知该提供者提供的服务对应的发现者,该主机的该服务下线了
方法与提供者:能够知道谁的哪些方法下线了,然后通知发现过该方法的客户端
7.Server :基于以上模块整合而出的服务端模块
RpcServer:rpc功能模块与网络通信部分结合
RegistryServer:服务发现注册功能与网络通信部分结合
TopicServer:发布订阅功能模块与网络通信部分结合
客户端模块划分
1.Protocol:应用层通信协议模块
2.Network:网络通信模块
3.Dispatch:消息分发处理模块
4.Requestor:请求管理模块
5.RpcCaller:远端调用功能模块
6.Publish-Subscribe:发布订阅功能模块
7.Registry-Discovery:服务注册,发现,上线,下线功能模块
8.Client:基于以上模块整合而出的客户端模块
重复模块不再过多赘述
Requestor模块:
针对客户端的每一条请求进行管理,以便于对请求对应的响应做出合适的操作
首先我们要思考一个问题:对于客户端来说,更多的是请求方,是主动发起请求服务的一方,而在多线程网络通信中,可能会存在时序的问题,其次muduo库这种异步IO网络通信库,通常是IO操作都是异步操作,无法直接在发送请求后去等待该条请求的响应
解决办法:给每一个请求设置请求ID,将数据存入hash_map中,以请求ID作为映射,并向外提供获取指定请求ID响应的阻塞接口,这样只要知道的请求ID即可获取到自己想要的响应
RpcCall模块:
向用户提供进行rpc调用模块
1.同步调用:发起调用后,等收到响应结果后返回
2.异步调用:发起调用后立即返回,在想获取结果的时候进行获取
3.回调调用:发起调用的同事设置结果的处理回调,收到响应后自动对结果进行回调处理
Registry-Discovery模块
注册者:作为Rpc服务的提供者,需要向注册中心提供服务,因为需要实现向服务器注册服务的功能
发现者:作为Rpc服务的调用者,需要先进行服务发现,获取地址后需要管理起来留用,且作为发现者,需要关注注册中心发过来的服务上线/下线消息,及时对已经下线的服务和主机进行管理
Client整合模块:
RegistryClient:服务注册功能模块与网络通信客户端结合
DiscoveryClient:服务发现功能模块与网络通信客户端结合
RpcClient:Discovery 与RPC功能模块与网络通信客户端结合
TopicClient:发布订阅功能模块与网络通信客户端结合
七、项目实现
常用接口实现
日志:
一个项目的实现,日志是必不可少的,主要用来开酥定位程序运行逻辑出错的位置
出现问题不是最可怕的,可怕的是找不到问题出现在哪
#define LDBG 0#define LINF 1#define LERR 2#define LDEFAULT LDBG#define LOG(level, format, ...) {\if (level >= LDEFAULT){\time_t t = time(NULL);\struct tm *lt = localtime(&t);\char time_tmp[32] = {0};\strftime(time_tmp, 31, "%m-%d %T", lt);\fprintf(stdout, "[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);\}\}#define DLOG(format, ...) LOG(LDBG, format, ##__VA_ARGS__);#define ILOG(format, ...) LOG(LINF, format, ##__VA_ARGS__);#define ELOG(format, ...) LOG(LERR, format, ##__VA_ARGS__);
Json序列化/反序列化
class JSON {public:static bool serialize(const Json::Value &val, std::string &body) {std::stringstream ss;//先实例化一个工厂类对象Json::StreamWriterBuilder swb;//通过工厂类对象来生产派生类对象std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());int ret = sw->write(val, &ss);if (ret != 0) {ELOG("json serialize failed!");return false;}body = ss.str();return true;}static bool unserialize(const std::string &body, Json::Value &val) {Json::CharReaderBuilder crb;std::string errs;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);if (ret == false) {ELOG("json unserialize failed : %s", errs.c_str());return false;}return true;}};
UUID生成
什么是UUID?UUID也叫通用唯一识别码,通常由32位16进制数字字符组成
8-4-4-4-12
我们采用生成8个随机数字,8个字节序号,16字节数组生成
class UUID {public:static std::string uuid() {std::stringstream ss;//1. 构造一个机器随机数对象std::random_device rd;//2. 以机器随机数为种子构造伪随机数对象std::mt19937 generator (rd());//3. 构造限定数据范围的对象std::uniform_int_distribution<int> distribution(0, 255);//4. 生成8个随机数,按照特定格式组织成为16进制数字字符的字符串for (int i = 0; i < 8; i++) {if (i == 4 || i == 6) ss << "-";ss << std::setw(2) << std::setfill('0') <<std::hex << distribution(generator);}ss << "-";//5. 定义一个8字节序号,逐字节组织成为16进制数字字符的字符串static std::atomic<size_t> seq(1); // 00 00 00 00 00 00 00 01size_t cur = seq.fetch_add(1);for (int i = 7; i >= 0; i--) {if (i == 5) ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> (i*8)) & 0xFF);}return ss.str();}
定义
1.请求字段宏定义
方法名称 方法参数 主题名称 主题消息 操作类型 IP地址 port端口 响应码 调用结果
#define KEY_METHOD "method"#define KEY_PARAMS "parameters"#define KEY_TOPIC_KEY "topic_key"#define KEY_TOPIC_MSG "topic_msg"#define KEY_OPTYPE "optype"#define KEY_HOST "host"#define KEY_HOST_IP "ip"#define KEY_HOST_PORT "port"#define KEY_RCODE "rcode"#define KEY_RESULT "result"
2.消息类型的定义
PRC 请求,响应
主题操作 请求,响应
服务操作 请求,响应
enum class MType {REQ_RPC = 0,RSP_RPC,REQ_TOPIC,RSP_TOPIC,REQ_SERVICE,RSP_SERVICE};
3.响应码类型定义
成功处理
解析失败
消息中字段缺失或错误导致无效消息
链接断开
无效的Rpc调用参数
Rpc服务不存在
无效的Topic操作类型
主题不存在
无效的服务操作类型
enum class RCode {RCODE_OK = 0,RCODE_PARSE_FAILED,RCODE_ERROR_MSGTYPE,RCODE_INVALID_MSG,RCODE_DISCONNECTED,RCODE_INVALID_PARAMS,RCODE_NOT_FOUND_SERVICE,RCODE_INVALID_OPTYPE,RCODE_NOT_FOUND_TOPIC,RCODE_INTERNAL_ERROR};static std::string errReason(RCode code) {static std::unordered_map<RCode, std::string> err_map = {{RCode::RCODE_OK, "成功处理!"},{RCode::RCODE_PARSE_FAILED, "消息解析失败!"},{RCode::RCODE_ERROR_MSGTYPE, "消息类型错误!"},{RCode::RCODE_INVALID_MSG, "无效消息"},{RCode::RCODE_DISCONNECTED, "连接已断开!"},{RCode::RCODE_INVALID_PARAMS, "无效的Rpc参数!"},{RCode::RCODE_NOT_FOUND_SERVICE, "没有找到对应的服务!"},{RCode::RCODE_INVALID_OPTYPE, "无效的操作类型"},{RCode::RCODE_NOT_FOUND_TOPIC, "没有找到对应的主题!"},{RCode::RCODE_INTERNAL_ERROR, "内部错误!"}};auto it = err_map.find(code);if (it == err_map.end()) {return "未知错误!";}return it->second;}
4.RPC请求类型定义
同步请求
异步请求
回调请求
enum class RType {REQ_ASYNC = 0,REQ_CALLBACK};
5.主题操作类型定义
主题创建
主题删除
主题订阅
主题取消订阅
主题消息发布
enum class TopicOptype {TOPIC_CREATE = 0,TOPIC_REMOVE,TOPIC_SUBSCRIBE,TOPIC_CANCEL,TOPIC_PUBLISH};
6.服务操作类型定义
服务注册
服务发现
服务上线
服务下线
enum class ServiceOptype {SERVICE_REGISTRY = 0,SERVICE_DISCOVERY,SERVICE_ONLINE,SERVICE_OFFLINE,SERVICE_UNKNOW};
通信抽象实现
提供一些必要的接口,具体实现由派生类实现
BaseMessage BaseBuffer BaseProtocol BaseConnection BaseServe BaseClient
namespace myrpc {class BaseMessage {public:using ptr = std::shared_ptr<BaseMessage>;virtual ~BaseMessage(){}virtual void setId(const std::string &id) {_rid = id;}virtual std::string rid() { return _rid; }virtual void setMType(MType mtype) {_mtype = mtype;}virtual MType mtype() { return _mtype; }virtual std::string serialize() = 0;virtual bool unserialize(const std::string &msg) = 0;virtual bool check() = 0;private:MType _mtype;std::string _rid;};class BaseBuffer {public:using ptr = std::shared_ptr<BaseBuffer>;virtual size_t readableSize() = 0;virtual int32_t peekInt32() = 0;virtual void retrieveInt32() = 0;virtual int32_t readInt32() = 0;virtual std::string retrieveAsString(size_t len) = 0;};class BaseProtocol {public:using ptr = std::shared_ptr<BaseProtocol>;virtual bool canProcessed(const BaseBuffer::ptr &buf) = 0;virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) = 0;virtual std::string serialize(const BaseMessage::ptr &msg) = 0;};class BaseConnection {public:using ptr = std::shared_ptr<BaseConnection>;virtual void send(const BaseMessage::ptr &msg) = 0;virtual void shutdown() = 0;virtual bool connected() = 0;};using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;using CloseCallback = std::function<void(const BaseConnection::ptr&)>;using MessageCallback = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;class BaseServer {public:using ptr = std::shared_ptr<BaseServer>;virtual void setConnectionCallback(const ConnectionCallback& cb) {_cb_connection = cb;}virtual void setCloseCallback(const CloseCallback& cb) {_cb_close = cb;}virtual void setMessageCallback(const MessageCallback& cb) {_cb_message = cb;}virtual void start() = 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;};class BaseClient {public:using ptr = std::shared_ptr<BaseClient>;virtual void setConnectionCallback(const ConnectionCallback& cb) {_cb_connection = cb;}virtual void setCloseCallback(const CloseCallback& cb) {_cb_close = cb;}virtual void setMessageCallback(const MessageCallback& cb) {_cb_message = cb;}virtual void connect() = 0;virtual void shutdown() = 0;virtual bool send(const BaseMessage::ptr&) = 0;virtual BaseConnection::ptr connection() = 0;virtual bool connected() = 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;};
}
消息抽象实现
class JsonMessage : public BaseMessage {public:using ptr = std::shared_ptr<JsonMessage>;virtual std::string serialize() override {std::string body;bool ret = JSON::serialize(_body, body);if (ret == false) {return std::string();}return body;}virtual bool unserialize(const std::string &msg) override {return JSON::unserialize(msg, _body);}protected:Json::Value _body;};class JsonRequest : public JsonMessage {public:using ptr = std::shared_ptr<JsonRequest>;};class JsonResponse : public JsonMessage {public:using ptr = std::shared_ptr<JsonResponse>;virtual bool check() override {//在响应中,大部分的响应都只有响应状态码//因此只需要判断响应状态码字段是否存在,类型是否正确即可if (_body[KEY_RCODE].isNull() == true) {ELOG("响应中没有响应状态码!");return false;}if (_body[KEY_RCODE].isIntegral() == false) {ELOG("响应状态码类型错误!");return false;}return true;}virtual RCode rcode() {return (RCode)_body[KEY_RCODE].asInt();}virtual void setRCode(RCode rcode) {_body[KEY_RCODE] = (int)rcode;}};
Muduo封装实现
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpClient.h>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"
#include "message.hpp"
#include <mutex>
#include <unordered_map>namespace myrpc {class MuduoBuffer : public BaseBuffer {public:using ptr = std::shared_ptr<MuduoBuffer>;MuduoBuffer(muduo::net::Buffer *buf):_buf(buf){}virtual size_t readableSize() override {return _buf->readableBytes();}virtual int32_t peekInt32() override {//muduo库是一个网络库,从缓冲区取出一个4字节整形,会进行网络字节序的转换return _buf->peekInt32();}virtual void retrieveInt32() override {return _buf->retrieveInt32();}virtual int32_t readInt32() override {return _buf->readInt32();}virtual std::string retrieveAsString(size_t len) override {return _buf->retrieveAsString(len);}private:muduo::net::Buffer *_buf;};class BufferFactory {public:template<typename ...Args>static BaseBuffer::ptr create(Args&& ...args) {return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);}};class LVProtocol : public BaseProtocol {public:// |--Len--|--VALUE--|// |--Len--|--mtype--|--idlen--|--id--|--body--|using ptr = std::shared_ptr<LVProtocol>;//判断缓冲区中的数据量是否足够一条消息的处理virtual bool canProcessed(const BaseBuffer::ptr &buf) override {if (buf->readableSize() < lenFieldsLength) {return false;}int32_t total_len = buf->peekInt32();//DLOG("total_len:%d", total_len);if (buf->readableSize() < (total_len + lenFieldsLength)) {return false;}return true;}virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) override {//当调用onMessage的时候,默认认为缓冲区中的数据足够一条完整的消息int32_t total_len = buf->readInt32(); //读取总长度MType mtype = (MType)buf->readInt32(); // 读取数据类型int32_t idlen = buf->readInt32(); // 读取id长度int32_t body_len = total_len - idlen - idlenFieldsLength - mtypeFieldsLength;std::string id = buf->retrieveAsString(idlen);std::string body = buf->retrieveAsString(body_len);msg = MessageFactory::create(mtype);if (msg.get() == nullptr) {ELOG("消息类型错误,构造消息对象失败!");return false;}bool ret = msg->unserialize(body);if (ret == false) {ELOG("消息正文反序列化失败!");return false;}msg->setId(id);msg->setMType(mtype);return true;}virtual std::string serialize(const BaseMessage::ptr &msg) override {// |--Len--|--mtype--|--idlen--|--id--|--body--|std::string body = msg->serialize();std::string id = msg->rid();auto mtype = htonl((int32_t)msg->mtype());int32_t idlen = htonl(id.size());int32_t h_total_len = mtypeFieldsLength + idlenFieldsLength + id.size() + body.size();int32_t n_total_len = htonl(h_total_len);//DLOG("h_total_len:%d", h_total_len);std::string result;result.reserve(h_total_len);result.append((char*)&n_total_len, lenFieldsLength);result.append((char*)&mtype, mtypeFieldsLength);result.append((char*)&idlen, idlenFieldsLength);result.append(id);result.append(body);return result;}private:const size_t lenFieldsLength = 4;const size_t mtypeFieldsLength = 4;const size_t idlenFieldsLength = 4;};class ProtocolFactory {public:template<typename ...Args>static BaseProtocol::ptr create(Args&& ...args) {return std::make_shared<LVProtocol>(std::forward<Args>(args)...);}};class MuduoConnection : public BaseConnection {public:using ptr = std::shared_ptr<MuduoConnection>;MuduoConnection(const muduo::net::TcpConnectionPtr &conn,const BaseProtocol::ptr &protocol) : _protocol(protocol), _conn(conn) {}virtual void send(const BaseMessage::ptr &msg) override {std::string body = _protocol->serialize(msg);_conn->send(body);}virtual void shutdown() override {_conn->shutdown();}virtual bool connected() override {_conn->connected();}private:BaseProtocol::ptr _protocol;muduo::net::TcpConnectionPtr _conn;};class ConnectionFactory {public:template<typename ...Args>static BaseConnection::ptr create(Args&& ...args) {return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);}};class MuduoServer : public BaseServer {public:using ptr = std::shared_ptr<MuduoServer>;MuduoServer(int port) : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "MuduoServer", muduo::net::TcpServer::kReusePort),_protocol(ProtocolFactory::create()){}virtual void start() {_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.start();//先开始监听_baseloop.loop();//开始死循环事件监控}private:void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {std::cout << "连接建立!\n";auto muduo_conn = ConnectionFactory::create(conn, _protocol);{std::unique_lock<std::mutex> lock(_mutex);_conns.insert(std::make_pair(conn, muduo_conn));}if (_cb_connection) _cb_connection(muduo_conn);}else {std::cout << "连接断开!\n";BaseConnection::ptr muduo_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()) {return;}muduo_conn = it->second;_conns.erase(conn);}if (_cb_close) _cb_close(muduo_conn);}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG("连接有数据到来,开始处理!");auto base_buf = BufferFactory::create(buf);while(1) {if (_protocol->canProcessed(base_buf) == false) {//数据不足if (base_buf->readableSize() > maxDataSize) {conn->shutdown();ELOG("缓冲区中数据过大!");return ;}//DLOG("数据量不足!");break;}//DLOG("缓冲区中数据可处理!");BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false) {conn->shutdown();ELOG("缓冲区中数据错误!");return ;}//DLOG("消息反序列化成功!")BaseConnection::ptr base_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()) {conn->shutdown();return;}base_conn = it->second;}//DLOG("调用回调函数进行消息处理!");if (_cb_message) _cb_message(base_conn, msg);}}private:const size_t maxDataSize = (1 << 16);BaseProtocol::ptr _protocol;muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;};class ServerFactory {public:template<typename ...Args>static BaseServer::ptr create(Args&& ...args) {return std::make_shared<MuduoServer>(std::forward<Args>(args)...);}};class MuduoClient : public BaseClient {public:using ptr = std::shared_ptr<MuduoClient>;MuduoClient(const std::string &sip, int sport):_protocol(ProtocolFactory::create()),_baseloop(_loopthread.startLoop()),_downlatch(1),_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient"){}virtual void connect() override {DLOG("设置回调函数,连接服务器");_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));//设置连接消息的回调_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//连接服务器_client.connect();_downlatch.wait();DLOG("连接服务器成功!");}virtual void shutdown() override {return _client.disconnect();}virtual bool send(const BaseMessage::ptr &msg) override {if (connected() == false) {ELOG("连接已断开!");return false;}_conn->send(msg);}virtual BaseConnection::ptr connection() override {return _conn;}virtual bool connected() {return (_conn && _conn->connected());}private:void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {std::cout << "连接建立!\n";_downlatch.countDown();//计数--,为0时唤醒阻塞_conn = ConnectionFactory::create(conn, _protocol);}else {std::cout << "连接断开!\n";_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG("连接有数据到来,开始处理!");auto base_buf = BufferFactory::create(buf);while(1) {if (_protocol->canProcessed(base_buf) == false) {//数据不足if (base_buf->readableSize() > maxDataSize) {conn->shutdown();ELOG("缓冲区中数据过大!");return ;}//DLOG("数据量不足!");break;}//DLOG("缓冲区中数据可处理!");BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false) {conn->shutdown();ELOG("缓冲区中数据错误!");return ;}//DLOG("缓冲区中数据解析完毕,调用回调函数进行处理!");if (_cb_message) _cb_message(_conn, msg);}}private:const size_t maxDataSize = (1 << 16);BaseProtocol::ptr _protocol;BaseConnection::ptr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop *_baseloop;muduo::net::TcpClient _client;};class ClientFactory {public:template<typename ...Args>static BaseClient::ptr create(Args&& ...args) {return std::make_shared<MuduoClient>(std::forward<Args>(args)...);}};
}
不同消息封装实现
Request : Rpc,Topic,Service (继承于JsonReque)
Response:Rpc,Topic,Service (继承于JsonResponse)
class RpcRequest : public JsonRequest {public:using ptr = std::shared_ptr<RpcRequest>;virtual bool check() override {//rpc请求中,包含请求方法名称-字符串,参数字段-对象if (_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false) {ELOG("RPC请求中没有方法名称或方法名称类型错误!");return false;}if (_body[KEY_PARAMS].isNull() == true ||_body[KEY_PARAMS].isObject() == false) {ELOG("RPC请求中没有参数信息或参数信息类型错误!");return false;}return true;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string &method_name) {_body[KEY_METHOD] = method_name;}Json::Value params() {return _body[KEY_PARAMS];}void setParams(const Json::Value ¶ms) {_body[KEY_PARAMS] = params;}};class TopicRequest : public JsonRequest {public:using ptr = std::shared_ptr<TopicRequest>;virtual bool check() override {//rpc请求中,包含请求方法名称-字符串,参数字段-对象if (_body[KEY_TOPIC_KEY].isNull() == true ||_body[KEY_TOPIC_KEY].isString() == false) {ELOG("主题请求中没有主题名称或主题名称类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false) {ELOG("主题请求中没有操作类型或操作类型的类型错误!");return false;}if (_body[KEY_OPTYPE].asInt() == (int)TopicOptype::TOPIC_PUBLISH &&(_body[KEY_TOPIC_MSG].isNull() == true ||_body[KEY_TOPIC_MSG].isString() == false)) {ELOG("主题消息发布请求中没有消息内容字段或消息内容类型错误!");return false;}return true;}std::string topicKey() {return _body[KEY_TOPIC_KEY].asString();}void setTopicKey(const std::string &key) {_body[KEY_TOPIC_KEY] = key;}TopicOptype optype() {return (TopicOptype)_body[KEY_OPTYPE].asInt();}void setOptype(TopicOptype optype) {_body[KEY_OPTYPE] = (int)optype;}std::string topicMsg() {return _body[KEY_TOPIC_MSG].asString();}void setTopicMsg(const std::string &msg) {_body[KEY_TOPIC_MSG] = msg;}};class ServiceRequest : public JsonRequest {public:using ptr = std::shared_ptr<ServiceRequest>;virtual bool check() override {//rpc请求中,包含请求方法名称-字符串,参数字段-对象if (_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false) {ELOG("服务请求中没有方法名称或方法名称类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false) {ELOG("服务请求中没有操作类型或操作类型的类型错误!");return false;}if (_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&(_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isObject() == false ||_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||_body[KEY_HOST][KEY_HOST_IP].isString() == false ||_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false)) {ELOG("服务请求中主机地址信息错误!");return false;}return true;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string &name) {_body[KEY_METHOD] = name;}ServiceOptype optype() {return (ServiceOptype)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOptype optype) {_body[KEY_OPTYPE] = (int)optype;}Address host() {Address addr;addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();return addr;}void setHost(const Address &host) {Json::Value val;val[KEY_HOST_IP] = host.first;val[KEY_HOST_PORT] = host.second;_body[KEY_HOST] = val;}};class RpcResponse : public JsonResponse {public:using ptr = std::shared_ptr<RpcResponse>;virtual bool check() override {if (_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false) {ELOG("响应中没有响应状态码,或状态码类型错误!");return false;}if (_body[KEY_RESULT].isNull() == true) {ELOG("响应中没有Rpc调用结果,或结果类型错误!");return false;}return true;}Json::Value result() {return _body[KEY_RESULT];}void setResult(const Json::Value &result) {_body[KEY_RESULT] = result;}};class TopicResponse : public JsonResponse {public:using ptr = std::shared_ptr<TopicResponse>;};class ServiceResponse : public JsonResponse {public:using ptr = std::shared_ptr<ServiceResponse>;virtual bool check() override {if (_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false) {ELOG("响应中没有响应状态码,或状态码类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false) {ELOG("响应中没有操作类型,或操作类型的类型错误!");return false;}if (_body[KEY_OPTYPE].asInt() == (int)(ServiceOptype::SERVICE_DISCOVERY) &&(_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false ||_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isArray() == false)) {ELOG("服务发现响应中响应信息字段错误!");return false;}return true;}ServiceOptype optype() {return (ServiceOptype)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOptype optype) {_body[KEY_OPTYPE] = (int)optype;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string &method) {_body[KEY_METHOD] = method;}void setHost(std::vector<Address> addrs) {for (auto &addr : addrs) {Json::Value val;val[KEY_HOST_IP] = addr.first;val[KEY_HOST_PORT] = addr.second;_body[KEY_HOST].append(val);}}std::vector<Address> hosts() {std::vector<Address> addrs;int sz = _body[KEY_HOST].size();for (int i = 0; i < sz; i++) {Address addr;addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();addrs.push_back(addr);}return addrs;}};
我们可以采用工厂模式来生产消息
class MessageFactory {public:static BaseMessage::ptr create(MType mtype) {switch(mtype) {case MType::REQ_RPC : return std::make_shared<RpcRequest>();case MType::RSP_RPC : return std::make_shared<RpcResponse>();case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();}return BaseMessage::ptr();}template<typename T, typename ...Args>static std::shared_ptr<T> create(Args&& ...args) {return std::make_shared<T>(std::forward(args)...);}};
Dispatcher实现
注册消息类型-回调函数映射关系 提供消息处理接口
namespace myrpc {class Callback {public:using ptr = std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn, BaseMessage::ptr &msg) = 0;};template<typename T>class CallbackT : public Callback{public:using ptr = std::shared_ptr<CallbackT<T>>;using MessageCallback = std::function<void(const BaseConnection::ptr &conn, std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler) { }void onMessage(const BaseConnection::ptr &conn, BaseMessage::ptr &msg) override {auto type_msg = std::dynamic_pointer_cast<T>(msg);_handler(conn, type_msg);}private:MessageCallback _handler;};class Dispatcher {public:using ptr = std::shared_ptr<Dispatcher>;template<typename T>void registerHandler(MType mtype, const typename CallbackT<T>::MessageCallback &handler) {std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype, cb));}void onMessage(const BaseConnection::ptr &conn, BaseMessage::ptr &msg) {//找到消息类型对应的业务处理函数,进行调用即可std::unique_lock<std::mutex> lock(_mutex);auto it = _handlers.find(msg->mtype());if (it != _handlers.end()) {return it->second->onMessage(conn, msg);}//没有找到指定类型的处理回调--因为客户端和服务端都是我们自己设计的,因此不可能出现这种情况ELOG("收到未知类型的消息: %d!", msg->mtype());conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType, Callback::ptr> _handlers;};
服务端RpcRouter实现
提供Rpc请求处理回调函数
内部的服务管理:方法名称 参数信息 对外提供参数校验接口
namespace myrpc {namespace server {enum class VType {BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};class ServiceDescribe {public: using ptr = std::shared_ptr<ServiceDescribe>;using ServiceCallback = std::function<void(const Json::Value&, Json::Value &)>;using ParamsDescribe = std::pair<std::string, VType>;ServiceDescribe(std::string &&mname, std::vector<ParamsDescribe> &&desc, VType vtype, ServiceCallback &&handler) : _method_name(std::move(mname)),_callback(std::move(handler)), _params_desc(std::move(desc)), _return_type(vtype){}const std::string &method() { return _method_name; }//针对收到的请求中的参数进行校验bool paramCheck(const Json::Value ¶ms){//对params进行参数校验---判断所描述的参数字段是否存在,类型是否一致for (auto &desc : _params_desc) {if (params.isMember(desc.first) == false) {ELOG("参数字段完整性校验失败!%s 字段缺失!", desc.first.c_str());return false;}if (check(desc.second, params[desc.first]) == false) {ELOG("%s 参数类型校验失败!", desc.first.c_str());return false;}}return true;}bool call(const Json::Value ¶ms, Json::Value &result) {_callback(params, result);if (rtypeCheck(result) == false) {ELOG("回调处理函数中的响应信息校验失败!");return false;}return true;}private:bool rtypeCheck(const Json::Value &val) {return check(_return_type, val);}bool check(VType vtype, const Json::Value &val) {switch(vtype) {case VType::BOOL : return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name; // 方法名称ServiceCallback _callback; // 实际的业务回调函数std::vector<ParamsDescribe> _params_desc; // 参数字段格式描述VType _return_type; //结果作为返回值类型的描述};class SDescribeFactory {public:void setMethodName(const std::string &name) {_method_name = name;}void setReturnType(VType vtype) {_return_type = vtype;}void setParamsDesc(const std::string &pname, VType vtype) {_params_desc.push_back(ServiceDescribe::ParamsDescribe(pname, vtype));}void setCallback(const ServiceDescribe::ServiceCallback &cb) {_callback = cb;}ServiceDescribe::ptr build() {return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));}private:std::string _method_name;ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数std::vector<ServiceDescribe::ParamsDescribe> _params_desc; // 参数字段格式描述VType _return_type; //结果作为返回值类型的描述};class ServiceManager {public:using ptr = std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr &desc) {std::unique_lock<std::mutex> lock(_mutex);_services.insert(std::make_pair(desc->method(), desc));}ServiceDescribe::ptr select(const std::string &method_name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(method_name);if (it == _services.end()) {return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name) {std::unique_lock<std::mutex> lock(_mutex);_services.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string, ServiceDescribe::ptr> _services;};class RpcRouter {public:using ptr = std::shared_ptr<RpcRouter>;RpcRouter(): _service_manager(std::make_shared<ServiceManager>()){}//这是注册到Dispatcher模块针对rpc请求进行回调处理的业务函数void onRpcRequest(const BaseConnection::ptr &conn, RpcRequest::ptr &request){//1. 查询客户端请求的方法描述--判断当前服务端能否提供对应的服务auto service = _service_manager->select(request->method());if (service.get() == nullptr) {ELOG("%s 服务未找到!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);}//2. 进行参数校验,确定能否提供服务if (service->paramCheck(request->params()) == false) {ELOG("%s 服务参数校验失败!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);}//3. 调用业务回调接口进行业务处理Json::Value result;bool ret = service->call(request->params(), result);if (ret == false) {ELOG("%s 服务参数校验失败!", request->method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);}//4. 处理完毕得到结果,组织响应,向客户端发送return response(conn, request, result, RCode::RCODE_OK);}void registerMethod(const ServiceDescribe::ptr &service) {return _service_manager->insert(service);}private:void response(const BaseConnection::ptr &conn, const RpcRequest::ptr &req, const Json::Value &res, RCode rcode) {auto msg = MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(myrpc::MType::RSP_RPC);msg->setRCode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};}
}
服务端 Publish Subscribe实现
对外提供主题操作处理回调函数
对外提供消息发布处理回调函数
内部进行主题及订阅者的管理
namespace myrpc {namespace server {class TopicManager {public:using ptr = std::shared_ptr<TopicManager>;TopicManager() {}void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype = msg->optype();bool ret = true;switch(topic_optype){//主题的创建case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;//主题的删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;//主题的订阅case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;//主题的取消订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;//主题消息的发布case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE);}if (!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}//一个订阅者在连接断开时的处理---删除其关联的数据void onShutdown(const BaseConnection::ptr &conn) {//消息发布者断开连接,不需要任何操作; 消息订阅者断开连接需要删除管理数据//1. 判断断开连接的是否是订阅者,不是的话则直接返回std::vector<Topic::ptr> topics;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto it = _subscribers.find(conn);if (it == _subscribers.end()) {return;//断开的连接,不是一个订阅者的连接}subscriber = it->second;//2. 获取到订阅者退出,受影响的主题对象for (auto &topic_name : subscriber->topics) {auto topic_it = _topics.find(topic_name);if (topic_it == _topics.end()) continue;topics.push_back(topic_it->second);}//4. 从订阅者映射信息中,删除订阅者_subscribers.erase(it);}//3. 从受影响的主题对象中,移除订阅者for (auto &topic : topics) {topic->removeSubscriber(subscriber);}}private:void errorResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg, RCode rcode) {auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(rcode);conn->send(msg_rsp);}void topicResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(RCode::RCODE_OK);conn->send(msg_rsp);}void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//构造一个主题对象,添加映射关系的管理std::string topic_name = msg->topicKey();auto topic = std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name, topic));}void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {// 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删除掉// 2. 删除主题的数据 -- 主题名称与主题对象的映射关系std::string topic_name = msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//在删除主题之前,先找出会受到影响的订阅者auto it = _topics.find(topic_name);if (it == _topics.end()) {return;}subscribers = it->second->subscribers;_topics.erase(it);//删除当前的主题映射关系,}for (auto &subscriber : subscribers) {subscriber->removeTopic(topic_name);}}bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1. 先找出主题对象,以及订阅者对象// 如果没有找到主题--就要报错; 但是如果没有找到订阅者对象,那就要构造一个订阅者Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()) {return false;}topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()) {subscriber = sub_it->second;}else {subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}//2. 在主题对象中,新增一个订阅者对象关联的连接; 在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1. 先找出主题对象,和订阅者对象Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end()) {topic = topic_it->second;}auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()) {subscriber = sub_it->second;}}//2. 从主题对象中删除当前的订阅者连接; 从订阅者信息中删除所订阅的主题名称if (subscriber) subscriber->removeTopic(msg->topicKey());if (topic && subscriber) topic->removeSubscriber(subscriber);}bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()) {return false;}topic = topic_it->second;}topic->pushMessage(msg);return true;}private:struct Subscriber {using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics;//订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr &c): conn(c) { }//订阅主题的时候调用void appendTopic(const std::string &topic_name) {std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}//主题被删除 或者 取消订阅的时候,调用void removeTopic(const std::string &topic_name) {std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};struct Topic {using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; //当前主题的订阅者Topic(const std::string &name) : topic_name(name){}//新增订阅的时候调用void appendSubscriber(const Subscriber::ptr &subscriber) {std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}//取消订阅 或者 订阅者连接断开 的时候调用void removeSubscriber(const Subscriber::ptr &subscriber) {std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}//收到消息发布请求的时候调用void pushMessage(const BaseMessage::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers) {subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}
服务端 Registry Discovery实现
对外提供服务操作(注册,发现)消息处理回调函数
内部进行服务发现者的管理
内部进行服务提供者的管理
namespace myrpc {namespace server {class ProviderManager {public:using ptr = std::shared_ptr<ProviderManager>;struct Provider {using ptr = std::shared_ptr<Provider>;std::mutex _mutex;BaseConnection::ptr conn;Address host;std::vector<std::string> methods;Provider(const BaseConnection::ptr &c, const Address &h):conn(c), host(h){}void appendMethod(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//当一个新的服务提供者进行服务注册的时候调用void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method) {Provider::ptr provider;//查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {provider = it->second;}else {provider = std::make_shared<Provider>(c, h);_conns.insert(std::make_pair(c, provider));}//method方法的提供主机要多出一个,_providers新增数据auto &providers = _providers[method];providers.insert(provider);}//向服务对象中新增一个所能提供的服务名称provider->appendMethod(method);}//当一个服务提供者断开连接的时候,获取他的信息--用于进行服务下线通知Provider::ptr getProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {return it->second;}return Provider::ptr();}//当一个服务提供者断开连接的时候,删除它的关联信息void delProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()) {//当前断开连接的不是一个服务提供者return;}//如果是提供者,看看提供了什么服务,从服务者提供信息中删除当前服务提供者for (auto & method : it->second->methods) {auto &providers = _providers[method];providers.erase(it->second);}//删除连接与服务提供者的关联关系_conns.erase(it);}std::vector<Address> methodHosts(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end()) {return std::vector<Address>();}std::vector<Address> result;for (auto &provider : it->second) {result.push_back(provider->host);}return result;}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Provider::ptr>> _providers;std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;};class DiscovererManager {public:using ptr = std::shared_ptr<DiscovererManager>;struct Discoverer {using ptr = std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn; //发现者关联的客户端连接std::vector<std::string> methods; //发现过的服务名称Discoverer(const BaseConnection::ptr &c) : conn(c){}void appendMethod(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//当每次客户端进行服务发现的时候新增发现者,新增服务名称Discoverer::ptr addDiscoverer(const BaseConnection::ptr &c, const std::string &method) {Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {discoverer = it->second;}else {discoverer = std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c, discoverer));}auto &discoverers = _discoverers[method];discoverers.insert(discoverer);}discoverer->appendMethod(method);return discoverer;}//发现者客户端断开连接时,找到发现者信息,删除关联数据void delDiscoverer(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()) {//没有找到连接对应的发现者信息,代表客户端不是一个服务发现者return;}for (auto &method : it->second->methods) {auto discoverers = _discoverers[method];discoverers.erase(it->second);}_conns.erase(it);}//当有一个新的服务提供者上线,则进行上线通知void onlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_ONLINE);}//当有一个服务提供者断开连接,则进行下线通知void offlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method, const Address &host, ServiceOptype optype) {std::unique_lock<std::mutex> lock(_mutex);auto it = _discoverers.find(method);if (it == _discoverers.end()) {//这代表这个服务当前没有发现者return;}auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(optype);for (auto &discoverer : it->second) {discoverer->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers;std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;};class PDManager {public:using ptr = std::shared_ptr<PDManager>;PDManager():_providers(std::make_shared<ProviderManager>()),_discoverers(std::make_shared<DiscovererManager>()){}void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {//服务操作请求:服务注册/服务发现/ServiceOptype optype = msg->optype();if (optype == ServiceOptype::SERVICE_REGISTRY){//服务注册:// 1. 新增服务提供者; 2. 进行服务上线的通知ILOG("%s:%d 注册服务 %s", msg->host().first.c_str(), msg->host().second, msg->method().c_str());_providers->addProvider(conn, msg->host(), msg->method());_discoverers->onlineNotify(msg->method(), msg->host());return registryResponse(conn, msg);} else if (optype == ServiceOptype::SERVICE_DISCOVERY){//服务发现:// 1. 新增服务发现者ILOG("客户端要进行 %s 服务发现!", msg->method().c_str());_discoverers->addDiscoverer(conn, msg->method());return discoveryResponse(conn, msg);}else {ELOG("收到服务操作请求,但是操作类型错误!");return errorResponse(conn, msg);}}void onConnShutdown(const BaseConnection::ptr &conn) {auto provider = _providers->getProvider(conn);if (provider.get() != nullptr) {ILOG("%s:%d 服务下线", provider->host.first.c_str(), provider->host.second);for (auto &method : provider->methods) {_discoverers->offlineNotify(method, provider->host);}_providers->delProvider(conn);}_discoverers->delDiscoverer(conn);}private:void errorResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_INVALID_OPTYPE);msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);conn->send(msg_rsp);}void registryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);conn->send(msg_rsp);}void discoveryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);std::vector<Address> hosts = _providers->methodHosts(msg->method());if (hosts.empty()) {msg_rsp->setRCode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);} msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};}
}
服务端整合
namespace myrpc {namespace server {//注册中心服务端:只需要针对服务注册与发现请求进行处理即可class RegistryServer {public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<myrpc::Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = myrpc::ServerFactory::create(port);auto message_cb = std::bind(&myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start() {_server->start();}private:void onConnShutdown(const BaseConnection::ptr &conn) {_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class RpcServer {public:using ptr = std::shared_ptr<RpcServer>;//rpc——server端有两套地址信息:// 1. rpc服务提供端地址信息--必须是rpc服务器对外访问地址(云服务器---监听地址和访问地址不同)// 2. 注册中心服务端地址信息 -- 启用服务注册后,连接注册中心进行服务注册用的RpcServer(const Address &access_addr, bool enableRegistry = false, const Address ®istry_server_addr = Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<myrpc::server::RpcRouter>()),_dispatcher(std::make_shared<myrpc::Dispatcher>()) {if (enableRegistry) {_reg_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);}//当前成员server是一个rpcserver,用于提供rpc服务的auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<myrpc::RpcRequest>(myrpc::MType::REQ_RPC, rpc_cb);_server = myrpc::ServerFactory::create(access_addr.second);auto message_cb = std::bind(&myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service) {if (_enableRegistry) {_reg_client->registryMethod(service->method(), _access_addr);}_router->registerMethod(service);}void start() {_server->start();}private:bool _enableRegistry;Address _access_addr;client::RegistryClient::ptr _reg_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer {public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port):_topic_manager(std::make_shared<TopicManager>()),_dispatcher(std::make_shared<myrpc::Dispatcher>()){auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);_server = myrpc::ServerFactory::create(port);auto message_cb = std::bind(&myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start() {_server->start();}private:void onConnShutdown(const BaseConnection::ptr &conn) {_topic_manager->onShutdown(conn);}private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}
客户端Requestor实现
提供发送请求的接口
内部进行请求 响应的管理
namespace myrpc {namespace client {class Requestor {public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("请求类型未知!!");}delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};}
}
客户端 RpcCaller实现
namespace myrpc {namespace client {class RpcCaller {public:using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value&)>;RpcCaller(const Requestor::ptr &requestor): _requestor(requestor){}//requestor中的处理是针对BaseMessage进行处理的//用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value ¶ms, Json::Value &result) {DLOG("开始同步rpc调用...");//1. 组织请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2. 发送请求bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);if (ret == false) {ELOG("同步Rpc请求失败!");return false;}DLOG("收到响应,进行解析,获取结果!");//3. 等待响应auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return false;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));return false;}result = rpc_rsp_msg->result();DLOG("结果设置完毕!");return true;}bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value ¶ms, JsonAsyncResponse &result) {//向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);auto json_promise = std::make_shared<std::promise<Json::Value>>() ;result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback, this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if (ret == false) {ELOG("异步Rpc请求失败!");return false;}return true;}bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value ¶ms, const JsonResponseCallback &cb) {auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false) {ELOG("回调Rpc请求失败!");return false;}return true;}private:void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg) {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg) {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};}
}
客户端 Publish Subscribe实现
提供消息发布接口
提供主题操作接口
内部进行主题及订阅者的管理
namespace myrpc {namespace client {class TopicManager {public:using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr &requestor) : _requestor(requestor) {}bool create(const BaseConnection::ptr &conn, const std::string &key) {return commonRequest(conn, key, TopicOptype::TOPIC_CREATE);}bool remove(const BaseConnection::ptr &conn, const std::string &key) {return commonRequest(conn, key, TopicOptype::TOPIC_REMOVE);}bool subscribe(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb) {addSubscribe(key, cb);bool ret = commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);if (ret == false) {delSubscribe(key);return false;}return true;}bool cancel(const BaseConnection::ptr &conn, const std::string &key) {delSubscribe(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);}bool publish(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg) {return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1. 从消息中取出操作类型进行判断,是否是消息请求auto type = msg->optype();if (type != TopicOptype::TOPIC_PUBLISH) {ELOG("收到了错误类型的主题操作!");return ;}//2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();//3. 通过主题名称,查找对应主题的回调处理函数,有在处理,无在报错auto callback = getSubscribe(topic_key);if (!callback) {ELOG("收到了 %s 主题消息,但是该消息无主题处理回调!", topic_key.c_str());return ;}return callback(topic_key, topic_msg);}private:void addSubscribe(const std::string &key, const SubCallback &cb) {std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscribe(const std::string &key) {std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubscribe(const std::string &key) {std::unique_lock<std::mutex> lock(_mutex);auto it = _topic_callbacks.find(key);if (it == _topic_callbacks.end()) {return SubCallback();}return it->second;}bool commonRequest(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = "") {//1. 构造请求对象,并填充数据auto msg_req = MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setOptype(type);msg_req->setTopicKey(key);if (type == TopicOptype::TOPIC_PUBLISH) {msg_req->setTopicMsg(msg);}//2. 向服务端发送请求,等待响应BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("主题操作请求失败!");return false;}//3. 判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if (!topic_rsp_msg) {ELOG("主题操作响应,向下类型转换失败!");return false;}if (topic_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()));return false;}return true;}private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks;Requestor::ptr _requestor;};}
}
客户端Registry Discovery 实现
提供服务发现接口
提供服务注册接口
提供服务上线下线操作,通知处理回调函数
内部进行发现的服务与主机信息管理
namespace myrpc {namespace client {class Provider {public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr &requestor) : _requestor(requestor){}bool registryMethod(const BaseConnection::ptr &conn, const std::string &method, const Address &host) {auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("%s 服务注册失败!", method.c_str());return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务注册失败,原因:%s", errReason(service_rsp->rcode()).c_str());return false;}return true;}private:Requestor::ptr _requestor;};class MethodHost {public:using ptr = std::shared_ptr<MethodHost>;MethodHost(): _idx(0){}MethodHost(const std::vector<Address> &hosts):_hosts(hosts.begin(), hosts.end()), _idx(0){}void appendHost(const Address &host) {//中途收到了服务上线请求后被调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address &host) {//中途收到了服务下线请求后被调用std::unique_lock<std::mutex> lock(_mutex);for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == host) {_hosts.erase(it);break;}}}Address chooseHost() {std::unique_lock<std::mutex> lock(_mutex);size_t pos = _idx++ % _hosts.size();return _hosts[pos];}bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;std::vector<Address> _hosts;};class Discoverer {public:using OfflineCallback = std::function<void(const Address&)>;using ptr = std::shared_ptr<Discoverer>;Discoverer(const Requestor::ptr &requestor, const OfflineCallback &cb) : _requestor(requestor), _offline_callback(cb){}bool serviceDiscovery(const BaseConnection::ptr &conn, const std::string &method, Address &host) {{//当前所保管的提供者信息存在,则直接返回地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if (it != _method_hosts.end()) {if (it->second->empty() == false) {host = it->second->chooseHost();return true;}}}//当前服务的提供者为空auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服务发现失败!");return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (!service_rsp) {ELOG("服务发现失败!响应类型转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}//能走到这里,代表当前是没有对应的服务提供主机的std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());if (method_host->empty()) {ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());return false;}host = method_host->chooseHost();_method_hosts[method] = method_host;return true;}//这个接口是提供给Dispatcher模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {//1. 判断是上线还是下线请求,如果都不是那就不用处理了auto optype = msg->optype();std::string method = msg->method();std::unique_lock<std::mutex> lock(_mutex);if (optype == ServiceOptype::SERVICE_ONLINE){//2. 上线请求:找到MethodHost,向其中新增一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {auto method_host = std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;}else {it->second->appendHost(msg->host());}} else if (optype == ServiceOptype::SERVICE_OFFLINE){//3. 下线请求:找到MethodHost,从其中删除一个主机地址auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {return;}it->second->removeHost(msg->host());_offline_callback(msg->host());}}private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;};}
}
客户端整合
namespace myrpc {namespace client {class RegistryClient {public:using ptr = std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string &method, const Address &host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class DiscoveryClient {public:using ptr = std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb): _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()){auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string &method, Address &host) {return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class RpcClient {public:using ptr = std::shared_ptr<RpcClient>;//enableDiscovery--是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<myrpc::client::RpcCaller>(_requestor)) {//针对rpc请求后的响应进行的回调处理auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_clientif (_enableDiscovery) {auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);}else {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result) {//获取服务提供者:1. 服务发现; 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}//3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address &host) {auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}BaseClient::ptr getClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) {return BaseClient::ptr();}return it->second;}BaseClient::ptr getClient(const std::string method) {BaseClient::ptr client;if (_enableDiscovery) {//1. 通过服务发现,获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false) {ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建client = getClient(host);if (client.get() == nullptr) {//没有找打已实例化的客户端,则创建client = newClient(host);}}else {client = _rpc_client;}return client;}void putClient(const Address &host, BaseClient::ptr &client) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}void delClient(const Address &host) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address &host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现std::mutex _mutex;//<"127.0.0.1:8080", client1>std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池};class TopicClient {public:TopicClient(const std::string &ip, int port):_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool create(const std::string &key) {return _topic_manager->create(_rpc_client->connection(), key);}bool remove(const std::string &key) {return _topic_manager->remove(_rpc_client->connection(), key);}bool subscribe(const std::string &key, const TopicManager::SubCallback &cb) {return _topic_manager->subscribe(_rpc_client->connection(), key, cb);}bool cancel(const std::string &key) {return _topic_manager->cancel(_rpc_client->connection(), key);}bool publish(const std::string &key, const std::string &msg) {return _topic_manager->publish(_rpc_client->connection(), key, msg);}void shutdown() {_rpc_client->shutdown();}private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现};}
}
八、项目总结
项目功能:实现了基础的rpc远程调用功能,以及基于服务注册与发现的rpc远程调用功能,简单的发布订阅功能
所用技术:json序列化反序列化,网络通信Muduo,rpc,发布订阅
框架设计:
抽象层:针对底层的网络通信和协议部分进行了抽象,降低框架的依赖,提高框架的灵活度,以及可维护性
具象层:针对抽象的功能进行具体的实现(Muduo库搭建高性能客户端服务器,TLV应用层协议格式,消息类型)
业务层:基础rpc,服务发现与注册以及上线下线通知,发布订阅
具体模块划分:
应用层协议的抽象与实现
网络通信模块的抽象与实现
消息的抽象与实现
rpc客户端与服务端业务
服务发现与注册业务
发布订阅业务
项目代码
Json-Rpc框架