当前位置: 首页 > news >正文

应用层自定义协议序列与反序列化

目录

一、网络版计算器

二、网络版本计算器实现

2.1源代码

2.2测试结果


一、网络版计算器

应用层定义的协议:

应用层进行网络通信能否使用如下的协议进行通信呢?

在操作系统内核中是以这种协议进行通信的,但是在应用层禁止以这种协议进行通信,原因如下:

  1. 我们写的服务端程序是在Linux系统上运行的,但是客户端程序不一定是在Linux系统上运行的,可能是在Windos系统上运行的,服务端在不同平台上运行,不同的平台下内存对齐等等可能不同,那么对同一个结构体定义出来的结构体变量大小可能不同。还有服务端程序是由C++语言写的,客户端程序可能是由其他语言写的,如:Java语言、Python语言写的,那么类型大小可能不一样。
  2. 即使对内存对齐问题做出解决,但是我们的程序随时随地根据需求需要做出更改,那么之前写的协议就需要做出改变,重新进行内存等问题的处理,重新进行测试,出现问题的可能性很大,这样写出来的代码可扩展性很差。

操作系统内核能够以这种方式进行通信的原因:操作系统都是用C语言写的,操作系统一旦写好,基本不会做改变,使用这种协议进行本地通信实现起来简单。

哪如何进行网络通信呢?

第一种约定方案:客户端发送一个形如"1+1"的字符串,中间不存在空格

第二种约定方案:定义结构体来表示我们需要交互的信息, 发送数据时将这个结构体按照一个规则转换成字符串, 接收到数据的时候再按照相同的规则把字符串转化回结构体。这个过程叫做 “序列化” 和 "反序列化。

  • 传输层和网络层是操作系统部分,数据链路层对应的是驱动。
  • 一个fd代表一个链接,一个链接有两个缓冲区,read()、write()、send()、recv()这些函数本质是拷贝函数,将内容拷贝到换冲区,从缓冲区中拷贝内容。
  • 发送数据的本质:是从发送方的发送缓冲区把数据通过协议栈和网络拷贝给接受方的接收缓冲区,这也就是为什么TCP协议是全双工通信模式。
  • 应用层将数据拷贝给传输层,数据什么时候发、一次发多少、出错了怎么办应用层不用管由操作系统来决定,这也就是TCP称为传输控制协议的原因。
  • 传输数据过程其实就是生产消费模型,发送和接收缓冲区就是临界资源,read()阻塞就是接收缓冲区是空的,write()阻塞就是发送缓冲区满了。

二、网络版本计算器实现

序列化和反序列化的工具:Jsoncpp用于将数据序列化为字符串的C++库

安装Jsoncpp库

ubuntu: sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel

  • 对于系统头文件默认是在/usr/include目录下找,要想使用必须这样包含头文件

     #include <jsoncpp/json/json.h>
    

【第一次测试】

这个报错是链接报错,gcc、g++默认是认识C/C++库,但不认识第三方提供的库,在makefile文件中添加

-ljsoncpp,告诉编译器指定到那个库中去找。

2.1源代码

SeverMain.cc(服务端入口)

#include <iostream>
#include <functional>#include "Sever.hpp"
#include "IoService.hpp"
#include "Calculate.hpp"int main()
{Scream();uint16_t port = 8888;Calculate cal;IoService ioService(std::bind(&Calculate::Operation, &cal, std::placeholders::_1));Sever sever(std::bind(&IoService::IoExecute, &ioService, std::placeholders::_1, std::placeholders::_2),port);sever.Init();sever.Loop();return 0;
}

Sever.hpp(服务端)

#pragma once
#include <functional>#include "Socket.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include "IoService.hpp"using namespace socket_n;
const static int gport = 8888;
using ioService_t = std::function<void(SockPtr, InetAddr &)>; // 可调用对象的类型,Io业务处理类型class Sever
{
public:Sever(ioService_t ioService, uint16_t port = gport): _port(port), _ifRunning(false), _listenSocket(), _ioService(ioService){}void Init(){// 创建监听套接字、绑定、设置监听状态_listenSocket.CreatListenSocket(_port);}struct ThreadData{ThreadData(SockPtr sockfd, InetAddr addr, Sever *pSever): _sockfd(sockfd), _addr(addr), _pSever(pSever){}SockPtr _sockfd;InetAddr _addr;Sever *_pSever;};void Loop(){_ifRunning = true;while (_ifRunning){InetAddr client_addr;SockPtr SockfdSmartPtr = _listenSocket.Accept(client_addr);if (SockfdSmartPtr == nullptr){LOG(WARNING, "accept error\n");continue;}LOG(INFO, "get a new link, client info: %s\n", client_addr.AddrStr().c_str());// 多线程处理业务pthread_t tid;// 智能指针和继承???????ThreadData *td = new ThreadData(SockfdSmartPtr, client_addr, this); // td必须是动态开辟出来的pthread_create(&tid, nullptr, Execute, td);}_ifRunning = false;}static void *Execute(void *args){pthread_detach(pthread_self()); // 将自己和主线程分离ThreadData *td = static_cast<ThreadData *>(args);td->_pSever->_ioService(td->_sockfd, td->_addr); // 回调交互业务函数,执行交互业务td->_sockfd->Close();                            // 业务处理完,也不再进行网络通信,关闭该套接字delete td;return nullptr;}~Sever(){}private:bool _ifRunning;uint16_t _port;TcpSocket _listenSocket;ioService_t _ioService;
};

Socket.hpp(套接字)

#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <functional>
#include <memory>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>#include "Log.hpp"
#include "InetAddr.hpp"namespace socket_n
{// 父类提供接口,子类实现接口const static int gsockfd = -1;const static int gbacklog = 8;enum{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR};class Socket;using SockPtr = std::shared_ptr<Socket>; // 父类中也用到了,所以在父类前面声明class Socket{public:virtual void CreatSocket() = 0;virtual void BindSocket(uint16_t port) = 0;virtual void SetListenStatus(int balcklog = gbacklog) = 0;virtual SockPtr Accept(InetAddr &client_addr) = 0;virtual bool Connect(const std::string &sever_ip, uint16_t sever_port) = 0;virtual int Sockfd() = 0;virtual void Close() = 0;virtual ssize_t Recv(std::string &out) = 0;virtual ssize_t Send(const std::string &in) = 0;// 创建监听套接字void CreatListenSocket(uint16_t port){CreatSocket();BindSocket(port);SetListenStatus();}// 客户端套接字bool CreatClientSocket(const std::string &sever_ip, uint16_t sever_port){CreatSocket();return Connect(sever_ip, sever_port);}};class TcpSocket : public Socket{public:TcpSocket(int sockfd = gsockfd): _sockfd(sockfd){}TcpSocket(TcpSocket &socket): _sockfd(socket._sockfd){}void CreatSocket() override{_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(FATAL, "socket creat error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket creat success,sockfd: %d\n", _sockfd);}void BindSocket(uint16_t port) override{struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");}void SetListenStatus(int balcklog) override{if (listen(_sockfd, balcklog) < 0){LOG(FATAL, "set listen error\n");exit(LISTEN_ERROR);}LOG(INFO, "listen success\n");}SockPtr Accept(InetAddr &client_addr) override{struct sockaddr_in client;socklen_t len = sizeof(client);int sockfd = accept(_sockfd, (struct sockaddr *)&client, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");return nullptr; // 返回一个用nullptr构造的智能指针}client_addr = InetAddr(client);LOG(INFO, "get a new link, client info: %s\n", client_addr.AddrStr().c_str());return std::make_shared<TcpSocket>(sockfd); // C++14}bool Connect(const std::string &sever_ip, uint16_t sever_port) override{struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(sever_port);::inet_pton(AF_INET, sever_ip.c_str(), &server.sin_addr);int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));if (n < 0){return false;}return true;}int Sockfd() override{return _sockfd;}void Close() override{if (_sockfd > 0)close(_sockfd);}ssize_t Recv(std::string &out) override{char inbuffer[1024];ssize_t n = recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);if (n > 0){inbuffer[n] = 0;out = inbuffer;}return n;}ssize_t Send(const std::string &in) override{return send(_sockfd, in.c_str(), in.size(), 0);}private:int _sockfd;};
}

IoSever.hpp(IO交互)

#pragma once
#include <memory>
#include <functional>#include "InetAddr.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include "Protocol.hpp"using namespace socket_n;
using process_t = std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;
class IoService
{
public:IoService(process_t process): _process(process){}void IoExecute(SockPtr sockPtr, InetAddr &addr){std::string packageStream;std::shared_ptr<Request> req_ptr = std::make_shared<Request>();std::string recvMessage;std::string sendMessage;while (true){// 读内容ssize_t n = sockPtr->Recv(recvMessage);// 数据读取出错或者没数据了,直接break,关闭套接字,停止对该客户端的服务// 客户端可以重新建立连接if (n <= 0){LOG(ERROR, "read error or rend end: %s\n", addr.AddrStr().c_str());break;}packageStream += recvMessage;// 提取一个完整的报文std::string jsonStr = DeCode(packageStream);if (jsonStr == "")continue;// 反序列化req_ptr->Deserialize(jsonStr);// 计算auto res_ptr = _process(req_ptr);// 序列化res_ptr->Serialize(sendMessage);// 添加报头sendMessage = EnCode(sendMessage);// 向套接字中写sockPtr->Send(sendMessage);}}~IoService(){}private:process_t _process;
};

Protocol.hpp(计算器协议)

#pragma once
#include <string>
#include <jsoncpp/json/json.h>// 应用层协议完整的报头+报文格式
//"len\r\n{jsonStr}\r\n" —— "len":报文的长度;"\r\n":区分len和报文
// 使用"\r\n"作为分割原因是在Debug测试时打印出来的结果:一行是len、一行是报文的形式,也可以用其他作为分割
//"{jsonStr}\r\n"能不能只以"\r\n"作为分割呢?
// 不能因为报文中有可能也有\r\n,如果查找\r\n,取其前面的内容作为报文是不行的
static const std::string sep = "\r\n";
// 添加报头
std::string EnCode(const std::string &jsonStr)
{int len = jsonStr.size();std::string lenStr = std::to_string(len);return lenStr + sep + jsonStr + sep;
}
//"le"、"len\r"
//"len\r\n{js"
//"len\r\n{jsonStr}\r\nlen\r\n{js"
// 从传的流中提取出一个完整的报文,如果流中没有一个完整的报文返回空,如果有返回一个完整的报文字符串
std::string DeCode(std::string &packageStream)
{size_t n = packageStream.find(sep);if (n == std::string::npos)return "";std::string numStr = packageStream.substr(0, n);int len = std::stoi(numStr);// 当前这个完整报文的长度int total = numStr.size() + len + 2 * sep.size();if (packageStream.size() < total)return "";int pos = n + sep.size();std::string jsonStr = packageStream.substr(pos, len);// 提取到一个完整的jsonStr串,删掉packageStream中头部字段packageStream.erase(0, total);return jsonStr;
}// 请求
class Request
{
public:Request(int x = 1, int y = 1, char oper = '+'): _x(x), _y(y), _oper(oper){}// 序列化:将结构化字段转换成字符串void Serialize(std::string &out){// 先定义一个中间值,Value类型的对象rootJson::Value root;root["x"] = _x; // 一个键值对应一个value值,value可以是任意类型,包括对象root["y"] = _y;root["oper"] = _oper;// 再定义一个FastWriter类型的对象,使用其内部的write()方法将中间值root序列化成字符串Json::FastWriter writer;std::string s = writer.write(root);out = s;}// 反序列化:将字符串转换成结构化字段bool Deserialize(std::string &in){Json::Value root;// 将序列化后的字符串,转化成Value的中间值Json::Reader reader;bool res = reader.parse(in, root);if (res){_x = root["x"].asInt(); // 将root中的"x"对应的value值作为正数给给_x_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}return false;}int RetX(){return _x;}int RetY(){return _y;}char RetOper(){return _oper;}~Request(){}// private:int _x;int _y;char _oper;
};// 回应
class Response
{
public:Response(): _result(2), _code(0), _describe("Calculation successful"){}// 序列化void Serialize(std::string &out){// 先定义一个中间值,Value类型的对象rootJson::Value root;root["result"] = _result; // 一个键值对应一个value值,value可以是任意类型,包括对象root["code"] = _code;root["describe"] = _describe;// 再定义一个FastWriter类型的对象,使用其内部的write()方法将中间值root序列化成字符串Json::FastWriter writer;std::string s = writer.write(root);out = s;}// 反序列化bool Deserialize(std::string &in){Json::Value root;// 将序列化后的字符串,转化成Value的中间值Json::Reader reader;bool res = reader.parse(in, root);if (res){_result = root["result"].asInt(); // 将root中的"x"对应的value值作为正数给给_x_code = root["code"].asInt();_describe = root["describe"].asString();return true;}return false;}~Response(){}// private:int _result;int _code;             // 结果码0:计算成功 1:除0错误 2:其他非法操作std::string _describe; // 对结果码的描述
};

Calculate.hpp(计算功能)

#pragma once
#include <memory>
#include <climits>#include "Protocol.hpp"class Calculate
{
public:
Calculate()
{}
std::shared_ptr<Response> Operation(std::shared_ptr<Request> reqPtr)
{int x = reqPtr->RetX();int y = reqPtr->RetY();char oper = reqPtr->RetOper();std::shared_ptr<Response> resPtr = std::make_shared<Response>();switch (oper){case '+':{resPtr->_result = x + y;break;}case '-':{resPtr->_result = x - y;break;}case '*':{resPtr->_result = x * y;break;}case '/':{if (y == 0){resPtr->_result = INT_MAX;resPtr->_code = 1;resPtr->_describe = "Division by zero error";}else{resPtr->_result = x / y;}break;}default:{resPtr->_result = INT_MIN;resPtr->_code = 2;resPtr->_describe = "Other illegal operations";break;}}return resPtr;
}
~Calculate()
{}private:
};

Log.hpp(日志)

#pragma once
#include <string>
#include <ctime>
#include <fstream>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h>
#include <pthread.h>#define SCREAM 1
#define FILE 2
enum 
{DEBUG = 1,INFO,WARNING,ERROR,FATAL
};
//日志消息 [日志等级][pid][filename][filenumber][time] 日志内容
class LMessage
{
public:std::string _level;//信息等级pid_t _pid;//进程idstd::string _filename;//所在文件int _filenumber;//所在文件行号std::string _cur_time; //打印时间std::string _message_info;//日志内容
};
//获取当前时间
std::string GetCurTime()
{//从过去的那一个时刻到现在累计的秒数time_t now = time(nullptr);//将时间戳转换成年月日时分秒的字符串struct tm* cur_time = localtime(&now);char buffer[100];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d", cur_time->tm_year+1900,cur_time->tm_mon+1,cur_time->tm_mday,cur_time->tm_hour,cur_time->tm_min,cur_time->tm_sec);return buffer;
}pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;
const std::string g_file = "./log.txt";
//日志
class Log
{
private:void FlushToScream(LMessage& message){//对指定的锁进行加锁pthread_mutex_lock(&mutex1); //[日志等级][pid][filename][filenumber][time] 日志内容printf("[%s][%d][%s][%d][%s]%s",message._level.c_str(), message._pid,message._filename.c_str(),message._filenumber,message._cur_time.c_str(),message._message_info.c_str());//对指定的锁进行解锁pthread_mutex_unlock(&mutex1);}void FlushToFile(LMessage& message){//对指定的锁进行加锁pthread_mutex_lock(&mutex1); //std::ofstream out(_file)//每次写入前先前会先清空文件std::ofstream out(_file, std::ios::app);//向文件追加写入if(!out.is_open())return ;char buffer[1024];snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s]%s",message._level.c_str(), message._pid,message._filename.c_str(),message._filenumber,message._cur_time.c_str(),message._message_info.c_str());out.write(buffer, strlen(buffer));out.close();//对指定的锁进行解锁pthread_mutex_unlock(&mutex1);}void Fussh(LMessage& message){if(_type == SCREAM){//向显示器中打印FlushToScream(message);}else if(_type == FILE){//向文件中打印FlushToFile(message);}}
public:Log()//默认是向显示器中打印:_type(SCREAM),_file(g_file){}//为了方便将等级名称输出到文件/显示屏中,将数字转换成字符串const std::string LevelToString(int level){switch(level){case DEBUG:return "DEBUG";case INFO:return "INFO";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOW";//表示未知的}}//通过log.LogMessage("xx",12,INFO,"ctear %d thread success",pid)这种形式将消息写进日志中//通过外部传一个消息,由于外部有时会需要传一个可变参数,message_info在传的时候需要设计成可变参数void LogMessage(const std::string filename, int filenumber, int level, const char* format, ...){LMessage message;message._level = LevelToString(level);message._pid = getpid();message._filename = filename;message._filenumber = filenumber;message._cur_time = GetCurTime();//定义一个apva_list ap;//初始化ap,让ap指向可变参数va_start(ap,format);char info[1024];//将格式化形式传进来,可变参数传进来,自动将转换成字符串放到字符数组中vsnprintf(info, sizeof(info), format, ap);//销毁apva_end(ap);message._message_info = info;//将消息写入到文件或显示器;Fussh(message);}    ~Log(){pthread_mutex_destroy(&mutex1);pthread_mutex_destroy(&mutex2);}void FlusshScream(){_type = SCREAM;}void FlusshFile(){_type = FILE;}
private:int _type;const std::string _file;
};Log lg;
#define LOG(level, format, ...) do{ lg.LogMessage(__FILE__, __LINE__, level, format, ##__VA_ARGS__); }while(0)
#define Scream() do{ lg.FlusshScream(); }while(0)
#define File() do{ lg.FlusshFile(); }while(0)

InetAddr.hpp

#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>class InetAddr
{
public:InetAddr(){}InetAddr(const struct sockaddr_in &addr) : _addr(addr){_port = ntohs(addr.sin_port);char ip_buf[32];::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}bool operator==(const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr;
};

ClientMain.cc(客户端入口)

#include <iostream>
#include <string>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>#include "Socket.hpp"
#include "Protocol.hpp"
#include "InetAddr.hpp"using namespace socket_n;int main(int argc, char *argv[])
{if (argc != 3){std::cout << argv[0] << "serve-ip serve-port" << std::endl;exit(0);}std::string severIp = argv[1];uint16_t severPort = std::stoi(argv[2]);// 1.创建socket链接远端服务器SockPtr socketPtr = std::make_shared<TcpSocket>();if (!socketPtr->CreatClientSocket(severIp, severPort)){std::cout << "creat or connect error" << std::endl;exit(1);}std::shared_ptr<Request> reqPtr = std::make_shared<Request>();std::shared_ptr<Response> resPtr = std::make_shared<Response>();std::string packageStream;while (true){// 设置requestreqPtr->_x = 888;reqPtr->_y = 1;reqPtr->_oper = '*';// 序列化std::string sendMessage;reqPtr->Serialize(sendMessage);// 添加报头sendMessage = EnCode(sendMessage);// 向套接字中写socketPtr->Send(sendMessage);while (true){// 接收信息std::string recvMessage;ssize_t n = socketPtr->Recv(recvMessage);if (n <= 0)break;packageStream += recvMessage;// 提取一个完整的报文std::string jison = DeCode(packageStream);if (jison == "")continue;std::cout << "jison string: " << std::endl;std::cout << jison << std::endl;// 反序列化resPtr->Deserialize(jison);// 打印resultstd::cout << "result: " << resPtr->_result<< "code: " << resPtr->_code<< "describe: " << resPtr->_describe << std::endl;}sleep(2);}socketPtr->Close();return 0;
}

2.2测试结果

相关文章:

  • 数据赋能(209)——质量管理——时效性原则
  • 模型测试报错:有2张显卡但cuda.device_count()显示GPU卡数量只有一张
  • 昇腾的CANN是什么?跟英伟达CUDA的有什么联系和区别?【浅谈版】
  • 智能决策支持系统的系统结构:四库架构与融合范式
  • P1537 数字反转(升级版)详解
  • 【unity游戏开发入门到精通——UGUI】整体控制一个UGUI面板的淡入淡出——CanvasGroup画布组组件的使用
  • 深入探索 AAC 编码原理与 ADTS 格式:音频世界的智慧结晶
  • MCP多智能体消息传递机制(Message Passing Between Agents)
  • 注入内部Bean
  • 数据结构---
  • Scrapy框架之【settings.py文件】详解
  • Xilinx FPGA | 管脚约束 / 时序约束 / 问题解析
  • Qwen3:快慢思考融合,一键启停
  • 【Vue】性能优化与调试技巧
  • ipvsadm,是一个什么工具?
  • MySQL 中日期相减的完整指南
  • 【赵渝强老师】TiDB生态圈组件
  • 如何优化MySQL主从复制的性能?
  • 130. 被围绕的区域
  • 使用DeepSeek协助恢复历史数据
  • 多地晒五一假期前两日成绩单,湖南单日客流同比增长逾三成
  • 澳大利亚大选今日投票:聚焦生活成本与“特朗普问题”
  • “五一”假期国铁集团计划日均开行旅客列车超1.2万列
  • 西湖大学2025年上海市综合评价招生简章发布
  • 视频丨中国海警位中国黄岩岛领海及周边区域执法巡查
  • “80后”商洛市委副书记、市政府党组副书记赵孝任商洛市副市长