【项目】分布式Json-RPC框架 - 抽象层与具象层实现
目录
常用零碎功能接口实现
日志宏的实现
Json序列化与反序列化
UUID
项目消息类型字段定义
请求、响应正文字段宏定义
消息类型定义
响应码类型定义
RPC请求类型定义
主题操作类型定义
服务操作类型定义
抽象层实现
BaseMessage的抽象
BaseBuffer的抽象
BaseProtocol的抽象
BaseConnection的抽象
BaseServer的抽象
BaseClient的抽象
具象层的实现
消息类型的具象
Request
Response
消息类型测试
BaseBuffer的具象
BaseProtocol的具象
BaseConnection的具象
BaseServer的具象
BaseClient的具象
客户端与服务端通信测试
Dispatcher模块实现
常用零碎功能接口实现
日志宏的实现
意义:快速定位程序运行逻辑出错的位置。
项目在运行中可能会出现各种问题,出问题不可怕,关键的是要能找到问题,并解决问题。
解决问题的方式:
- gdb调试:逐步调试过于繁琐,缓慢。主要用于程序崩溃后的定位。
- 系统运行日志分析:在任何程序运行有可能逻辑错误的位置进行输出提示,快速定位逻辑问题的位置。
#define LOG(msg) printf("%s\n", msg)int main()
{printf("%s\n", "hello world!");LOG("hello world!");return 0;
}
无论是直接打印,还是使用宏,效果是相同的。但是使用宏会有一个问题,宏是没有类型校验的,这个宏只能输出固定的模式,也就是说我们传入的一定要是字符串,当我们传入整型时,就会崩溃。此时可以实意format让宏更加灵活。
#define LOG(format, msg) printf(format "\n", msg)int main()
{LOG("%d", 77);return 0;
}
为了让打印的消息更加详细,可以给宏加上预编译选项。
#define LOG(format, msg) printf("[%s:%d] " format "\n", __FILE__, __LINE__, msg)int main()
{LOG("%d", 77);return 0;
}
__FILE__在编译时会被替换为文件名,__LINE__在编译时会被替换为行号。
注意:一个宏必须定义在一行,若要换行,必须实意\对末尾的换行进行转义。
我们在日志中打印出时间。
#define LOG(format, msg)\
{\time_t t = time(NULL);\struct tm *lt = localtime(&t);\char time_tmp[32] = {0};\strftime(time_tmp, 31, "%m-%d %T", lt);\printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, msg);\
}
现在这个宏仍然是有问题的,因为传入的参数是固定的,一定要是2个。
#define LOG(format, ...)\
{\time_t t = time(NULL);\struct tm *lt = localtime(&t);\char time_tmp[32] = {0};\strftime(time_tmp, 31, "%m-%d %T", lt);\printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, __VA_ARGS__);\
}
现在这样变成了宏的参数最少要有2个,可以使用##__VA_ARGS__,表示允许可变参数为空,当可变参数为空时,自动去除前面的逗号。
#define LOG(format, ...)\
{\time_t t = time(NULL);\struct tm *lt = localtime(&t);\char time_tmp[32] = {0};\strftime(time_tmp, 31, "%m-%d %T", lt);\printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);\
}
这样最少只需要传入1个参数即可。
我们要知道,日志是分等级的。
#define LDBG 0 // 调试日志
#define LINF 1 // 提示日志
#define LERR 2 // 错误日志#define LDEFAULT LDBG// 控制日志输出等级
#define LOG(level, format, ...)\
{\if(level >= LDEFAULT)\{\time_t t = time(NULL);\struct tm *lt = localtime(&t);\char time_tmp[32] = {0};\strftime(time_tmp, 31, "%m-%d %T", lt);\printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}// ##__VA_ARGS__表示的就是可变参数中传入进来的参数
#define DLOG(format, ...) LOG(LDBG, format, ##__VA_ARGS__);
#define ILOG(format, ...) LOG(LINF, format, ##__VA_ARGS__);
#define ELOG(format, ...) LOG(LERR, format, ##__VA_ARGS__);
Json序列化与反序列化
直接使用之前的序列化与反序列化代码,并修改一下输出,使用日志输出即可。
class JSON
{
public:// 实现数据的序列化static bool serialize(const Json::Value& val, std::string& body){std::stringstream ss;// 先实例化出一个工厂类对象Json::StreamWriterBuilder swb;// 通过工厂类对象来生产派生类对象std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());// 这里直接将序列化后的结果打印出来int ret = sw->write(val, &ss);if (ret != 0){ELOG("json serialize failed!");return false;}body = ss.str();return true;}// 实现数据的反序列化static bool unserialize(const std::string& body, Json::Value& val){// 实例化工厂对象Json::CharReaderBuilder crb;// 通过工厂类对象来生产派生类对象std::string errs;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);if (ret == false){// 宏是以C语言接口输出的,所以要适配C语言的接口ELOG("json unserialize failed: %s", errs.c_str());return false;}return true;}
};
UUID
UUID也叫通用唯一识别码,通常由32位16进制数字字符组成。UUID的标准型式包含32个16进制数字字符,以连字号分为五段,形式为8-4-4-4-12的32个字符,
如:550e8400-e29b-41d4-a716-446655440000。
我们这里采用16个16进制数来表示,每个16进制数都用2个位来表示,不够则补0,两个位的16进制数最大是255。UUID=8个16进制数组成的随机数+8个16进制数组成的序号。
C++中,这个函数可以用于生成随机数。他是通过硬件生成随机数的,可以保证不重复,但是效率较低。
也可以使用这个,这个函数生成的随机数范围是在2^19937之内。因为生成的随机数范围越大,则重复的概率越小。优点是快,但是与C语言一样,需要种一棵随机数种子。这个种子如果使用系统时间是不好的。因为可能在一秒钟之内,需要生成多个随机数,此时生成的随机数都是相同的。所以,我们选择的是每次都使用random_device生成一个随机数,然后将生成的随机数作为种子,通过mt19927_64生成8个随机数。这些随机数是十进制的,我们将其转换为十六进制,然后拼接在一起。因为mt199337_64生成的随机数范围太大了,所以我们需要限制它生成的随机数范围。两个位的十六进制数最大是255,所以我们的范围控制在0-255。
这个函数可以用于限定生成随机数的范围。
class UUID
{
public:static std::string uuid(){std::stringstream ss;// 1. 构造一个机器随机数对象std::random_device rd;// 2. 以及其随机数位种子构造伪随机数对象std::mt19937 generator(rd());// 3. 构造限定数据范围的对象std::uniform_int_distribution<int> distribution(0, 255);// 4. 生成8个随机数,按照特定格式组织称为十六进制数字字符的字符串for (int i = 0; i < 8; i++){if (i == 4 || i == 6) ss << "-";// 将随机数转为十六进制,并且不足两位补0ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);}ss << "-";// 5. 定义一个8字节序号,逐字节组织称为十六进制数字字符的字符串// 注意:seq一定要设置为静态的,这样才能让序号保持持续增长// atomic是原子操作,fetch_add是原子加法,返回旧值static std::atomic<size_t> seq(1); // size_t就是8字节size_t cur = seq.fetch_add(1); // cur就是一个8字节的数字for (int i = 7; i >= 0; i--){if (i == 5) ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> (i * 8)) & 0xFF);}return ss.str();}
};
对于cur,现在是十进制的,我们需要将其转换为十六进制,我们是逐字节转换,一个字节是8个比特位,所以一次转一个字节是((cur>>(i*8))& 0xFF,按位与上0xFF就是取出低8位,为了保证原来的顺序,所以应该从高位取,所以从7往下减。
int main()
{cxfrpc::UUID uid;for(int i = 0;i < 10;i ++){std::cout << uid.uuid() << std::endl;}return 0;
}
项目消息类型字段定义
我们的项目所采用的协议如下,这一节主要是规范请求和响应的一些字段信息。
为什么要定义消息类型呢?
假设我们现在在进行RPC请求,请求正文部分肯定需要有方法名称,如果我们没有进行消息类型定义,我们可能会写这样的代码:
Json::Value val;
val["method"] = "xxx";
未来如果method字段需要修改,就需要将全部代码都进行修改,所以需要定义消息类型。
#define METHOD "method"Json::Value val;
val[METHOD] = "xxx";
请求、响应正文字段宏定义
- RPC请求:方法名称、方法参数
- 发布订阅请求:主题名称、操作类型、主题消息
- 服务操作请求:方法名称、操作类型、主机信息(IP地址、端口号)
- 响应:响应状态码、结果
namespace cxfrpc
{
#define KEY_METHOD "method" // RPC请求方法名称
#define KEY_PARAMS "parameters" // RPC请求参数
#define KEY_TOPIC_KEY "topic_key" // 主题名称
#define KEY_TOPIC_MSG "topic_msg" // 主题消息
#define KEY_OPTYPE "optype" // 操作类型,这里可以与服务操作的操作类型共享
#define KEY_HOST "host" // 主机信息
#define KEY_HOST_IP "ip" // 主机的IP地址
#define KEY_HOST_PORT "port" // 主机的端口号
#define KEY_RCODE "rcode" // 响应状态码
#define KEY_RESULT "result" // 结果
}
消息类型定义
这里就是请求的报头字段中的消息类型的定义。消息类型有6种:
- RPC请求和响应
- 主题操作请求和响应
- 服务操作请求和响应
// 请求/响应报头部分的消息类型定义
enum class MType
{REQ_RPC = 0, // RPC请求RSP_RPC, // RPC响应REQ_TOPIC, // 主题请求RSP_TOPIC, // 主题响应REQ_SERVICE, // 服务请求RSP_SERVICE // 服务响应
};
响应码类型定义
响应码一共有以下类型:
- 成功处理
- 消息解析失败
- 无效消息
- 连接已断开
- 无效的RPC参数
- 没有找到对应的服务
- 无效的操作类型
- 没有找到对应的主题
// 响应码类型定义
enum class RCode
{RCODE_OK = 0, // 成功处理RCODE_PARSE_FAILED, // 消息解析失败RCODE_INVALID_MSG, // 无效消息RCODE_DISCONNECTED, // 连接已断开RCODE_INVALID_PARAMS, // 无效的RPC参数RCODE_NOT_FOUND_SERVICE, // 没有找到对应的服务RCODE_INVALID_OPTYPE, // 无效的操作类型RCODE_NOT_FOUND_TOPIC // 没有找到对应的主题
};// 将响应码转为描述
static std::string errReason(RCode code)
{static std::unordered_map<RCode, std::string> err_map = {{RCode::RCODE_OK, "成功处理!"},{RCode::RCODE_PARSE_FAILED, "消息解析失败!"},{RCode::RCODE_INVALID_MSG, "无效消息!"},{RCode::RCODE_DISCONNECTED, "连接已断开!"},{RCode::RCODE_INVALID_PARAMS, "无效的RPC参数!"},{RCode::RCODE_NOT_FOUND_SERVICE, "没有找到对应的服务!"},{RCode::RCODE_INVALID_OPTYPE, "无效的操作类型!"},{RCode::RCODE_NOT_FOUND_TOPIC, "没有找到对应的主题!"}};auto it = err_map.find(code);if (it == err_map.end()){return "未知错误!";}return it->second;
}
RPC请求类型定义
我们给用户提供了三种不同的RPC请求方式:
- 同步请求:等待收到响应后返回
- 异步请求:返回异步对象,在需要的时候通过异步对象获取响应结果(还未收到结果会阻塞)
- 回调请求:设置回调函数,通过回调函数对响应进行处理
// RPC请求类型定义
enum class RType
{REQ_SYNC = 0,REQ_ASYNC,REQ_CALLBACK
};
主题操作类型定义
- 主题创建
- 主题删除
- 主题订阅
- 主题取消订阅
- 主题消息发布
// 主题操作类型定义
enum class TopicOptype
{TOPIC_CREATE = 0,TOPIC_REMOVE,TOPOC_SUBSCRIBE,TOPIC_CANCEL,TOPIC_PUBLISH
};
服务操作类型定义
- 服务注册
- 服务发现
- 服务上线
- 服务下线
// 服务操作类型定义
enum class ServiceOptype
{SERVICE_REGISTRY = 0,SERVICE_DISCOVERY,SERVICE_ONLINE,SERVICE_OFFLINE
};
抽象层实现
在抽象层,我们主要是对网络通信和协议定制进行抽象。抽象的意思就是定义出一些基类,在基类中不实现出具体的功能,基类的接口只是给上层使用的,真正实现具体功能是在派生类中实现,上层通过基类就可以访问到派生类的函数了。这样当底层应用的技术发生变化时,不需要修改抽象层及上层,只需要修改底层的具象层即可。
BaseMessage的抽象
BaseMessage是对于消息类型的抽象,未来可以通过派生类实现出不同的消息类型。
// BaseMessage的抽象
class BaseMessage
{
public:using ptr = std::shared_ptr<BaseMessage>;virtual ~BaseMessage() {}// 设置消息IDvirtual void setId(const std::string& id) { _rid = id; }// 获取消息IDvirtual std::string rid() { return _rid; }// 设置消息类型virtual void setMType(MType mtype) { _mtype = mtype; }// 获取消息类型virtual MType mtype() { return _mtype; }// 序列化virtual std::string serialize() = 0;// 反序列化virtual bool unserialize(const std::string& msg) = 0;// 检查反序列化完成之后的消息是否是一条完整的消息virtual bool check() = 0;
private:MType _mtype; // 消息类型std::string _rid; // 消息ID
};
BaseBuffer的抽象
BaseBuffer是对于缓冲区的抽象。
// BaseBuffer的抽象
class BaseBuffer
{
public:using ptr = std::shared_ptr<BaseBuffer>;virtual size_t readableSize() = 0; // 返回当前缓冲区有多少数据virtual int32_t peekInt32() = 0; // 尝试从缓冲区取出4字节数据,但是不从缓冲区删除virtual void retrieveInt32() = 0; // 从缓冲区删除4字节的数据 virtual int32_t readInt32() = 0; // 从缓冲区取出4字节数据,并从缓冲区删除virtual std::string retrieveAsString(size_t len) = 0; // 从缓冲区中取出指定长度的字符串
};
BaseProtocol的抽象
BaseProtocol是对协议的抽象。
// BaseProtocol的抽象
class BaseProtocol
{
public:using ptr = std::shared_ptr<BaseProtocol>;virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0; // 判断缓冲区中是否有一条或以上的消息virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0; // 从缓冲区中获取一条消息virtual std::string serialize(const BaseMessage::ptr& msg) = 0; // 对消息进行序列化
};
BaseConnection的抽象
BaseConnection是对连接的抽象。
// BaseConnection的抽象
class BaseConnection
{
public:using ptr = std::shared_ptr<BaseConnection>;virtual void send(const BaseMessage::ptr& msg) = 0; // 发送消息virtual void shutdown() = 0; // 关闭连接virtual bool connected() = 0; // 判断连接是否正常
};
BaseServer的抽象
BaseServer是对服务器的抽象。
// 设置一些回调函数类型
using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;
using CloseCallback = std::function<void(const BaseConnection::ptr&)>;
using MessageCallback = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;// BaseServer的抽象
class BaseServer
{
public:using ptr = std::shared_ptr<BaseServer>;virtual void setConnectionCallback(const ConnectionCallback& cb) { _cb_connection = cb; }virtual void setCloseCallback(const CloseCallback& cb) { _cb_close = cb; }virtual void setMessageCallback(const MessageCallback& cb) { _cb_message = cb; }virtual void start() = 0; // 启动服务器
protected:ConnectionCallback _cb_connection; // 连接建立成功时的回调函数CloseCallback _cb_close; // 连接关闭时的回调函数MessageCallback _cb_message; // 接收到消息时的回调函数
};
BaseClient的抽象
BaseClient是对客户端的抽象。
// BaseClient的抽象
class BaseClient
{
public:using ptr = std::shared_ptr<BaseClient>;virtual void setConnectionCallback(const ConnectionCallback& cb) { _cb_connection = cb; }virtual void setCloseCallback(const CloseCallback& cb) { _cb_close = cb; }virtual void setMessageCallback(const MessageCallback& cb) { _cb_message = cb; }virtual void connect() = 0; // 连接服务器virtual void shutdown() = 0; // 关闭连接virtual void send(const BaseMessage::ptr&) = 0; // 发送数据virtual BaseConnection::ptr connection() = 0; // 获取连接virtual bool connected() = 0; // 判断连接是否正常
protected:ConnectionCallback _cb_connection; // 连接建立成功时的回调函数CloseCallback _cb_close; // 连接关闭时的回调函数MessageCallback _cb_message; // 接收到消息时的回调函数
};
具象层的实现
在具象层,主要是基于抽象层的抽象,去实现出具体的功能。因为抽象层是对网络通信和协议定制的抽象,所以在具象层,实现的也是网络通信和协议定制的功能。
消息类型的具象
在抽象层,对于消息类型是BaseMessage,但是消息类型是有具体的类型的:
- RPC请求/响应
- 主题操作请求/响应
- 服务操作请求/响应
在我们的项目中,消息都是基于Json的,所以它们的序列化和反序列化是完全一样的,所以我们可以先定义一个BaseMessage的派生类JsonMessage,在里面先将序列化与反序列化接口实现。
class JsonMessage : public BaseMessage
{
public:using ptr = std::shared_ptr<JsonMessage>;virtual std::string serialize() override{std::string body;bool ret = JSON::serialize(_body, body);if (ret == false){return std::string();}return body;}virtual bool unserialize(const std::string& msg) override{return JSON::unserialize(msg, _body);}virtual bool check() = 0;
protected:Json::Value _body; // 正文部分
};
这里的序列化实际上就是使用我们之前定义好的序列化接口。check不应该在这里实现,因为对于不同类型的消息,检测手段是不同的。
Request
我们先对请求类型进行具象
class JsonRequest : public JsonMessage
{
public:using ptr = std::shared_ptr<JsonRequest>;
};// RPC请求的具象
class RpcRequest : public JsonRequest
{
public:using ptr = std::shared_ptr<RpcRequest>;virtual bool check() override{// RPC请求中,包含请求方法名称,类型为字符串,和参数字段if (_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false){// 如果RPC请求中,方法名称为空,或者类型不为字符串,说明错误ELOG("RPC请求中没有方法名称或方法名称类型错误!");return false;}if (_body[KEY_PARAMS].isNull() == true){// 如果RPC请求中,参数信息为空,说明错误ELOG("RPC请求中没有参数信息!");return false;}return true;}std::string method() { return _body[KEY_METHOD].asString(); }void setMethod(const std::string& method_name) { _body[KEY_METHOD] = method_name; }Json::Value params() { return _body[KEY_PARAMS]; }void setParams(const Json::Value& params) { _body[KEY_PARAMS] = params; }
};
Json中有一个isMember,用于判断一个Json::Value中是否有指定名称的字段,也可以使用isNull。
除了检测RPC请求的字段信息之外,还需要有一些其他的功能。作为客户端,需要能够组织请求,作为服务端,需要能从请求中获取内容。
// 主题操作请求的具象
class TopicRequest : public JsonRequest
{
public:using ptr = std::shared_ptr<TopicRequest>;virtual bool check() override{// 在主题操作请求中,会有主题名称,类型是字符串,操作类型,类型是整数// 对于消息发布请求,还有消息内容if (_body[KEY_TOPIC_KEY].isNull() == true ||_body[KEY_TOPIC_KEY].isString() == false){ELOG("主题请求中没有主题名称或主题名称类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("主题请求中没有操作类型或操作类型错误!");return false;}// 对于消息发布请求的特殊处理if (_body[KEY_OPTYPE].asInt() == (int)TopicOptype::TOPIC_PUBLISH &&(_body[KEY_TOPIC_MSG].isNull() == true ||_body[KEY_TOPIC_MSG].isString() == false)){ELOG("主题消息发布请求中没有消息内容字段或消息内容类型错误!");return false;}return true;}// 主题名称std::string topicKey() { return _body[KEY_TOPIC_KEY].asString(); }void setTopicKey(const std::string& key) { _body[KEY_TOPIC_KEY] = key; }// 操作类型TopicOptype optype() { return (TopicOptype)_body[KEY_OPTYPE].asInt(); }void setOptype(TopicOptype optype) { _body[KEY_OPTYPE] = (int)optype; }// 消息内容std::string topicMsg() { return _body[KEY_TOPIC_MSG].asString(); }void setTopicMsg(const std::string& msg) { _body[KEY_TOPIC_MSG] = msg; }
};// IP地址与端口号的映射
typedef std::pair<std::string, int> Address;// 服务操作请求的具象
class ServiceRequest : public JsonRequest
{
public:using ptr = std::shared_ptr<ServiceRequest>;virtual bool check() override{// 服务请求中,包含请求的方法名称,类型为字符串,操作类型,类型为整型// 对于服务注册/上线/下线请求,还会有主机信息字段,对于服务发现请求则没有if (_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false){ELOG("服务请求中没有方法名称或方法名称类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("服务请求中没有操作类型或操作类型的类型错误!");return false;}if (_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&(_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isObject() == false ||_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||_body[KEY_HOST][KEY_HOST_IP].isString() == false ||_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false)){ELOG("服务请求中主机地址信息为空或错误!");return false;}return true;}std::string method() { return _body[KEY_METHOD].asString(); }void setMethod(const std::string& name) { _body[KEY_METHOD] = name; }ServiceOptype optype() { return (ServiceOptype)_body[KEY_OPTYPE].asInt(); }void setOptype(ServiceOptype optype) { _body[KEY_OPTYPE] = (int)optype; }Address host(){Address addr;addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();return addr;}void setHost(const Address& host){Json::Value val;val[KEY_HOST_IP] = host.first;val[KEY_HOST_PORT] = host.second;_body[KEY_HOST] = val;}
};
Response
现在对响应类型进行具象。
class JsonResponse : public JsonMessage
{
public:using ptr = std::shared_ptr<JsonResponse>;virtual bool check() override{// 大部分响应中只有一个响应状态码,类型为整型if (_body[KEY_RCODE].isNull() == true){ELOG("响应中没有响应状态码!");return false;}if (_body[KEY_RCODE].isIntegral() == false){ELOG("响应状态码类型错误!");return false;}return true;}virtual RCode rcode() { return (RCode)_body[KEY_RCODE].asInt(); }virtual void setRCode(RCode rcode) { _body[KEY_RCODE] = (int)rcode; }
};
因为大部分的响应中都只有一个响应状态码,所以可以直接对响应状态码进行判断。后面每个具体的响应类型根据自己的实际情况选择是否实现自己的check。并且每个响应都需要设置和获取响应状态码,所以我们可以在这里直接定义。
// RPC响应的具象
class RpcResponse : public JsonResponse {
public:using ptr = std::shared_ptr<RpcResponse>;virtual bool check() override{if (_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false){ELOG("响应中没有响应状态码,或状态码类型错误!");return false;}if (_body[KEY_RESULT].isNull() == true){ELOG("响应中没有Rpc调用结果,或结果类型错误!");return false;}return true;}Json::Value result() { return _body[KEY_RESULT]; }void setResult(const Json::Value& result) { _body[KEY_RESULT] = result; }
};// 主题操作响应的具象
class TopicResponse : public JsonResponse
{
public:using ptr = std::shared_ptr<TopicResponse>;
};// 服务操作响应的具象
class ServiceResponse : public JsonResponse
{
public:using ptr = std::shared_ptr<ServiceResponse>;virtual bool check() override{if (_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false){ELOG("响应中没有响应状态码,或状态码类型错误!");return false;}if (_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("响应中没有操作类型,或操作类型的类型错误!");return false;}if (_body[KEY_OPTYPE].asInt() == (int)(ServiceOptype::SERVICE_DISCOVERY) &&(_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false ||_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isArray() == false)){ELOG("服务发现响应中响应信息字段错误!");return false;}return true;}ServiceOptype optype() { return (ServiceOptype)_body[KEY_OPTYPE].asInt(); }void setOptype(ServiceOptype optype) { _body[KEY_OPTYPE] = (int)optype; }std::string method() { return _body[KEY_METHOD].asString(); }void setMethod(const std::string& method) { _body[KEY_METHOD] = method; }void setHost(std::vector<Address> addrs){for (auto& addr : addrs){Json::Value val;val[KEY_HOST_IP] = addr.first;val[KEY_HOST_PORT] = addr.second;_body[KEY_HOST].append(val);}}std::vector<Address> hosts(){std::vector<Address> addrs;int sz = _body[KEY_HOST].size();for (int i = 0; i < sz; i++){Address addr;addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();addrs.push_back(addr);}return addrs;}
};
服务注册/下线/上线的响应,只需要有一个状态码即可。但是对于服务发现,除了要有状态码,还要有能提供服务的主机信息和方法名称。为了能够区分响应是服务注册/下线/上线,还是服务发现,给这个响应添加一个字段,用于区分。
实现一个消息对象的生成工厂,未来要创建消息对象时,不需要new或者make_shared了。这样能够增加代码的可维护性。因为如果使用new或make_shared,项目代码中肯定会有大量应用,未来如果创建对象的参数变化了,则需要修改大量代码。
// 生成消息对象的工厂
class MessageFactory
{
public:// 第一种方法,使用父类指针指向子类对象static BaseMessage::ptr create(MType mtype){switch (mtype){case MType::REQ_RPC: return std::make_shared<RpcRequest>();case MType::RSP_RPC: return std::make_shared<RpcResponse>();case MType::REQ_TOPIC: return std::make_shared<TopicRequest>();case MType::RSP_TOPIC: return std::make_shared<TopicResponse>();case MType::REQ_SERVICE: return std::make_shared<ServiceRequest>();case MType::RSP_SERVICE: return std::make_shared<ServiceResponse>();}return BaseMessage::ptr();}// 第二种方法,使用子类指针指向子类对象。利用模板函数template<typename T, typename ...Args>static std::shared_ptr<T> create(Args&& ...args){return std::make_shared<T>(std::forward<Args>(args)...);}
};
消息类型测试
RPC请求
int main()
{// RPC请求// 构建一个RPC请求并进行序列化cxfrpc::RpcRequest::ptr rrp = cxfrpc::MessageFactory::create<cxfrpc::RpcRequest>();Json::Value param;param["num1"] = 33;param["num2"] = 44;rrp->setMethod("Add");rrp->setParams(param);std::string str = rrp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::REQ_RPC);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::RpcRequest::ptr rrp2 = std::dynamic_pointer_cast<cxfrpc::RpcRequest>(bmp);std::cout << rrp2->method() << std::endl;std::cout << rrp2->params()["num1"].asInt() << std::endl;std::cout << rrp2->params()["num2"].asInt() << std::endl;return 0;
}
主题操作请求
int main()
{// 主题操作请求// 构建一个主题操作请求并进行序列化cxfrpc::TopicRequest::ptr trp = cxfrpc::MessageFactory::create<cxfrpc::TopicRequest>();trp->setTopicKey("new");trp->setOptype(cxfrpc::TopicOptype::TOPIC_PUBLISH);trp->setTopicMsg("hello world!");std::string str = trp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::REQ_TOPIC);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::TopicRequest::ptr trp2 = std::dynamic_pointer_cast<cxfrpc::TopicRequest>(bmp);std::cout << trp2->topicKey() << std::endl;std::cout << (int)trp2->optype() << std::endl;std::cout << trp2->topicMsg() << std::endl;return 0;
}
服务操作请求
int main()
{// 服务操作请求// 构建一个服务操作请求并进行序列化cxfrpc::ServiceRequest::ptr srp = cxfrpc::MessageFactory::create<cxfrpc::ServiceRequest>();srp->setMethod("Add");srp->setOptype(cxfrpc::ServiceOptype::SERVICE_REGISTRY);srp->setHost(cxfrpc::Address("127.0.0.1", 8080));std::string str = srp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::REQ_SERVICE);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::ServiceRequest::ptr srp2 = std::dynamic_pointer_cast<cxfrpc::ServiceRequest>(bmp);std::cout << srp2->method() << std::endl;std::cout << (int)srp2->optype() << std::endl;std::cout << srp2->host().first << std::endl;std::cout << srp2->host().second << std::endl;return 0;
}
RPC响应
int main()
{// RPC响应// 构建一个RPC响应并进行序列化cxfrpc::RpcResponse::ptr trp = cxfrpc::MessageFactory::create<cxfrpc::RpcResponse>();trp->setRCode(cxfrpc::RCode::RCODE_OK);trp->setResult(77);std::string str = trp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::RSP_RPC);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::RpcResponse::ptr trp2 = std::dynamic_pointer_cast<cxfrpc::RpcResponse>(bmp);std::cout << (int)trp2->rcode() << std::endl;std::cout << trp2->result().asInt() << std::endl;return 0;
}
主题操作响应
int main()
{// 主题操作响应// 构建一个主题操作响应并进行序列化cxfrpc::TopicResponse::ptr trp = cxfrpc::MessageFactory::create<cxfrpc::TopicResponse>();trp->setRCode(cxfrpc::RCode::RCODE_OK);std::string str = trp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::RSP_TOPIC);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::TopicResponse::ptr trp2 = std::dynamic_pointer_cast<cxfrpc::TopicResponse>(bmp);std::cout << (int)trp2->rcode() << std::endl;return 0;
}
服务操作响应
int main()
{// 主题操作响应// 构建一个主题操作响应并进行序列化cxfrpc::ServiceResponse::ptr trp = cxfrpc::MessageFactory::create<cxfrpc::ServiceResponse>();trp->setRCode(cxfrpc::RCode::RCODE_OK);trp->setMethod("Add");trp->setOptype(cxfrpc::ServiceOptype::SERVICE_DISCOVERY);std::vector<cxfrpc::Address> addrs;addrs.push_back(cxfrpc::Address("127.0.0.1", 8080));addrs.push_back(cxfrpc::Address("127.0.0.2", 8081));trp->setHost(addrs);std::string str = trp->serialize();std::cout << str << std::endl;std::cout << "-------------------------------------------------" << std::endl;// 对上面序列化的结果进行反序列化cxfrpc::BaseMessage::ptr bmp = cxfrpc::MessageFactory::create(cxfrpc::MType::RSP_SERVICE);bool ret = bmp->unserialize(str);if(ret == false) { return -1; }ret = bmp->check();if(ret == false) { return -1; }cxfrpc::ServiceResponse::ptr trp2 = std::dynamic_pointer_cast<cxfrpc::ServiceResponse>(bmp);std::cout << (int)trp2->rcode() << std::endl;std::cout << (int)trp2->optype() << std::endl;std::cout << trp2->method() << std::endl;std::vector<cxfrpc::Address> addrs1 = trp2->hosts();for(auto &addr : addrs1){std::cout << addr.first << " : " << addr.second << std::endl;}return 0;
}
接下来就是网络通信相关的操作了。
BaseBuffer的具象
我们底层通信使用的是muduo库,所以具体的BaseBuffer就是MuduoBuffer。MuduoBuffer实际上就是对muduo::net::Buffer的封装。
// BaseBuffer的具象
class MuduoBuffer : public BaseBuffer
{
public:using ptr = std::shared_ptr<MuduoBuffer>;MuduoBuffer(muduo::net::Buffer* buf) :_buf(buf) {}virtual size_t readableSize() override { return _buf->readableBytes(); }virtual int32_t peekInt32() override{// muduo是一个网络库,从缓冲区取出一个4字节的整型时,会将网络字节序转换成本地字节序// 所以未来在发送时,是需要将本地字节序转为网络字节序的return _buf->peekInt32();}virtual void retrieveInt32() override { return _buf->retrieveInt32(); }virtual int32_t readInt32() override { return _buf->readInt32(); }virtual std::string retrieveAsString(size_t len) override { return _buf->retrieveAsString(len); }
private:muduo::net::Buffer* _buf; // 这里一定不能使用智能指针,因为缓冲区不能随意释放
};// 生产BaseBuffer的工厂
class BufferFactory
{
public:template<typename ...Args>static BaseBuffer::ptr create(Args&& ...args){return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);}
};
BaseProtocol的具象
对于协议,我们采用的是LV格式的协议。
// BaseProtocol的具象
class LVProtocol : public BaseProtocol
{
public:// |--Len--|--VALUE--|// |--Len--|--mtype--|--idlen--|--id--|--body--|using ptr = std::shared_ptr<LVProtocol>;//判断缓冲区中的数据量是否足够一条消息的处理virtual bool canProcessed(const BaseBuffer::ptr& buf) override{// 我们的协议字段中,头4个字节表示的就是一条报文的长度// 所以我们判断缓冲区的数据量释放足够一条消息的思路就是从缓冲区读取4字节数据,不取出,然后判断if (buf->readableSize() < lenFieldsLength) { return false; }int32_t total_len = buf->peekInt32();if (buf->readableSize() < (total_len + lenFieldsLength)) { return false; }return true;}// 从缓冲区中获取一条消息virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) override{//当调用onMessage的时候,默认认为缓冲区中的数据足够一条完整的消息int32_t total_len = buf->readInt32(); //读取总长度MType mtype = (MType)buf->readInt32(); // 读取数据类型int32_t idlen = buf->readInt32(); // 读取id长度int32_t body_len = total_len - idlen - idlenFieldsLength - mtypeFieldsLength;std::string id = buf->retrieveAsString(idlen);std::string body = buf->retrieveAsString(body_len);msg = MessageFactory::create(mtype);if (msg.get() == nullptr){ELOG("消息类型错误,构造消息对象失败!");return false;}bool ret = msg->unserialize(body);if (ret == false){ELOG("消息正文反序列化失败!");return false;}msg->setId(id);msg->setMType(mtype);return true;}// 对消息进行序列化// 传入一个结构化的数据,将其序列化并添加上报头virtual std::string serialize(const BaseMessage::ptr& msg) override{// |--Len--|--mtype--|--idlen--|--id--|--body--|// 对结构化数据msg进行序列化,作为请求/响应的正文std::string body = msg->serialize();// 组织好报头的各个字段并添加,注意:对于数字,一定要将主机字节序转为网络字节序std::string id = msg->rid();auto mtype = htonl((int32_t)msg->mtype());int32_t idlen = htonl(id.size());int32_t h_total_len = mtypeFieldsLength + idlenFieldsLength + id.size() + body.size();int32_t n_total_len = htonl(h_total_len);std::string result;result.reserve(h_total_len); // 提前开好空间,避免频繁扩容result.append((char*)&n_total_len, lenFieldsLength);result.append((char*)&mtype, mtypeFieldsLength);result.append((char*)&idlen, idlenFieldsLength);result.append(id);result.append(body);return result;}
private:const size_t lenFieldsLength = 4; // 报头中Len字段的长度,单位字节const size_t mtypeFieldsLength = 4; // 报头中mtype字段的长度,单位字节const size_t idlenFieldsLength = 4; // 报头中idlen字段的长度,单位字节
};// 生产BaseProtocol的工厂
class ProtocolFactory
{
public:template<typename ...Args>static BaseProtocol::ptr create(Args&& ...args){return std::make_shared<LVProtocol>(std::forward<Args>(args)...);}
};
可以看到,在BaseMessage和BaseProtocol中都有序列化的接口,在BaseMessage的序列化接口是对消息的正文部分进行序列化,序列化后只是一条消息的有效载荷。在BaseProtocol的序列化接口是对整条消息进行序列化,除了会对消息正文进行序列化,还会进行添加报头,当然,对于消息正文的序列化调用的就是BaseMessage中实现的接口。
BaseConnection的具象
// BaseConnection的具象
class MuduoConnection : public BaseConnection
{
public:using ptr = std::shared_ptr<MuduoConnection>;MuduoConnection(const muduo::net::TcpConnectionPtr& conn,const BaseProtocol::ptr& protocol) :_protocol(protocol), _conn(conn) {}virtual void send(const BaseMessage::ptr& msg) override{std::string body = _protocol->serialize(msg);_conn->send(body);}virtual void shutdown() override { _conn->shutdown(); }virtual bool connected() override { return _conn->connected(); }
private:BaseProtocol::ptr _protocol;muduo::net::TcpConnectionPtr _conn;
};// 生产BaseConnection的工厂
class ConnectionFactory
{
public:template<typename ...Args>static BaseConnection::ptr create(Args&& ...args){return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);}
};
BaseServer的具象
设置回调函数的工作已经在父类中完成了,所以这里只需要完成start即可。
// BaseServer的具象
class MuduoServer : public BaseServer
{
public:using ptr = std::shared_ptr<MuduoServer>;MuduoServer(int port) :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"MuduoServer", muduo::net::TcpServer::kReusePort){}virtual void start() override{_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_server.start(); // 开始监听_baseloop.loop(); // 开始死循环事件监控}
private:// 服务器是基于muduo库搭建的,所以一定要有下面两个成员变量muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
};
在这里,我们让start的工作就是设置muduo库的回调函数,并启动服务器。注意这里是设置muduo库的回调函数,不是设置BaseServer的回调函数,设置BaseServer的回调函数在基类中已经提供了相应接口了。接下里就需要在MuduoServer中实现两个回调函数了。
连接建立/断开时的回调函数:服务端应该要对连接进行封装,然后管理起来。因为服务端对于连接是基于BaseConnection的,而不是Muduo库的TcpConnection。
// 连接建立/断开时的回调函数
void onConnection(const muduo::net::TcpConnectionPtr& conn)
{if (conn->connected()){std::cout << "连接建立!" << std::endl;// 对连接进行封装,并管理起来auto muduo_conn = ConnectionFactory::create(conn, _protocol);{std::unique_lock<std::mutex> lock(_mutex);_conns.insert(std::make_pair(conn, muduo_conn));}// 设置了回调函数就调用if (_cb_connection) _cb_connection(muduo_conn);}else{std::cout << "连接断开!" << std::endl;BaseConnection::ptr muduo_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()) return;muduo_conn = it->second;_conns.erase(conn);}if (_cb_close) _cb_close(muduo_conn);}
}
接收到消息时的回调函数:首先,先对muduo库的Buffer进行封装,封装出一个BaseBuffer。然后基于协议对消息进行处理即可。
// BaseServer的具象
class MuduoServer : public BaseServer
{
public:using ptr = std::shared_ptr<MuduoServer>;MuduoServer(int port) :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"MuduoServer", muduo::net::TcpServer::kReusePort),_protocol(ProtocolFactory::create()){}virtual void start() override{_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.start(); // 开始监听_baseloop.loop(); // 开始死循环事件监控}
private:// 连接建立/断开时的回调函数void onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){std::cout << "连接建立!" << std::endl;// 对连接进行封装,并管理起来auto muduo_conn = ConnectionFactory::create(conn, _protocol);{std::unique_lock<std::mutex> lock(_mutex);_conns.insert(std::make_pair(conn, muduo_conn));}// 设置了回调函数就调用if (_cb_connection) _cb_connection(muduo_conn);}else{std::cout << "连接断开!" << std::endl;BaseConnection::ptr muduo_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()) return;muduo_conn = it->second;_conns.erase(conn);}if (_cb_close) _cb_close(muduo_conn);}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp){DLOG("连接有数据到来,开始处理!");auto base_buf = BufferFactory::create(buf);while (1){if (_protocol->canProcessed(base_buf) == false){//数据不足if (base_buf->readableSize() > maxDataSize){conn->shutdown();ELOG("缓冲区中数据过大!");return;}break;}BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false){conn->shutdown();ELOG("缓冲区中数据错误!");return;}BaseConnection::ptr base_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()){conn->shutdown();return;}base_conn = it->second;}if (_cb_message) _cb_message(base_conn, msg);}}
private:const size_t maxDataSize = (1 << 16);BaseProtocol::ptr _protocol;std::mutex _mutex;// _conns保存连接与封装的映射关系std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;// 服务器是基于muduo库搭建的,所以一定要有下面两个成员变量muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
};// 生产BaseServer的工厂
class ServerFactory
{
public:template<typename ...Args>static BaseServer::ptr create(Args&& ...args){return std::make_shared<MuduoServer>(std::forward<Args>(args)...);}
};
可能会有一些恶意主机,缓冲区中数据很多,但是不足一条数据。例如前4字节是一个非常大的数据,导致缓冲区中虽然有很多数据,但是还是数据不足。针对这种情况,直接关闭连接。所以,我们增加了一个成员变量maxDataSize,用于判断是否太大。
用户在使用时,只需要设置好3个回调函数,并通过start启动即可。
BaseClient的具象
// BaseClient的具象
class MuduoClient : public BaseClient
{
public:using ptr = std::shared_ptr<MuduoClient>;MuduoClient(const std::string& sip, int sport) :_protocol(ProtocolFactory::create()),_baseloop(_loopthread.startLoop()),_downlatch(1),_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient"){}virtual void connect() override{// 这里的回调函数不能在构造函数中设置,因为回调函数需要用户自己传入DLOG("设置回调函数,连接服务器");_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));//设置连接消息的回调_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//连接服务器_client.connect();_downlatch.wait();DLOG("连接服务器成功!");}virtual void shutdown() override { return _client.disconnect(); }// 用户都是直接调用业务接口的,这里的send是给业务层提供的virtual bool send(const BaseMessage::ptr& msg) override{if (connected() == false) {ELOG("连接已断开!");return false;}_conn->send(msg);return true;}virtual BaseConnection::ptr connection() override { return _conn; }virtual bool connected() { return (_conn && _conn->connected()); }
private:void onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){std::cout << "连接建立!\n";_downlatch.countDown();//计数--,为0时唤醒阻塞_conn = ConnectionFactory::create(conn, _protocol);}else{std::cout << "连接断开!\n";_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp){DLOG("连接有数据到来,开始处理!");auto base_buf = BufferFactory::create(buf);while (1) {if (_protocol->canProcessed(base_buf) == false) {//数据不足if (base_buf->readableSize() > maxDataSize) {conn->shutdown();ELOG("缓冲区中数据过大!");return;}break;}BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false) {conn->shutdown();ELOG("缓冲区中数据错误!");return;}if (_cb_message) _cb_message(_conn, msg);}}
private:const size_t maxDataSize = (1 << 16);BaseProtocol::ptr _protocol;BaseConnection::ptr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop* _baseloop;muduo::net::TcpClient _client;
};// 生产BaseClient的工厂
class ClientFactory
{
public:template<typename ...Args>static BaseClient::ptr create(Args&& ...args){return std::make_shared<MuduoClient>(std::forward<Args>(args)...);}
};
客户端与服务端通信测试
服务端代码:
void onMessage(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::string body = msg->serialize();std::cout << body << std::endl;auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::RpcResponse>();rpc_req->setId("7777");rpc_req->setMType(cxfrpc::MType::RSP_RPC);rpc_req->setRCode(cxfrpc::RCode::RCODE_OK);rpc_req->setResult(33);conn->send(rpc_req);
}int main()
{auto server = cxfrpc::ServerFactory::create(8080);server->setMessageCallback(onMessage);server->start();return 0;
}
客户端代码:
void onMessage(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::string body = msg->serialize();std::cout << body << std::endl;
}int main()
{auto client = cxfrpc::ClientFactory::create("127.0.0.1", 8080);client->setMessageCallback(onMessage);client->connect();auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::RpcRequest>();rpc_req->setId("7777");rpc_req->setMType(cxfrpc::MType::REQ_RPC);rpc_req->setMethod("Add");Json::Value param;param["num1"] = 11;param["num2"] = 22;rpc_req->setParams(param);client->send(rpc_req);// 等待客户端收到响应再关闭连接std::this_thread::sleep_for(std::chrono::seconds(10));client->shutdown();return 0;
}
可以看到,是可以进行通信的。
Dispatcher模块实现
服务端和客户端中都会有一个MessageCallback函数对接收到的请求或响应进行处理。我们知道,服务端会接收到不同类型的请求,客户端会接收到不同类型的响应。因为请求和响应具有多样性,所以在MessageCallback中需要判断消息类型,根据不同的类型做出不同的处理。如果使用if语句进行判断是一件不好的事情。
程序设计中需要遵循一个原则:开闭原则 --- 对修改关闭,对扩展开放。当后期维护代码或新增功能时:不去修改以前的代码,而是新增当前需要的代码。如果在MessageCallback中直接使用if,未来新增功能,需要修改原来的代码,为了遵循开闭原则,就诞生了Dispatcher模块。
用户可以提前给Dispatcher模块注册业务处理函数,当消息到来时,可以自动调用相应的业务处理函数进行处理。未来要新增功能时,只需要将新功能的业务处理函数注册进Dispatcher模块即可,不需要修改原先的代码。
这个模块中,有一个哈希表,保存的就是消息类型与业务回调函数的映射关系,并向外提供一个注册业务函数的接口。模块中还有一个onMessage接口,并将其设置到Protocol的onMesgeCallback中。Protocol的onMessssageCallback将消息反序列完成之后,就可以调用Dispatcher模块的onMessage函数根据消息的类型进行处理了。
class Dispatcher
{
public:using ptr = std::shared_ptr<Dispatcher>;// 注册映射关系void registerHandler(MType mtype, const MessageCallback& handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype, handler));}void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){// 找到消息类型对于的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it = _handlers.find(msg->mtype());if (it != _handlers.end()) return it->second(conn, msg);ELOG("收到未知类型的消息!");conn->shutdown();}
private:std::mutex _mutex;std::unordered_map<MType, MessageCallback> _handlers; // 消息类型与回调函数的映射关系
}
应用到刚刚的客户端与服务端,看看是否有问题。
服务端:
// RPC请求处理函数
void onRpcRequest(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::cout << "接收到了RPC请求: ";std::string body = msg->serialize();std::cout << body << std::endl;auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::RpcResponse>();rpc_req->setId("7777");rpc_req->setMType(cxfrpc::MType::RSP_RPC);rpc_req->setRCode(cxfrpc::RCode::RCODE_OK);rpc_req->setResult(33);conn->send(rpc_req);
}// 主题操作请求处理函数
void onTopicRequest(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::cout << "接收到了主题操作请求: ";std::string body = msg->serialize();std::cout << body << std::endl;auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::TopicResponse>();rpc_req->setId("1111");rpc_req->setMType(cxfrpc::MType::RSP_TOPIC);rpc_req->setRCode(cxfrpc::RCode::RCODE_OK);conn->send(rpc_req);
}int main()
{// 注册映射关系auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();dispatcher->registerHandler(cxfrpc::MType::REQ_RPC, onRpcRequest);dispatcher->registerHandler(cxfrpc::MType::REQ_TOPIC, onTopicRequest);auto server = cxfrpc::ServerFactory::create(8080);// 这是类的成员函数,所以要进行绑定auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, dispatcher.get(),std::placeholders::_1, std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}
客户端:
// RPC响应回调函数
void onRpcResponse(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::cout << "接收到了RPC响应: ";std::string body = msg->serialize();std::cout << body << std::endl;
}// 主题操作响应回调函数
void onTopicResponse(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::BaseMessage::ptr &msg)
{std::cout << "接收到了主题操作响应: ";std::string body = msg->serialize();std::cout << body << std::endl;
}int main()
{// 注册映射关系auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();dispatcher->registerHandler(cxfrpc::MType::RSP_RPC, onRpcResponse);dispatcher->registerHandler(cxfrpc::MType::RSP_TOPIC, onTopicResponse);auto client = cxfrpc::ClientFactory::create("127.0.0.1", 8080);// 这是类的成员函数,所以要进行绑定auto message_cb = std::bind(&cxfrpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::RpcRequest>();rpc_req->setId("7777");rpc_req->setMType(cxfrpc::MType::REQ_RPC);rpc_req->setMethod("Add");Json::Value param;param["num1"] = 11;param["num2"] = 22;rpc_req->setParams(param);client->send(rpc_req);auto topic_req = cxfrpc::MessageFactory::create<cxfrpc::TopicRequest>();topic_req->setId("2222");topic_req->setMType(cxfrpc::MType::REQ_TOPIC);topic_req->setOptype(cxfrpc::TopicOptype::TOPIC_CREATE);topic_req->setTopicKey("news");client->send(topic_req);// 等待客户端收到响应再关闭连接std::this_thread::sleep_for(std::chrono::seconds(10));client->shutdown();return 0;
}
我们上面的代码可以运行,但是它们不够灵活。对于请求和响应的回调函数,接收消息都是BaseMessage类型的指针,父类指针是没办法调用子类的接口的,也就是说我们没办法调用RpcRequest等内部自己实现的接口。所以,最好的方法是将BaseMessage修改成对应的消息类型。可以在内部让用户使用dynamic_cast进行向下转换,但是这样增加了用户使用接口的困难性。
我们理想的回调函数是这样的:
// RPC响应回调函数
void onRpcResponse(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::RpcResponse::ptr &msg)
{std::cout << "接收到了RPC响应: ";std::string body = msg->serialize();std::cout << body << std::endl;
}// 主题操作响应回调函数
void onTopicResponse(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::TopicResponse::ptr &msg)
{std::cout << "接收到了主题操作响应: ";std::string body = msg->serialize();std::cout << body << std::endl;
}
// RPC请求处理函数
void onRpcRequest(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::RpcRequest::ptr &msg)
{std::cout << "接收到了RPC请求: ";std::string body = msg->serialize();std::cout << body << std::endl;auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::RpcResponse>();rpc_req->setId("7777");rpc_req->setMType(cxfrpc::MType::RSP_RPC);rpc_req->setRCode(cxfrpc::RCode::RCODE_OK);rpc_req->setResult(33);conn->send(rpc_req);
}// 主题操作请求处理函数
void onTopicRequest(const cxfrpc::BaseConnection::ptr &conn, cxfrpc::TopicRequest::ptr &msg)
{std::cout << "接收到了主题操作请求: ";std::string body = msg->serialize();std::cout << body << std::endl;auto rpc_req = cxfrpc::MessageFactory::create<cxfrpc::TopicResponse>();rpc_req->setId("1111");rpc_req->setMType(cxfrpc::MType::RSP_TOPIC);rpc_req->setRCode(cxfrpc::RCode::RCODE_OK);conn->send(rpc_req);
}
但是,这样做之后,每个回调函数的类型就不一致了,Dispatcher的注册接口就没办法适配了,所以Dispatcher的注册接口需要使用模板,但是unordered_map存储的类型必须是一致的,没办法存储模板参数,所以此时可以定义一个类Callback,让unordered_map的第二个参数存储Callback。Callback的成员变量就是消息的回调函数,所以Callback也需要设置为模板,这样unordered_map又无法存储了,所以可以将Callback定义为基类,让子类成为模板类。让子类以多态的形式实现onMessage,这样通过基类的指针就可以调用到子类的onMessage函数了。
class Callback
{
public:using ptr = std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) = 0;
};template<typename T>
class CallbackT : public Callback
{
public:using ptr = std::shared_ptr<CallbackT<T>>;CallbackT(const T& handler) :_handler(handler){}void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) override{auto type_msg = std::dynamic_pointer_cast<T>(msg);_handler(conn, type_msg);}
private:T _handler; // 对于消息的处理函数
};class Dispatcher
{
public:using ptr = std::shared_ptr<Dispatcher>;// 注册映射关系template<typename T>void registerHandler(MType mtype, const T& handler){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<Callback<T>>(handler);_handlers.insert(std::make_pair(mtype, handler));}void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){// 找到消息类型对于的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it = _handlers.find(msg->mtype());if (it != _handlers.end()) return it->second(conn, msg);ELOG("收到未知类型的消息!");conn->shutdown();}
private:std::mutex _mutex;std::unordered_map<MType, Callback::ptr> _handlers; // 消息类型与回调函数的映射关系
};
现在registerHandler这个模板函数中,模板参数是回调函数类型。所以,我们要进行修改,让注册接口registerHandler的参数是消息类型。
class Callback
{
public:using ptr = std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) = 0;
};template<typename T>
class CallbackT : public Callback
{
public:using ptr = std::shared_ptr<CallbackT<T>>;using MessageCallback = std::function<void(const BaseConnection::ptr& conn, std::shared_ptr<T>& msg)>;CallbackT(const MessageCallback& handler) :_handler(handler){}void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) override{auto type_msg = std::dynamic_pointer_cast<T>(msg);_handler(conn, type_msg);}
private:MessageCallback _handler; // 对于消息的处理函数
};class Dispatcher
{
public:using ptr = std::shared_ptr<Dispatcher>;// 注册映射关系template<typename T>void registerHandler(MType mtype, const typename CallbackT<T>::MessageCallback& handler){// 上面需要加上一个typename表示这是一个类型std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype, cb));}void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){// 找到消息类型对于的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it = _handlers.find(msg->mtype());if (it != _handlers.end()) return it->second->onMessage(conn, msg);ELOG("收到未知类型的消息!");conn->shutdown();}
private:std::mutex _mutex;std::unordered_map<MType, Callback::ptr> _handlers; // 消息类型与回调函数的映射关系
};
现在,我们在注册回调函数时,就需要指明以下消息类型。
// 注册映射关系
auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();
dispatcher->registerHandler<cxfrpc::RpcRequest>(cxfrpc::MType::REQ_RPC, onRpcRequest);
dispatcher->registerHandler<cxfrpc::TopicRequest>(cxfrpc::MType::REQ_TOPIC, onTopicRequest);
// 注册映射关系
auto dispatcher = std::make_shared<cxfrpc::Dispatcher>();
dispatcher->registerHandler<cxfrpc::RpcResponse>(cxfrpc::MType::RSP_RPC, onRpcResponse);
dispatcher->registerHandler<cxfrpc::TopicResponse>(cxfrpc::MType::RSP_TOPIC, onTopicResponse);
现在,我们已经实现了项目的网络通信功能和分发功能。