当前位置: 首页 > news >正文

MQTT QoS 2 详细流程解析

QoS 2 概述

QoS 2 是 MQTT 协议中最复杂但也最可靠的消息传输级别,它通过四次握手机制确保消息仅一次投递(Exactly Once Delivery)。这意味着:

  • 消息绝对不会丢失
  • 消息绝对不会重复
  • 但网络开销最大,延迟最高

四次握手协议详解

QoS 2 通过四次握手确保消息的仅一次投递,这是MQTT协议中最复杂但也最可靠的机制。

四次握手的设计原理

为什么需要四次握手?

传统的三次握手(如TCP)只能保证数据传输的可靠性,但无法解决应用层重复处理的问题。QoS 2面临的核心挑战是:

  1. 网络可能丢包 - 需要重发机制
  2. 重发可能导致重复 - 需要去重机制
  3. 去重需要状态同步 - 需要双方确认机制
  4. 状态同步需要可靠传输 - 需要分阶段确认

四次握手的分阶段设计

阶段一:消息传输确认
发送端 ──PUBLISH──→ 接收端    (传输消息内容)
发送端 ←──PUBREC──── 接收端    (确认收到消息)阶段二:状态同步确认  
发送端 ──PUBREL───→ 接收端    (请求释放消息ID)
发送端 ←──PUBCOMP─── 接收端    (确认已释放消息ID)

每次握手的具体作用:

四次握手状态机

发送端状态                              接收端状态
INITIAL                                INITIAL|                                      ||══════ ① PUBLISH ═══════>              |↓                                      ↓
WAIT_PUBREC                           RECEIVED_PUBLISH|                                      |── 处理消息内容|<═════ ② PUBREC ════════              |── 保存消息ID到去重表↓                                      ↓
WAIT_PUBCOMP                          WAIT_PUBREL|                                      ||══════ ③ PUBREL ═══════>              ||                                      ↓|                                  READY_COMPLETE|<═════ ④ PUBCOMP ═══════              |── 从去重表删除消息ID↓                                      ↓
COMPLETED                             COMPLETED

详细流程分析

四次握手详细流程实现

第一次握手:PUBLISH (发送端 → 接收端)

作用:传输消息内容

这是四次握手的核心步骤,承载实际的业务数据。

发送端逻辑
private void publishBytes(String topic, byte[] payload, MqttQoS qos, boolean retain) {// QoS 2 需要分配消息IDint messageIdNum = nextMessageId();// 构建PUBLISH消息MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false,           // isDup: 初次发送为falseEXACTLY_ONCE,    // QoS 2retain, 0);MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, messageIdNum);ByteBuf payloadBuf = Unpooled.wrappedBuffer(payload);MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, payloadBuf);// 关键:缓存PUBLISH消息用于重发cachePublishMsg(EXACTLY_ONCE, payloadBufBak, variableHeader, fixedHeader, null, channel);// 发送消息,进入WAIT_PUBREC状态channel.writeAndFlush(publishMessage);log.info("QoS2: PUBLISH sent, messageId={}, waiting for PUBREC", messageIdNum);
}
PUBLISH消息重发机制
private void cachePublishMsg(MqttQoS qos, ByteBuf byteBuf, MqttPublishVariableHeader variableHeader,MqttFixedHeader mqttFixedHeaderInfo, ChannelHandlerContext context, Channel channels) {// 构建重发消息,关键:设置isDup=true表示这是重复消息MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true,  // isDup=true,重发时标记为重复qos, false,mqttFixedHeaderInfo.remainingLength());MqttPublishMessage cachePubMessage = new MqttPublishMessage(fixedHeader, variableHeader, byteBuf);// 启动定时重发任务:每1秒检查一次,如果没收到PUBREC就重发PUBLISHScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(variableHeader.packetId(), cachePubMessage, context, channels), 1000, 1000, TimeUnit.MILLISECONDS);// 保存任务引用,收到PUBREC时会取消这个任务TimerData.scheduledFutureMap.put(variableHeader.packetId(), scheduledFuture);
}

第二次握手:PUBREC (接收端 → 发送端)

作用:确认消息接收,但流程未完成

接收端逻辑
private void handlePublish(MqttPublishMessage msg) {try {// 先处理消息内容(重要:QoS 2在第一步就处理消息)if (messageHandler != null) {messageHandler.accept(msg);}MqttQoS qos = msg.fixedHeader().qosLevel();if (qos == EXACTLY_ONCE) {// 发送PUBREC确认:告诉发送端"我收到了PUBLISH,但还没完全处理完"MqttMessage pubrecMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()));channel.writeAndFlush(pubrecMessage);log.info("QoS2: PUBREC sent, messageId={}, waiting for PUBREL", msg.variableHeader().packetId());}} catch (Exception e) {log.error("QoS2: Error handling PUBLISH message", e);}
}

关键理解点:

为什么需要PUBREL?

考虑这个场景:

  • 接收端在发送PUBREC之前就已经处理了消息内容
  • PUBREC只是告诉发送端"我收到了,别再重发PUBLISH了"
  • 此时接收端必须保持消息ID在去重表中,防止重复处理
  • 协议流程还没有结束,不能释放资源
    / 接收端的去重表管理
    Set<Integer> pendingMessageIds = Collections.synchronizedSet(new HashSet<>());private void handlePublish(MqttPublishMessage msg) {int messageId = msg.variableHeader().packetId();// 检查是否重复消息if (!pendingMessageIds.contains(messageId)) {// 首次收到:处理消息 + 加入去重表messageHandler.accept(msg);pendingMessageIds.add(messageId);log.info("QoS2: First PUBLISH received, messageId={}, processed and cached", messageId);} else {// 重复消息:只响应PUBREC,不重复处理log.warn("QoS2: Duplicate PUBLISH received, messageId={}, ignored", messageId);}// 无论是否重复,都要发送PUBRECMqttMessage pubrecMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0),MqttMessageIdVariableHeader.from(messageId));channel.writeAndFlush(pubrecMessage);log.info("QoS2: PUBREC sent, messageId={}, waiting for PUBREL", messageId);
    }

    第三次握手:PUBREL (发送端 → 接收端)

    作用:请求释放消息ID,允许接收端清理去重状态

    PUBREL的关键作用:

  • 状态同步信号:告诉接收端"现在可以安全地释放这个消息ID了"
  • 防止永久资源占用:如果没有PUBREL,接收端的去重表会无限增长
  • 确保协议完整性:只有收到PUBREL,接收端才能确定发送端已经确认消息传输成功
1. 发送端发送PUBLISH(msgId=123)
2. 接收端处理消息,发送PUBREC(123)
3. 网络问题:PUBREC丢失
4. 发送端重发PUBLISH(123)
5. 接收端收到重复PUBLISH,不重复处理,但再次发送PUBREC(123)

此时如果没有PUBREL机制:

  • 接收端永远不知道什么时候可以从去重表中删除msgId=123
  • 去重表会无限增长,最终内存溢出
  • 消息ID会被永久占用

PUBREL解决方案:

  • 只有发送端确认收到PUBREC后,才发送PUBREL
  • PUBREL是"释放许可",告诉接收端可以清理状态了
private void receivePubAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {// 注意:在QoS 2中,这个方法处理的是PUBREC消息,不是PUBACKMqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader();int msgId = variableHeader.messageId();// 第一阶段完成:取消PUBLISH重发任务ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(msgId);if (scheduledFuture != null) {scheduledFuture.cancel(true);log.info("QoS2: PUBREC received, cancelled PUBLISH retransmission for messageId={}", msgId);}// 构建PUBREL消息MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE,  // 注意:PUBREL本身使用QoS 1false, 0);MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(msgId, MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader);// 发送PUBREL并启动第二阶段重发机制if (ctx != null) {ctx.writeAndFlush(mqttMessageBack).addListener(future -> {// 第二阶段:开始PUBREL重发机制cachePubrelMsg(msgId, ctx);});} else {context.channel().writeAndFlush(mqttMessageBack).addListener(future -> {cachePubrelMsg(msgId, ctx);});}log.info("QoS2: PUBREL sent, messageId={}, waiting for PUBCOMP", msgId);
}

PUBREL重发机制

private void cachePubrelMsg(int messageId, ChannelHandlerContext context) {// 构建PUBREL重发消息MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);// 关键差异:PUBREL重发间隔是6秒,比PUBLISH的1秒更长// 因为此时第一阶段已经完成,不那么紧急ScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(messageId, mqttMessageBack, context, null), 6000, 6000, TimeUnit.MILLISECONDS);TimerData.scheduledFutureMap.put(messageId, scheduledFuture);log.info("QoS2: PUBREL retransmission scheduled, messageId={}, interval=6s", messageId);
}

第四次握手:PUBCOMP (接收端 → 发送端)

作用:确认消息ID已释放,协议完成

这是四次握手的最终确认,标志着整个QoS 2协议的完成。

接收端:收到PUBREL后发送PUBCOMP
private void handlePubrel(ChannelHandlerContext ctx, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();int messageId = messageIdVariableHeader.messageId();log.info("QoS2: PUBREL received for messageId={}, ready to complete protocol", messageId);// 关键操作:从去重表中移除消息IDboolean removed = pendingMessageIds.remove(messageId);if (removed) {log.info("QoS2: MessageId={} removed from deduplication table", messageId);} else {log.warn("QoS2: MessageId={} not found in deduplication table", messageId);}// 构建PUBCOMP最终确认MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE,  // PUBCOMP使用QoS 0,因为PUBREL会重发false, 0x02);MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId);MqttMessage pubcompMessage = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);// 发送最终确认if (ctx != null) {ctx.writeAndFlush(pubcompMessage);} else {context.channel().writeAndFlush(pubcompMessage);}log.info("QoS2: PUBCOMP sent for messageId={}, protocol completed on receiver side", messageId);
}

PUBCOMP的关键作用:

  1. 资源释放确认:告诉发送端"我已经清理了去重状态"
  2. 协议完成信号:标志着整个QoS 2流程的结束
  3. 允许消息ID重用:双方都可以重新使用该消息ID
发送端:收到PUBCOMP完成协议
private void receivePubcomp(MqttMessage mqttMessage) {MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader();int mesgId = variableHeader.messageId();log.info("QoS2: PUBCOMP received for messageId={}, protocol completed", mesgId);// 第二阶段完成:取消PUBREL重发任务ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(mesgId);if (scheduledFuture != null) {scheduledFuture.cancel(true);log.info("QoS2: PUBREL retransmission cancelled for messageId={}", mesgId);}// 清理发送端状态(如果有)// publishStates.remove(mesgId);// QoS 2 协议完全结束,消息确保仅一次投递log.info("QoS2: Full protocol completed successfully for messageId={}", mesgId);log.info("QoS2: MessageId={} can now be reused", mesgId);
}

四次握手总结

双重保障机制

阶段一保障(PUBLISH↔PUBREC):

目标:确保消息内容可靠传输
机制:PUBLISH重发 + PUBREC确认
结果:消息内容成功投递,接收端已处理

阶段二保障(PUBREL↔PUBCOMP):

目标:确保状态同步,防止重复
机制:PUBREL重发 + PUBCOMP确认  
结果:双方状态清理,消息ID可重用

重发策略差异

// 阶段一:消息传输阶段 - 重发频率高
// PUBLISH重发:每1秒检查,业务数据传输优先级高
ScheduledFuture<?> publishRetry = scheduledExecutor.scheduleAtFixedRate(publishRetryTask, 1000, 1000, TimeUnit.MILLISECONDS);// 阶段二:状态同步阶段 - 重发频率低  
// PUBREL重发:每6秒检查,状态同步相对不紧急
ScheduledFuture<?> pubrelRetry = scheduledExecutor.scheduleAtFixedRate(pubrelRetryTask, 6000, 6000, TimeUnit.MILLISECONDS);

为什么必须四次握手?

三次握手的问题:

发送端 ──PUBLISH──→ 接收端  (传输消息)
发送端 ←──PUBREC──── 接收端  (确认接收)  
发送端 ──ACK─────→ 接收端  (确认完成)问题:接收端不知道发送端是否收到了PUBREC
结果:接收端不敢释放消息ID,造成资源泄漏

四次握手的解决方案:

发送端 ──PUBLISH──→ 接收端  (传输消息)
发送端 ←──PUBREC──── 接收端  (确认接收)
发送端 ──PUBREL───→ 接收端  (请求释放ID) 
发送端 ←──PUBCOMP─── 接收端  (确认已释放)优势:双向确认,状态同步,资源安全释放

QoS 2的业务价值

适用场景:

  • 🏦 金融支付:转账消息不能丢失也不能重复
  • 📦 库存管理:库存扣减操作必须精确
  • 🔒 权限控制:权限变更指令不能重复执行
  • 📊 计费系统:计费事件必须准确记录

代价分析:

  • ✅ 可靠性:100% 仅一次投递
  • ❌ 性能:4倍网络开销,2倍延迟
  • ❌ 复杂性:状态管理复杂,内存占用高

QoS 2 关键时序图

时间线    发送端                                    接收端
T1       |                                           ||========== PUBLISH(msgId=123) ============>||  (启动PUBLISH重发定时器:1秒间隔)             |-- 处理消息内容
T2       |                                           ||<========= PUBREC(msgId=123) ==============||  (取消PUBLISH重发定时器)                     |
T3       |                                           ||========== PUBREL(msgId=123) ============>||  (启动PUBREL重发定时器:6秒间隔)             |-- 释放消息ID资源
T4       |                                           ||<========= PUBCOMP(msgId=123) =============||  (取消PUBREL重发定时器)                     |
T5       |                                           ||  QoS 2 协议完成                           |  QoS 2 协议完成

总结

QoS 2 的复杂性主要体现在:

  1. 两阶段协议:PUBLISH→PUBREC→PUBREL→PUBCOMP
  2. 双重状态管理:需要维护两套不同的重发机制
  3. 严格的消息去重:接收端必须记录已处理的消息ID
  4. 资源管理复杂:需要在正确的时机清理状态和定时器

但正是这种复杂性保证了消息的绝对可靠传输,在金融、支付、库存管理等对数据一致性要求极高的场景中,QoS 2 是必不可少的选择。

http://www.dtcms.com/a/270349.html

相关文章:

  • 爬虫-request处理POST
  • pytorch深度学习-ResNet残差网络-CIFAR-10
  • 利用AI技术快速提升图片编辑效率的方法
  • Mapper接口是什么
  • HarmonyOS从入门到精通:自定义组件开发指南(四):组件状态管理之父子组件通信
  • 跨越十年的C++演进:C++23新特性全解析
  • VR法庭相比传统法庭有哪些优势​
  • WebClient与HTTPInterface远程调用对比
  • 第8章:应用层协议HTTP、SDN软件定义网络、组播技术、QoS
  • SPI / I2C / UART 哪个更适合初学者?
  • 通过“逆向侦测”驾驭涌现复杂性的认知架构与技术实现
  • 短视频矩阵管理平台的崛起:源头厂商的深度解析
  • C# Type.GetProperties() 获取不到值的笔记
  • SQL注入与防御-第六章-2:利用操作系统--执行操作系统命令
  • 图像梯度处理与边缘检测:OpenCV 实战指南
  • 【牛客刷题】小红的v三元组
  • FastAPI Docker环境管理脚本使用指南
  • 虚拟机忘记密码怎么办
  • nmon使用方法
  • 征程 6|工具链量化简介与代码实操
  • 云原生安全观察:零信任架构与动态防御的下一代免疫体系
  • 人物设定一秒入魂!RAIDEN-R1提出可验证奖励新范式,让CoT推理更“人格一致”
  • SpringAI学习笔记-MCP客户端简单示例
  • python采集商品详情数据接口json数据返回参考
  • 前端面试常考题目详解​
  • 解决阿里云ubuntu内存溢出导致vps死机无法访问 - 永久性增加ubuntu的swap空间 - 阿里云Linux实例内存溢出(OOM)问题修复方案
  • (四)机器学习小白入门YOLOv :图片标注实操手册
  • 深度学习环境配置:PyTorch、CUDA和Python版本选择
  • 工作中的思考
  • 推荐系统中的相似度