MQTT-SpringBoot整合
MQTT-SpringBoot
创建简单 SpringBoot 项目
导入必须依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.study</groupId><artifactId>MqttDemo</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootMqttDemo</name><description>SpringBootMqttDemo</description><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- spring boot项目web开发的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring boot项目集成消息中间件基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><!-- spring boot项目和mqtt客户端集成起步依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version></dependency><!-- lombok依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- fastjson依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.study.mqtt.demo.MqttDemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
增加MQTT相关配置
application.yml
spring:mqtt:# mqtt 服务器地址url: tcp://192.168.40.128:1883# 订阅客户端IDsubClientId: sub_client_id_1# 订阅主题subTopic: lq/iot/demo/# 发布客户端IDpubClientId: pub_client_id_1# 用户名username: admin# 密码password: admin123456
编写对应Java类
配置类
MqttConfig.java
package com.study.mqtt.demo.domain;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {private String username;private String password;private String url;private String subClientId ;private String subTopic ;private String pubClientId ;
}
启动类增加开启配置
MqttDemoApplication.java
package com.study.mqtt.demo;import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;@SpringBootApplication
@EnableConfigurationProperties(value = MqttConfig.class)
public class MqttDemoApplication {public static void main(String[] args) {SpringApplication.run(MqttDemoApplication.class, args);}}
创建MQTT连接工厂类
package com.study.mqtt.demo.factory;import com.study.mqtt.demo.domain.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Configuration
public class MqttFactory {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {// 创建客户端工厂DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfig.getUrl()});options.setCleanSession(true);factory.setConnectionOptions(options);return factory;}
}
接收消息处理类
ReceiveMsgHandler
package com.study.mqtt.demo.handler;import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;@Component
public class ReceiveMsgHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {System.out.println("接收到消息对象:" + message);// 消息内容Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();Object mqttReceivedTopic = headers.get("mqtt_receivedTopic");System.out.println("接收的消息主题:" + mqttReceivedTopic);System.out.println("接收的消息内容:" + payload);}
}
接收消息配置类
MqttInboundConfig.java
package com.study.mqtt.demo.inbound;import com.study.mqtt.demo.domain.MqttConfig;
import com.study.mqtt.demo.factory.MqttFactory;
import com.study.mqtt.demo.handler.ReceiveMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfig mqttConfig ;@Autowiredprivate ReceiveMsgHandler receiveMsgHandler;/*** 配置消息接收通道* @return*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置接收适配器*/@Beanpublic MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() ,mqttConfig.getSubClientId() ,mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ;adapter.setConverter(new DefaultPahoMessageConverter());// 质量服务等级adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter ;}/*** 配置接收消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道public MessageHandler messageHandler() {return this.receiveMsgHandler ;}
}
发送消息配置类
MqttOutboundConfig.java
package com.study.mqtt.demo.outbound;import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate MqttPahoClientFactory pahoClientFactory ;@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler mqttOutboundMassageHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() ,mqttConfig.getPubClientId() , pahoClientFactory ) ;messageHandler.setAsync(true);messageHandler.setDefaultQos(0);messageHandler.setDefaultTopic("default");return messageHandler ;}
}
发送消息网关接口类
MqttGateway.java
package com.study.mqtt.demo.gateway;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送mqtt消息* @param topic 主题* @param payload 内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送包含qos的消息* @param topic 主题* @param qos 对消息处理的几种机制。* * 0 发送成功就算完成,会出现消息丢失* * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息* * 2 多了一次去重的动作,确保只有一次消息推给订阅者。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送消息服务类
MqttMsgSenderService.java
package com.study.mqtt.demo.service;import com.study.mqtt.demo.gateway.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MqttMsgSenderService {@Autowiredprivate MqttGateway mqttGateway;public void send(String topic, String payload) {mqttGateway.sendToMqtt(topic, payload);}public void send(String topic, int qos, String payload) {mqttGateway.sendToMqtt(topic, qos, payload);}}
测试验证
订阅消息验证
- 启动项目
- 发送消息
- 主题为配置文件中配置的订阅主题
lq/iot/demo/
- 发送时间:
2025-05-25 21:29:26:439
- 主题为配置文件中配置的订阅主题
- 订阅收到消息
- 接收到消息的时间:
Sun May 25 21:29:26 GMT+08:00 2025
- 接收到的主题:
lq/iot/demo/
- 接收到的内容:
{ "msg":"spring boot mqtt demo" }
- 接收到消息的时间:
发送消息验证
- 编写测试类
- 发送主题:
sb/mqtt/test
- 发送内容:
hello world !=> 当前时间
- 发送主题:
package com.study.mqtt.demo;import com.study.mqtt.demo.service.MqttMsgSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;@SpringBootTest(classes = MqttDemoApplication.class)
class MqttDemoApplicationTests {@Autowiredprivate MqttMsgSenderService mqttMsgSenderService;@Testvoid contextLoads() {}@Testvoid sendMsg(){mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date());}}
- 创建订阅者
- 订阅主题:
sb/mqtt/test
- 订阅主题:
- 运行测试类
- 订阅者接收消息
- 主题:
sb/mqtt/test
- 主题: