【从零实现Json-Rpc框架】- 项目实现 -抽象消息类实现篇
📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、抽象层通信基类的实现
- 🏳️🌈二、JsonMessage类实现
- 🏳️🌈三、消息请求类 及 消息响应类
- 🏳️🌈四、请求类
- 🏳️🌈五、响应类
- 🏳️🌈六、工厂类
- 🏳️🌈七、代码测试
- 👥总结
📢前言
前几篇文章中,笔者介绍了rpc
的原理和目的,也介绍了需要使用的部分第三方库
和我们所需实现的功能
现在我们着手项目实现篇章
,目前零碎接口 和 项目消息字段类型 都已经完成了
接下来我们将着手 抽象层的实现,首当其冲的是通信抽象实现
中的消息抽象实现
这里的代码,比如说 MType
就是用我们之前封装的 fields.hpp
代码中的消息类型定义,所以如果有需要的话,可以去前面的文章中取,或者笔者的仓库中linux_test/lesson/CSDN_code/project1_RPC/source/common
中也有
🏳️🌈一、抽象层通信基类的实现
大家是否还记得这张图,在抽象层介绍中笔者有提到过
在这一层中,我们需要针对不同的请求,从 BaseMessage
中派生出不同的请求和响应类型,以便于在针对指定消息处理时,能够更加轻松的获取或设置请求及响应中的各项数据元素。
当然这部分仅仅只是抽象层通信消息基类的实现,至于还有的抽象通信层
还有的协议、缓冲区、连接、服务端、客户端的实现,笔者在下一篇文章再在介绍,本篇文章就主要介绍消息基类
其实基类抽象类的实现就是为之后可能会出现的各个部分的功能做一个父类定义,方便后面管理理解
// 基类消息类
class BaseMessage{
public:
using ptr = std::shared_ptr<BaseMessage>;
// 析构函数
virtual ~BaseMessage(){}
// 设置消息id
virtual void setId(const std::string& id){ _rid = id; }
// 获取消息id
virtual std::string rid() const{ return _rid; }
// 设置消息类型
virtual void setMType(MType type){ _mtype = type; }
// 获取消息类型
virtual MType mtype() const{ return _mtype; }
// 序列化消息
virtual std::string serialize() = 0;
// 反序列化消息
virtual bool unserialize(const std::string& msg) = 0;
// 检查消息是否合法
virtual bool check() = 0;
private:
MType _mtype;
std::string _rid;
};
总的来说,我们需要实现了一个基于 JSON 格式
的 RPC
框架消息系统,包含 消息基类、具体消息类型 和 工厂模式。
以下是核心组件及其作用:
🏳️🌈二、JsonMessage类实现
首先,RPC(远程过程调用)通常用于不同服务之间的通信,可能跨语言、跨平台。JSON
作为一种轻量级的数据交换格式,具有可读性好、易于解析的特点
根据上面的抽象层整体思路图
我们可以知道,我们整个rpc我们要先将基类消息派生出Json消息的子类对象,再根据JsonMessage
去制作消息请求类和消息应答类,再细分成服务、主题和PRC调用
class JsonMessage : public BaseMessage{
public:
using ptr = std::shared_ptr<BaseMessage>;
// 序列化
virtual std::string serialize() override {
std::string body;
bool ret = JSON::serialize(_body, body);
if(ret == false){
return body;
}
return body;
}
// 反序列化
virtual bool unserialize(const std::string& msg) override{
return JSON::unserialize(msg, _body);
}
// 检查
virtual bool check() = 0;
protected:
Json::Value _body;
};
🏳️🌈三、消息请求类 及 消息响应类
因为后面的服务接口需要传入host,它包括了ip和端口号,所以我们可以提前定义一下
typedef std::pair<std::string, int> Address;
// Json消息请求类
class JsonRequest : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonRequest>;
};
// Json消息响应类
class JsonResponse : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonResponse>;
virtual bool check() override{
// 在响应中没大部分的相应都只有响应状态码
// 因此只需要判断响应状态码字段是否存在,类型是否正确即可
if(_body[KEY_RCODE].isNull() == true){
ELOG("响应中没有相应状态码");
return false;
}
if(_body[KEY_RCODE].isIntegral() == false){
ELOG("响应状态码字段类型错误!");
return false;
}
return true;
}
// 获取响应状态码
virtual RCode rcode(){
return (RCode)_body[KEY_RCODE].asInt();
}
// 设置响应状态码
virtual void setRcode(RCode rcode){
_body[KEY_RCODE] = (int)rcode;
}
};
🏳️🌈四、请求类
// RPC请求类
class RpcRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<RpcRequest>;
// 检查
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("RPC请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_PARAMS].isNull() == true ||
_body[KEY_PARAMS].isObject() == false){
ELOG("RPC请求中没有参数信息或参数信息类型错误");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取参数
Json::Value params(){
return _body[KEY_PARAMS];
}
// 设置参数
void setParams(const Json::Value& params){
_body[KEY_PARAMS] = params;
}
};
// 主题请求类
class TopicRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<TopicRequest>;
// 检查
virtual bool check() override{
if(_body[KEY_TOPIC_KEY].isNull() == true ||
_body[KEY_TOPIC_KEY].isString() == false){
ELOG("主题请求中没有主题名称或主题名称类型类型错误!");
return false;
}
if(_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false){
ELOG("主题请求中没有主题消息或主题消息类型错误");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("主题请求中没有操作类型或操作类型类型错误!");
return false;
}
return true;
}
// 获取主题名称
std::string topicKey(){
return _body[KEY_TOPIC_KEY].asString();
}
// 设置主题名称
void setTopicKey(const std::string& topic_key){
_body[KEY_TOPIC_KEY] = topic_key;
}
// 获取主题消息
std::string topicMsg(){
return _body[KEY_TOPIC_MSG].asString();
}
// 设置主题消息
void setTopicMsg(const std::string& topic_msg){
_body[KEY_TOPIC_MSG] = topic_msg;
}
// 获取主题请求操作类型
TopicOptype optype(){
return (TopicOptype)_body[KEY_OPTYPE].asInt();
}
// 设置主题请求操作类型
void setOptype(TopicOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
};
// 服务请求类
class ServiceRequest : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceRequest>;
virtual bool check() override{
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("服务请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务请求中没有操作类型或操作类型类型错误");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&
(_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isObject() == false ||
_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST][KEY_HOST_IP].isString() == false ||
_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false
)){
ELOG("服务请求中主机地址信息错误!");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取操作类型
ServiceOptype Serviceoptype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置操作类型
void setServiceOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 获取主机地址
Address host(){
Address addr;
addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();
return addr;
}
void setHost(const Address& host){
Json::Value val;
val[KEY_HOST_IP] = host.first;
val[KEY_HOST_PORT] = host.second;
_body[KEY_HOST] = val;
}
};
🏳️🌈五、响应类
// RPC响应类
class RpcResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("RPC响应中没有响应状态码或响应状态码类型错误!");
return false;
}
// 因为返回类型多种多样,就不需要进行判断了
if(_body[KEY_RESULT].isNull() == true){
ELOG("RPC响应中没有响应结果或响应结果类型错误!");
return false;
}
return true;
}
// 获取响应结果
Json::Value result(){
return _body[KEY_RESULT];
}
// 设置响应结果
void setResult(const Json::Value& result){
_body[KEY_RESULT] = result;
}
};
// 主题响应类
class TopicResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
// 服务响应类
class ServiceResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check() override{
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("服务响应中没有响应状态码或响应状态码类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务响应中没有操作类型或操作类型的类型错误!");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) && (
_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false ||
_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isArray() == false
)){
ELOG("服务发现响应中响应信息错误!");
return false;
}
return true;
}
// 获取响应操作类型
ServiceOptype optype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置响应操作类型
void setOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 设置响应方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置响应方法
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取响应主机地址
std::vector<Address> Hosts(){
std::vector<Address> addrs;
int sz = _body[KEY_HOST].size();
for(int i = 0; i < sz; ++i){
Address addr;
addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
// 设置响应主机地址
void setHost(std::vector<Address> addrs){
for(auto& addr : addrs){
Json::Value val;
val[KEY_HOST_IP] = addr.first;
val[KEY_HOST_PORT] = addr.second;
_body[KEY_HOST].append(val);
}
}
};
🏳️🌈六、工厂类
为了方便使用和创建,以及封装管理,我们可以使用一个类来统一管理,这个模式称为工厂
这里使用了两种方法,一个是实参,一个是模板,可以自行选择
// 实现一个消息对象的生产工厂
class MessageFactory{
public:
static BaseMessage::ptr create(MType mtype){
switch(mtype){
case MType::REQ_RPC : return std::make_shared<RpcRequest>();
case MType::RSP_RPC : return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();
default: return BaseMessage::ptr(); // 类型错误,返回一个空的指针对象
}
}
template<typename T, typename... Args>
static std::shared_ptr<T> create(Args&& ...args){
return std::make_shared<T>(std::forward<Args>(args)...);
}
};
🏳️🌈七、代码测试
其实笔者并没有介绍为什么这里要有一个这样的接口,为什么要这样封装。说实话,我也说不明白。但
大概意思就是,rpc的整体原理就是这样,每个接口要干的事情,前人都已经确定好了,虽然细节上或有差异,但是整体上就是这么一个逻辑
至于里面的各个接口的细节,比较重要的都已经注释了
为了方便有需要的朋友测试代码,判断是否正确,这里提供一下测试代码
#include "message.hpp"
int main(){
std::cout << "RpcRequest test:" << std::endl;
// 模板的 create 函数,返回值是具体类型的指针
rpc::RpcRequest::ptr rrp = rpc::MessageFactory::create<rpc::RpcRequest>();
Json::Value param;
param["num1"] = 11;
param["num2"] = 22;
rrp->setMethod("Add");
rrp->setParams(param);
std::string str1 = rrp->serialize();
std::cout << str1 << std::endl;
// 参数为 MType 的 create 函数,返回值是 BaseMessage::ptr
rpc::BaseMessage::ptr rrp1 = rpc::MessageFactory::create(fields::MType::REQ_RPC);
bool ret = rrp1->unserialize(str1);
if(ret == false)
return -1;
ret = rrp1->check();
if(ret == false)
return -1;
// dynamic_pointer_cast 是 C++ 标准库中为智能指针 std::shared_ptr 提供的 动态类型转换工具,专门用于处理继承体系中的向下转型
rpc::RpcRequest::ptr rrp2 = std::dynamic_pointer_cast<rpc::RpcRequest>(rrp1);
std::cout << rrp2->method() << std::endl;
std::cout << rrp2->params()["num1"].asInt() << std::endl;
std::cout << rrp2->params()["num2"].asInt() << std::endl;
std::cout << "TopicRequest test end" << std::endl;
std::cout << "-------------------------" << std::endl << std::endl;
std::cout << "TopicRequest test:" << std::endl;
rpc::TopicRequest::ptr trp = rpc::MessageFactory::create<rpc::TopicRequest>();
trp->setTopicKey("test");
trp->setTopicMsg("hello world");
trp->setOptype(fields::TopicOptype::TOPIC_PUBLISH);
std::string str2 = trp->serialize();
std::cout << str2 << std::endl;
rpc::BaseMessage::ptr trp1 = rpc::MessageFactory::create(fields::MType::REQ_TOPIC);
ret = trp1->unserialize(str2);
if(ret == false)
return -1;
ret = trp1->check();
if(ret == false)
return -1;
rpc::TopicRequest::ptr trp2 = std::dynamic_pointer_cast<rpc::TopicRequest>(trp1);
std::cout << trp2->topicKey() << std::endl;
std::cout << trp2->topicMsg() << std::endl;
std::cout << (int)trp2->optype() << std::endl;
std::cout << "TopicRequest test end" << std::endl;
std::cout << "------------------------------" << std::endl << std::endl;
std::cout << "ServiceRequest test:" << std::endl;
rpc::ServiceRequest::ptr srp = rpc::MessageFactory::create<rpc::ServiceRequest>();
srp->setMethod("Add");
srp->setServiceOptype(fields::ServiceOptype::SERVICE_REGISTRY);
srp->setHost(rpc::Address("127.0.0.1", 8080));
std::string str3 = srp->serialize();
std::cout << str3 << std::endl;
rpc::BaseMessage::ptr srp1 = rpc::MessageFactory::create(fields::MType::REQ_SERVICE);
ret = srp1->unserialize(str3);
if(ret == false)
return -1;
ret = srp1->check();
if(ret == false)
return -1;
rpc::ServiceRequest::ptr srp2 = std::dynamic_pointer_cast<rpc::ServiceRequest>(srp1);
std::cout << srp2->method() << std::endl;
std::cout << (int)srp2->Serviceoptype() << std::endl;
std::cout << srp2->host().first << std::endl;
std::cout << srp2->host().second << std::endl;
std::cout << "ServiceRequest test end" << std::endl;
std::cout << "------------------------------" << std::endl <<std::endl;
std::cout << "RpcResponse test:" << std::endl;
rpc::RpcResponse::ptr rsp = rpc::MessageFactory::create<rpc::RpcResponse>();
rsp->setRcode(fields::RCode::RCODE_OK);
rsp->setResult(33);
std::string str4 = rsp->serialize();
std::cout << str4 << std::endl;
rpc::BaseMessage::ptr rsp1 = rpc::MessageFactory::create(fields::MType::RSP_RPC);
ret = rsp1->unserialize(str4);
if(ret == false)
return -1;
ret = rsp1->check();
if(ret == false)
return -1;
rpc::RpcResponse::ptr rsp2 = std::dynamic_pointer_cast<rpc::RpcResponse>(rsp1);
std::cout << (int)rsp2->rcode() << std::endl;
std::cout << rsp2->result().asInt() << std::endl;
std::cout << "RpcResponse test end" << std::endl;
std::cout << "------------------------------" << std::endl << std::endl;
std::cout << "TopicResponse test:" << std::endl;
rpc::TopicResponse::ptr tsp = rpc::MessageFactory::create<rpc::TopicResponse>();
tsp->setRcode(fields::RCode::RCODE_OK);
std::string str5 = tsp->serialize();
std::cout << str5 << std::endl;
rpc::BaseMessage::ptr tsp1 = rpc::MessageFactory::create(fields::MType::RSP_TOPIC);
ret = tsp1->unserialize(str5);
if(ret == false)
return -1;
ret = tsp1->check();
if(ret == false)
return -1;
rpc::TopicResponse::ptr tsp2 = std::dynamic_pointer_cast<rpc::TopicResponse>(tsp1);
std::cout << (int)tsp2->rcode() << std::endl;
std::cout << "TopicResponse test end" << std::endl;
std::cout << "------------------------------" << std::endl << std::endl;
std::cout << "ServiceResponse test:" << std::endl;
rpc::ServiceResponse::ptr ssp = rpc::MessageFactory::create<rpc::ServiceResponse>();
ssp->setRcode(fields::RCode::RCODE_OK);
ssp->setOptype(fields::ServiceOptype::SERVICE_DISCOVERY);
ssp->setMethod("Add");
std::vector<rpc::Address> addrs;
addrs.push_back(rpc::Address("127.0.0.1", 8080));
addrs.push_back(rpc::Address("127.0.0.1", 9090));
addrs.push_back(rpc::Address("127.0.0.1", 9999));
ssp->setHost(addrs);
std::string str6 = ssp->serialize();
std::cout << str6 << std::endl;
rpc::BaseMessage::ptr ssp1 = rpc::MessageFactory::create(fields::MType::RSP_SERVICE);
ret = ssp1->unserialize(str6);
if(ret == false)
return -1;
ret = ssp1->check();
if(ret == false)
return -1;
rpc::ServiceResponse::ptr ssp2 = std::dynamic_pointer_cast<rpc::ServiceResponse>(ssp1);
std::cout << (int)ssp2->rcode() << std::endl;
std::cout << (int)ssp2->optype() << std::endl;
std::cout << ssp2->method() << std::endl;
for(auto addr : ssp2->Hosts()){
std::cout << addr.first << ":" << addr.second << std::endl;
}
std::cout << "ServiceResponse test end" << std::endl;
std::cout << "------------------------------" << std::endl << std::endl;
return 0;
}
正确执行结果如下
RpcRequest test:
{
"method" : "Add",
"parameters" :
{
"num1" : 11,
"num2" : 22
}
}
Add
11
22
TopicRequest test end
-------------------------
TopicRequest test:
{
"optype" : 4,
"topic_key" : "test",
"topic_msg" : "hello world"
}
test
hello world
4
TopicRequest test end
------------------------------
ServiceRequest test:
{
"host" :
{
"host_ip" : "127.0.0.1",
"port" : 8080
},
"method" : "Add",
"optype" : 0
}
Add
0
127.0.0.1
8080
ServiceRequest test end
------------------------------
RpcResponse test:
{
"rcode" : 0,
"result" : 33
}
0
33
RpcResponse test end
------------------------------
TopicResponse test:
{
"rcode" : 0
}
0
TopicResponse test end
------------------------------
ServiceResponse test:
{
"host" :
[
{
"host_ip" : "127.0.0.1",
"port" : 8080
},
{
"host_ip" : "127.0.0.1",
"port" : 9090
},
{
"host_ip" : "127.0.0.1",
"port" : 9999
}
],
"method" : "Add",
"optype" : 1,
"rcode" : 0
}
0
1
Add
127.0.0.1:8080
127.0.0.1:9090
127.0.0.1:9999
ServiceResponse test end
------------------------------
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 项目实现 -抽象消息类实现篇 做了一个较为详细的介绍,不知道对你a有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~