当前位置: 首页 > news >正文

Springboot使用Integration实现MQTT发送和接收消息

特别说明:本文仅用于记录学习过程,方便日后进行查阅和帮助有需要的人。如造成不便,敬请谅解。

一、读取application.yml配置文件

配置文件内容详见https://blog.csdn.net/shenxiaomo1688/article/details/151898078?spm=1001.2014.3001.5502

import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProps {private String host;private String clientId;private String topic;private MqttConnectOptions options;
}

二、创建工厂构造器类

主要实现:

1、构建客户端工厂;

2、获取发送消息的处理器;

3、获取接收消息的处理器;

4、获取订阅主题的适配器。

在这个类中,涉及到一个重要的注解

@EnableIntegration

其作用是让Spring扫描到该类。

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** @author: * @Desc:EnableIntegration注解用于开启Spring Integration的功能,让Spring能扫描到@ServiceActivator注解的Bean* Spring Integration接收消息的步骤:* 1. 创建接收消息的信息通道* 2.创建消息处理器* 3.@ServiceActivator注解绑定消息处理器到指定消息通道* 4.创建消息发送适配器* @create: 2025-09-20 15:54**/
@Configuration
@EnableIntegration
@Slf4j
public class FactoryBuilder {private MqttProps props;/* ** 获取mqtt客户端* @author * @create 2025/9/20**/public MqttPahoClientFactory buildMqttFactory() {DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = props.getOptions();//配置连接地址options.setServerURIs(new String[]{props.getHost()});clientFactory.setConnectionOptions(options);return clientFactory;}/* ** 获取mqtt发送消息的处理器* ServiceActivator注解用于将mqtt消息处理器绑定到消息通道上* @author * @create 2025/9/20*/@Bean@ServiceActivator(outputChannel = MqttConstants.OUT_CHANNEL)public MqttPahoMessageHandler outHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(props.getClientId(), buildMqttFactory());return messageHandler;}/* ** 获取mqtt接收消息的处理器* ServiceActivator注解用于将mqtt消息处理器绑定到消息通道上* @author * @create 2025/9/20*/@Bean@ServiceActivator(inputChannel = MqttConstants.IN_CHANNEL)public MessageHandler inHandler(MqttPahoMessageHandler messageHandler) {return  new MessageHandler(){@Overridepublic void handleMessage(Message<?> message) {messageHandler.handleMessage(message);log.info("收到mqtt消息:{}", message.getPayload());}};}/* ** 获取订阅主题的适配器* @author * @create 2025/9/20*/@Beanpublic MessageProducer getAdapter() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(props.getClientId(), buildMqttFactory(), props.getTopic());//设置消息转换器adapter.setConverter(new DefaultPahoMessageConverter());//设置订阅通道adapter.setOutputChannel(inChannel());return adapter;}/* ** 获取发送消息通道* @author * @create 2025/9/20*/@Bean(name = MqttConstants.OUT_CHANNEL)public MessageChannel outChannel() {return new DirectChannel();}/* ** 获取接收消息通道* @author * @create 2025/9/20*/@Bean(name = MqttConstants.IN_CHANNEL)public MessageChannel inChannel() {return new DirectChannel();}
}

使用到的常量类:

public class MqttConstants {public static final String OUT_CHANNEL = "out";public static final String IN_CHANNEL = "in";
}

三、创建发送消息的service

这里涉及到一个重要的注解IntegrationComponentScan:

其作用是让Spring能扫描到@MessagingGateway注解的接口
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;/*** @Desc:MessagingGateway的作用:拦截发送的消息,投放到发送通道* @IntegrationComponentScan:让Spring能扫描到@MessagingGateway注解的接口* @create: 2025-09-20 16:42**/
@IntegrationComponentScan
@MessagingGateway(defaultRequestChannel = MqttConstants.OUT_CHANNEL)
public interface MqttService {// 发送消息void send(String topic, String message);
}

关于Integration知识图谱

注:未完待续...

http://www.dtcms.com/a/393130.html

相关文章:

  • 中国传统文化上衣下裳
  • zk管理kafkakafka-broker通信
  • 前端开发技术趋势Web Components
  • Python tarfile库详解
  • ​​[硬件电路-287]:高性能六通道数字隔离器CA-IS3763L 功能概述与管脚定义
  • 错题集系统接口文档
  • 【RAG-LLM】InfoGain-RAG基于文档信息增益的RAG
  • Browser-Use深度解析:重新定义AI与浏览器的智能协作
  • 【Mysql】事务隔离级别、索引原理、/redolog/undolog/binlog区别、主从复制原理
  • AWS 全景速查手册
  • 小米Openvela城市沙龙
  • Python数据分析:求矩阵的秩。啥是矩阵秩?听故事学线代并用Python实现,娘来太容易学会了!
  • UI Toolkit自定义元素
  • redis未授权访问-漏洞复现
  • PR调节器与PI调节器的区别
  • Unity核心概念⑫:碰撞检测
  • 【读论文】面向工业的ASR语音大模型
  • 重谈IO——五种IO模型及其分类
  • 数据库造神计划第十七天---索引(2)
  • 【开题答辩实录分享】以《车联网位置信息管理软件》为例进行答辩实录分享
  • (3)机器学习-模型介绍
  • 如何在 Ubuntu 20.04 LTS 上安装 MySQL 8
  • MuMu模拟器使用入门实践指南:从ADB连接到Frida动态分析
  • 条款5:优先选用auto, 而非显示类型声明
  • 强化学习原理(一)
  • 解读43页PPT经营分析与决策支持系统建设方案交流及解决经验
  • ubuntu24设置证书登录及问题排查
  • MySQL 备份与恢复完全指南:从理论到实战
  • 2011/12 JLPT听力原文 问题四
  • 实战free_s:在高并发缓存系统中落地“内存释放更安全——free_s函数深度解析与free全方位对比”