MQTT通信实现方案(Spring Boot 3 集成MQTT)
最近项目需求使用mqtt进行数据通信连接,有很多成熟的案例,我这里主要是记录一下Springboot3集成的mqtt。
一般常用的 Eclipse Paho的类库,又分为3版本和5版本。org.eclipse.paho.mqttv5.client 和 org.eclipse.paho.mqttv3.client 都是 Eclipse Paho 项目提供的 MQTT 客户端库,但前者实现了更新的 MQTT 5.0 协议,而后者实现了 MQTT 3.1.1 协议。
它们之间的主要区别,分为几个方面进行详细说明。
核心总结
-
MQTTv3 客户端:实现了 MQTT 3.1.1 协议,成熟、稳定、广泛使用,是当前大多数物联网应用的基础。
-
MQTTv5 客户端:实现了 MQTT 5.0 协议,在 v3.1.1 的基础上增加了大量新特性和改进,旨在解决大规模、复杂场景下的通信问题,是未来的发展方向。
作者这里使用的是Spring集成的MQTT。maven依赖为:
依赖配置 (pom.xml)
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.20</version>
</dependency>
进入到依赖可以看到Spring对3版本和5版本都进行了引入
<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>5.5.20</version><scope>compile</scope></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version><scope>compile</scope></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version><scope>compile</scope><optional>true</optional></dependency></dependencies>
完整代码实现
1. MQTT配置属性类
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {/*** MQTT代理服务器地址*/private String brokerUrl;/*** 客户端ID*/private String clientId;/*** 网关设备 ID(物联网接入服务 ID)*/private String interfaceDeviceId;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接超时时间(秒)*/private Integer connectionTimeout;/*** 保持活动间隔(秒)*/private Integer keepAliveInterval;/*** 重连延迟时间(毫秒)*/private Integer reconnectDelay;/*** 是否清除会话*/private Boolean cleanSession;/*** 最大飞行消息数*/private Integer maxInflight;/*** 默认主题*/private String defaultTopic;/*** 默认服务质量等级*/private Integer defaultQos;/*** 是否启用自定义重连机制*/private boolean enableCustomReconnect;/*** 最大重试次数(0表示无限重试)*/private int maxReconnectAttempts;/*** 初始重连延迟(毫秒)*/private int initialReconnectDelay;/*** 最大重连延迟(毫秒)*/private int maxReconnectDelay;/*** 重连延迟倍增因子*/private double reconnectBackoffMultiplier;/*** mqtt上报信息物模型标识前缀 测试环境:trailer_watch_model 正式环境:'0202001'*/private String physicalModel;/*** 主题配置*/private Topic topic = new Topic();/*** 主题配置类*/@Datapublic static class Topic {/*** 发送接收固定topic前缀*/private String prefix;/*** 发送者主题配置*/private Sender sender = new Sender();/*** 接收者主题配置*/private Receiver receiver = new Receiver();/*** 发送者主题配置类*/@Datapublic static class Sender {/*** 位置上报*/private String tpos;/*** 巡检上报*/private String tstatus;/*** 指令响应上报(首次响应)*/private String instructResp;/*** 指令执行结果事件*/private String instructResult;/*** 连接情况上报 connection_agent.online 为上线;connection_agent.offline 为下线*/private String connectionAgent;}/*** 接收者主题配置类*/@Datapublic static class Receiver {/*** 是否开启集群部署:开启后拼接前缀【只在订阅前添加前缀】*/private Boolean doShare;/*** 做集群部署时,按照规则增加前缀:$share/{deviceId} deviceId为网关设备 ID(物联网接入服务 ID)* 例如:$share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/call*/private String share;/*** 原先单体部署时使用前缀*/private String prefix;/*** 指令接收主题*/private String instructrceive;}}
}
2. MQTT配置类
/*** @author wp* @Description: MQTT配置类,负责创建和配置MQTT客户端* @date 2025-09-05 9:35*/
@Slf4j
@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttReconnectManager reconnectManager;@Autowiredprivate MqttGetFIle mqttGetFIle;/*** 创建MQTT连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});// 设置超时时间 单位为秒options.setConnectionTimeout(mqttProperties.getConnectionTimeout());// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());// 禁用Paho内置重连,使用自定义重连options.setAutomaticReconnect(false);// // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(mqttProperties.getCleanSession());options.setMaxInflight(mqttProperties.getMaxInflight());if (mqttProperties.getBrokerUrl().startsWith("ssl")) {// 加载证书及私钥this.configureTls(options);} else if (mqttProperties.getBrokerUrl().startsWith("tcp")) {// tcp 开头连接为本地开发测试options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());}return options;}/*** 创建MQTT异步客户端*/@Beanpublic IMqttAsyncClient mqttAsyncClient(MqttConnectOptions options, MqttCallbackHandler callbackHandler) {try {// 使用内存持久化MemoryPersistence persistence = new MemoryPersistence();// 创建客户端IMqttAsyncClient client = new MqttAsyncClient(mqttProperties.getBrokerUrl(), this.dealClientId(), persistence);// 设置回调处理器client.setCallback(callbackHandler);reconnectManager.initialize(client, options);// 初始连接尝试initialConnectWithRetry(client, options);return client;} catch (MqttException e) {log.error("创建MQTT客户端失败", e);throw new RuntimeException("无法创建MQTT客户端", e);}}/*** 初始连接带重试机制*/private void initialConnectWithRetry(IMqttAsyncClient client, MqttConnectOptions options) {new Thread(() -> {int attempt = 0;int delay = mqttProperties.getInitialReconnectDelay();while (!client.isConnected() && !Thread.currentThread().isInterrupted()) {try {attempt++;log.info("尝试第{}次连接MQTT代理: {}", attempt, mqttProperties.getBrokerUrl());client.connect(options).waitForCompletion();log.info("成功连接到MQTT代理");// 连接成功后订阅指定主题this.subscribe(client);break;} catch (Exception e) {log.warn("连接MQTT代理失败,{}秒后重试...", delay / 1000);if (mqttProperties.getMaxReconnectAttempts() > 0 && attempt >= mqttProperties.getMaxReconnectAttempts()) {log.error("已达到最大连接尝试次数({}),停止尝试", mqttProperties.getMaxReconnectAttempts());break;}try {Thread.sleep(delay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();log.error("连接线程被中断", ie);break;}// 计算下一次延迟delay = (int) Math.min(delay * mqttProperties.getReconnectBackoffMultiplier(), mqttProperties.getMaxReconnectDelay());}}}, "MQTT-Initial-Connect").start();}// {deviceId}-channel-{channelId}// deviceId为设备在天枢云上注册的设备ID// channelId表示该设备的通道标识, 为整型数字, 从数字0开始, 依次递增(这里采用16为随机数字)public String dealClientId() {String channelId = RandomUtil.randomString("0123456789", 16);return mqttProperties.getInterfaceDeviceId() + "-channel-" + channelId;}public void subscribe(IMqttAsyncClient client) {try {List<String> topics = new ArrayList<>();if (mqttProperties.getTopic().getReceiver().getDoShare()) {// $share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getReceiver().getShare() + mqttProperties.getInterfaceDeviceId() + "/" + mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());} else {// $iot/v1/device/{网关设备 id}/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());}// 对mqtt下行主题进行订阅for (String topic : topics) {client.subscribe(topic, mqttProperties.getDefaultQos());log.info("MQTT主题订阅成功:topic --> {}", topic);}} catch (MqttException e) {log.error("MQTT主题订阅失败:{}", e.getMessage(), e);}}private void configureTls(MqttConnectOptions connOpts) {try {KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());caKs.load(null);// 加载CA证书X509Certificate ca = getCertificate(mqttGetFIle.getSslCaFile());caKs.setCertificateEntry("ca", ca);KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());ks.load(null);// 加载客户端证书X509Certificate cert = getCertificate(mqttGetFIle.getSslCertFile());// 加载私钥PrivateKey privateKey = getPrivateKey(mqttGetFIle.getSslKeyFile());ks.setCertificateEntry("certificate", cert);ks.setKeyEntry("private-key", privateKey, new char[]{}, new Certificate[]{cert});TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());tmf.init(caKs);KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());kmf.init(ks, "".toCharArray());SSLContext sslContext = SSLContext.getInstance("TLSv1.2");sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);connOpts.setSocketFactory(sslContext.getSocketFactory());} catch (Exception ex) {log.error("证书加载异常:{}", ex.getMessage(), ex);}}private X509Certificate getCertificate(InputStream in) {try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {// 使用Bouncy Castle的PEMParser读取PEM文件PEMParser pemParser = new PEMParser(reader);Object object = pemParser.readObject();if (object instanceof X509CertificateHolder) {X509CertificateHolder certificateHolder = (X509CertificateHolder) object;Security.addProvider(new BouncyCastleProvider());JcaX509CertificateConverter converter = new JcaX509CertificateConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);return converter.getCertificate(certificateHolder);} else {throw new IllegalArgumentException("不支持的证书格式");}} catch (Exception ex) {log.error("读取证书时出现错误:{}", ex.getMessage(), ex);}return null;}private PrivateKey getPrivateKey(InputStream in) {try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {// 使用 Bouncy Castle 的 PEMParser 读取 PEM 文件PEMParser pemParser = new PEMParser(reader);Object object = pemParser.readObject();// 确认读取的是 PEMKeyPair 对象if (object instanceof PEMKeyPair) {PEMKeyPair pemKeyPair = (PEMKeyPair) object;PrivateKeyInfo privateKeyInfo =pemKeyPair.getPrivateKeyInfo();// 将 PrivateKeyInfo 转换为 PrivateKeySecurity.addProvider(new BouncyCastleProvider());JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);return converter.getPrivateKey(privateKeyInfo);} else {throw new IllegalArgumentException("不支持的私钥格式");}} catch (Exception ex) {log.error("读取私钥时出现错误:{}", ex.getMessage(), ex);}return null;}
}
3. MQTT回调处理器
/*** @author wp* @Description: MQTT回调处理器 处理连接丢失、消息到达和交付完成事件* @date 2025-09-05 9:51*/
@Slf4j
@Component
public class MqttCallbackHandler implements MqttCallback {@Autowiredprivate MqttReconnectManager reconnectManager;@Autowiredprivate MqttReceiveMsgHandler receiveMsgHandler;/*** 当连接丢失时调用*/@Overridepublic void connectionLost(Throwable cause) {log.warn("MQTT连接丢失", cause);// 触发自定义重连机制reconnectManager.triggerReconnect();}/*** 当消息到达时调用*/@Overridepublic void messageArrived(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}", topic, message.getQos(), payload);if (ObjectUtil.isEmpty(payload)) {return;}receiveMsgHandler.handler(payload);}/*** 当消息交付完成时调用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {try {log.debug("消息交付完成 - 主题: {}", token.getTopics() != null ? token.getTopics()[0] : "未知");} catch (Exception e) {log.error("获取交付完成消息主题失败", e);}}
}
4.MQTT重连管理器
/*** @author wp* @Description: MQTT重连管理器 负责处理连接丢失后的自动重连* @date 2025-09-05 13:20*/
@Slf4j
@Component
public class MqttReconnectManager {@Autowiredprivate MqttProperties mqttProperties;private final ScheduledExecutorService scheduler;private final AtomicInteger reconnectAttempts = new AtomicInteger(0);private final AtomicInteger currentDelay = new AtomicInteger(0);private final AtomicReference<IMqttAsyncClient> clientRef = new AtomicReference<>();private final AtomicReference<MqttConnectOptions> optionsRef = new AtomicReference<>();private volatile boolean isReconnecting = false;private volatile boolean isShutdown = false;@Autowiredpublic MqttReconnectManager() {this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {Thread thread = new Thread(r, "MQTT-Reconnect-Thread");thread.setDaemon(true);return thread;});}/*** 初始化重连管理器*/public void initialize(IMqttAsyncClient client, MqttConnectOptions options) {clientRef.set(client);optionsRef.set(options);currentDelay.set(mqttProperties.getInitialReconnectDelay());}/*** 触发重连*/public void triggerReconnect() {if (isShutdown || isReconnecting || !mqttProperties.isEnableCustomReconnect()) {return;}synchronized (this) {if (isReconnecting) {return;}isReconnecting = true;}scheduler.execute(this::doReconnect);}/*** 执行重连操作*/private void doReconnect() {IMqttAsyncClient client = clientRef.get();MqttConnectOptions options = optionsRef.get();if (client == null || options == null || isShutdown) {isReconnecting = false;return;}try {// 检查最大重试次数if (mqttProperties.getMaxReconnectAttempts() > 0 &&reconnectAttempts.get() >= mqttProperties.getMaxReconnectAttempts()) {log.warn("已达到最大重连次数({}),停止重连", mqttProperties.getMaxReconnectAttempts());isReconnecting = false;return;}log.info("尝试第{}次重连...", reconnectAttempts.incrementAndGet() + 1);if (client.isConnected()) {log.info("客户端已连接,无需重连");resetReconnectState();return;}// 尝试连接client.connect(options).waitForCompletion();log.info("重连成功!");// 重连成功后重新订阅主题subscribe(client);resetReconnectState();} catch (Exception e) {log.warn("重连失败,{}秒后再次尝试", currentDelay.get() / 1000);// 计算下一次重连延迟(使用退避算法)int nextDelay = (int) (currentDelay.get() * mqttProperties.getReconnectBackoffMultiplier());nextDelay = Math.min(nextDelay, mqttProperties.getMaxReconnectDelay());currentDelay.set(nextDelay);// 调度下一次重连if (!isShutdown) {scheduler.schedule(this::doReconnect, currentDelay.get(), TimeUnit.MILLISECONDS);}} finally {isReconnecting = false;}}public void subscribe(IMqttAsyncClient client) {try {List<String> topics = new ArrayList<>();if (mqttProperties.getTopic().getReceiver().getDoShare()) {// $share/25352d632800/$iot/v1/device/25352d632800/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getReceiver().getShare() + mqttProperties.getInterfaceDeviceId() + "/" + mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());} else {// $iot/v1/device/{网关设备 id}/connection_agent/functions/calltopics.add(mqttProperties.getTopic().getPrefix() + mqttProperties.getInterfaceDeviceId() + mqttProperties.getTopic().getReceiver().getInstructrceive());}// 对mqtt下行主题进行订阅for (String topic : topics) {client.subscribe(topic, 1);log.info("MQTT主题订阅成功:topic --> {}", topic);}} catch (MqttException e) {log.error("MQTT主题订阅失败:{}", e.getMessage(), e);}}/*** 重置重连状态*/private void resetReconnectState() {reconnectAttempts.set(0);currentDelay.set(mqttProperties.getInitialReconnectDelay());isReconnecting = false;}/*** 停止重连管理器*/@PreDestroypublic void shutdown() {isShutdown = true;scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}log.info("MQTT重连管理器已关闭");}/*** 获取当前重连状态*/public boolean isReconnecting() {return isReconnecting;}/*** 获取重连尝试次数*/public int getReconnectAttempts() {return reconnectAttempts.get();}
}
5.SSL连接证书配置类
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;/*** @author wp* @Description: 获取ca认证文件* @date 2025-09-04 16:07*/
@Slf4j
@Component
public class MqttGetFIle {@Autowiredprivate MqttProperties mqttProperties;public InputStream getSslCaFile() {try {String path = getFilePath("ca.crt");return getInputStream(path);} catch (Exception e) {log.error("读取ca证书出现异常:{}", e.getMessage(), e);}return null;}public InputStream getSslCertFile() {try {String path = getFilePath(mqttProperties.getClientId() + ".crt");return getInputStream(path);} catch (Exception e) {log.error("读取客户端证书出现异常:{}", e.getMessage(), e);}return null;}public InputStream getSslKeyFile() {try {String path = getFilePath(mqttProperties.getClientId() + ".key");return getInputStream(path);} catch (Exception e) {log.error("读取私钥出现异常:{}", e.getMessage(), e);}return null;}private String getFilePath(String fileName) {return String.format("ssl/%s/" + fileName, mqttProperties.getClientId());}private InputStream getInputStream(String path) throws IOException {ClassPathResource classPathResource = new ClassPathResource(path);if (classPathResource.exists()) {return classPathResource.getInputStream();}return new FileInputStream(new File(applicationHome.getDir(), path));}private ApplicationHome applicationHome = new ApplicationHome(getClass());
}
6.MQTT服务接口
/*** @author wp* @Description: MQTT服务接口 提供MQTT消息发布和订阅功能* @date 2025-09-05 9:48*/
public interface MqttService {/*** 发布消息到指定主题* @param topic 主题* @param payload 消息内容* @param qos 服务质量等级 (0,1,2)* @param retained 是否保留消息*/void publish(String topic, String payload, int qos, boolean retained);/*** 发布消息到指定主题(默认配置QoS,不保留)* @param topic 主题* @param payload 消息内容*/void publish(String topic, String payload);/*** 订阅指定主题* @param topic 主题,支持通配符* @param qos 服务质量等级*/void subscribe(String topic, int qos);/*** 取消订阅指定主题* @param topic 主题*/void unsubscribe(String topic);/*** 检查MQTT客户端是否已连接* @return 连接状态*/boolean isConnected();/*** 断开MQTT连接*/void disconnect();/*** 重新连接MQTT代理*/void reconnect();
}
7.MQTT服务实现类
/*** @author wp* @Description: MQTT服务实现类* @date 2025-09-05 9:48*/
@Slf4j
@Service
public class MqttServiceImpl implements MqttService {@Autowiredprivate MqttProperties mqttProperties;private final IMqttAsyncClient mqttClient;@Autowired@Lazypublic MqttServiceImpl(IMqttAsyncClient mqttClient) {this.mqttClient = mqttClient;}@Overridepublic void publish(String topic, String payload, int qos, boolean retained) {try {if (isConnected()) {mqttClient.publish(topic, payload.getBytes(), qos, retained);log.debug("已发布消息到主题 {}: {}", topic, payload);} else {log.warn("MQTT客户端未连接,无法发布消息");}} catch (MqttException e) {log.error("发布消息到主题 {} 失败", topic, e);}}@Overridepublic void publish(String topic, String payload) {publish(topic, payload, mqttProperties.getDefaultQos(), false);}@Overridepublic void subscribe(String topic, int qos) {try {if (isConnected()) {mqttClient.subscribe(topic, qos);log.info("已订阅主题: {}", topic);} else {log.warn("MQTT客户端未连接,无法订阅主题");}} catch (MqttException e) {log.error("订阅主题 {} 失败", topic, e);}}@Overridepublic void unsubscribe(String topic) {try {if (isConnected()) {mqttClient.unsubscribe(topic);log.info("已取消订阅主题: {}", topic);} else {log.warn("MQTT客户端未连接,无法取消订阅主题");}} catch (MqttException e) {log.error("取消订阅主题 {} 失败", topic, e);}}@Overridepublic boolean isConnected() {return mqttClient != null && mqttClient.isConnected();}@Overridepublic void disconnect() {try {if (isConnected()) {mqttClient.disconnect();log.info("已断开MQTT连接");}} catch (MqttException e) {log.error("断开MQTT连接失败", e);}}@Overridepublic void reconnect() {try {if (!isConnected()) {mqttClient.reconnect();log.info("正在重新连接MQTT代理...");}} catch (MqttException e) {log.error("重新连接MQTT代理失败", e);}}
}
8.配置文件
#mqtt配置信息
mqtt:# MQTT代理服务器地址broker-url: tcp://localhost:1883# 客户端IDclient-id: 2405b7249600# 网关设备 ID(物联网接入服务 ID)interface-device-id: 2405b7249600# 用户名username: admin# 密码password: public# 连接超时时间(秒)connection-timeout: 30# 保持活动间隔(秒)keep-alive-interval: 60# 重连延迟时间(毫秒)reconnect-delay: 5000# 是否清除会话clean-session: true# 最大飞行消息数max-inflight: 10# 重连相关配置enable-custom-reconnect: truemax-reconnect-attempts: 0 # 0表示无限重试initial-reconnect-delay: 1000 # 初始重连延迟1秒max-reconnect-delay: 60000 # 最大重连延迟60秒reconnect-backoff-multiplier: 1.5 # 重连延迟倍增因子# 默认主题default-topic: test/topic# 默认服务质量等级default-qos: 1
9.使用示例
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@PostMapping("/publish")public String publishMessage(@RequestParam String topic,@RequestParam String message) {boolean result = mqttService.publish(topic, message);return result ? "消息发布成功" : "消息发布失败";}
}
功能特点
-
自动重连机制:连接断开后会自动尝试重连,采用指数退避策略;
-
启动时自动连接:服务启动后会自动尝试连接MQTT服务器;
-
完整的配置管理:通过配置文件灵活配置MQTT参数;
-
异步处理:重连和连接操作使用异步处理,避免阻塞主线程;
-
异常处理:完善的异常处理和日志记录;
-
灵活的订阅机制:支持配置多个订阅主题和对应的QoS等级。
补充:MQTTv5与MQTTv3客户端区别
主要区别详述
1. 协议版本与兼容性
特性 | MQTTv3 客户端 (3.1.1) | MQTTv5 客户端 (5.0) |
---|---|---|
协议版本 | MQTT 3.1.1 | MQTT 5.0 |
向后兼容 | 是(与 3.1.1 broker 兼容) | 是。v5 客户端可以连接到 v3.1.1 的 Broker,但会自动降级功能,无法使用 v5 的新特性。 |
向前兼容 | 否。v3 客户端无法连接到仅支持 v5 的 Broker。 | 是(设计上考虑了未来的扩展) |
关键点:v5 协议设计时就考虑了向后兼容。一个 v5 客户端可以安全地连接到一个 v3.1.1 的 Broker,只是会话和消息都将以 v3.1.1 的形式进行。反之则不行。
2. 新特性和增强功能 (MQTTv5 独有)
这是最核心的区别。MQTTv5 引入了大量新特性,这些在 v3 客户端中是完全不支持的。
特性类别 | MQTTv5 新特性 | 说明 | MQTTv3 支持情况 |
---|---|---|---|
原因码 | Reason Code | 在确认包(如 CONNACK, PUBACK, DISCONNECT)中返回操作成功或失败的具体原因(如“未授权”、“主题名无效”、“超出配额”等),极大方便了调试和故障排除。 | 只有简单的返回码(如 0=成功,4=连接拒绝-用户名密码错误),信息量很少。 |
属性 | Properties | 几乎所有报文都可以附加一组自定义属性(键值对),极大地扩展了协议的灵活性和可扩展性。例如,可以设置消息过期时间、标识内容类型(如 JSON/XML)等。 | 不支持。报文格式固定,扩展性差。 |
共享订阅 | Shared Subscriptions | 允许将消息负载均衡到一个订阅组中的多个客户端。语法:$share/{ShareName}/{TopicFilter}。这是实现消费者组、避免单点故障和负载均衡的关键特性。 | 不支持。要实现类似功能,需要在应用层自己实现复杂的逻辑,或者依赖 Broker 的非标扩展。 |
主题别名 | Topic Alias | 客户端和 Broker 可以为长主题名分配一个简短的整数别名(如 123),在后续通信中用这个别名代替长主题名,显著减少网络带宽消耗。 | 不支持。每次发布消息都必须携带完整的长主题名。 |
服务端断开 | Server Disconnect | Broker 可以在 DISCONNECT 报文中携带原因码和属性,告知客户端断开连接的具体原因(如“因管理操作断开”),客户端可以据此采取不同策略(如是否重连)。 | Broker 断开连接时,客户端只能收到一个简单的 TCP 断开信号,无法知道原因。 |
流量控制 | Flow Control | 通过接收最大值属性,客户端可以控制从 Broker 接收消息的速率,防止被消息淹没。 | 不支持。Broker 会以最快速度向下推消息。 |
载荷格式和内容类型 | Payload Format Indicator | 可以明确指示消息负载是 UTF-8 文本还是二进制数据。 | 不支持。接收方需要自己猜测载荷格式。 |
请求/响应模式 | Request/Response | 通过响应主题和对比数据属性,原生支持了请求/响应模式,无需再约定固定的请求/响应主题,使得 RPC over MQTT 更加标准化。 | 需要手动在消息载荷或主题中约定请求ID和响应主题,是应用层协议。 |
用户属性 | User Properties | 可以在 PUBLISH 等报文中附加自定义的字符串键值对,用于传递元数据(如消息来源、设备ID、地理位置等),类似于 HTTP 的 Header。 | 不支持。只能将元数据放入消息载荷中,破坏了业务数据与元数据的分离。 |
3. API 差异
由于 v5 引入了新概念,其 Java API 也与 v3 有所不同,两者不兼容。
-
MQTTv3:
- 连接参数:MqttConnectOptions
- 消息类:MqttMessage
- 回调接口:MqttCallback (包含 connectionLost, messageArrived, deliveryComplete)
-
MQTTv5:
- 连接参数:MqttConnectionOptions (增加了设置属性等功能)
- 消息类:MqttMessage (但内部增加了设置属性、载荷格式等方法)
- 回调接口:MqttCallback (方法签名不同,参数中包含了大量 v5 特有的信息,如原因码、属性包等)
示例:发布消息的 API 对比
// MQTTv3 发布消息
MqttMessage messageV3 = new MqttMessage(payload);
messageV3.setQos(1);
client.publish("my/long/topic/name", messageV3);// MQTTv5 发布消息(可以使用属性)
MqttMessage messageV5 = new MqttMessage(payload);
messageV5.setQos(1);
// 设置 v5 属性:消息过期时间 60 秒,内容类型为 JSON
messageV5.setProperties(new MqttProperties());
messageV5.getProperties().setMessageExpiryInterval(60L);
messageV5.getProperties().setContentType("application/json");client.publish("my/long/topic/name", messageV5);
如何选择?
场景 | 推荐选择 | 理由 |
---|---|---|
新项目 | MQTTv5 客户端 | 面向未来,可以直接利用 v5 的新特性(如共享订阅、原因码)来构建更健壮、更高效的系统。只要 Broker 支持(如 EMQX 5.x, HiveMQ 4.x, Mosquitto 2.0+),就应优先选择 v5。 |
现有项目维护 | MQTTv3 客户端 | 如果系统稳定运行,且没有使用 v5 特性的迫切需求,继续使用 v3 可以避免不必要的升级和测试成本。 |
必须连接旧 Broker | MQTTv3 客户端 | 如果 Broker 是旧版本(如 Mosquitto 1.x, EMQX 4.x),不支持 MQTT 5.0,则必须使用 v3 客户端。 |
需要共享订阅、流量控制等高级特性 | MQTTv5 客户端 | 如果你的架构严重依赖这些特性,那么 v5 是唯一的选择。v3 无法原生实现。 |
资源极度受限的设备 | MQTTv3 客户端 | v5 协议包略大一些(因为属性等字段),在极端情况下,v3 可能更节省一点点资源。但对于绝大多数设备,这点差异可忽略不计。 |
总结表格
特性 | MQTTv3 Client | MQTTv5 Client |
---|---|---|
协议版本 | 3.1.1 | 5.0 |
核心特性 | 基础发布/订阅、QoS、保留消息、遗嘱消息 | 包含 v3 所有特性,并新增大量增强功能 |
扩展性 | 差,固定报文格式 | 极好,支持属性 |
错误诊断 | 简单,信息量少 | 详细的原因码,易于调试 |
带宽优化 | 无 | 主题别名 |
负载均衡 | 无原生支持 | 原生共享订阅 |
流量控制 | 无 | 有(接收最大值) |
元数据支持 | 需放入载荷 | 用户属性 |
API 兼容性 | 与 v5 不兼容 | 与 v3 不兼容,但可降级连接 |
结论:对于新项目,如果基础设施(Broker)支持,应毫不犹豫地选择 MQTTv5 客户端,以充分利用其现代、强大和灵活的特性。