当前位置: 首页 > news >正文

设计自己的小传输协议 状态机解析与封装抽象

设计自己的小传输协议 状态机解析与封装抽象

​ 笔者先直接给出代码:

C++头文件

#ifndef DATAPROTOCOLIZEDCONTROLLER_H
#define DATAPROTOCOLIZEDCONTROLLER_H
#include "DataHeader.h"
#include <QObject>
class QSaveFile;
class DataSinker;
class QElapsedTimer;class DataProtocolizedController : public QObject {Q_OBJECT
public:explicit DataProtocolizedController(DataSinker* sinker, QObject* parent = nullptr);~DataProtocolizedController();/*** @brief reset reset the protocal machines*/void reset();public slots:void sendData(int chunkSize = DataHeaderUtils::DEF_CHUNK_SZ,qint64 startOffset = 0);void onBytes(const QByteArray& bytes);
signals:// frame is ready to be sent to other placesvoid frameReady(const QByteArray& frame);// process for sendingsvoid sendProgress(quint64 sent, quint64 total);// receiving processvoid receiveProgress(const QString& fileId, quint64 received, quint64 total);// parse dead :(void protocolError(const QString& reason);// parsing deadvoid parsingBad();private:static constexpr int PARSE_TIMEOUT_MS = 5000;void sendStartFrame(quint64 fileId,const QString& name,quint64 totalSize);void sendDataFrame(quint64 fileId,quint64 offset,const QByteArray& chunk,quint64 totalSize);void sendEndFrame(quint64 fileId,quint64 totalSize);void parse();bool parseHeader();bool parseName();bool parseData();bool ensureSize(int need) const;DataHeaderUtils::DataHeader internal_header;QByteArray rx_buffer;quint64 sendFileSize { 0 };quint64 sendFileHasSent { 0 };DataHeaderUtils::CurrentState state {DataHeaderUtils::CurrentState::ReadingHeader};QString curFileIdStr; // string 形式的 fileId(方便信号里传)quint64 curReceived { 0 };DataSinker* sinker;QElapsedTimer* elasped_timeout;
};#endif // DATAPROTOCOLIZEDCONTROLLER_H

C++文件

#include "DataProtocolizedController.h"
#include "DataSinker.h"
#include <QDir>
#include <QElapsedTimer>
#include <QSaveFile>
namespace {
static constexpr quint32 MAX_NAME_LEN = 64 * 1024; // 64 KiB
static constexpr quint32 MAX_PAYLOAD_SIZE = 10 * 1024 * 1024; // 10 MiB
static constexpr quint64 MAX_TOTAL_SIZE = (1ULL << 40); //
}DataProtocolizedController::~DataProtocolizedController() {if (elasped_timeout) {delete elasped_timeout; // Clean up the elapsed timer}
}DataProtocolizedController::DataProtocolizedController(DataSinker* sinker, QObject* parent): QObject(parent), sinker(sinker), state(DataHeaderUtils::CurrentState::ReadingHeader) {if (!sinker) {throw std::invalid_argument("Sinker can not be null!");}sinker->setSinker(this);elasped_timeout = new QElapsedTimer();
}void DataProtocolizedController::sendData(int chunkSize, qint64 startOffset) {const auto fileID = sinker->id_generator();sendFileSize = sinker->size();sendStartFrame(fileID, sinker->provide_name(), sendFileSize);if (startOffset < 0 || startOffset > sendFileSize)startOffset = 0;sendFileHasSent = startOffset;while (sendFileHasSent < sendFileSize) {const qint64 bytesToRead = qMin(static_cast<qint64>(chunkSize),static_cast<qint64>(sendFileSize - sendFileHasSent));QByteArray chunk = sinker->requestChunkForEncoding(sendFileHasSent, bytesToRead);const quint64 offset= sendFileHasSent;sendDataFrame(fileID, offset, chunk, sendFileSize);sendFileHasSent += chunk.size();emit sendProgress(sendFileHasSent, sendFileSize);}sendEndFrame(fileID, sendFileSize);sinker->sendEnd();
}void DataProtocolizedController::onBytes(const QByteArray& bytes) {rx_buffer.append(bytes);parse();
}void DataProtocolizedController::sendStartFrame(quint64 fileId, const QString& name, quint64 totalSize) {QByteArray _name = name.toUtf8();DataHeaderUtils::DataHeader h {};h.header_magic = DataHeaderUtils::HEADER_MAGIC;h.protocol_version = DataHeaderUtils::HEADER_VERSION;h.current_state = static_cast<quint8>(DataHeaderUtils::OperationState::Start);// h.reserved = 0;h.fileId = fileId;h.offset = 0;h.totalSize = totalSize;h.nameLen = static_cast<quint32>(_name.size());h.payloadSize = 0;QByteArray frame = DataHeaderUtils::packUpHeader(h);frame.append(_name);emit frameReady(frame);
}void DataProtocolizedController::sendDataFrame(quint64 fileId, quint64 offset, const QByteArray& chunk, quint64 totalSize) {DataHeaderUtils::DataHeader h {};h.header_magic = DataHeaderUtils::HEADER_MAGIC;h.protocol_version = DataHeaderUtils::HEADER_VERSION;h.current_state = static_cast<quint8>(DataHeaderUtils::OperationState::Data);h.fileId = fileId;h.offset = offset;h.totalSize = totalSize;h.nameLen = 0;h.payloadSize = static_cast<quint32>(chunk.size());QByteArray frame = DataHeaderUtils::packUpHeader(h);frame.append(chunk);emit frameReady(frame);
}void DataProtocolizedController::sendEndFrame(quint64 fileId, quint64 totalSize) {DataHeaderUtils::DataHeader h {};h.header_magic = DataHeaderUtils::HEADER_MAGIC;h.protocol_version = DataHeaderUtils::HEADER_VERSION;h.current_state = static_cast<quint8>(DataHeaderUtils::OperationState::End);// h.reserved = 0;h.fileId = fileId;h.offset = totalSize;h.totalSize = totalSize;h.nameLen = 0;h.payloadSize = 0;QByteArray frame = DataHeaderUtils::packUpHeader(h);emit frameReady(frame);
}void DataProtocolizedController::parse() {if (!elasped_timeout->isValid()) {elasped_timeout->start();}bool advanced = true;while (advanced) {advanced = false;if (elasped_timeout->elapsed() > PARSE_TIMEOUT_MS) {// emit protocolError(QStringLiteral("Parsing timeout, clearing buffer and resetting state."));emit parsingBad();qDebug() << "Parsing bad!";rx_buffer.clear();elasped_timeout->restart(); // 超时后重新计时state = DataHeaderUtils::CurrentState::ReadingHeader;return;}switch (state) {case DataHeaderUtils::CurrentState::ReadingHeader:if (parseHeader()) {advanced = true;elasped_timeout->restart();}break;case DataHeaderUtils::CurrentState::ReadingName:if (parseName()) {advanced = true;elasped_timeout->restart();}break;case DataHeaderUtils::CurrentState::ReadingPayload:if (parseData()) {advanced = true;elasped_timeout->restart();}break;}}
}
bool DataProtocolizedController::parseHeader() {// 不够一个头,等待更多数据if (rx_buffer.size() < static_cast<int>(DataHeaderUtils::DATAHEADER_SIZE))return false;// 尝试读取头DataHeaderUtils::DataHeader h {};QByteArray headerBytes = rx_buffer.left(static_cast<int>(DataHeaderUtils::DATAHEADER_SIZE));QDataStream ds(headerBytes);ds.setByteOrder(QDataStream::BigEndian);if (!DataHeaderUtils::readOutHeader(ds, h)) {emit protocolError(QStringLiteral("Header corrupt or CRC mismatch, discarding 1 byte"));rx_buffer.remove(0, 1);return true; // 继续 while-loop,再试}// 基于整型直接判断 magic 与 versionif (h.header_magic != DataHeaderUtils::HEADER_MAGIC) {emit protocolError(QStringLiteral("Magic mismatch, discarding 1 byte"));rx_buffer.remove(0, 1);return true;}if (h.protocol_version != DataHeaderUtils::HEADER_VERSION) {emit protocolError(QStringLiteral("Unsupported protocol version"));// 版本不兼容,直接清空缓存,等待上层重新同步(或断开)rx_buffer.clear();return false;}// 限制防护if (h.nameLen > MAX_NAME_LEN || h.payloadSize > MAX_PAYLOAD_SIZE || h.totalSize > MAX_TOTAL_SIZE) {emit protocolError(QStringLiteral("Header fields too large, clearing buffer"));rx_buffer.clear();return false;}// 头部无误:移除缓存中的 headerrx_buffer.remove(0, static_cast<int>(DataHeaderUtils::DATAHEADER_SIZE));internal_header = h;curFileIdStr = QString::number(h.fileId);// 依帧类型决定下一状态const auto opState = static_cast<DataHeaderUtils::OperationState>(h.current_state);switch (opState) {case DataHeaderUtils::OperationState::Start:if (h.nameLen == 0) {emit protocolError(QStringLiteral("Start frame with zero nameLen"));rx_buffer.clear();return false;}curReceived = 0;state = DataHeaderUtils::CurrentState::ReadingName;break;case DataHeaderUtils::OperationState::Data:state = DataHeaderUtils::CurrentState::ReadingPayload;break;case DataHeaderUtils::OperationState::End: {if (!sinker->receiveEnd()) {emit protocolError(QStringLiteral("sinker's receiveEnd() failed"));}state = DataHeaderUtils::CurrentState::ReadingHeader;break;}default:emit protocolError(QStringLiteral("Unknown operation state"));rx_buffer.clear();return false;}return true;
}bool DataProtocolizedController::parseName() {const int need = static_cast<int>(internal_header.nameLen);if (!ensureSize(need))return false;const QByteArray nameBytes = rx_buffer.left(need);rx_buffer.remove(0, need);sinker->consume_name(QString::fromUtf8(nameBytes));state = DataHeaderUtils::CurrentState::ReadingHeader;return true;
}bool DataProtocolizedController::parseData() {const int need = static_cast<int>(internal_header.payloadSize);if (!ensureSize(need))return false;const QByteArray payload = rx_buffer.left(need);rx_buffer.remove(0, need);if (!sinker->consumeChunkBuffer(payload)) {emit protocolError(QStringLiteral("consumeChunkBuffer error"));} else {curReceived += static_cast<quint64>(payload.size());emit receiveProgress(curFileIdStr, curReceived, internal_header.totalSize);}// Data 一帧解析完,回到读下一帧 headerstate = DataHeaderUtils::CurrentState::ReadingHeader;return true;
}void DataProtocolizedController::reset() {rx_buffer.clear();state = DataHeaderUtils::CurrentState::ReadingHeader;curFileIdStr.clear();curReceived = 0;internal_header = DataHeaderUtils::DataHeader();sendFileSize = 0;sendFileHasSent = 0;if (elasped_timeout) {elasped_timeout->invalidate(); // 相当于停止计时}emit protocolError(QStringLiteral("Controller has been reset."));
}bool DataProtocolizedController::ensureSize(int need) const {return rx_buffer.size() >= need;
}

一、发送端实现

Sinker是作什么的?

#pragma once#include <QByteArray>
#include <QObject>
#include <QString>
class DataProtocolizedController;
class DataSinker {friend class DataProtocolizedController;public:virtual ~DataSinker() = default;virtual QString provide_name() = 0;virtual void consume_name(const QString& name) = 0;virtual qint64 id_generator() = 0;virtual QByteArray requestChunkForEncoding(quint64 offset, quint64 bytesForRead) = 0;virtual quint64 size() = 0;virtual bool receiveEnd() = 0;virtual bool consumeChunkBuffer(const QByteArray& chunk) = 0;virtual void sendEnd() = 0;private:inline void setSinker(DataProtocolizedController* controller) {this->controller = controller;}DataProtocolizedController* controller;
};

​ Sinker是一个简单的插槽,她负责实现我们对应的协议数据填装,比如说,我们如何一步一步封装我们的数据,如何接受和处理我们的数据。等等。

​ 这里,我们结合下面的协议类本身一起看:

1.1 初始化与依赖注入

发送端核心类 DataProtocolizedController 接收一个实现了 DataSinker 接口的对象,用于抽象文件或数据源的读写逻辑。构造函数中:

DataProtocolizedController::DataProtocolizedController(DataSinker* sinker, QObject* parent): QObject(parent), sinker(sinker), state(CurrentState::ReadingHeader)
{if (!sinker) throw std::invalid_argument("Sinker 不能为空");sinker->setSinker(this);elapsed_timer = new QElapsedTimer();
}
  • 依赖注入:将文件读写细节由 DataSinker 负责,实现职责单一、易于测试。
  • 定时器准备:接收端同样借助 QElapsedTimer 实现超时控制。

1.2 分块读取与发送流程

核心方法 sendData(int chunkSize, qint64 startOffset) 支持从任意偏移(断点续传)开始分块发送:

  1. 生成文件会话 ID:通过 sinker->id_generator(),确保每个文件或任务拥有唯一标识。
  2. 发送开始帧sendStartFrame(fileID, name, totalSize),上层收到信号后打包并发往套接字。
  3. 循环分块
    • 计算本次应读大小:bytesToRead = min(chunkSize, totalSize - sentSoFar)
    • 调用 sinker->requestChunkForEncoding(offset, bytesToRead) 获取数据块
    • 调用 sendDataFrame(...) 构建数据帧并通过 frameReady() 信号交给网络层
    • 发出 sendProgress(sentSoFar, totalSize) 信号,UI 可据此更新进度条
  4. 发送结束帧sendEndFrame(fileID, totalSize),触发对端完成写入或清理资源。
  5. 释放或重用资源sinker->sendEnd(),可在此处做句柄关闭、缓存释放等。
while (sendFileHasSent < sendFileSize) {QByteArray chunk = sinker->requestChunkForEncoding(sendFileHasSent, bytesToRead);sendDataFrame(fileID, sendFileHasSent, chunk, sendFileSize);sendFileHasSent += chunk.size();emit sendProgress(sendFileHasSent, sendFileSize);
}
sendEndFrame(fileID, sendFileSize);
sinker->sendEnd();

​ 这样设计,我们可以让重试或从中断处恢复变得轻而易举,只需要拿到上一次从传递的offset,咱们就可与继续发送之前缺失的数据即可。所有帧通过信号异步给网络层,UI 与业务逻辑不会被 I/O 操作阻塞。同样的,还有

  • 通过信号实时上报,便于可视化监控和限速策略。

接受端的实现

2.1 缓冲区与数据拼接

网络数据到达时,一律由 onBytes(const QByteArray &bytes) 方法追加到内部缓冲区 rx_buffer,然后调用 parse() 进入解析状态机:

void DataProtocolizedController::onBytes(const QByteArray& bytes) {rx_buffer.append(bytes);parse();
}
  • 粘包/分包兼容:无论收到多少字节,都能完整拼接或保留剩余,等待下一次解析。
  • 按需消费:每次解析只摘取需要的字节,其余留待下次。

2.2 状态机驱动的解析流程

解析方法将状态分为三种:

  1. ReadingHeader:读取固定长度的协议头(已在工具类封装)
  2. ReadingName:如果是 “Start” 帧,读取文件名字段
  3. ReadingPayload:如果是 “Data” 帧,读取当前数据块

每解析完一个阶段,就更新状态并重置计时器,直到缓冲区已无可解析或完成整个文件传输。

while (advanced) {advanced = false;if (state == ReadingHeader && parseHeader()) { advanced = true; restartTimer(); }else if (state == ReadingName && parseName())  { advanced = true; restartTimer(); }else if (state == ReadingPayload && parseData()) { advanced = true; restartTimer(); }
}

​ 这种方式让我们可以可能的完整分析我们的数据。

2.3 数据写入与进度回调

  • 文件名处理sinker->consume_name(name),上层可做目录创建、文件句柄打开等操作。
  • 数据块写入sinker->consumeChunkBuffer(payload),底层完成文件写入或内存缓存。
  • 结束回调sinker->receiveEnd(),通知文件写入完毕。

同时,receiveProgress(fileId, curReceived, totalSize) 信号可协助在 UI 展示接收进度或统计传输速率。


三、容错与安全机制

3.1 CRC 校验与错误重试

借助工具类 DataHeaderUtils::readOutHeader() 完成头部 CRC 校验。遇到校验失败或魔数/版本不吻合时:

  • 触发 protocolError(reason) 信号,让上层记录日志或告知用户。
  • 丢弃 rx_buffer.remove(0, 1) 逐字节重试,直到重新同步。
if (!readOutHeader(ds, h)) {emit protocolError("CRC mismatch");rx_buffer.remove(0, 1);return true; // 继续 while-loop
}

3.2 合理限制字段大小

为防止恶意构造超大 nameLenpayloadSizetotalSize,用 constexpr 常量在解析时校验,超限直接清空缓存并报错:

if (h.nameLen > MAX_NAME_LEN || h.payloadSize > MAX_PAYLOAD_SIZE) {emit protocolError("字段过大,清空缓存");rx_buffer.clear();return false;
}

设计思路:“信任但验证”,既保证性能,也防止内存爆炸攻击。

3.3 超时控制

使用 QElapsedTimer elapsed_timer 记录每次 parse() 后的进度。若连续超过 PARSE_TIMEOUT_MS 毫秒无任何解析进展,则触发 parsingBad() 并重置状态机与缓冲区,避免死循环或协议挂起。

if (elapsed_timer->elapsed() > PARSE_TIMEOUT_MS) {emit parsingBad();rx_buffer.clear();state = ReadingHeader;elapsed_timer->restart();return;
}

重点聊聊协议解析在做什么?

以下内容可作为博客中专门剖析解析逻辑的章节,重点讲解 parseHeader(),并简要串联 parseName()parseData() 的配合流程。

在整个接收端逻辑中,parseHeader() 是状态机的第一步,也是协议同步和容错的关键。它做了以下几件事:

bool DataProtocolizedController::parseHeader() {// 1. 缓冲区长度检查if (rx_buffer.size() < DataHeaderUtils::DATAHEADER_SIZE)return false;// 2. 读取头部二进制QByteArray headerBytes = rx_buffer.left(DataHeaderUtils::DATAHEADER_SIZE);QDataStream ds(headerBytes);ds.setByteOrder(QDataStream::BigEndian);// 3. CRC 与格式校验DataHeaderUtils::DataHeader h{};if (!DataHeaderUtils::readOutHeader(ds, h)) {emit protocolError("Header corrupt or CRC mismatch, discarding 1 byte");rx_buffer.remove(0, 1);return true;  // 继续尝试解析下一个字节}// 4. Magic 与版本检查if (h.header_magic != DataHeaderUtils::HEADER_MAGIC) {emit protocolError("Magic mismatch, discarding 1 byte");rx_buffer.remove(0, 1);return true;}if (h.protocol_version != DataHeaderUtils::HEADER_VERSION) {emit protocolError("Unsupported protocol version");rx_buffer.clear();return false;}// 5. 安全边界校验if (h.nameLen > MAX_NAME_LEN || h.payloadSize > MAX_PAYLOAD_SIZE || h.totalSize > MAX_TOTAL_SIZE) {emit protocolError("Header fields too large, clearing buffer");rx_buffer.clear();return false;}// 6. 同步成功:消费 header、保存状态rx_buffer.remove(0, DataHeaderUtils::DATAHEADER_SIZE);internal_header = h;curFileIdStr    = QString::number(h.fileId);// 7. 根据帧类型切换状态机switch (static_cast<OperationState>(h.current_state)) {case OperationState::Start:if (h.nameLen == 0) {emit protocolError("Start frame with zero nameLen");rx_buffer.clear();return false;}curReceived = 0;state       = CurrentState::ReadingName;break;case OperationState::Data:state = CurrentState::ReadingPayload;break;case OperationState::End:if (!sinker->receiveEnd())emit protocolError("sinker's receiveEnd() failed");state = CurrentState::ReadingHeader;break;default:emit protocolError("Unknown operation state");rx_buffer.clear();return false;}return true;
}

1. 缓冲区长度检查

  • 目的:确保缓存中至少有一个完整的协议头大小(DATAHEADER_SIZE
  • 效果:提前返回 false,交由外层 while 循环等待更多网络数据。
if (rx_buffer.size() < DATAHEADER_SIZE)return false;

2. CRC 与格式校验

  • readOutHeader()
    • 先通过 QDataStream 按大端顺序读取所有字段。
    • 内部做 CRC 校验,保证头部比特级无误。
  • 错误策略
    • 如果校验失败,抛出 protocolError,并 丢弃 1 字节 继续尝试同步。
    • 逐字节滑动窗口策略能快速跳过 “脏” 区域,最快恢复到正确帧边界。
if (!DataHeaderUtils::readOutHeader(ds, h)) {emit protocolError("CRC mismatch");rx_buffer.remove(0, 1);return true;
}

3. Magic 与版本检查

  • 魔数 (header_magic)
    • 防止与其它协议数据混淆;
    • 同样采用 “丢弃 1 字节 + 重试” 的方式重新同步。
  • 协议版本 (protocol_version)
    • 如果发生版本升级或不兼容,这里一次性清空缓冲并报错,交由上层决定是否断开或重置。
if (h.header_magic != HEADER_MAGIC) {}
if (h.protocol_version != HEADER_VERSION) {emit protocolError("Unsupported version");rx_buffer.clear();return false;
}

4. 安全边界校验

为避免内存耗尽和攻击,将必要字段(如 nameLenpayloadSizetotalSize)与编译期常量对比:

if (h.nameLen > MAX_NAME_LEN || h.payloadSize > MAX_PAYLOAD_SIZE || h.totalSize > MAX_TOTAL_SIZE) {emit protocolError("Header fields too large");rx_buffer.clear();return false;
}

5. 状态机切换

根据 current_state(Start/Data/End)决定下一步如何处理:

状态下一动作会做什么
StartReadingName读取文件名字段,长度 h.nameLen
DataReadingPayload读取数据块,长度 h.payloadSize
EndReadingHeader调用 sinker->receiveEnd(),完成文件收尾

在进入新的子状态前,需要先 消费掉 这次的头部字节:

rx_buffer.remove(0, DATAHEADER_SIZE);
internal_header = h;

6. 与 parseName()parseData() 的配合

  • parseName()
    • 等待 rx_buffer 中累积到 h.nameLen 字节才返回 true
    • 读取完毕后将文件名传给 sinker->consume_name(),并重置状态回 ReadingHeader
  • parseData()
    • 同理等待 h.payloadSize 字节。
    • 每消费完一个数据块,会触发 receiveProgress(),并回到 ReadingHeader

这一 三段式解析(头→名→Payload)配合外层循环,既保证了高效流式消费,也天然解决了 TCP 粘包与半包问题。


七、为什么这么设计?

  1. 最快恢复同步
    • CRC/Magic 校验失败时逐字节跳过,几乎能在「第一个合法魔数」位置恢复。
  2. 高内聚低耦合
    • parseHeader() 专注协议头,parseName()parseData() 各司其职。
  3. 可观测性强
    • 每一步通过信号上报出错/进度,易于日志记录与问题追踪。
  4. 安全健壮
    • 字段越界检查、版本兼容检查、超时保护,多层防护确保长期稳定运行。
http://www.dtcms.com/a/299987.html

相关文章:

  • Java设计模式之行为型模式(中介者模式)实现方式详解
  • 函数参数的解包与顺序匹配机制
  • Go的管道——channel
  • HTML5元素相关补充
  • HighlightingSystem
  • MATLAB近红外光谱分析技术及实践技术应用
  • C++ 类型萃取:深入理解与实践
  • 【AcWing 143题解】最大异或对
  • Android-广播详解
  • 零拷贝应用场景
  • 【Spring AI】大模型服务平台-阿里云百炼
  • 基于cooragent的旅游多智能体的MCP组件安装与其开发
  • javaSE 6
  • connect系统调用及示例
  • Go-Elasticsearch v9 安装与版本兼容性
  • Docker常用命令详解:以Nginx为例
  • 求hom_math_2d的角度值
  • Aerospike架构深度解析:打造web级分布式应用的理想数据库
  • JS实现数字变化时,上下翻滚动画效果
  • 本地部署智能家居集成解决方案 ESPHome 并实现外部访问
  • 五分钟系列-文本搜索工具grep
  • 【工具】好用的浏览器AI助手
  • 【MySQL】VARCHAR(10) 和 VARCHAR(100) 的区别
  • 大模型蒸馏(distillation)---从DeepseekR1-1.5B到Qwen-2.5-1.5B蒸馏
  • 拒绝SQL恐惧:用Python+pyqt打造任意Excel数据库查询系统
  • C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(四)
  • 丝杆升降机应用在食品机械行业有什么特殊的要求吗
  • Java BeanUtils 类详解:作用、语法与示例
  • springboot 基于签名的安全通信
  • 深入解析YARN中的FairScheduler与CapacityScheduler:资源分配策略的核心区别