SpringBoot使用MQTT协议简述
在 Spring Boot 中使用 MQTT 协议连接硬件设备,可以通过以下步骤实现。这里以 Eclipse Paho MQTT 客户端为例:
1. 添加 Maven 依赖
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- MQTT 依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
</dependencies>
2. 配置 MQTT 连接参数
在 application.properties
中添加:
# MQTT 配置
mqtt.broker-url=tcp://your-broker-address:1883
mqtt.client-id=spring-boot-client
mqtt.username=your-username
mqtt.password=your-password
mqtt.default-topic=hardware/commands
3. 创建 MQTT 配置类
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;// 创建 MQTT 客户端工厂@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());factory.setConnectionOptions(options);return factory;}// 配置消息发送处理器(向硬件发送指令)@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-out", mqttClientFactory());handler.setAsync(true); // 异步发送handler.setDefaultTopic("hardware/commands"); // 默认发送主题return handler;}
}
4. 创建消息网关接口(用于发送消息)
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String payload);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}
5. 实现消息接收(监听硬件消息)
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Component;@Component
public class MqttInboundConfig {@Value("${mqtt.client-id}")private String clientId;// 配置消息订阅@Beanpublic MqttPahoMessageDrivenChannelAdapter inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(brokerUrl, clientId + "-in","hardware/data", // 订阅硬件数据主题"hardware/status" // 订阅状态主题);adapter.setCompletionTimeout(5000);adapter.setQos(1); // 服务质量等级adapter.setOutputChannelName("mqttInputChannel");return adapter;}// 处理收到的消息@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(String payload) {System.out.println("Received from hardware: " + payload);// 这里添加业务逻辑,如解析硬件数据}
}
6. 控制器示例(发送指令到硬件)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HardwareController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/send-command")public String sendCommand(@RequestParam String command) {// 发送指令到默认主题mqttGateway.sendToMqtt(command);return "Command sent: " + command;}@PostMapping("/send-custom")public String sendCustom(@RequestParam String topic, @RequestParam String message) {// 发送到指定主题mqttGateway.sendToMqtt(message, topic);return "Sent to " + topic + ": " + message;}
}
7. 主应用类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.config.EnableIntegration;@SpringBootApplication
@EnableIntegration
public class MqttApplication {public static void main(String[] args) {SpringApplication.run(MqttApplication.class, args);}
}
关键点说明:
-
双向通信:
- 发送指令:通过
MqttGateway
向硬件发布消息 - 接收数据:通过
MqttPahoMessageDrivenChannelAdapter
订阅硬件消息
- 发送指令:通过
-
主题管理:
- 命令主题:
hardware/commands
(服务器→硬件) - 数据主题:
hardware/data
(硬件→服务器) - 状态主题:
hardware/status
(硬件状态推送)
- 命令主题:
-
QoS 级别:
adapter.setQos(1); // 可选 0、1、2
- 0:最多一次(可能丢失)
- 1:至少一次(可能重复)
- 2:精确一次(可靠但慢)
硬件端对接建议:
- 消息格式:推荐使用 JSON
{"sensor":"temp","value":25.6,"unit":"C"}
- 保持连接:硬件应实现自动重连机制
- 心跳检测:使用 MQTT 的
Keep Alive
参数
测试工具:
- MQTT.fx:桌面客户端模拟硬件测试
- Mosquitto:本地搭建 MQTT 服务器
docker run -it -p 1883:1883 eclipse-mosquitto
增强功能:
- 消息持久化:
options.setCleanSession(false); // 保留会话
- SSL/TLS 加密:
options.setSocketFactory(SSLSocketFactory.getDefault());
- 遗嘱消息(LWT):
options.setWill("hardware/status", "offline".getBytes(), 1, true);
完整项目结构:
src
├── main
│ ├── java
│ │ └── com
│ │ └── example
│ │ ├── MqttApplication.java
│ │ ├── config
│ │ │ ├── MqttConfig.java
│ │ │ └── MqttInboundConfig.java
│ │ ├── gateway
│ │ │ └── MqttGateway.java
│ │ └── controller
│ │ └── HardwareController.java
│ └── resources
│ └── application.properties
通过以上实现,Spring Boot 应用即可通过 MQTT 协议与硬件设备进行双向通信。