当前位置: 首页 > news >正文

Spring Boot 集成 MQTT 实现消息发布与订阅

一、MQTT 是什么?

MQTT(Message Queuing Telemetry Transport)是一种 轻量级发布/订阅消息传输协议,广泛用于 物联网(IoT)设备通信。它基于 TCP/IP 协议,具有以下特点:

  • 📦 轻量级,网络带宽占用低

  • 🔁 支持自动重连与消息重发

  • 🔔 支持 QoS(服务质量等级)保证消息可靠性

  • 📡 典型应用场景:智能家居、工业监控、电力系统、设备遥测等

  项目Demo源码
GitHub: https://github.com/lpsqq1944900433/mqtt-demo/

Gitee: https://gitee.com/QQ1944900433/mqtt-demo

在本文中,我们将基于 Spring Boot + Eclipse Paho 客户端,实现一个最小可运行的 MQTT 消息发布与订阅系统。


二、项目结构概览

项目结构如下:

mqtt-demo├─ com.lps.mqttdemo│   ├─ config│   │   └─ MqttConfig.java           # MQTT连接配置类│   ├─ publisher│   │   └─ Publisher.java            # MQTT消息发布者│   ├─ subscriber│   │   └─ Subscriber.java           # MQTT消息订阅者│   └─ MqttDemoApplication.java      # 启动类└─ resources└─ application.yml               # 配置文件

依赖引入:

        <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

三、配置文件(application.yml)

# MQTT 配置
mqtt:# MQTT Broker 地址broker-url: tcp://192.168.88.178:1883# 客户端 IDclient-id: mqtt-demo-client# 订阅的主题topic: lps-test-topic# 默认 QoS 等级(0: 最多一次, 1: 至少一次, 2: 仅一次)default-qos: 1# 连接超时时间(秒)connection-timeout: 30# 保持连接间隔(秒)keep-alive-interval: 60# 是否自动重连automatic-reconnect: true# 是否清除会话clean-session: true# Spring Boot 应用配置
spring:application:name: mqtt-demo# 服务器端口
server:port: 8080

💡 建议:

  • 如果你的 MQTT Broker 是 EMQXMosquitto,可以用 tcp://127.0.0.1:1883 进行测试。

  • 在生产环境中建议启用 SSL(ssl://broker地址:8883)。


四、MqttConfig 配置类详解

package com.lps.mqttdemo.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** MQTT 配置类* 用于配置 MQTT 客户端连接参数*/
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.connection-timeout}")private int connectionTimeout;@Value("${mqtt.keep-alive-interval}")private int keepAliveInterval;@Value("${mqtt.automatic-reconnect}")private boolean automaticReconnect;@Value("${mqtt.clean-session}")private boolean cleanSession;/*** 创建 MQTT 连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setConnectionTimeout(connectionTimeout);options.setKeepAliveInterval(keepAliveInterval);options.setAutomaticReconnect(automaticReconnect);options.setCleanSession(cleanSession);return options;}/*** 创建 MQTT 客户端(用于发布消息)*/@Beanpublic MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-publisher");client.connect(mqttConnectOptions);return client;}/*** 创建订阅端 MQTT 客户端*/@Bean(name = "subscriberMqttClient")public MqttClient subscriberMqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-subscriber");client.connect(mqttConnectOptions);return client;}
}

🧩 说明:

  • 使用 @Bean 管理两个独立的 MQTT 客户端:发布端和订阅端;

  • MqttConnectOptions 用于配置连接参数;

  • 支持自动重连、保活机制,保证长连接稳定。


五、Subscriber(订阅者)

package com.lps.mqttdemo.subscriber;import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** MQTT 消息订阅者* 在应用启动时自动订阅指定主题并接收消息*/
@Component
public class Subscriber {@Autowired@Qualifier("subscriberMqttClient")private MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.default-qos}")private int qos;/*** 应用启动后自动订阅主题*/@PostConstructpublic void subscribe() {try {// 设置回调处理器mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT 连接丢失: " + cause.getMessage());// 自动重连由配置处理}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payload = new String(message.getPayload());System.out.println("========================================");System.out.println("收到 MQTT 消息:");System.out.println("主题: " + topic);System.out.println("QoS: " + message.getQos());System.out.println("消息内容: " + payload);System.out.println("消息ID: " + message.getId());System.out.println("是否保留消息: " + message.isRetained());System.out.println("========================================");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 发布完成回调(订阅者不需要实现)}});// 订阅主题mqttClient.subscribe(topic, qos);System.out.println("成功订阅主题: " + topic + ",QoS: " + qos);} catch (MqttException e) {System.err.println("订阅主题失败: " + e.getMessage());e.printStackTrace();}}
}

逻辑说明:

  • 使用 @PostConstruct 实现应用启动后自动订阅;

  • 实现 MqttCallback 接口监听消息;

  • 当消息到达时打印详细信息;

  • 可在此扩展业务逻辑(如存入数据库、转发到消息队列)。


六、Publisher(发布者)

 
package com.lps.mqttdemo.publisher;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;/*** MQTT 消息发布者* 提供 REST 接口用于发布 MQTT 消息*/
@RestController
@RequestMapping("/publish")
public class Publisher {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String defaultTopic;@Value("${mqtt.default-qos}")private int defaultQos;/*** 发布消息到默认主题** @param message 要发布的消息内容* @return 发布结果*/@PostMappingpublic ResponseEntity<String> publishMessage(@RequestParam String message) {return publishMessage(defaultTopic, message, defaultQos);}/*** 发布消息到指定主题** @param topic   主题名称* @param message 消息内容* @param qos     QoS 等级(0, 1, 2)* @return 发布结果*/@PostMapping("/{topic}")public ResponseEntity<String> publishToTopic(@PathVariable String topic,@RequestParam String message,@RequestParam(required = false, defaultValue = "1") int qos) {return publishMessage(topic, message, qos);}/*** 发布消息的核心方法** @param topic   主题* @param message 消息内容* @param qos     QoS 等级* @return 发布结果*/private ResponseEntity<String> publishMessage(String topic, String message, int qos) {try {// 确保客户端已连接if (!mqttClient.isConnected()) {mqttClient.reconnect();}// 创建 MQTT 消息MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(qos);mqttMessage.setRetained(false);// 发布消息mqttClient.publish(topic, mqttMessage);return ResponseEntity.ok(String.format("消息已成功发布到主题 [%s],QoS: %d,内容: %s", topic, qos, message));} catch (MqttException e) {return ResponseEntity.status(500).body("发布消息失败: " + e.getMessage());}}
}

🌍 接口测试:

1️⃣ 发布默认主题消息

curl -X POST "http://localhost:8080/publish?message=hello_mqtt"

2️⃣ 发布自定义主题消息

curl -X POST "http://localhost:8080/publish/myTopic?message=test_msg&qos=1"

控制台输出示例:

成功订阅主题: lps-test-topic,QoS: 1 ========== 收到MQTT消息 ========== 主题: lps-test-topic QoS: 1 内容: hello_mqtt ================================


七、常见问题与优化建议

🧠 1. 客户端断开重连问题

如果 Broker 重启或网络异常,isConnected() 检查 + reconnect() 是基本方案。
更优的方案是:

  • 使用 Paho 的 MqttAsyncClient

  • 或者使用 Spring Integration MQTT 框架管理连接状态


🧠 2. 多主题订阅

mqttClient.subscribe(new String[]{"topic1", "topic2"}, new int[]{1, 1});

🧠 3. QoS 说明

等级含义场景
0最多一次(不保证送达)传感器实时数据
1至少一次(可能重复)一般消息
2仅一次(最可靠)控制指令、金融场景

🧠 4. 提升健壮性建议

  • 建议封装 MqttService 类统一管理发布/订阅逻辑;

  • 增加心跳检测与状态日志;

  • 使用 CompletableFuture 或线程池异步发布,提升吞吐;

  • 如果系统使用 RocketMQ/Kafka,可将 MQTT 作为接入层网关。


八、总结

本文完整演示了如何在 Spring Boot 中使用 Eclipse Paho 实现 MQTT 消息的发布与订阅功能。
整个流程简单明了,适合初学者快速上手,同时也能作为物联网项目的基础通信模块。

http://www.dtcms.com/a/565469.html

相关文章:

  • 【Linux系统编程】进程概念(二)进程的概念和基本操作
  • 建收费网站营销型网站建设风格设定包括哪些方面?
  • 【深度强化学习】#6 Soft Actor-Critic:最大熵与重参数化技巧
  • 教务管理系统源码
  • Demis Hassabis带领DeepMind告别纯科研时代:当AI4S成为新叙事,伦理考验仍在继续
  • 顺德网站建设域名网站的规划与建设_按时间顺序可以分为哪等五个阶段
  • 基于Python与Streamlit的救援物资调度双层规划模型实现方案
  • 高阶金融衍生品系统实战:TRS收益互换与场外期权的万亿级交易架构设计
  • 没有备案的网站可以用ip访问吗wifi小程序源码
  • idea远程debug 断点调试
  • Windows 10 停服下的国产化迁移:统信 UOS 工具核心技术深度解析
  • QML-Model-View
  • 电子电路原理第二十一章(稳压电源)
  • 存储连接方式与RAID重构解析,2018年5月第二题
  • 沈阳网站建设方案服务wordpress自定义背景颜色
  • 【个人成长笔记】在Linux系统中常见压缩与解压文件及文件夹命令(亲测有效)
  • 打印机驱动网能解决打印机驱动问题么?惠普打印机驱动故障问题修复
  • 通州网站建设服务七台河网站制作
  • idea配置代码注释模板
  • 前端文件上传终极指南:从原理到架构实践!
  • 【一问专栏】链表:数据世界的“寻宝游戏“——详解应用场景与独特优势
  • Linux 线程
  • 【Android项目】KMMV项目随笔
  • vmware windows和linux系统共享和映射物理机目录
  • 机器学习日报11
  • 宿州品牌网站建设公司淘宝网站建设单子好接吗
  • 大数据成矿预测系列(六) | 从“看图像”到“读结构”:图卷积神经网络如何赋能地质“图谱”推理
  • AI研究-118 具身智能 Mobile-ALOHA 解读:移动+双臂模仿学习的开源方案(含论文/代码/套件链接)
  • 超越“盒子”:虚拟机在云计算与AI时代的颠覆性未来应用展望
  • 外国人可以在中国做网站吗cnzz网站建设