当前位置: 首页 > news >正文

MQTT通信实现方案(Spring Boot 3 集成MQTT)

最近项目需求使用mqtt进行数据通信连接,有很多成熟的案例,我这里主要是记录一下Springboot3集成的mqtt。
一般常用的 Eclipse Paho的类库,又分为3版本和5版本。org.eclipse.paho.mqttv5.client 和 org.eclipse.paho.mqttv3.client 都是 Eclipse Paho 项目提供的 MQTT 客户端库,但前者实现了更新的 MQTT 5.0 协议,而后者实现了 MQTT 3.1.1 协议。

它们之间的主要区别,分为几个方面进行详细说明。

核心总结

  • MQTTv3 客户端:实现了 MQTT 3.1.1 协议,成熟、稳定、广泛使用,是当前大多数物联网应用的基础。

  • MQTTv5 客户端:实现了 MQTT 5.0 协议,在 v3.1.1 的基础上增加了大量新特性和改进,旨在解决大规模、复杂场景下的通信问题,是未来的发展方向。

作者这里使用的是Spring集成的MQTT。maven依赖为:

依赖配置 (pom.xml)

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.20</version>
</dependency>

进入到依赖可以看到Spring对3版本和5版本都进行了引入

<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>5.5.20</version><scope>compile</scope></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version><scope>compile</scope></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version><scope>compile</scope><optional>true</optional></dependency></dependencies>

完整代码实现

1. MQTT配置属性类

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {/*** MQTT代理服务器地址*/private String brokerUrl;/*** 客户端ID*/private String clientId;/*** 网关设备 ID(物联网接入服务 ID)*/private String interfaceDeviceId;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接超时时间(秒)*/private Integer connectionTimeout;/*** 保持活动间隔(秒)*/private Integer keepAliveInterval;/*** 重连延迟时间(毫秒)*/private Integer reconnectDelay;/*** 是否清除会话*/private Boolean cleanSession;/*** 最大飞行消息数*/private Integer maxInflight;/*** 默认主题*/private String defaultTopic;/*** 默认服务质量等级*/private Integer defaultQos;/*** 是否启用自定义重连机制*/private boolean enableCustomReconnect;/*** 最大重试次数(0表示无限重试)*/private int maxReconnectAttempts;/*** 初始重连延迟(毫秒)*/private int initialReconnectDelay;/*** 最大重连延迟(毫秒)*/private int maxReconnectDelay;/*** 重连延迟倍增因子*/private double reconnectBackoffMultiplier;/*** mqtt上报信息物模型标识前缀 测试环境:trailer_watch_model 正式环境:'0202001'*/private String physicalModel;/*** 主题配置*/private Topic topic = new Topic();/*** 主题配置类*/@Datapublic static class Topic {/*** 发送接收固定topic前缀*/private String prefix;/*** 发送者主题配置*/private Sender sender = new Sender();/*** 接收者主题配置*/private Receiver receiver = new Receiver();/*** 发送者主题配置类*/@Datapublic static class Sender {/*** 位置上报*/private String tpos;/*** 巡检上报*/private String tstatus;/*** 指令响应上报(首次响应)*/private String instructResp;/*** 指令执行结果事件*/private String instructResult;/*** 连接情况上报 connection_agent.online 为上线;connection_agent.offline 为下线*/private String connectionAgent;}/*** 接收者主题配置类*/@Datapublic static class Receiver {/*** 是否开启集群部署:开启后拼接前缀【只在订阅前添加前缀】*/private Boolean doShare;/*** 做集群部署时,按照规则增加前缀:$share/{deviceId}  deviceId为网关设备 ID(物联网接入服务 ID)* 例如:$share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/call*/private String share;/*** 原先单体部署时使用前缀*/private String prefix;/*** 指令接收主题*/private String instructrceive;}}
}     

2. MQTT配置类

/*** @author wp* @Description: MQTT配置类,负责创建和配置MQTT客户端* @date 2025-09-05 9:35*/
@Slf4j
@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttReconnectManager reconnectManager;@Autowiredprivate MqttGetFIle mqttGetFIle;/*** 创建MQTT连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});// 设置超时时间 单位为秒options.setConnectionTimeout(mqttProperties.getConnectionTimeout());// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());// 禁用Paho内置重连,使用自定义重连options.setAutomaticReconnect(false);// // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(mqttProperties.getCleanSession());options.setMaxInflight(mqttProperties.getMaxInflight());if (mqttProperties.getBrokerUrl().startsWith("ssl")) {// 加载证书及私钥this.configureTls(options);} else if (mqttProperties.getBrokerUrl().startsWith("tcp")) {// tcp 开头连接为本地开发测试options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());}return options;}/*** 创建MQTT异步客户端*/@Beanpublic IMqttAsyncClient mqttAsyncClient(MqttConnectOptions options, MqttCallbackHandler callbackHandler) {try {// 使用内存持久化MemoryPersistence persistence = new MemoryPersistence();// 创建客户端IMqttAsyncClient client = new MqttAsyncClient(mqttProperties.getBrokerUrl(), this.dealClientId(), persistence);// 设置回调处理器client.setCallback(callbackHandler);reconnectManager.initialize(client, options);// 初始连接尝试initialConnectWithRetry(client, options);return client;} catch (MqttException e) {log.error("创建MQTT客户端失败", e);throw new RuntimeException("无法创建MQTT客户端", e);}}/*** 初始连接带重试机制*/private void initialConnectWithRetry(IMqttAsyncClient client, MqttConnectOptions options) {new Thread(() -> {int attempt = 0;int delay = mqttProperties.getInitialReconnectDelay();while (!client.isConnected() && !Thread.currentThread().isInterrupted()) {try {attempt++;log.info("尝试第{}次连接MQTT代理: {}", attempt, mqttProperties.getBrokerUrl());client.connect(options).waitForCompletion();log.info("成功连接到MQTT代理");// 连接成功后订阅指定主题this.subscribe(client);break;} catch (Exception e) {log.warn("连接MQTT代理失败,{}秒后重试...", delay / 1000);if (mqttProperties.getMaxReconnectAttempts() > 0 && attempt >= mqttProperties.getMaxReconnectAttempts()) {log.error("已达到最大连接尝试次数({}),停止尝试", mqttProperties.getMaxReconnectAttempts());break;}try {Thread.sleep(delay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();log.error("连接线程被中断", ie);break;}// 计算下一次延迟delay = (int) Math.min(delay * mqttProperties.getReconnectBackoffMultiplier(), mqttProperties.getMaxReconnectDelay());}}}, "MQTT-Initial-Connect").start();}// {deviceId}-channel-{channelId}// deviceId为设备在天枢云上注册的设备ID// channelId表示该设备的通道标识, 为整型数字, 从数字0开始, 依次递增(这里采用16为随机数字)public String dealClientId() {String channelId = RandomUtil.randomString("0123456789", 16);return mqttProperties.getInterfaceDeviceId() + "-channel-" + channelId;}public void subscribe(IMqttAsyncClient client) {try {List<String> topics = new ArrayList<>();if (mqttProperties.getTopic().getReceiver().getDoShare()) {// $share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getReceiver().getShare() + mqttProperties.getInterfaceDeviceId() + "/" + mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());} else {// $iot/v1/device/{网关设备 id}/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());}// 对mqtt下行主题进行订阅for (String topic : topics) {client.subscribe(topic, mqttProperties.getDefaultQos());log.info("MQTT主题订阅成功:topic --> {}", topic);}} catch (MqttException e) {log.error("MQTT主题订阅失败:{}", e.getMessage(), e);}}private void configureTls(MqttConnectOptions connOpts) {try {KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());caKs.load(null);// 加载CA证书X509Certificate ca = getCertificate(mqttGetFIle.getSslCaFile());caKs.setCertificateEntry("ca", ca);KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());ks.load(null);// 加载客户端证书X509Certificate cert = getCertificate(mqttGetFIle.getSslCertFile());// 加载私钥PrivateKey privateKey = getPrivateKey(mqttGetFIle.getSslKeyFile());ks.setCertificateEntry("certificate", cert);ks.setKeyEntry("private-key", privateKey, new char[]{}, new Certificate[]{cert});TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());tmf.init(caKs);KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());kmf.init(ks, "".toCharArray());SSLContext sslContext = SSLContext.getInstance("TLSv1.2");sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);connOpts.setSocketFactory(sslContext.getSocketFactory());} catch (Exception ex) {log.error("证书加载异常:{}", ex.getMessage(), ex);}}private X509Certificate getCertificate(InputStream in) {try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {// 使用Bouncy Castle的PEMParser读取PEM文件PEMParser pemParser = new PEMParser(reader);Object object = pemParser.readObject();if (object instanceof X509CertificateHolder) {X509CertificateHolder certificateHolder = (X509CertificateHolder) object;Security.addProvider(new BouncyCastleProvider());JcaX509CertificateConverter converter = new JcaX509CertificateConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);return converter.getCertificate(certificateHolder);} else {throw new IllegalArgumentException("不支持的证书格式");}} catch (Exception ex) {log.error("读取证书时出现错误:{}", ex.getMessage(), ex);}return null;}private PrivateKey getPrivateKey(InputStream in) {try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {// 使用 Bouncy Castle 的 PEMParser 读取 PEM 文件PEMParser pemParser = new PEMParser(reader);Object object = pemParser.readObject();// 确认读取的是 PEMKeyPair 对象if (object instanceof PEMKeyPair) {PEMKeyPair pemKeyPair = (PEMKeyPair) object;PrivateKeyInfo privateKeyInfo =pemKeyPair.getPrivateKeyInfo();// 将 PrivateKeyInfo 转换为 PrivateKeySecurity.addProvider(new BouncyCastleProvider());JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);return converter.getPrivateKey(privateKeyInfo);} else {throw new IllegalArgumentException("不支持的私钥格式");}} catch (Exception ex) {log.error("读取私钥时出现错误:{}", ex.getMessage(), ex);}return null;}
}

3. MQTT回调处理器

/*** @author wp* @Description: MQTT回调处理器 处理连接丢失、消息到达和交付完成事件* @date 2025-09-05 9:51*/
@Slf4j
@Component
public class MqttCallbackHandler implements MqttCallback {@Autowiredprivate MqttReconnectManager reconnectManager;@Autowiredprivate MqttReceiveMsgHandler receiveMsgHandler;/*** 当连接丢失时调用*/@Overridepublic void connectionLost(Throwable cause) {log.warn("MQTT连接丢失", cause);// 触发自定义重连机制reconnectManager.triggerReconnect();}/*** 当消息到达时调用*/@Overridepublic void messageArrived(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}", topic, message.getQos(), payload);if (ObjectUtil.isEmpty(payload)) {return;}receiveMsgHandler.handler(payload);}/*** 当消息交付完成时调用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {try {log.debug("消息交付完成 - 主题: {}", token.getTopics() != null ? token.getTopics()[0] : "未知");} catch (Exception e) {log.error("获取交付完成消息主题失败", e);}}
}

4.MQTT重连管理器

/*** @author wp* @Description: MQTT重连管理器  负责处理连接丢失后的自动重连* @date 2025-09-05 13:20*/
@Slf4j
@Component
public class MqttReconnectManager {@Autowiredprivate MqttProperties mqttProperties;private final ScheduledExecutorService scheduler;private final AtomicInteger reconnectAttempts = new AtomicInteger(0);private final AtomicInteger currentDelay = new AtomicInteger(0);private final AtomicReference<IMqttAsyncClient> clientRef = new AtomicReference<>();private final AtomicReference<MqttConnectOptions> optionsRef = new AtomicReference<>();private volatile boolean isReconnecting = false;private volatile boolean isShutdown = false;@Autowiredpublic MqttReconnectManager() {this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {Thread thread = new Thread(r, "MQTT-Reconnect-Thread");thread.setDaemon(true);return thread;});}/*** 初始化重连管理器*/public void initialize(IMqttAsyncClient client, MqttConnectOptions options) {clientRef.set(client);optionsRef.set(options);currentDelay.set(mqttProperties.getInitialReconnectDelay());}/*** 触发重连*/public void triggerReconnect() {if (isShutdown || isReconnecting || !mqttProperties.isEnableCustomReconnect()) {return;}synchronized (this) {if (isReconnecting) {return;}isReconnecting = true;}scheduler.execute(this::doReconnect);}/*** 执行重连操作*/private void doReconnect() {IMqttAsyncClient client = clientRef.get();MqttConnectOptions options = optionsRef.get();if (client == null || options == null || isShutdown) {isReconnecting = false;return;}try {// 检查最大重试次数if (mqttProperties.getMaxReconnectAttempts() > 0 &&reconnectAttempts.get() >= mqttProperties.getMaxReconnectAttempts()) {log.warn("已达到最大重连次数({}),停止重连", mqttProperties.getMaxReconnectAttempts());isReconnecting = false;return;}log.info("尝试第{}次重连...", reconnectAttempts.incrementAndGet() + 1);if (client.isConnected()) {log.info("客户端已连接,无需重连");resetReconnectState();return;}// 尝试连接client.connect(options).waitForCompletion();log.info("重连成功!");// 重连成功后重新订阅主题subscribe(client);resetReconnectState();} catch (Exception e) {log.warn("重连失败,{}秒后再次尝试", currentDelay.get() / 1000);// 计算下一次重连延迟(使用退避算法)int nextDelay = (int) (currentDelay.get() * mqttProperties.getReconnectBackoffMultiplier());nextDelay = Math.min(nextDelay, mqttProperties.getMaxReconnectDelay());currentDelay.set(nextDelay);// 调度下一次重连if (!isShutdown) {scheduler.schedule(this::doReconnect, currentDelay.get(), TimeUnit.MILLISECONDS);}} finally {isReconnecting = false;}}public void subscribe(IMqttAsyncClient client) {try {List<String> topics = new ArrayList<>();if (mqttProperties.getTopic().getReceiver().getDoShare()) {// $share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getReceiver().getShare() + mqttProperties.getInterfaceDeviceId() + "/" + mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());} else {// $iot/v1/device/{网关设备 id}/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());}// 对mqtt下行主题进行订阅for (String topic : topics) {client.subscribe(topic, 1);log.info("MQTT主题订阅成功:topic --> {}", topic);}} catch (MqttException e) {log.error("MQTT主题订阅失败:{}", e.getMessage(), e);}}/*** 重置重连状态*/private void resetReconnectState() {reconnectAttempts.set(0);currentDelay.set(mqttProperties.getInitialReconnectDelay());isReconnecting = false;}/*** 停止重连管理器*/@PreDestroypublic void shutdown() {isShutdown = true;scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}log.info("MQTT重连管理器已关闭");}/*** 获取当前重连状态*/public boolean isReconnecting() {return isReconnecting;}/*** 获取重连尝试次数*/public int getReconnectAttempts() {return reconnectAttempts.get();}
}

5.SSL连接证书配置类

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;/*** @author wp* @Description: 获取ca认证文件* @date 2025-09-04 16:07*/
@Slf4j
@Component
public class MqttGetFIle {@Autowiredprivate MqttProperties mqttProperties;public InputStream getSslCaFile() {try {String path = getFilePath("ca.crt");return getInputStream(path);} catch (Exception e) {log.error("读取ca证书出现异常:{}", e.getMessage(), e);}return null;}public InputStream getSslCertFile() {try {String path = getFilePath(mqttProperties.getClientId() + ".crt");return getInputStream(path);} catch (Exception e) {log.error("读取客户端证书出现异常:{}", e.getMessage(), e);}return null;}public InputStream getSslKeyFile() {try {String path = getFilePath(mqttProperties.getClientId() + ".key");return getInputStream(path);} catch (Exception e) {log.error("读取私钥出现异常:{}", e.getMessage(), e);}return null;}private String getFilePath(String fileName) {return String.format("ssl/%s/" + fileName, mqttProperties.getClientId());}private InputStream getInputStream(String path) throws IOException {ClassPathResource classPathResource = new ClassPathResource(path);if (classPathResource.exists()) {return classPathResource.getInputStream();}return new FileInputStream(new File(applicationHome.getDir(), path));}private ApplicationHome applicationHome = new ApplicationHome(getClass());
}  

6.MQTT服务接口

/*** @author wp* @Description:  MQTT服务接口 提供MQTT消息发布和订阅功能* @date 2025-09-05 9:48*/
public interface MqttService {/*** 发布消息到指定主题* @param topic 主题* @param payload 消息内容* @param qos 服务质量等级 (0,1,2)* @param retained 是否保留消息*/void publish(String topic, String payload, int qos, boolean retained);/*** 发布消息到指定主题(默认配置QoS,不保留)* @param topic 主题* @param payload 消息内容*/void publish(String topic, String payload);/*** 订阅指定主题* @param topic 主题,支持通配符* @param qos 服务质量等级*/void subscribe(String topic, int qos);/*** 取消订阅指定主题* @param topic 主题*/void unsubscribe(String topic);/*** 检查MQTT客户端是否已连接* @return 连接状态*/boolean isConnected();/*** 断开MQTT连接*/void disconnect();/*** 重新连接MQTT代理*/void reconnect();
}

7.MQTT服务实现类

/*** @author wp* @Description: MQTT服务实现类* @date 2025-09-05 9:48*/
@Slf4j
@Service
public class MqttServiceImpl implements MqttService {@Autowiredprivate MqttProperties mqttProperties;private final IMqttAsyncClient mqttClient;@Autowired@Lazypublic MqttServiceImpl(IMqttAsyncClient mqttClient) {this.mqttClient = mqttClient;}@Overridepublic void publish(String topic, String payload, int qos, boolean retained) {try {if (isConnected()) {mqttClient.publish(topic, payload.getBytes(), qos, retained);log.debug("已发布消息到主题 {}: {}", topic, payload);} else {log.warn("MQTT客户端未连接,无法发布消息");}} catch (MqttException e) {log.error("发布消息到主题 {} 失败", topic, e);}}@Overridepublic void publish(String topic, String payload) {publish(topic, payload, mqttProperties.getDefaultQos(), false);}@Overridepublic void subscribe(String topic, int qos) {try {if (isConnected()) {mqttClient.subscribe(topic, qos);log.info("已订阅主题: {}", topic);} else {log.warn("MQTT客户端未连接,无法订阅主题");}} catch (MqttException e) {log.error("订阅主题 {} 失败", topic, e);}}@Overridepublic void unsubscribe(String topic) {try {if (isConnected()) {mqttClient.unsubscribe(topic);log.info("已取消订阅主题: {}", topic);} else {log.warn("MQTT客户端未连接,无法取消订阅主题");}} catch (MqttException e) {log.error("取消订阅主题 {} 失败", topic, e);}}@Overridepublic boolean isConnected() {return mqttClient != null && mqttClient.isConnected();}@Overridepublic void disconnect() {try {if (isConnected()) {mqttClient.disconnect();log.info("已断开MQTT连接");}} catch (MqttException e) {log.error("断开MQTT连接失败", e);}}@Overridepublic void reconnect() {try {if (!isConnected()) {mqttClient.reconnect();log.info("正在重新连接MQTT代理...");}} catch (MqttException e) {log.error("重新连接MQTT代理失败", e);}}
}

8.配置文件

#mqtt配置信息
mqtt:# MQTT代理服务器地址broker-url: tcp://localhost:1883# 客户端IDclient-id: 2405b7249600# 网关设备 ID(物联网接入服务 ID)interface-device-id: 2405b7249600#  用户名username: admin# 密码password: public# 连接超时时间(秒)connection-timeout: 30# 保持活动间隔(秒)keep-alive-interval: 60# 重连延迟时间(毫秒)reconnect-delay: 5000# 是否清除会话clean-session: true# 最大飞行消息数max-inflight: 10# 重连相关配置enable-custom-reconnect: truemax-reconnect-attempts: 0  # 0表示无限重试initial-reconnect-delay: 1000  # 初始重连延迟1秒max-reconnect-delay: 60000     # 最大重连延迟60秒reconnect-backoff-multiplier: 1.5  # 重连延迟倍增因子# 默认主题default-topic: test/topic# 默认服务质量等级default-qos: 1

9.使用示例

@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@PostMapping("/publish")public String publishMessage(@RequestParam String topic,@RequestParam String message) {boolean result = mqttService.publish(topic, message);return result ? "消息发布成功" : "消息发布失败";}
}



功能特点

  1. 自动重连机制:连接断开后会自动尝试重连,采用指数退避策略;

  2. 启动时自动连接:服务启动后会自动尝试连接MQTT服务器;

  3. 完整的配置管理:通过配置文件灵活配置MQTT参数;

  4. 异步处理:重连和连接操作使用异步处理,避免阻塞主线程;

  5. 异常处理:完善的异常处理和日志记录;

  6. 灵活的订阅机制:支持配置多个订阅主题和对应的QoS等级。

补充:MQTTv5与MQTTv3客户端区别

主要区别详述

1. 协议版本与兼容性

特性MQTTv3 客户端 (3.1.1)MQTTv5 客户端 (5.0)
协议版本MQTT 3.1.1MQTT 5.0
向后兼容是(与 3.1.1 broker 兼容)是。v5 客户端可以连接到 v3.1.1 的 Broker,但会自动降级功能,无法使用 v5 的新特性。
向前兼容否。v3 客户端无法连接到仅支持 v5 的 Broker。是(设计上考虑了未来的扩展)

关键点:v5 协议设计时就考虑了向后兼容。一个 v5 客户端可以安全地连接到一个 v3.1.1 的 Broker,只是会话和消息都将以 v3.1.1 的形式进行。反之则不行。

2. 新特性和增强功能 (MQTTv5 独有)
这是最核心的区别。MQTTv5 引入了大量新特性,这些在 v3 客户端中是完全不支持的。

特性类别MQTTv5 新特性说明MQTTv3 支持情况
原因码Reason Code在确认包(如 CONNACK, PUBACK, DISCONNECT)中返回操作成功或失败的具体原因(如“未授权”、“主题名无效”、“超出配额”等),极大方便了调试和故障排除。只有简单的返回码(如 0=成功,4=连接拒绝-用户名密码错误),信息量很少。
属性Properties几乎所有报文都可以附加一组自定义属性(键值对),极大地扩展了协议的灵活性和可扩展性。例如,可以设置消息过期时间、标识内容类型(如 JSON/XML)等。不支持。报文格式固定,扩展性差。
共享订阅Shared Subscriptions允许将消息负载均衡到一个订阅组中的多个客户端。语法:$share/{ShareName}/{TopicFilter}。这是实现消费者组、避免单点故障和负载均衡的关键特性。不支持。要实现类似功能,需要在应用层自己实现复杂的逻辑,或者依赖 Broker 的非标扩展。
主题别名Topic Alias客户端和 Broker 可以为长主题名分配一个简短的整数别名(如 123),在后续通信中用这个别名代替长主题名,显著减少网络带宽消耗。不支持。每次发布消息都必须携带完整的长主题名。
服务端断开Server DisconnectBroker 可以在 DISCONNECT 报文中携带原因码和属性,告知客户端断开连接的具体原因(如“因管理操作断开”),客户端可以据此采取不同策略(如是否重连)。Broker 断开连接时,客户端只能收到一个简单的 TCP 断开信号,无法知道原因。
流量控制Flow Control通过接收最大值属性,客户端可以控制从 Broker 接收消息的速率,防止被消息淹没。不支持。Broker 会以最快速度向下推消息。
载荷格式和内容类型Payload Format Indicator可以明确指示消息负载是 UTF-8 文本还是二进制数据。不支持。接收方需要自己猜测载荷格式。
请求/响应模式Request/Response通过响应主题和对比数据属性,原生支持了请求/响应模式,无需再约定固定的请求/响应主题,使得 RPC over MQTT 更加标准化。需要手动在消息载荷或主题中约定请求ID和响应主题,是应用层协议。
用户属性User Properties可以在 PUBLISH 等报文中附加自定义的字符串键值对,用于传递元数据(如消息来源、设备ID、地理位置等),类似于 HTTP 的 Header。不支持。只能将元数据放入消息载荷中,破坏了业务数据与元数据的分离。

3. API 差异
由于 v5 引入了新概念,其 Java API 也与 v3 有所不同,两者不兼容。

  • MQTTv3:

    • 连接参数:MqttConnectOptions
    • 消息类:MqttMessage
    • 回调接口:MqttCallback (包含 connectionLost, messageArrived, deliveryComplete)
  • MQTTv5:

    • 连接参数:MqttConnectionOptions (增加了设置属性等功能)
    • 消息类:MqttMessage (但内部增加了设置属性、载荷格式等方法)
    • 回调接口:MqttCallback (方法签名不同,参数中包含了大量 v5 特有的信息,如原因码、属性包等)

示例:发布消息的 API 对比

// MQTTv3 发布消息
MqttMessage messageV3 = new MqttMessage(payload);
messageV3.setQos(1);
client.publish("my/long/topic/name", messageV3);// MQTTv5 发布消息(可以使用属性)
MqttMessage messageV5 = new MqttMessage(payload);
messageV5.setQos(1);
// 设置 v5 属性:消息过期时间 60 秒,内容类型为 JSON
messageV5.setProperties(new MqttProperties());
messageV5.getProperties().setMessageExpiryInterval(60L);
messageV5.getProperties().setContentType("application/json");client.publish("my/long/topic/name", messageV5);

如何选择?

场景推荐选择理由
新项目MQTTv5 客户端面向未来,可以直接利用 v5 的新特性(如共享订阅、原因码)来构建更健壮、更高效的系统。只要 Broker 支持(如 EMQX 5.x, HiveMQ 4.x, Mosquitto 2.0+),就应优先选择 v5。
现有项目维护MQTTv3 客户端如果系统稳定运行,且没有使用 v5 特性的迫切需求,继续使用 v3 可以避免不必要的升级和测试成本。
必须连接旧 BrokerMQTTv3 客户端如果 Broker 是旧版本(如 Mosquitto 1.x, EMQX 4.x),不支持 MQTT 5.0,则必须使用 v3 客户端。
需要共享订阅、流量控制等高级特性MQTTv5 客户端如果你的架构严重依赖这些特性,那么 v5 是唯一的选择。v3 无法原生实现。
资源极度受限的设备MQTTv3 客户端v5 协议包略大一些(因为属性等字段),在极端情况下,v3 可能更节省一点点资源。但对于绝大多数设备,这点差异可忽略不计。

总结表格

特性MQTTv3 ClientMQTTv5 Client
协议版本3.1.15.0
核心特性基础发布/订阅、QoS、保留消息、遗嘱消息包含 v3 所有特性,并新增大量增强功能
扩展性差,固定报文格式极好,支持属性
错误诊断简单,信息量少详细的原因码,易于调试
带宽优化主题别名
负载均衡无原生支持原生共享订阅
流量控制有(接收最大值)
元数据支持需放入载荷用户属性
API 兼容性与 v5 不兼容与 v3 不兼容,但可降级连接

结论:对于新项目,如果基础设施(Broker)支持,应毫不犹豫地选择 MQTTv5 客户端,以充分利用其现代、强大和灵活的特性。

http://www.dtcms.com/a/397965.html

相关文章:

  • 做网站客户需求网站建设与运行的盈利收入
  • Sass:CSS 预处理器
  • CSS元素的总宽度计算规则
  • WPS表格和Excel中快速选择有批注的全部单元格
  • 108. 将有序数组转换为二叉搜索树【 力扣(LeetCode) 】
  • 构建你的 MCP 能力层:.NET 9 + SK 的系统方案
  • 好网站分享建设一个网站的具体流程
  • 缓存优化技术指南:让数据访问快如闪电
  • 算法相关问题记录
  • DV OV EV SSL证书验证级别
  • 中山做网站哪家公司好网页设计模板html图片
  • AI赋能 破局重生 嬗变图强 | 安贝斯受邀参加2025第三届智能物联网与安全科技应用大会暨第七届智能化信息化年度峰会
  • ASP.NET 学习总结
  • 基于ASP.NET+SQL Server简单的 MVC 电商网站
  • 开源生态与技术民主化 - 从LLaMA到DeepSeek的开源革命(LLaMA、DeepSeek-V3、Mistral 7B)
  • 电路方案分析(二十三)Hi-Fi耳机放大器电源参考设计
  • 快速识别可访问端口号:Python 实现端口扫描
  • 【汽车篇】AI深度学习在汽车激光焊接外观检测的应用
  • 广州专业建站旅游景区网站建设规划
  • 【第30话:路径规划】自动驾驶中Hybrid A星(A*)搜索算法的详细推导及代码示例
  • [算法导论] 正则匹配 . *
  • 电子商务网站开发教程网站源码.net
  • (三)React+.Net+Typescript全栈(动态Router/Redux/RTK Query获取后端数据)
  • elasticsearch的使用、api调用、更新、持久化
  • Jenkins(速通版)
  • IDEA新建SpringBoot项目时没有低版本Java选项
  • Jupyter Lab 汉化
  • Amazon Chime SDK 详解:AWS 的实时音视频利器
  • python学智能算法(三十八)|使用Numpy和PyTorch模块绘制正态分布函数图
  • 佛山网站建设no.1开源站群cms