当前位置: 首页 > news >正文

[小白]spring boot接入emqx

每次接都要搜索好久。

JDK 11 和 Spring Boot 2.7.7 的版本信息, EMQX 5.8.8

安装EMQX 5.8.8

docker pull emqx/emqx:5.8.8
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.8.8

http://localhost:18083
默认用户名及密码(此处用户名密码是登录的用户名和密码,与后面的连接时的用户名和密码不同):
admin
public

添加依赖

在 pom.xml 中添加以下依赖,确保版本与 Spring Boot 2.7.7 兼容:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.7.7</version></dependency><!-- Spring Integration MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.15</version> <!-- 与 Spring Boot 2.7.7 兼容 --></dependency><!-- Eclipse Paho MQTT 客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>

配置 application.yml


在 src/main/resources/application.yml 中配置 EMQX 连接参数。EMQX 5.8.8 默认监听 1883 端口(MQTT TCP)。假设本地运行,匿名认证(生产环境请配置用户名/密码)

mqtt:broker: tcp://localhost:1883client-id: spring-boot-client-${random.uuid}username: adminpassword: admindefault-topic: test/topicqos: 1keepalive: 60connection-timeout: 5000clean-session: true

配置 MQTT 客户端


创建 MqttConfig.java,配置 Spring Integration 的 MQTT 入站和出站适配器:

package com.example.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttConfig {@Value("${mqtt.broker}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.default-topic}")private String defaultTopic;@Value("${mqtt.qos}")private int qos;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setKeepAliveInterval(60);options.setConnectionTimeout(5000);options.setCleanSession(true);factory.setConnectionOptions(options);return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MqttPahoMessageDrivenChannelAdapter inbound(MqttPahoClientFactory mqttClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(qos);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return message -> {String payload = (String) message.getPayload();String topic = (String) message.getHeaders().get("mqtt_receivedTopic");System.out.println("接收到 MQTT 消息: 主题=" + topic + ", 内容=" + payload);};}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(clientId + "-outbound", mqttClientFactory);messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(qos);return messageHandler;}
}

关于订阅主题以下代码可订阅多个

MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, "test/topic", "sensor/#", "device/+");

#:匹配多级主题,如 sensor/data/temp。
+:匹配单级主题,如 device/123。

如需订阅所有主题 ,可以直接用#,如下代码

MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, "test/topic", "#");

发布消息服务


创建 MqttService.java 用于发送消息:

package com.example.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class MqttService {@Autowiredprivate MessageChannel mqttOutboundChannel;public void publish(String topic, String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).setHeader("mqtt_topic", topic).build());System.out.println("已发布消息到主题 " + topic + ": " + payload);}
}

测试 Controller


创建 MqttController.java 用于 REST 接口测试:

package com.example.controller;import com.example.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MqttController {@Autowiredprivate MqttService mqttService;@GetMapping("/publish")public String publish(@RequestParam String message) {mqttService.publish("test/topic", message);return "消息已发送: " + message;}
}

测试

订阅:应用启动后自动订阅 test/topic,发送消息到该主题会在控制台打印。
发布:访问 http://localhost:8080/publish?message=Hello EMQX!,用 MQTTX 或 EMQX Dashboard 验证消息。

MQTT桌面客户端软件使用 MQTTX即可

http://www.dtcms.com/a/516498.html

相关文章:

  • Spring Boot 实现GZIP压缩优化
  • Spring Boot使用Redis实现消息队列
  • 互联网大厂Java面试实战:以Spring Boot与微服务为核心的技术场景剖析
  • 做网站页面的软件毕业设计网站成品
  • 《一个浏览器多人用?Docker+Neko+cpolar实现跨网共享》
  • design设计网站网站优化方法页面
  • C++基础:(十七)模版进阶:深入探索非类型参数、特化、分离编译与实战技巧
  • 《Git:从入门到精通(五)—— Git:Gitee远程仓库创建与克隆指南》
  • UML学习文档(一)
  • 淘宝放单网站开发网站wordpress错误
  • Latex中的错误汇总
  • huggingface transformers调试问题--加载本地路径模型时pdb断点消失
  • KMP算法详解 -- 串的模式匹配
  • 用php做网站的方法学网站建设前途
  • 网站不用下载免费软件曰本孕妇做爰网站
  • 【微信小程序 + 消息订阅 + 授权】 微信小程序实现消息订阅流程介绍,代码示例(仅前端)
  • 网站开发找哪家什么查网站是否降权
  • 【经典书籍】C++ Primer 第13类继承精华讲解
  • “VMware与vmx86驱动程序版本不匹配:预期为:417,实际为416。”解决步骤,亲测有效!!!
  • 查找组成一个偶数最接近的两个素数
  • 获取文件版本(C++源码)
  • 济南网站建设鲁icp备附近展览制作工厂
  • 在Windows WSL2中安装Ubuntu和Docker的完整指南
  • Ubuntu 22 .04安装CUDA, cuDNN, TensorRT
  • Linux编辑神器——vim工具的使用
  • UPS-不间断电源系统
  • AMDGPU/KFD IV(Interrupt Vector)信息结构及实现
  • 网站开发公司计划书如何做英文网站的外链
  • 彬县网站建设it外包前景
  • 网站集约化做暧暧国外网站