【Linux笔记】网络部分——应用层自定义协议与序列化
35.应用层自定义协议与序列化
文章目录
- 35.应用层自定义协议与序列化
- 应用层协议的概念与实现
- 再谈协议
- 网络版计算器
- 结构化数据与网络通信的关系
- 序列化和反序列化
- TCP套接字的缓冲区机制
- 套接字缓冲区结构理解难点
- TCP与UDP数据传输机制对比
- 网络计算器实现与代码结构
- Protocol.hpp
- TcpServer.hpp
- TcpServer.cc
- TcpClient.cc
应用层协议的概念与实现
再谈协议
协议是⼀种 “约定”. socket api
的接⼝, 在读写数据时, 都是按 “字符串” 的⽅式来发送接收的。程序员编写的解决实际问题和满足需求的网络程序都在应用层。
在网络通信中,客户端和服务器之间需要传输结构化的数据,这些数据通常以结构体或类的形式表示。例如,一个结构体可能包含三个成员:头像路径(string pass)、昵称(string nike name)和消息内容(string message)。这些数据需要被转化为一个大字符串进行网络传输,接收方再将这个大字符串反向转化为结构体。这种双方约定好的结构化数据被称为协议。
协议的本质是双方对数据格式的共识,确保发送和接收的数据能够被正确解析和处理。在网络基础中,我们已经讨论过协议的概念。
网络版计算器
现在,我们以网络版本的计算器为例,说明协议的实现方式。
- 客户端可以发送形如"一加一"的字符串,其中包含两个操作数和一个运算符,且数字和运算符之间没有空格。服务器收到这样的字符串后,按照约定进行解析和处理。这种方案要求客户端和服务器严格遵守约定的格式。
- 另一种方案是直接定义一个结构体,包含整型变量
x
和y
,以及字符变量oper
。客户端可以将这个结构体以二进制形式直接发送给服务器。由于客户端和服务器使用相同的结构化数据定义,服务器可以正确读取和解析二进制数据。然而,这种方法存在兼容性问题,例如客户端和服务器可能运行在不同位数的系统上(32位或64位),或者使用不同的对齐方式,导致结构体大小不一致。因此,直接传输二进制结构体的方法虽然可行,但对客户端和服务器的要求较高,需要考虑内存对齐、平台兼容性等问题。
结构化数据与网络通信的关系
上层软件在处理消息时,需要对消息进行管理,因此需要结构化的数据。结构化的数据可以是类或结构体,它们能够清晰地描述消息的组成部分。在网络通信中,发送方将结构化的数据序列化为字符串,接收方则将字符串反序列化为结构化的数据。这一过程确保了数据在网络传输中的完整性和上层处理的便捷性。网络层(如TCP)并不关心发送的具体内容,它只负责传输字节流。真正的数据解释由上层软件完成,因此序列化和反序列化是连接上层软件和网络层的关键环节。协议的定义是双方共同认可的结构化数据格式,序列化和反序列化则是实现协议的具体手段
序列化和反序列化
在网络通信中,为了避免直接传输二进制结构体带来的兼容性问题,通常采用序列化和反序列化的方法。
-
序列化是将多个字符串合并为一个大”字符串“的过程,而反序列化是将这个大字符串重新拆分为原始数据结构的过程。
-
例如,在聊天场景中,一条消息可能包含内容、时间和昵称三个部分。这三个部分可以分别作为字符串发送,但更好的方法是将它们合并为一个大的字符串,中间用空格分隔,如"你好 时间 新时代好青年"。这个大字符串通过网络发送给对方后,对方再按照约定的格式将其拆分为原始的三个字符串。这个过程称为序列化和反序列化。
-
序列化的优点是可以避免二进制数据的兼容性问题,因为字符串格式在不同平台和语言中具有更好的一致性。此外,序列化后的数据更易于处理和调试。反序列化则是将序列化后的数据恢复为原始结构的过程。
TCP套接字的缓冲区机制
TCP套接字在操作系统内部会创建两个缓冲区:发送缓冲区和接收缓冲区。每个TCP套接字都有自己的一对缓冲区,主机A和主机B各自维护自己的发送和接收缓冲区。
- 发送数据时,用户层的数据通过文件描述符传递给操作系统,实际上是将其拷贝到发送缓冲区中,而不是直接发送到网络。操作系统会根据TCP/IP协议栈的规则将数据封装并发送。数据的刷新和发送时间由操作系统自主决定,类似于文件操作中数据从文件缓冲区刷新到磁盘的过程。
- 接收数据时,发送方的数据通过网络传输到接收方的接收缓冲区。如果接收缓冲区为空,读取操作(如read或receive)会阻塞应用层进程;一旦接收缓冲区中有数据,操作系统会将数据拷贝到套接字描述符对应的缓冲区中,供应用程序读取。
- 因此,网络数据的发送和接收本质上是数据的拷贝过程。TCP的全双工特性意味着数据可以同时在两个方向上传输,每个方向都有独立的发送和接收缓冲区。UDP协议则不存在这种缓冲区机制,因为UDP是无连接的协议。
这种机制类似于文件操作中数据从用户缓冲区拷贝到文件缓冲区的过程。TCP的流式特性要求应用程序正确处理数据的边界问题,因为TCP不保证数据包的完整性,而是以字节流的形式传输数据。因此,应用程序需要通过协议设计或额外的逻辑来确保数据的正确解析。
套接字缓冲区结构理解难点
- 部分学习者对单个文件描述符包含两个缓冲区的概念存在理解困难。具体表现为难以接受一个套接字文件描述符(sock fd)同时管理发送和接收两个队列的事实。这种困惑源于对Linux文件系统实现机制的不完全理解。在Linux系统中,每个文件描述符都关联一个file结构体,而网络套接字的file结构体中包含一个private_data指针,这个void类型指针可以指向任意数据结构。当创建网络套接字时,操作系统内部会生成一个socket结构体,private_data指针就会指向这个特定的socket结构体。对于普通文件,这个字段通常不使用,但对于网络文件,它会指向预先定义好的socket对象。
- Linux内核中套接字的实现采用多层结构设计。最上层的file结构体通过private_data指针指向socket结构体,socket结构体又包含指向sock结构的指针。sock结构是网络协议栈中的核心数据结构,负责管理具体的网络通信细节。这种层级关系表现为:进程的文件描述符表项指向file结构体,file结构体的private_data指向socket结构体,socket结构体再通过指针关联到sock结构体。
- 报文管理的方法:客户端向服务器发送消息时,服务器会收到来自客户端的报文。服务器可能同时接收来自数百甚至上千个客户端的报文,这些报文在服务器内部协议栈中处于不同处理阶段,有的刚被接收,有的在网络层,有的在传输层,还有的尚未交付给上层应用。操作系统内部会存在大量未被交付的报文,需要对这些报文进行管理。管理报文的方法是先描述再组织,使用结构体描述报文对象,这个结构体在内核中被称为
sk_buff
。sk_buff
代表socket
缓冲区,用于描述一个报文的结构体,包含指向缓冲区头尾的指针,以及指向有效数据的指针。- 操作系统通过链表管理所有的
sk_buff
,将报文管理转化为对链表的管理。每个报文对应一个sk_buff
结构体,操作系统内有十个报文就对应十个sk_buff,再将这些sk_buff连接起来。sk_buff结构体包含描述报文的结构和缓冲区,缓冲区中存储报头和数据请求。通过hand指针指向缓冲区头部,end指针指向缓冲区结尾,date指针指向有效数据,操作系统可以高效管理缓冲区。操作系统对报文的管理实际上是对sk_buff链表的管理。 - 运行时:文件描述符帮助找到对应的socket,将报文放入接收队列或发送队列。TCP协议提供接收和发送缓冲区,这些缓冲区在操作系统内部实现时通过sk_buff链表管理。上层应用发送数据时,实际上是将数据拷贝到下层,包装成sk_buff后添加到对应socket的链表中,放入发送队列,由操作系统负责后续发送工作。网络发送的本质是数据拷贝而非直接发送。
Linux 2.6.18版本内核源码:
一句话总结:TCP协议支持全双工通信,这是因为TCP套接字在底层通过文件系统实现,双方各自拥有独立的接收和发送缓冲区。
TCP与UDP数据传输机制对比
TCP协议在数据传输过程中不关心数据的具体内容和结构,只负责将接收到的字节流发送给对方。当发送缓冲区有20个字节数据而接收缓冲区只剩10个字节空间时,TCP会根据流量控制机制只发送10个字节。上层应用读取数据时可能只获取到部分报文,需要应用层自行处理报文完整性。TCP被称为面向字节流的协议,因为它不对报文进行任何完整性处理,完全由应用层保证。UDP则完全不同,它要求操作系统必须将整个报文完整发送,不允许部分发送。UDP被称为面向数据报的协议,就像快递必须完整送达不能只送半个包裹。TCP则类似于自来水供应,只负责输送水流,用户自行决定如何接收和使用。应用层需要自行处理TCP传输中可能出现的报文不完整问题,比如读取到半条消息时需要等待后续数据拼接完整。这种应用层需要处理的数据包粘包问题只存在于TCP协议中,UDP不存在这个问题
为解决报文粘包问题,我们提出了两种方案:使用特殊字符分隔和添加长度前缀。第一种方案利用不会出现在运算表达式中的特殊字符(如换行符)来分隔不同报文,可以按行读取处理。第二种更优方案是在报文前添加head_len字段表示有效载荷长度,并用空格和换行符增强可读性。
网络计算器实现与代码结构
实现网络版本的计算器,客户端发送算式给服务器,服务器计算之后发送回客户端。与之前不同的是我们需要定制协议并使用JSONCPP
对数据进行序列化和反序列化。之前没有明显对数据进行序列化和反序列化,但其实也约定了类似的协议如按行读取等。
网络版本的计算器实现需要定制协议和计算器模块。协议定义在protocol.hpp中,计算器模块定义在calculator.hpp中。计算器提供的服务将被整合到TCP服务器的回调中,当数据到达时,服务器将数据回调给计算器处理。协议的定义包括请求和响应类,请求类包含x、y和操作符,响应类包含结果和错误码。
所有工程文件:
博客中只列出重要文件,其他文件在之前的博客有实现
Protocol.hpp
#pragma once#include <iostream>
#include <string>
#include <sstream>
#include <jsoncpp/json/json.h>
#include "Log.hpp"using namespace My_Log;const std::string sep = "\r\n";bool Encode(std::string &message)
{int size = message.size();std::string tmp = std::to_string(size) + sep + message + sep;message = tmp;return true;
}bool Decode(std::string &package, std::string *message)
{int pos = package.find(sep);if (pos == std::string::npos){return false;}std::string msglen_str = (package.substr(0, pos));int msglen = std::stoi(msglen_str);int full_msglen = msglen_str.size() + msglen + 2 * sep.size();if (package.size() < full_msglen){return false;}*message = package.substr(pos + sep.size(), msglen);package.erase(0, full_msglen);return true;
}class Request
{
public:Request(int x = 0, int y = 0, char oper = 0) : _x(x), _y(y), _oper(oper){}bool Serialize(std::string &out_string){Json::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StreamWriterBuilder wbuilder;std::unique_ptr<Json::StreamWriter> write(wbuilder.newStreamWriter());std::stringstream ss;write->write(root, &ss);out_string = ss.str();return true;}bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool n = reader.parse(in_string, root);if (!n){LOG(LogLevel::WARNING) << "Failed to parse JSON: " << reader.getFormattedErrorMessages();return false;}_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}int GetX() const { return _x; }int GetY() const { return _y; }char GetOper() const { return _oper; }~Request() {}private:int _x;int _y;char _oper;
};class Response
{
public:Response(int result = 0, int code = 0) : _result(result), _code(code){}Response(const Response& response){_result = response.GetResult();_code = response.GetCode();}bool Serialize(std::string &out_string){Json::Value root;root["result"] = _result;root["code"] = _code;Json::StreamWriterBuilder wbuilder;std::unique_ptr<Json::StreamWriter> write(wbuilder.newStreamWriter());std::stringstream ss;write->write(root, &ss);out_string = ss.str();return true;}bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool n = reader.parse(in_string, root);if (!n){LOG(LogLevel::WARNING) << "Failed to parse JSON: " << reader.getFormattedErrorMessages();return false;}_result = root["result"].asInt();_code = root["code"].asInt();return true;}void SetResult(int result) { _result = result; }void SetCode(int code) { _code = code; }int GetResult() const { return _result; }int GetCode() const { return _code; }~Response() {}private:int _result;int _code;
};
- JSONCPP的使用可以查找资料,这里主要是思路,关于使用JSON实现序列化和反序列化接口,只需要会用即可,不清楚可以查询大模型。
TcpServer.hpp
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <pthread.h>
#include <functional>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>#include "Common.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"using namespace My_Log;
using namespace My_ThreadPool;const in_port_t defport = 8080;
const std::string defip = "127.0.0.1";using handtask_t = std::function<void()>;
using task_t = std::function<std::string(std::string&)>;class TcpServer
{
private:class ThreadData{public:int sockfd;TcpServer *self;};void HanderRequest(int sockfd){while (true){char recvbuf[1024] = {0};std::string package;int n = recv(sockfd, recvbuf, sizeof(recvbuf) - 1, 0);if (n == 0){LOG(LogLevel::INFO) << "client close";break;}else if (n < 0){LOG(LogLevel::WARNING) << "recv: " << strerror(errno);break;}recvbuf[n] = '\0';LOG(LogLevel::INFO) << "recvbuf is : " << recvbuf;package += recvbuf;std::string res = _task(package);LOG(LogLevel::INFO) << "result is : " << res;::send(sockfd, res.c_str(), res.size(), 0);}::close(sockfd);}public:TcpServer(task_t task, in_port_t port = defport, std::string ip = defip): _task(task), _listensockfd(-1), _isrunning(false), _addr(port){}void InitServer(){// 创建套接字_listensockfd = socket(AF_INET, SOCK_STREAM, 0);if (_listensockfd < 0){LOG(LogLevel::FATAL) << "socket err: " << strerror(errno);exit(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket success";// bind套接字int n = bind(_listensockfd, _addr.NetAddr(), _addr.NetAddrLen());if (n < 0){LOG(LogLevel::FATAL) << "bind: " << strerror(errno);exit(BIND_ERR);}LOG(LogLevel::INFO) << "bind success";// 监听套接字n = listen(_listensockfd, 4);if (n < 0){LOG(LogLevel::FATAL) << "listen: " << strerror(errno);exit(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen success";}void Start(){_isrunning = true;while (true){struct sockaddr_in cliaddr;socklen_t len = sizeof(cliaddr);int sockfd = accept(_listensockfd, CONV(&cliaddr), &len);if (sockfd < 0){LOG(LogLevel::WARNING) << "accept: fail";}LOG(LogLevel::INFO) << "accept success, sockfd is : " << sockfd;// 打印客户信息InetAddr cli(cliaddr);LOG(LogLevel::INFO) << "client info is : " << cli.GetStrAddr();// 线程池版本ThreadPool<handtask_t>::getinstance()->Equeue([sockfd, this](){ this->HanderRequest(sockfd); });}}~TcpServer(){}private:int _listensockfd;InetAddr _addr;bool _isrunning;task_t _task;
};
TcpServer.cc
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include "TcpServer.hpp"
#include "Calculator.hpp"
#include "Protocol.hpp"using namespace My_Log;using cal_t = std::function<Response(const Request &req)>;class Prase
{
public:Prase(cal_t cal) : _cal(cal) {}std::string Entry(std::string &package){std::string messag;std::string resstr;// 解码 len\r\n{json}\r\nwhile (Decode(package, &messag)){// 判空if (messag.empty())break;// 反序列化Request req;if (!req.Deserialize(messag))break;// 计算Response res = _cal(req);// 序列化std::string tmp;res.Serialize(tmp);// 编码if (!Encode(tmp))break;resstr += tmp;}return resstr;}~Prase() {}private:cal_t _cal;
};int main()
{Calculator cal;Prase pra([&cal](const Request &req){ return cal.Execute(req); });std::unique_ptr<TcpServer> ser_ptr = std::make_unique<TcpServer>([&pra](std::string &package){ return pra.Entry(package); });ser_ptr->InitServer();ser_ptr->Start();ser_ptr->~TcpServer();return 0;
}
TcpClient.cc
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Common.hpp"
#include "Protocol.hpp"int main(int argc, char* argv[])
{//获取服务器信息if(argc != 3){std::cout << "Usage: " << argv[0] << " <server_ip> <server_port>" << std::endl;return 1;}std::string ser_ip = argv[1];int ser_port = std::stoi(argv[2]);//创建套接字int sockfd = socket(AF_INET, SOCK_STREAM, 0);if(sockfd < 0){std::cerr << "socket err" << std::endl;return 2;}struct sockaddr_in ser_addr;ser_addr.sin_family = AF_INET;ser_addr.sin_port = htons(ser_port);ser_addr.sin_addr.s_addr = inet_addr(ser_ip.c_str());int n = connect(sockfd, CONV(&ser_addr), sizeof(ser_addr));if(n < 0){std::cerr << "connect err" << std::endl;return 3;}//循环发送接收逻辑std::string message;while(true){char serbuff[1024];int x,y;char oper;std::cout << "input x# ";std::cin >> x;std::cout << "input y# ";std::cin >> y;std::cout << "input oper# ";std::cin >> oper;Request req(x, y, oper);req.Serialize(message);Encode(message);//发送数据n = ::send(sockfd, message.c_str(), message.size(), 0);if(n < 0){std::cerr << "send err" << std::endl;break;}n = ::recv(sockfd, serbuff, sizeof(serbuff), 0);if(n < 0){std::cerr << "recv err" << std::endl;break;}serbuff[n] = 0;std::string package = serbuff;std::string resstr;Decode(package, &resstr);Response resp;resp.Deserialize(resstr);std::cout << "server message# " << resp.GetResult() << '[' << resp.GetCode() << ']' << std::endl;}::close(sockfd);return 0;
}
客户端与服务器的完整交互过程包括:序列化请求、encode报文、发送请求、接收应答、decode应答、反序列化应答数据。这个过程与服务器的处理流程完全一致。在代码实现上,客户端首先构建请求并序列化,然后encode添加报头,通过socket发送。服务器返回应答后,客户端需要decode解析报文,反序列化得到结构化数据response。最后可以处理response,比如打印计算结果result和状态码code。如果code为0表示结果可信,非零则表示计算出现问题。整个过程中,客户端和服务器都使用相同的协议定义,确保通信的一致性。代码编译通过后,可以进行手动测试,启动服务器和客户端进行连接测试。