当前位置: 首页 > news >正文

Spring整合MQTT使用

MQTT使用

消息发送和接收

引入依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

编写发送消息客户端

clientId不能重复,重复的clientId会导致已连接的客户端下线

    /*** 消息发送* @throws MqttException*/public void send() throws MqttException {//url和clientId请修改为自己的MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-send1");//设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});//连接参数设置MqttConnectOptions connectOptions = new MqttConnectOptions();//用户名和密码请修改为自己的connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());client.connect(connectOptions);//主题String topic = "topic/test2";int qos = 1;String msg = "Hello MQTT 1";MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(qos);client.publish(topic, message);client.disconnect();client.close();}

消息接收

请先启动消息接收客户端,再开启消息发送客户端,否则消息发送后没有客户端接收,消息会被丢失

    public void recevie() throws MqttException, InterruptedException {MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-receive123");MqttConnectOptions connectOptions = new MqttConnectOptions();connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());connectOptions.setAutomaticReconnect(true);client.connect(connectOptions);String topic = "topic/test2";CountDownLatch countDownLatch = new CountDownLatch(1);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达:" + mqttMessage.toString() + " " + s);countDownLatch.countDown();}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});client.subscribe(topic, 1);countDownLatch.await();client.disconnect();client.close();}

topic(主题)

MQTT 协议规定主题的长度为两个字节,因此主题最多可包含 65,535 个字符。
MQTT中的t主题不需要用户提前创建,如果代码中使用了不存在的主题,MQTT会自动创建这个主题。

mqtt通过/对消息分层。为了便于理解,不建议使用/chat和chat/。

topic订阅规则

订阅一个主题很简单,如果主题是topic,那么那么使用/topic就可以订阅了;但是如果想一次订阅多个主题时,就需要用到通配符了。
常用通配符:

  1. +:用于单个主题层级匹配的通配符
test/+/ip   //有效
test+       //无效
test/+      //无效

如果客户端订阅了test/+/ip主题,那么会收到以下主题的消息:

test/1/ip
test/2/ip
test/3/ip

不会收到以下主题的消息:

test/ip
test/wind/1/ip
  1. #:用于匹配主题中任意层级的通配符;多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符

如果订阅了test/#主题,那么会收到以下主题的消息:

test
test/ip
test/1/ip
  1. :以:以:以$SYS开头的主题为系统主题,系统主题主要用户获取MQTT服务器自身运行状态、消息统计、客户端上下线事件等数据

通配符主题订阅的性能弱于普通主题订阅,且会消耗更多的服务器资源,用户可根据实际业务情况选择订阅类型。

MQTT和Spring的整合

引入依赖

        <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

核心配置:

@Configuration
public class MQTTConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});// 连接断开时自动重连options.setAutomaticReconnect(true);// MQTT服务器认证用户名options.setUserName(username);// MQTT服务器认证密码options.setPassword(password.toCharArray());factory.setConnectionOptions(options);System.out.println("Connecting to broker: " + brokerUrl + " OK.");return factory;}@Beanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound() {// 创建入站适配器:监听MQTT消息并转发到Spring Integration通道MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", // 入站客户端ID(唯一,避免与其他客户端冲突)mqttClientFactory(),    // 使用前面定义的客户端工厂"testSub/#"  // 订阅的MQTT主题("#"为通配符,表示订阅"testSub/"下所有子主题));// 消息接收后转发到的通道名称adapter.setOutputChannelName("mqttInputChannel");// 设置订阅QoS级别(1表示确保消息至少到达一次)adapter.setQos(1);return adapter;}/***  网关指定通过该网关发送的消息,默认会被路由到名为 指定的消息通道(这里是配置类中定义的 mqttOutboundChannel Bean),*  最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-outbound",mqttClientFactory());handler.setAsync(true);handler.setDefaultQos(1);return handler;}@Beanpublic MessageChannel mqttInputChannel() {// 入站消息通道(接收MQTT消息)return new DirectChannel();}@Beanpublic MessageChannel mqttOutboundChannel() {// 出站消息通道(发送MQTT消息)return new DirectChannel();}
}

MessageChannel 是 Spring Integration 的核心概念,相当于消息流转的“管道”:

  • mqttInputChannel:入站适配器(mqttInbound)接收的 MQTT 消息会流入此通道,后续可通过 @ServiceActivator 绑定处理器消费消息(当前代码未实现,需补充消息处理逻辑)
  • mqttOutboundChannel:业务代码通过 MqttMessageGateway 发送的消息会先流入此通道,再由出站处理器(mqttOutbound)发送到 MQTT 服务器。

发送消息

/*** Description: @MessagingGateway 注解的接口,用于发送 MQTT 消息,会自动生成代理类,代理类会将方法参数转换为 Spring Integration 消息(Message),*    通过 defaultRequestChannel = "mqttOutboundChannel" 指定: 所有通过该网关发送的消息,默认会被路由到名为 mqttOutboundChannel*    的消息通道(对应配置类中定义的 mqttOutboundChannel Bean),最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}

接收消息

@Component
public class MqttMessageListener {@Bean@ServiceActivator(inputChannel = "mqttInputChannel") //绑定消息入站通道public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();String payload = message.getPayload().toString();Logger log = LoggerFactory.getLogger(MqttMessageListener.class);log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);try {if (topic.startsWith("testSub/")) {//进行业务处理}} else if (topic.startsWith("report/allpoints")) {//进行业务处理}}} catch (Exception e) {log.error("[MQTT] 消息处理失败: {}", e.getMessage());}}};}
}
  1. 接收消息: MQTT服务器 → mqttInbound(入站适配器,订阅主题)→ mqttInputChannel(通道)→ 消息处理器(需补充,如存储/业务处理)。
  2. 发送消息: 业务代码 → MqttMessageGateway(消息网关)→ mqttOutboundChannel(通道)→ mqttOutbound(出站处理器)→ MQTT服务器。

参考

  1. MQTT 主题与通配符(Topics & Wildcards)入门手册
  2. Java对接MQTT协议的完整实现

文章转载自:

http://sQI7Mwc7.qrwnj.cn
http://WMMONu1p.qrwnj.cn
http://gYHC6RFb.qrwnj.cn
http://0QJa9CA8.qrwnj.cn
http://nCQXM05E.qrwnj.cn
http://Oqqmoa6e.qrwnj.cn
http://6hMg7TFA.qrwnj.cn
http://WICHObnN.qrwnj.cn
http://HGRWzkV5.qrwnj.cn
http://GQHsug2e.qrwnj.cn
http://0AHsfqhw.qrwnj.cn
http://GY76Ufil.qrwnj.cn
http://2XsFk6Ga.qrwnj.cn
http://1DD5E1jV.qrwnj.cn
http://g7pwUCxK.qrwnj.cn
http://bk9GGqdB.qrwnj.cn
http://sRHBoigz.qrwnj.cn
http://4kQnrfg0.qrwnj.cn
http://VLQSxjO3.qrwnj.cn
http://lEJ2GiVM.qrwnj.cn
http://MtMbTduU.qrwnj.cn
http://PtWVjzxw.qrwnj.cn
http://uTASu65H.qrwnj.cn
http://dAu1ME6W.qrwnj.cn
http://FEwVTfxT.qrwnj.cn
http://fGDBdbSq.qrwnj.cn
http://fZo4BMG7.qrwnj.cn
http://litzJRon.qrwnj.cn
http://C5P3Z3BT.qrwnj.cn
http://kdsw5W6O.qrwnj.cn
http://www.dtcms.com/a/369671.html

相关文章:

  • AI应用开发-技术架构 PAFR介绍
  • 9月5日星期五今日早报简报微语报早读
  • Zynq-7000 上 RT-Thread 的 MMU 与 SMP 优势分析
  • 【完整源码+数据集+部署教程】西兰花实例分割系统源码和数据集:改进yolo11-AggregatedAtt
  • 数据库查询优化
  • PiscCode基于 Mediapipe 实现轨迹跟踪
  • 硬件(三) 通信方式、串口通信
  • 在 CentOS 上完整安装 Docker 指南
  • 详解人造卫星遭遇的地球反射光与月球反射光
  • NAF、INRAS、NACF论文解读
  • 【Linux】系统部分——进程间通信1(管道)
  • 从策略到实效|Adobe Target 实战应用与成功案例
  • 连锁门店可用性监测和进程监测最佳实践
  • 残差网络ResNet
  • 人工智能之数学基础:逻辑回归算法的概率密度函数与分布函数
  • Pinia 两种写法全解析:Options Store vs Setup Store(含实践与场景对比)
  • MySQL抛出的Public Key Retrieval is not allowed
  • 贵州移动创维E900V22F-S905L3SB-全分区备份
  • HarmonyOSAI编程自然语言代码生成
  • 系统性学习数据结构-第三讲-栈和队列
  • 远程协作下的项目失控:不是信任危机,而是感知缺失
  • 从零打造商业级LLMOps平台:开源项目LMForge详解,助力多模型AI Agent开发!
  • 【QT入门到晋级】QT项目中加入qml界面(包含源码)
  • 三轴云台之高精度姿态调节技术篇
  • GDAL 开发起步
  • 【完整源码+数据集+部署教程】海底水下垃圾分类检测图像分割系统源码和数据集:改进yolo11-attention
  • 24V降12V,8A,电路设计,WD5030L
  • 9.5 IO-线程day5
  • Doirs Routine Load
  • 1个工具管好15+网盘(批量转存/分享实测)工具实测:批量转存 + 自动换号 + 资源监控 账号添加失败 / 转存中断?这样解决(含功能详解)