当前位置: 首页 > news >正文

MQTT-SpringBoot整合

MQTT-SpringBoot

创建简单 SpringBoot 项目

导入必须依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.study</groupId><artifactId>MqttDemo</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootMqttDemo</name><description>SpringBootMqttDemo</description><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- spring boot项目web开发的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring boot项目集成消息中间件基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><!-- spring boot项目和mqtt客户端集成起步依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version></dependency><!-- lombok依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- fastjson依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.study.mqtt.demo.MqttDemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

增加MQTT相关配置

application.yml

spring:mqtt:# mqtt 服务器地址url: tcp://192.168.40.128:1883# 订阅客户端IDsubClientId: sub_client_id_1# 订阅主题subTopic: lq/iot/demo/# 发布客户端IDpubClientId: pub_client_id_1# 用户名username: admin# 密码password: admin123456

编写对应Java类

配置类

MqttConfig.java

package com.study.mqtt.demo.domain;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {private String username;private String password;private String url;private String subClientId ;private String subTopic ;private String pubClientId ;
}

启动类增加开启配置

MqttDemoApplication.java

package com.study.mqtt.demo;import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;@SpringBootApplication
@EnableConfigurationProperties(value = MqttConfig.class)
public class MqttDemoApplication {public static void main(String[] args) {SpringApplication.run(MqttDemoApplication.class, args);}}

创建MQTT连接工厂类

package com.study.mqtt.demo.factory;import com.study.mqtt.demo.domain.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Configuration
public class MqttFactory {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {// 创建客户端工厂DefaultMqttPahoClientFactory  factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfig.getUrl()});options.setCleanSession(true);factory.setConnectionOptions(options);return factory;}
}

接收消息处理类

ReceiveMsgHandler

package com.study.mqtt.demo.handler;import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;@Component
public class ReceiveMsgHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {System.out.println("接收到消息对象:" + message);// 消息内容Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();Object mqttReceivedTopic = headers.get("mqtt_receivedTopic");System.out.println("接收的消息主题:" + mqttReceivedTopic);System.out.println("接收的消息内容:" + payload);}
}

接收消息配置类

MqttInboundConfig.java

package com.study.mqtt.demo.inbound;import com.study.mqtt.demo.domain.MqttConfig;
import com.study.mqtt.demo.factory.MqttFactory;
import com.study.mqtt.demo.handler.ReceiveMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfig mqttConfig ;@Autowiredprivate ReceiveMsgHandler receiveMsgHandler;/*** 配置消息接收通道* @return*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置接收适配器*/@Beanpublic MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter  =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() ,mqttConfig.getSubClientId() ,mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ;adapter.setConverter(new DefaultPahoMessageConverter());// 质量服务等级adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter ;}/*** 配置接收消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道public MessageHandler messageHandler() {return this.receiveMsgHandler ;}
}

发送消息配置类

MqttOutboundConfig.java

package com.study.mqtt.demo.outbound;import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate MqttPahoClientFactory pahoClientFactory ;@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler mqttOutboundMassageHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() ,mqttConfig.getPubClientId() , pahoClientFactory ) ;messageHandler.setAsync(true);messageHandler.setDefaultQos(0);messageHandler.setDefaultTopic("default");return messageHandler ;}
}

发送消息网关接口类

MqttGateway.java

package com.study.mqtt.demo.gateway;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送mqtt消息* @param topic 主题* @param payload 内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送包含qos的消息* @param topic 主题* @param qos 对消息处理的几种机制。*          * 0 发送成功就算完成,会出现消息丢失*          * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息*          * 2 多了一次去重的动作,确保只有一次消息推给订阅者。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

发送消息服务类

MqttMsgSenderService.java

package com.study.mqtt.demo.service;import com.study.mqtt.demo.gateway.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MqttMsgSenderService {@Autowiredprivate MqttGateway mqttGateway;public void send(String topic, String payload) {mqttGateway.sendToMqtt(topic, payload);}public void send(String topic, int qos, String payload) {mqttGateway.sendToMqtt(topic, qos, payload);}}

测试验证

订阅消息验证

  • 启动项目

在这里插入图片描述

  • 发送消息
    • 主题为配置文件中配置的订阅主题 lq/iot/demo/
    • 发送时间: 2025-05-25 21:29:26:439

在这里插入图片描述

  • 订阅收到消息
    • 接收到消息的时间:Sun May 25 21:29:26 GMT+08:00 2025
    • 接收到的主题:lq/iot/demo/
    • 接收到的内容:{ "msg":"spring boot mqtt demo" }

在这里插入图片描述

发送消息验证

  • 编写测试类
    • 发送主题:sb/mqtt/test
    • 发送内容:hello world !=> 当前时间
package com.study.mqtt.demo;import com.study.mqtt.demo.service.MqttMsgSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;@SpringBootTest(classes = MqttDemoApplication.class)
class MqttDemoApplicationTests {@Autowiredprivate MqttMsgSenderService mqttMsgSenderService;@Testvoid contextLoads() {}@Testvoid sendMsg(){mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date());}}
  • 创建订阅者
    • 订阅主题: sb/mqtt/test

在这里插入图片描述

  • 运行测试类

在这里插入图片描述

  • 订阅者接收消息
    • 主题:sb/mqtt/test

在这里插入图片描述

相关文章:

  • 6.4.3_有向无环图描述表达式
  • JAVA 项目中 maven pom.xml 和 properties 配置文件、spring 配置文件,以及环境变量的关系
  • 深入理解Istio:全面解析与实践指南
  • 向量数据库选型实战指南:Milvus架构深度解析与技术对比
  • Lua 脚本在 Redis 中的运用-22
  • 每日Prompt:龙虎斗
  • Oracle附加日志概述
  • 华为OD机试真题——字符串序列判定(2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • Go语言中常见的6个设计模式
  • 非常适合初学者的Golang教程
  • pyhton基础【4】判断
  • 位运算的小结
  • 深度图数据增强-形态学腐蚀操作
  • 【MySQL系列】SQL 分组统计与排序
  • leetcode 2131. 连接两字母单词得到的最长回文串 中等
  • 财管-1-财务分析、评价和预测
  • Vue3 + TypeScript + el-input 实现人民币金额的输入和显示
  • 17. Qt系统相关:文件操作
  • 【医学影像 AI】医学影像 AI 入门:PyTorch 基础与数据加载
  • Seaborn库的定义与核心功能
  • 百度网站验证是/电商网站设计模板
  • 导购网站制作/建站软件
  • iis7.5 网站打不开/我想注册一个网站怎么注册
  • 教你用模板做网站/北京seo包年
  • 商业网站的建设/做网络推广好吗
  • 经营性网站备案需要哪些材料/今日新闻联播