Spring Boot 集成 MQTT 实现消息发布与订阅
一、MQTT 是什么?
MQTT(Message Queuing Telemetry Transport)是一种 轻量级发布/订阅消息传输协议,广泛用于 物联网(IoT)设备通信。它基于 TCP/IP 协议,具有以下特点:
-  
📦 轻量级,网络带宽占用低
 -  
🔁 支持自动重连与消息重发
 -  
🔔 支持 QoS(服务质量等级)保证消息可靠性
 -  
📡 典型应用场景:智能家居、工业监控、电力系统、设备遥测等
 
  项目Demo源码
 GitHub: https://github.com/lpsqq1944900433/mqtt-demo/
Gitee: https://gitee.com/QQ1944900433/mqtt-demo
在本文中,我们将基于 Spring Boot + Eclipse Paho 客户端,实现一个最小可运行的 MQTT 消息发布与订阅系统。


二、项目结构概览
项目结构如下:
mqtt-demo├─ com.lps.mqttdemo│   ├─ config│   │   └─ MqttConfig.java           # MQTT连接配置类│   ├─ publisher│   │   └─ Publisher.java            # MQTT消息发布者│   ├─ subscriber│   │   └─ Subscriber.java           # MQTT消息订阅者│   └─ MqttDemoApplication.java      # 启动类└─ resources└─ application.yml               # 配置文件
 
依赖引入:
        <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency> 
三、配置文件(application.yml)
# MQTT 配置
mqtt:# MQTT Broker 地址broker-url: tcp://192.168.88.178:1883# 客户端 IDclient-id: mqtt-demo-client# 订阅的主题topic: lps-test-topic# 默认 QoS 等级(0: 最多一次, 1: 至少一次, 2: 仅一次)default-qos: 1# 连接超时时间(秒)connection-timeout: 30# 保持连接间隔(秒)keep-alive-interval: 60# 是否自动重连automatic-reconnect: true# 是否清除会话clean-session: true# Spring Boot 应用配置
spring:application:name: mqtt-demo# 服务器端口
server:port: 8080 
💡 建议:
-  
如果你的 MQTT Broker 是 EMQX 或 Mosquitto,可以用
tcp://127.0.0.1:1883进行测试。 -  
在生产环境中建议启用 SSL(
ssl://broker地址:8883)。 
四、MqttConfig 配置类详解
package com.lps.mqttdemo.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** MQTT 配置类* 用于配置 MQTT 客户端连接参数*/
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.connection-timeout}")private int connectionTimeout;@Value("${mqtt.keep-alive-interval}")private int keepAliveInterval;@Value("${mqtt.automatic-reconnect}")private boolean automaticReconnect;@Value("${mqtt.clean-session}")private boolean cleanSession;/*** 创建 MQTT 连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setConnectionTimeout(connectionTimeout);options.setKeepAliveInterval(keepAliveInterval);options.setAutomaticReconnect(automaticReconnect);options.setCleanSession(cleanSession);return options;}/*** 创建 MQTT 客户端(用于发布消息)*/@Beanpublic MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-publisher");client.connect(mqttConnectOptions);return client;}/*** 创建订阅端 MQTT 客户端*/@Bean(name = "subscriberMqttClient")public MqttClient subscriberMqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-subscriber");client.connect(mqttConnectOptions);return client;}
} 
🧩 说明:
-  
使用
@Bean管理两个独立的 MQTT 客户端:发布端和订阅端; -  
MqttConnectOptions用于配置连接参数; -  
支持自动重连、保活机制,保证长连接稳定。
 
五、Subscriber(订阅者)
package com.lps.mqttdemo.subscriber;import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** MQTT 消息订阅者* 在应用启动时自动订阅指定主题并接收消息*/
@Component
public class Subscriber {@Autowired@Qualifier("subscriberMqttClient")private MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.default-qos}")private int qos;/*** 应用启动后自动订阅主题*/@PostConstructpublic void subscribe() {try {// 设置回调处理器mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT 连接丢失: " + cause.getMessage());// 自动重连由配置处理}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payload = new String(message.getPayload());System.out.println("========================================");System.out.println("收到 MQTT 消息:");System.out.println("主题: " + topic);System.out.println("QoS: " + message.getQos());System.out.println("消息内容: " + payload);System.out.println("消息ID: " + message.getId());System.out.println("是否保留消息: " + message.isRetained());System.out.println("========================================");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 发布完成回调(订阅者不需要实现)}});// 订阅主题mqttClient.subscribe(topic, qos);System.out.println("成功订阅主题: " + topic + ",QoS: " + qos);} catch (MqttException e) {System.err.println("订阅主题失败: " + e.getMessage());e.printStackTrace();}}
} 
✅ 逻辑说明:
-  
使用
@PostConstruct实现应用启动后自动订阅; -  
实现
MqttCallback接口监听消息; -  
当消息到达时打印详细信息;
 -  
可在此扩展业务逻辑(如存入数据库、转发到消息队列)。
 
六、Publisher(发布者)
package com.lps.mqttdemo.publisher;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;/*** MQTT 消息发布者* 提供 REST 接口用于发布 MQTT 消息*/
@RestController
@RequestMapping("/publish")
public class Publisher {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String defaultTopic;@Value("${mqtt.default-qos}")private int defaultQos;/*** 发布消息到默认主题** @param message 要发布的消息内容* @return 发布结果*/@PostMappingpublic ResponseEntity<String> publishMessage(@RequestParam String message) {return publishMessage(defaultTopic, message, defaultQos);}/*** 发布消息到指定主题** @param topic   主题名称* @param message 消息内容* @param qos     QoS 等级(0, 1, 2)* @return 发布结果*/@PostMapping("/{topic}")public ResponseEntity<String> publishToTopic(@PathVariable String topic,@RequestParam String message,@RequestParam(required = false, defaultValue = "1") int qos) {return publishMessage(topic, message, qos);}/*** 发布消息的核心方法** @param topic   主题* @param message 消息内容* @param qos     QoS 等级* @return 发布结果*/private ResponseEntity<String> publishMessage(String topic, String message, int qos) {try {// 确保客户端已连接if (!mqttClient.isConnected()) {mqttClient.reconnect();}// 创建 MQTT 消息MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(qos);mqttMessage.setRetained(false);// 发布消息mqttClient.publish(topic, mqttMessage);return ResponseEntity.ok(String.format("消息已成功发布到主题 [%s],QoS: %d,内容: %s", topic, qos, message));} catch (MqttException e) {return ResponseEntity.status(500).body("发布消息失败: " + e.getMessage());}}
} 
 
🌍 接口测试:
1️⃣ 发布默认主题消息
curl -X POST "http://localhost:8080/publish?message=hello_mqtt" 
2️⃣ 发布自定义主题消息
curl -X POST "http://localhost:8080/publish/myTopic?message=test_msg&qos=1" 
控制台输出示例:
成功订阅主题: lps-test-topic,QoS: 1 ========== 收到MQTT消息 ========== 主题: lps-test-topic QoS: 1 内容: hello_mqtt ================================ 
七、常见问题与优化建议
🧠 1. 客户端断开重连问题
如果 Broker 重启或网络异常,isConnected() 检查 + reconnect() 是基本方案。
 更优的方案是:
-  
使用 Paho 的
MqttAsyncClient -  
或者使用 Spring Integration MQTT 框架管理连接状态
 
🧠 2. 多主题订阅
mqttClient.subscribe(new String[]{"topic1", "topic2"}, new int[]{1, 1}); 
🧠 3. QoS 说明
| 等级 | 含义 | 场景 | 
|---|---|---|
| 0 | 最多一次(不保证送达) | 传感器实时数据 | 
| 1 | 至少一次(可能重复) | 一般消息 | 
| 2 | 仅一次(最可靠) | 控制指令、金融场景 | 
🧠 4. 提升健壮性建议
-  
建议封装
MqttService类统一管理发布/订阅逻辑; -  
增加心跳检测与状态日志;
 -  
使用
CompletableFuture或线程池异步发布,提升吞吐; -  
如果系统使用 RocketMQ/Kafka,可将 MQTT 作为接入层网关。
 
八、总结
本文完整演示了如何在 Spring Boot 中使用 Eclipse Paho 实现 MQTT 消息的发布与订阅功能。
 整个流程简单明了,适合初学者快速上手,同时也能作为物联网项目的基础通信模块。
