MQTT QoS 2 详细流程解析
QoS 2 概述
QoS 2 是 MQTT 协议中最复杂但也最可靠的消息传输级别,它通过四次握手机制确保消息仅一次投递(Exactly Once Delivery)。这意味着:
- 消息绝对不会丢失
- 消息绝对不会重复
- 但网络开销最大,延迟最高
四次握手协议详解
QoS 2 通过四次握手确保消息的仅一次投递,这是MQTT协议中最复杂但也最可靠的机制。
四次握手的设计原理
为什么需要四次握手?
传统的三次握手(如TCP)只能保证数据传输的可靠性,但无法解决应用层重复处理的问题。QoS 2面临的核心挑战是:
- 网络可能丢包 - 需要重发机制
- 重发可能导致重复 - 需要去重机制
- 去重需要状态同步 - 需要双方确认机制
- 状态同步需要可靠传输 - 需要分阶段确认
四次握手的分阶段设计
阶段一:消息传输确认
发送端 ──PUBLISH──→ 接收端 (传输消息内容)
发送端 ←──PUBREC──── 接收端 (确认收到消息)阶段二:状态同步确认
发送端 ──PUBREL───→ 接收端 (请求释放消息ID)
发送端 ←──PUBCOMP─── 接收端 (确认已释放消息ID)
每次握手的具体作用:
四次握手状态机
发送端状态 接收端状态
INITIAL INITIAL| ||══════ ① PUBLISH ═══════> |↓ ↓
WAIT_PUBREC RECEIVED_PUBLISH| |── 处理消息内容|<═════ ② PUBREC ════════ |── 保存消息ID到去重表↓ ↓
WAIT_PUBCOMP WAIT_PUBREL| ||══════ ③ PUBREL ═══════> || ↓| READY_COMPLETE|<═════ ④ PUBCOMP ═══════ |── 从去重表删除消息ID↓ ↓
COMPLETED COMPLETED
详细流程分析
四次握手详细流程实现
第一次握手:PUBLISH (发送端 → 接收端)
作用:传输消息内容
这是四次握手的核心步骤,承载实际的业务数据。
发送端逻辑
private void publishBytes(String topic, byte[] payload, MqttQoS qos, boolean retain) {// QoS 2 需要分配消息IDint messageIdNum = nextMessageId();// 构建PUBLISH消息MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, // isDup: 初次发送为falseEXACTLY_ONCE, // QoS 2retain, 0);MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, messageIdNum);ByteBuf payloadBuf = Unpooled.wrappedBuffer(payload);MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, payloadBuf);// 关键:缓存PUBLISH消息用于重发cachePublishMsg(EXACTLY_ONCE, payloadBufBak, variableHeader, fixedHeader, null, channel);// 发送消息,进入WAIT_PUBREC状态channel.writeAndFlush(publishMessage);log.info("QoS2: PUBLISH sent, messageId={}, waiting for PUBREC", messageIdNum);
}
PUBLISH消息重发机制
private void cachePublishMsg(MqttQoS qos, ByteBuf byteBuf, MqttPublishVariableHeader variableHeader,MqttFixedHeader mqttFixedHeaderInfo, ChannelHandlerContext context, Channel channels) {// 构建重发消息,关键:设置isDup=true表示这是重复消息MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, // isDup=true,重发时标记为重复qos, false,mqttFixedHeaderInfo.remainingLength());MqttPublishMessage cachePubMessage = new MqttPublishMessage(fixedHeader, variableHeader, byteBuf);// 启动定时重发任务:每1秒检查一次,如果没收到PUBREC就重发PUBLISHScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(variableHeader.packetId(), cachePubMessage, context, channels), 1000, 1000, TimeUnit.MILLISECONDS);// 保存任务引用,收到PUBREC时会取消这个任务TimerData.scheduledFutureMap.put(variableHeader.packetId(), scheduledFuture);
}
第二次握手:PUBREC (接收端 → 发送端)
作用:确认消息接收,但流程未完成
接收端逻辑
private void handlePublish(MqttPublishMessage msg) {try {// 先处理消息内容(重要:QoS 2在第一步就处理消息)if (messageHandler != null) {messageHandler.accept(msg);}MqttQoS qos = msg.fixedHeader().qosLevel();if (qos == EXACTLY_ONCE) {// 发送PUBREC确认:告诉发送端"我收到了PUBLISH,但还没完全处理完"MqttMessage pubrecMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()));channel.writeAndFlush(pubrecMessage);log.info("QoS2: PUBREC sent, messageId={}, waiting for PUBREL", msg.variableHeader().packetId());}} catch (Exception e) {log.error("QoS2: Error handling PUBLISH message", e);}
}
关键理解点:
为什么需要PUBREL?
考虑这个场景:
- 接收端在发送PUBREC之前就已经处理了消息内容
- PUBREC只是告诉发送端"我收到了,别再重发PUBLISH了"
- 此时接收端必须保持消息ID在去重表中,防止重复处理
- 协议流程还没有结束,不能释放资源
/ 接收端的去重表管理 Set<Integer> pendingMessageIds = Collections.synchronizedSet(new HashSet<>());private void handlePublish(MqttPublishMessage msg) {int messageId = msg.variableHeader().packetId();// 检查是否重复消息if (!pendingMessageIds.contains(messageId)) {// 首次收到:处理消息 + 加入去重表messageHandler.accept(msg);pendingMessageIds.add(messageId);log.info("QoS2: First PUBLISH received, messageId={}, processed and cached", messageId);} else {// 重复消息:只响应PUBREC,不重复处理log.warn("QoS2: Duplicate PUBLISH received, messageId={}, ignored", messageId);}// 无论是否重复,都要发送PUBRECMqttMessage pubrecMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0),MqttMessageIdVariableHeader.from(messageId));channel.writeAndFlush(pubrecMessage);log.info("QoS2: PUBREC sent, messageId={}, waiting for PUBREL", messageId); }
第三次握手:PUBREL (发送端 → 接收端)
作用:请求释放消息ID,允许接收端清理去重状态
PUBREL的关键作用:
- 状态同步信号:告诉接收端"现在可以安全地释放这个消息ID了"
- 防止永久资源占用:如果没有PUBREL,接收端的去重表会无限增长
- 确保协议完整性:只有收到PUBREL,接收端才能确定发送端已经确认消息传输成功
1. 发送端发送PUBLISH(msgId=123) 2. 接收端处理消息,发送PUBREC(123) 3. 网络问题:PUBREC丢失 4. 发送端重发PUBLISH(123) 5. 接收端收到重复PUBLISH,不重复处理,但再次发送PUBREC(123)
此时如果没有PUBREL机制:
- 接收端永远不知道什么时候可以从去重表中删除msgId=123
- 去重表会无限增长,最终内存溢出
- 消息ID会被永久占用
PUBREL解决方案:
- 只有发送端确认收到PUBREC后,才发送PUBREL
- PUBREL是"释放许可",告诉接收端可以清理状态了
private void receivePubAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {// 注意:在QoS 2中,这个方法处理的是PUBREC消息,不是PUBACKMqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader();int msgId = variableHeader.messageId();// 第一阶段完成:取消PUBLISH重发任务ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(msgId);if (scheduledFuture != null) {scheduledFuture.cancel(true);log.info("QoS2: PUBREC received, cancelled PUBLISH retransmission for messageId={}", msgId);}// 构建PUBREL消息MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, // 注意:PUBREL本身使用QoS 1false, 0);MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(msgId, MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader);// 发送PUBREL并启动第二阶段重发机制if (ctx != null) {ctx.writeAndFlush(mqttMessageBack).addListener(future -> {// 第二阶段:开始PUBREL重发机制cachePubrelMsg(msgId, ctx);});} else {context.channel().writeAndFlush(mqttMessageBack).addListener(future -> {cachePubrelMsg(msgId, ctx);});}log.info("QoS2: PUBREL sent, messageId={}, waiting for PUBCOMP", msgId);
}
PUBREL重发机制
private void cachePubrelMsg(int messageId, ChannelHandlerContext context) {// 构建PUBREL重发消息MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);// 关键差异:PUBREL重发间隔是6秒,比PUBLISH的1秒更长// 因为此时第一阶段已经完成,不那么紧急ScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(messageId, mqttMessageBack, context, null), 6000, 6000, TimeUnit.MILLISECONDS);TimerData.scheduledFutureMap.put(messageId, scheduledFuture);log.info("QoS2: PUBREL retransmission scheduled, messageId={}, interval=6s", messageId);
}
第四次握手:PUBCOMP (接收端 → 发送端)
作用:确认消息ID已释放,协议完成
这是四次握手的最终确认,标志着整个QoS 2协议的完成。
接收端:收到PUBREL后发送PUBCOMP
private void handlePubrel(ChannelHandlerContext ctx, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();int messageId = messageIdVariableHeader.messageId();log.info("QoS2: PUBREL received for messageId={}, ready to complete protocol", messageId);// 关键操作:从去重表中移除消息IDboolean removed = pendingMessageIds.remove(messageId);if (removed) {log.info("QoS2: MessageId={} removed from deduplication table", messageId);} else {log.warn("QoS2: MessageId={} not found in deduplication table", messageId);}// 构建PUBCOMP最终确认MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, // PUBCOMP使用QoS 0,因为PUBREL会重发false, 0x02);MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId);MqttMessage pubcompMessage = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);// 发送最终确认if (ctx != null) {ctx.writeAndFlush(pubcompMessage);} else {context.channel().writeAndFlush(pubcompMessage);}log.info("QoS2: PUBCOMP sent for messageId={}, protocol completed on receiver side", messageId);
}
PUBCOMP的关键作用:
- 资源释放确认:告诉发送端"我已经清理了去重状态"
- 协议完成信号:标志着整个QoS 2流程的结束
- 允许消息ID重用:双方都可以重新使用该消息ID
发送端:收到PUBCOMP完成协议
private void receivePubcomp(MqttMessage mqttMessage) {MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader();int mesgId = variableHeader.messageId();log.info("QoS2: PUBCOMP received for messageId={}, protocol completed", mesgId);// 第二阶段完成:取消PUBREL重发任务ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(mesgId);if (scheduledFuture != null) {scheduledFuture.cancel(true);log.info("QoS2: PUBREL retransmission cancelled for messageId={}", mesgId);}// 清理发送端状态(如果有)// publishStates.remove(mesgId);// QoS 2 协议完全结束,消息确保仅一次投递log.info("QoS2: Full protocol completed successfully for messageId={}", mesgId);log.info("QoS2: MessageId={} can now be reused", mesgId);
}
四次握手总结
双重保障机制
阶段一保障(PUBLISH↔PUBREC):
目标:确保消息内容可靠传输 机制:PUBLISH重发 + PUBREC确认 结果:消息内容成功投递,接收端已处理
阶段二保障(PUBREL↔PUBCOMP):
目标:确保状态同步,防止重复 机制:PUBREL重发 + PUBCOMP确认 结果:双方状态清理,消息ID可重用
重发策略差异
// 阶段一:消息传输阶段 - 重发频率高 // PUBLISH重发:每1秒检查,业务数据传输优先级高 ScheduledFuture<?> publishRetry = scheduledExecutor.scheduleAtFixedRate(publishRetryTask, 1000, 1000, TimeUnit.MILLISECONDS);// 阶段二:状态同步阶段 - 重发频率低 // PUBREL重发:每6秒检查,状态同步相对不紧急 ScheduledFuture<?> pubrelRetry = scheduledExecutor.scheduleAtFixedRate(pubrelRetryTask, 6000, 6000, TimeUnit.MILLISECONDS);
为什么必须四次握手?
三次握手的问题:
发送端 ──PUBLISH──→ 接收端 (传输消息) 发送端 ←──PUBREC──── 接收端 (确认接收) 发送端 ──ACK─────→ 接收端 (确认完成)问题:接收端不知道发送端是否收到了PUBREC 结果:接收端不敢释放消息ID,造成资源泄漏
四次握手的解决方案:
发送端 ──PUBLISH──→ 接收端 (传输消息) 发送端 ←──PUBREC──── 接收端 (确认接收) 发送端 ──PUBREL───→ 接收端 (请求释放ID) 发送端 ←──PUBCOMP─── 接收端 (确认已释放)优势:双向确认,状态同步,资源安全释放
QoS 2的业务价值
适用场景:
- 🏦 金融支付:转账消息不能丢失也不能重复
- 📦 库存管理:库存扣减操作必须精确
- 🔒 权限控制:权限变更指令不能重复执行
- 📊 计费系统:计费事件必须准确记录
代价分析:
- ✅ 可靠性:100% 仅一次投递
- ❌ 性能:4倍网络开销,2倍延迟
- ❌ 复杂性:状态管理复杂,内存占用高
QoS 2 关键时序图
时间线 发送端 接收端 T1 | ||========== PUBLISH(msgId=123) ============>|| (启动PUBLISH重发定时器:1秒间隔) |-- 处理消息内容 T2 | ||<========= PUBREC(msgId=123) ==============|| (取消PUBLISH重发定时器) | T3 | ||========== PUBREL(msgId=123) ============>|| (启动PUBREL重发定时器:6秒间隔) |-- 释放消息ID资源 T4 | ||<========= PUBCOMP(msgId=123) =============|| (取消PUBREL重发定时器) | T5 | || QoS 2 协议完成 | QoS 2 协议完成
总结
QoS 2 的复杂性主要体现在:
- 两阶段协议:PUBLISH→PUBREC→PUBREL→PUBCOMP
- 双重状态管理:需要维护两套不同的重发机制
- 严格的消息去重:接收端必须记录已处理的消息ID
- 资源管理复杂:需要在正确的时机清理状态和定时器
但正是这种复杂性保证了消息的绝对可靠传输,在金融、支付、库存管理等对数据一致性要求极高的场景中,QoS 2 是必不可少的选择。