Springboot使用Integration实现MQTT发送和接收消息
特别说明:本文仅用于记录学习过程,方便日后进行查阅和帮助有需要的人。如造成不便,敬请谅解。
一、读取application.yml配置文件
配置文件内容详见https://blog.csdn.net/shenxiaomo1688/article/details/151898078?spm=1001.2014.3001.5502
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProps {private String host;private String clientId;private String topic;private MqttConnectOptions options;
}
二、创建工厂构造器类
主要实现:
1、构建客户端工厂;
2、获取发送消息的处理器;
3、获取接收消息的处理器;
4、获取订阅主题的适配器。
在这个类中,涉及到一个重要的注解
@EnableIntegration
其作用是让Spring扫描到该类。
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** @author: * @Desc:EnableIntegration注解用于开启Spring Integration的功能,让Spring能扫描到@ServiceActivator注解的Bean* Spring Integration接收消息的步骤:* 1. 创建接收消息的信息通道* 2.创建消息处理器* 3.@ServiceActivator注解绑定消息处理器到指定消息通道* 4.创建消息发送适配器* @create: 2025-09-20 15:54**/
@Configuration
@EnableIntegration
@Slf4j
public class FactoryBuilder {private MqttProps props;/* ** 获取mqtt客户端* @author * @create 2025/9/20**/public MqttPahoClientFactory buildMqttFactory() {DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = props.getOptions();//配置连接地址options.setServerURIs(new String[]{props.getHost()});clientFactory.setConnectionOptions(options);return clientFactory;}/* ** 获取mqtt发送消息的处理器* ServiceActivator注解用于将mqtt消息处理器绑定到消息通道上* @author * @create 2025/9/20*/@Bean@ServiceActivator(outputChannel = MqttConstants.OUT_CHANNEL)public MqttPahoMessageHandler outHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(props.getClientId(), buildMqttFactory());return messageHandler;}/* ** 获取mqtt接收消息的处理器* ServiceActivator注解用于将mqtt消息处理器绑定到消息通道上* @author * @create 2025/9/20*/@Bean@ServiceActivator(inputChannel = MqttConstants.IN_CHANNEL)public MessageHandler inHandler(MqttPahoMessageHandler messageHandler) {return new MessageHandler(){@Overridepublic void handleMessage(Message<?> message) {messageHandler.handleMessage(message);log.info("收到mqtt消息:{}", message.getPayload());}};}/* ** 获取订阅主题的适配器* @author * @create 2025/9/20*/@Beanpublic MessageProducer getAdapter() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(props.getClientId(), buildMqttFactory(), props.getTopic());//设置消息转换器adapter.setConverter(new DefaultPahoMessageConverter());//设置订阅通道adapter.setOutputChannel(inChannel());return adapter;}/* ** 获取发送消息通道* @author * @create 2025/9/20*/@Bean(name = MqttConstants.OUT_CHANNEL)public MessageChannel outChannel() {return new DirectChannel();}/* ** 获取接收消息通道* @author * @create 2025/9/20*/@Bean(name = MqttConstants.IN_CHANNEL)public MessageChannel inChannel() {return new DirectChannel();}
}
使用到的常量类:
public class MqttConstants {public static final String OUT_CHANNEL = "out";public static final String IN_CHANNEL = "in";
}
三、创建发送消息的service
这里涉及到一个重要的注解IntegrationComponentScan:
其作用是让Spring能扫描到@MessagingGateway注解的接口
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;/*** @Desc:MessagingGateway的作用:拦截发送的消息,投放到发送通道* @IntegrationComponentScan:让Spring能扫描到@MessagingGateway注解的接口* @create: 2025-09-20 16:42**/
@IntegrationComponentScan
@MessagingGateway(defaultRequestChannel = MqttConstants.OUT_CHANNEL)
public interface MqttService {// 发送消息void send(String topic, String message);
}
关于Integration知识图谱
注:未完待续...