Spring Boot集成MQTT与单片机通信
Spring Boot 中集成 MQTT 协议为单片机提供通信接口,这是一个物联网应用中常见的需求。下面我将为你梳理实现步骤、关键代码和注意事项。
为了更直观地理解整个系统的架构和数据流动,你可以参考下面的序列图,它展示了单片机通过MQTT与Spring Boot应用交互的典型过程:
🧩 Spring Boot 集成 MQTT 服务
实现 Spring Boot 与 MQTT 协议的集成,并通过 MQTT 协议为单片机提供接口,主要涉及 Spring Boot 服务端 和 单片机客户端 两部分的开发。Spring Boot 应用作为 MQTT 的客户端(可能同时也是消息代理),与单片机设备进行基于主题的发布/订阅通信。
🌐 Spring Boot 端实现
Spring Boot 应用程序需要引入 MQTT 客户端库,配置连接至 MQTT 代理(Broker),并实现消息的发送与接收逻辑。
第一步:添加依赖
在 pom.xml
中添加 MQTT 客户端依赖。常用的库是 Eclipse Paho 或 mica-mqtt。
xml
<!-- 使用 Eclipse Paho (可选) --> <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version> </dependency><!-- 或者使用 mica-mqtt-spring-boot-starter (可选) --> <dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-spring-boot-starter</artifactId><version>${最新版本}</version> <!-- 请查看Gitee或官网获取最新版本 --> </dependency>
第二步:配置 MQTT 连接参数
在 application.yml
中配置 MQTT 连接参数。这些参数需要与你选择的 MQTT 代理(如 EMQX、HiveMQ、Mosquitto 或阿里云等公有云 MQTT 服务)的设置相匹配。
yaml
# application.yml 配置示例 (以 mica-mqtt 为例) mqtt:client:enabled: trueip: 127.0.0.1 # MQTT 代理服务器地址port: 1883 # MQTT 代理服务器端口client-id: spring-server # 客户端ID,确保唯一性user-name: admin # 认证用户名(如果有)password: 123456 # 认证密码(如果有)keep-alive-secs: 60 # 心跳间隔,单位秒timeout: 5 # 超时时间,单位秒reconnect: true # 是否自动重连clean-session: true # 是否清除会话 # 如果使用 TLS/SSL,需要进行相关配置:cite[3] # ssl: # enabled: false # keystore-path: # keystore-pass: # truststore-path: # truststore-pass:
第三步:编写消息发布服务
创建一个服务用于向单片机发布(发送)消息。
java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;@Service public class MqttPublisherService {@Autowiredprivate MqttClientTemplate mqttClientTemplate; // mica-mqtt 提供的模板/*** 向指定主题发布消息* @param topic 主题,如 "device/control"* @param message 消息内容* @param qos 服务质量等级 (0, 1, 2)*/public void sendMessage(String topic, String message, int qos) {mqttClientTemplate.publish(topic, message, qos);} }
第四步:编写消息订阅监听
单片机发送到特定主题的消息,Spring Boot 应用可以通过监听这些主题来接收。
java
import org.springframework.stereotype.Service; import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener; import net.dreamlu.iot.mqtt.core.client.MqttClientCreator; import jakarta.annotation.PostConstruct;@Service public class MqttSubscriberService {private final MqttClientCreator mqttClientCreator;public MqttSubscriberService(MqttClientCreator mqttClientCreator) {this.mqttClientCreator = mqttCreator;}@PostConstructpublic void subscribeToTopics() {// 假设单片机发布数据的主题是 "sensor/data"String singleChipTopic = "sensor/data";// 添加订阅及其监听器mqttClientCreator.addSubQosMap(singleChipTopic, 1) // 主题和QoS.addMessageListener((topic, payload) -> {// 处理接收到的来自单片机的消息String message = new String(payload, StandardCharsets.UTF_8);System.out.println("Received from topic: " + topic + ", message: " + message);// 在这里添加你的业务逻辑,例如解析数据、存储到数据库等}).subscribe().join(); // 执行订阅} }
如果使用 mica-mqtt,也可以使用注解方式订阅:
java
import service; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import net.dreamlu.iot.mqtt.core.client.MqttClientSubscribe; import lombok.extern.slf4j.Slf4j;@Slf4j @Service public class MqttMessageListenerService {// 使用注解订阅主题,支持通配符 # 和 +@MqttClientSubscribe("/sensor/#") // 监听所有以 "sensor/" 开头的主题public void handleSensorData(String topic, byte[] payload) {String message = new String(payload, StandardCharsets.UTF_8);log.info("Received message from topic '{}': {}", topic, message);// 根据不同的主题或消息内容,编写相应的处理逻辑// 例如:if ("sensor/temperature".equals(topic)) { ... }} }
第五步:处理连接事件(可选)
你还可以监听 MQTT 客户端的连接和断开事件,以便进行相应的处理。
java
import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import net.dreamlu.iot.mqtt.core.client.MqttConnectedEvent; import net.dreamlu.iot.mqtt.core.client.MqttDisconnectEvent; import lombok.extern.slf4j.Slf4j;@Slf4j @Service public class MqttClientConnectListener {@EventListenerpublic void onConnected(MqttConnectedEvent event) {log.info("MQTT Client connected successfully.");// 连接成功后的操作,如重新订阅主题}@EventListenerpublic void onDisconnect(MqttDisconnectEvent event) {log.warn("MQTT Client disconnected.");// 连接断开后的处理,如记录日志或尝试重新初始化} }
🔌 单片机端连接 MQTT
单片机需要通过支持 MQTT 协议的库连接到同一个 MQTT 代理(Broker)。实现方式取决于单片机型号和使用的网络模块(如 ESP8266、SIM800C 等)。
常见实现方式:
使用 AT 指令:对于像 ESP8266、SIM800C 这样的模块,单片机可以通过串口发送 AT 指令,让模块建立 TCP 连接,然后按照 MQTT 协议格式组装和发送数据包。
移植 MQTT 客户端库:在单片机资源允许的情况下(如 STM32F103、ESP32),可以移植轻量级的 MQTT 客户端库(如 Eclipse Paho MQTT Embedded C),直接处理 MQTT 协议的逻辑。
单片机端关键步骤:
网络连接:确保单片机通过 WiFi(如 ESP8266/ESP32)或移动网络(如 SIM800C)能够连接到互联网并与 MQTT 代理服务器建立 TCP 连接。
MQTT 连接:
构造 MQTT Connect 报文:包含客户端 ID(ClientID)、用户名、密码(如果需要)、遗嘱消息、心跳间隔(Keep Alive)等。
客户端 ID 需确保唯一性,通常可用设备标识符。
如果 Broker 要求认证,需提供正确的用户名和密码。
订阅主题:单片机向 Broker 发送 Subscribe 报文,订阅它关心的控制主题(例如
device/${clientId}/control
),以便接收来自 Spring Boot 应用的控制指令。发布消息:单片机将传感器数据或状态信息封装成 MQTT Publish 报文,发送到 Spring Boot 应用订阅的主题(例如
sensor/data/${clientId}
)。心跳保持:按照 Connect 报文指定的心跳间隔,定期向 Broker 发送 PingReq 报文,保持长连接活性。
断线重连:实现断线检测和重连机制,确保网络异常后能恢复连接。
单片机代码逻辑片段(概念性示例):
c
// 伪代码,基于 Paho MQTT Embedded C 库概念 void main() {// 1. 初始化网络连接 (例如, 配置ESP8266连接WiFi)network_init();// 2. 配置MQTT客户端MQTTClient client;MQTTClient_Init(&client, network_send, network_receive, system_current_time_ms);MQTTClient_ConnectOptions opts = MQTTClient_ConnectOptions_initializer;opts.clientID = "single_chip_001";opts.username = "admin";opts.password = "123456";opts.keepAliveInterval = 60;opts.cleansession = 1;// 3. 连接MQTT Brokerif (MQTTClient_Connect(&client, &opts) != SUCCESS) {// 处理连接失败while(1) { // 重连逻辑delay_ms(5000);if (MQTTClient_Connect(&client, &opts) == SUCCESS) break;}}// 4. 订阅控制主题MQTTClient_Subscribe(&client, "device/control/#", 1);// 5. 主循环while(1) {// 检查并接收网络数据,处理MQTT消息(调用MQTTClient_Yield()或类似函数)MQTTClient_Yield(&client, 1000); // 等待1秒处理消息// 6. 定时发布传感器数据if (time_to_publish()) {char sensor_data[50];read_sensor(sensor_data);MQTTClient_Publish(&client, "sensor/data", sensor_data, strlen(sensor_data), 1, 0);}// 7. 处理心跳 (库内部可能自动处理)// MQTTClient_Ping(&client); // 如果需要手动触发} }// 消息到达回调函数(需设置) void message_arrived_callback(MessageData* data) {char topic[data->topicName->len + 1];memcpy(topic, data->topicName->string, data->topicName->len);topic[data->topicName->len] = '\0';char message[data->message->payloadlen + 1];memcpy(message, data->message->payload, data->message->payloadlen);message[data->message->payloadlen] = '\0';// 根据topic和message解析控制命令并执行if (strstr(topic, "device/control/led") != NULL) {control_led(message); // 例如,控制LED} }
⚙️ 进阶考虑与优化
QoS 等级:根据业务场景选择合适的 服务质量(QoS)。
QoS 0:最多交付一次,可能丢失。
QoS 1:至少交付一次,可能重复。
QoS 2:恰好交付一次,可靠但开销大。单片机与服务器间重要指令或数据可考虑 QoS 1。
主题设计:设计清晰、分层的主題结构,便于管理和订阅过滤。例如:
factoryA/line1/device001/sensor/temperature
device/001/control/led
身份认证与安全:
为每个单片机设备分配独立的 客户端 ID、用户名和密码。
考虑使用 TLS/SSL 加密 MQTT 通信通道,防止数据窃听和篡改。
消息序列化格式:单片机与 Spring Boot 间传输的消息可采用 JSON、Protobuf 等格式,便于扩展和解析。
异常处理与重连:无论在 Spring Boot 端还是单片机端,都要充分考虑网络不稳定性和 Broker 重启等情况,实现 robust 的断线重连机制。
Spring Boot 集群部署:如果 Spring Boot 应用以集群方式部署,需要注意 MQTT 客户端连接的管理,或者考虑使用共享订阅(
$share/group/topic
)来均衡负载。保留消息和遗嘱消息:
可利用 保留消息(Retained Message) 让新订阅者立即获取最新状态。
使用 遗嘱消息(Will Message) 让 Broker 在单片机异常断开时通知其他客户端。
💎 总结
总结来说,实现 Spring Boot 结合 MQTT 为单片机提供接口,核心是让双方作为 MQTT 客户端连接到同一个 MQTT 代理(Broker),并通过主题进行异步的发布/订阅通信。
Spring Boot 端侧重集成 MQTT 客户端库、配置连接、实现消息监听与业务逻辑处理以及主动下发控制指令。单片机端则需实现网络连接、MQTT 协议处理(通常借助库或精细的 AT 指令控制)、数据上报和指令接收执行。