SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话
一、引言
随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设备与服务器之间的双向通信。
二、技术选型与环境准备
2.1 技术栈介绍
-
SpringBoot 2.7.x:简化Spring应用初始搭建和开发过程
-
EMQX 5.0:开源的大规模分布式MQTT消息服务器
-
Eclipse Paho:流行的MQTT客户端库
-
Lombok:简化Java Bean编写
2.2 环境准备
-
安装EMQX服务器(可使用Docker快速部署):
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14
-
确保Java开发环境(JDK 11+)和Maven已安装
三、SpringBoot项目集成MQTT
3.1 创建SpringBoot项目并添加依赖
在pom.xml
中添加必要的依赖:
<dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- MQTT Paho Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>
3.2 配置MQTT连接参数
在application.yml
中添加配置:
mqtt:broker-url: tcp://localhost:1883username: emqxpassword: publicclient-id: springboot-serverdefault-topic: device/statustimeout: 30keepalive: 60qos: 1clean-session: true
创建配置类MqttProperties.java
:
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {private String brokerUrl;private String username;private String password;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private int qos;private boolean cleanSession;
}
3.3 实现MQTT客户端配置
创建MqttConfiguration.java
:
@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {private final MqttProperties mqttProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());options.setCleanSession(mqttProperties.isCleanSession());options.setAutomaticReconnect(true);return options;}@Beanpublic IMqttClient mqttClient() throws MqttException {IMqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.connect(mqttConnectOptions());return client;}
}
3.4 实现MQTT消息发布服务
创建MqttPublisher.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;public void publish(String topic, String payload) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}MqttMessage message = new MqttMessage(payload.getBytes());message.setQos(mqttProperties.getQos());message.setRetained(true);mqttClient.publish(topic, message);log.info("MQTT message published to topic: {}, payload: {}", topic, payload);}public void publish(String payload) throws MqttException {publish(mqttProperties.getDefaultTopic(), payload);}
}
3.5 实现MQTT消息订阅服务
创建MqttSubscriber.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {subscribe(mqttProperties.getDefaultTopic());}public void subscribe(String topic) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);log.info("Subscribed to MQTT topic: {}", topic);}private void handleMessage(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);// 这里可以添加业务逻辑处理接收到的消息processMessage(topic, payload);}private void processMessage(String topic, String payload) {// 示例:解析JSON格式的消息try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);// 根据不同的topic和payload内容进行业务处理if (topic.startsWith("device/status")) {handleDeviceStatus(jsonNode);} else if (topic.startsWith("device/control")) {handleDeviceControl(jsonNode);}} catch (JsonProcessingException e) {log.error("Failed to parse MQTT message payload: {}", payload, e);}}private void handleDeviceStatus(JsonNode jsonNode) {// 处理设备状态上报String deviceId = jsonNode.get("deviceId").asText();String status = jsonNode.get("status").asText();log.info("Device {} status updated to: {}", deviceId, status);}private void handleDeviceControl(JsonNode jsonNode) {// 处理设备控制指令响应String deviceId = jsonNode.get("deviceId").asText();String command = jsonNode.get("command").asText();String result = jsonNode.get("result").asText();log.info("Device {} executed command {} with result: {}", deviceId, command, result);}
}
四、实现双向通信
4.1 服务器向设备发送控制指令
创建REST API接口用于发送控制指令:
@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {private final MqttPublisher mqttPublisher;@PostMapping("/control")public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {try {ObjectMapper mapper = new ObjectMapper();String payload = mapper.writeValueAsString(command);String topic = "device/control/" + command.getDeviceId();mqttPublisher.publish(topic, payload);return ResponseEntity.ok("Control command sent successfully");} catch (Exception e) {log.error("Failed to send control command", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send control command: " + e.getMessage());}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class DeviceCommand {private String deviceId;private String command;private Map<String, Object> params;}
}
4.2 设备模拟客户端
为了测试双向通信,我们可以创建一个简单的设备模拟客户端:
@Component
@Slf4j
public class DeviceSimulator {private final MqttPublisher mqttPublisher;private final MqttProperties mqttProperties;private IMqttClient deviceClient;public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {this.mqttPublisher = mqttPublisher;this.mqttProperties = mqttProperties;initDeviceClient();}private void initDeviceClient() {try {String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);deviceClient = new MqttClient(mqttProperties.getBrokerUrl(), deviceId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);deviceClient.connect(options);// 订阅控制主题String controlTopic = "device/control/" + deviceId;deviceClient.subscribe(controlTopic, (topic, message) -> {String payload = new String(message.getPayload());log.info("Device received control command: {}", payload);// 模拟设备执行命令并返回响应executeCommand(payload, deviceId);});// 模拟设备定期上报状态simulatePeriodicStatusReport(deviceId);} catch (MqttException e) {log.error("Failed to initialize device simulator", e);}}private void executeCommand(String payload, String deviceId) {try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);String command = jsonNode.get("command").asText();// 模拟命令执行Thread.sleep(1000); // 模拟执行耗时// 构造响应ObjectNode response = mapper.createObjectNode();response.put("deviceId", deviceId);response.put("command", command);response.put("result", "success");response.put("timestamp", System.currentTimeMillis());// 发布响应String responseTopic = "device/control/response/" + deviceId;mqttPublisher.publish(responseTopic, response.toString());} catch (Exception e) {log.error("Failed to execute command", e);}}private void simulatePeriodicStatusReport(String deviceId) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {ObjectMapper mapper = new ObjectMapper();ObjectNode status = mapper.createObjectNode();status.put("deviceId", deviceId);status.put("status", "online");status.put("cpuUsage", Math.random() * 100);status.put("memoryUsage", 30 + Math.random() * 50);status.put("timestamp", System.currentTimeMillis());String topic = "device/status/" + deviceId;mqttPublisher.publish(topic, status.toString());} catch (Exception e) {log.error("Failed to send status report", e);}}, 0, 10, TimeUnit.SECONDS);}
}
五、测试与验证
5.1 测试设备状态上报
-
启动SpringBoot应用
-
观察日志输出,应该能看到设备模拟客户端定期上报状态信息
5.2 测试服务器控制指令
使用Postman或curl发送控制指令:
curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{"deviceId": "device-123456","command": "restart","params": {"delay": 5}
}'
5.3 验证双向通信
-
服务器发送控制指令到特定设备
-
设备接收指令并执行
-
设备发送执行结果回服务器
-
服务器接收并处理设备响应
六、高级功能扩展
6.1 消息持久化与QoS级别
-
QoS 0:最多一次,消息可能丢失
-
QoS 1:至少一次,消息不会丢失但可能重复
-
QoS 2:恰好一次,消息不丢失且不重复
根据业务需求选择合适的QoS级别:
// 在发布消息时设置QoS
message.setQos(2); // 使用最高级别的QoS
6.2 安全配置
-
启用TLS加密:
mqtt:broker-url: ssl://localhost:8883
-
配置EMQX的ACL规则,限制客户端权限
6.3 集群部署
对于生产环境,可以部署EMQX集群:
# 启动第一个节点
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14# 启动第二个节点
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14
6.4 消息桥接与WebHook
通过EMQX的桥接功能,可以将消息转发到其他MQTT服务器或Kafka等消息队列。也可以通过WebHook将消息推送到HTTP服务。
七、总结
本文详细介绍了如何使用SpringBoot整合MQTT协议,基于EMQX实现设备与服务器之间的双向通信。主要内容包括:
-
SpringBoot项目中集成MQTT客户端
-
实现消息发布和订阅功能
-
设计双向通信机制
-
设备模拟与测试验证
-
高级功能扩展建议
这种架构非常适合物联网场景,能够支持海量设备连接和实时消息通信。开发者可以根据实际业务需求,在此基础上进行扩展和优化,构建稳定可靠的物联网平台。
八、参考资料
-
EMQX官方文档:Introduction | EMQX 5.0 Docs
-
Eclipse Paho项目:Eclipse Paho | The Eclipse Foundation
-
MQTT协议规范:MQTT Version 3.1.1
-
Spring Boot官方文档:Spring Boot