当前位置: 首页 > news >正文

SpringBoot使用MQTT协议简述

在 Spring Boot 中使用 MQTT 协议连接硬件设备,可以通过以下步骤实现。这里以 Eclipse Paho MQTT 客户端为例:

1. 添加 Maven 依赖

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- MQTT 依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
</dependencies>

2. 配置 MQTT 连接参数

application.properties 中添加:

# MQTT 配置
mqtt.broker-url=tcp://your-broker-address:1883
mqtt.client-id=spring-boot-client
mqtt.username=your-username
mqtt.password=your-password
mqtt.default-topic=hardware/commands

3. 创建 MQTT 配置类

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;// 创建 MQTT 客户端工厂@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());factory.setConnectionOptions(options);return factory;}// 配置消息发送处理器(向硬件发送指令)@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-out", mqttClientFactory());handler.setAsync(true); // 异步发送handler.setDefaultTopic("hardware/commands"); // 默认发送主题return handler;}
}

4. 创建消息网关接口(用于发送消息)

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String payload);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}

5. 实现消息接收(监听硬件消息)

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Component;@Component
public class MqttInboundConfig {@Value("${mqtt.client-id}")private String clientId;// 配置消息订阅@Beanpublic MqttPahoMessageDrivenChannelAdapter inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(brokerUrl, clientId + "-in","hardware/data",     // 订阅硬件数据主题"hardware/status"    // 订阅状态主题);adapter.setCompletionTimeout(5000);adapter.setQos(1); // 服务质量等级adapter.setOutputChannelName("mqttInputChannel");return adapter;}// 处理收到的消息@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(String payload) {System.out.println("Received from hardware: " + payload);// 这里添加业务逻辑,如解析硬件数据}
}

6. 控制器示例(发送指令到硬件)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HardwareController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/send-command")public String sendCommand(@RequestParam String command) {// 发送指令到默认主题mqttGateway.sendToMqtt(command);return "Command sent: " + command;}@PostMapping("/send-custom")public String sendCustom(@RequestParam String topic, @RequestParam String message) {// 发送到指定主题mqttGateway.sendToMqtt(message, topic);return "Sent to " + topic + ": " + message;}
}

7. 主应用类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.config.EnableIntegration;@SpringBootApplication
@EnableIntegration
public class MqttApplication {public static void main(String[] args) {SpringApplication.run(MqttApplication.class, args);}
}

关键点说明:

  1. 双向通信

    • 发送指令:通过 MqttGateway 向硬件发布消息
    • 接收数据:通过 MqttPahoMessageDrivenChannelAdapter 订阅硬件消息
  2. 主题管理

    • 命令主题:hardware/commands(服务器→硬件)
    • 数据主题:hardware/data(硬件→服务器)
    • 状态主题:hardware/status(硬件状态推送)
  3. QoS 级别

    adapter.setQos(1); // 可选 0、1、2
    
    • 0:最多一次(可能丢失)
    • 1:至少一次(可能重复)
    • 2:精确一次(可靠但慢)

硬件端对接建议:

  1. 消息格式:推荐使用 JSON
    {"sensor":"temp","value":25.6,"unit":"C"}
    
  2. 保持连接:硬件应实现自动重连机制
  3. 心跳检测:使用 MQTT 的 Keep Alive 参数

测试工具:

  • MQTT.fx:桌面客户端模拟硬件测试
  • Mosquitto:本地搭建 MQTT 服务器
    docker run -it -p 1883:1883 eclipse-mosquitto
    

增强功能:

  1. 消息持久化
    options.setCleanSession(false); // 保留会话
    
  2. SSL/TLS 加密
    options.setSocketFactory(SSLSocketFactory.getDefault());
    
  3. 遗嘱消息(LWT)
    options.setWill("hardware/status", "offline".getBytes(), 1, true);
    

完整项目结构:

src
├── main
│   ├── java
│   │   └── com
│   │       └── example
│   │           ├── MqttApplication.java
│   │           ├── config
│   │           │   ├── MqttConfig.java
│   │           │   └── MqttInboundConfig.java
│   │           ├── gateway
│   │           │   └── MqttGateway.java
│   │           └── controller
│   │               └── HardwareController.java
│   └── resources
│       └── application.properties

通过以上实现,Spring Boot 应用即可通过 MQTT 协议与硬件设备进行双向通信。

相关文章:

  • 十、【核心功能篇】项目与模块管理:前端页面开发与后端 API 联调实战
  • 华为OD机试真题——阿里巴巴找黄金宝箱(II)(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • leetcode450.删除二叉搜索树中的节点:迭代法巧用中间节点应对多场景删除
  • Oracle的NVL函数
  • MCP协议开发规范
  • 第八章 Wireshark工具的安装与使用
  • 数据治理是什么意思?数据治理平台有哪些?
  • JDBC 核心执行流程详解
  • Python对接GPT-4o API接口:聊天与文件上传功能详解
  • [预训练]Encoder-only架构的预训练任务核心机制
  • 【大模型/MCP】MCP简介
  • 【论文精读】2024 CVPR--Upscale-A-Video现实世界视频超分辨率(RealWorld VSR)
  • 力扣HOT100之动态规划:118. 杨辉三角
  • C/C++ 面试复习笔记(1)
  • MySQL进阶篇(存储引擎、索引、视图、SQL性能优化、存储过程、触发器、锁)
  • Vue-Router中的三种路由历史模式详解
  • 第一章 项目总览
  • udp 传输实时性测量
  • 4.1.4 基于数据帧做SQL查询
  • RabbitMQ备份与恢复技术详解:策略、工具与最佳实践
  • 全网商城系统/安卓优化大师老版本下载
  • 部队网站建设/品牌推广软文200字
  • php网站开发流程步骤/新的数据新闻
  • 深圳专业做网站哪家好/近期舆情热点事件
  • 东莞网站优化关键词费用/济南最新消息
  • 开发一个app要多久/武汉百度快照优化排名