当前位置: 首页 > news >正文

物联网之使用Vertx实现MQTT-Server最佳实践【响应式】

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现MQTT-Server

实现MQTT-Server【响应式】

vertx-mqtt地址

实现思路

1.启动MQTT Server并绑定很多端口记录到缓存,服务注册到Nacos,通过接口的方式获取IP和端口【负载均衡】
2.MQTT Client连接MQTT Server并上报数据
3.MQTT Server接收到数据并通过MQ转发出去

代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
实现过程

查看源码

kafka安装

采用docker-compose一键式启动!!!

还没有安装docker朋友,参考文章下面两篇文章

# Ubuntu20.04安装Docker

# Centos7安装Docker 23.0.6

services:kafka:image: bitnami/kafka:4.0.0container_name: kafkatty: trueports:- '9092:9092'- '9093:9093'environment:# 节点ID- KAFKA_BROKER_ID=1# 允许使用kraft,即Kafka替代Zookeeper- KAFKA_ENABLE_KRAFT=yes# kafka角色,做broker,也要做controller- KAFKA_CFG_PROCESS_ROLES=broker,controller# 指定供外部使用的控制类请求信息- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER# 定义安全协议- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 定义kafka服务端socket监听端口- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093# 外网访问地址- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用- ALLOW_PLAINTEXT_LISTENER=yes# 设置broker最大内存,和初始内存- KAFKA_HEAP_OPTS=-Xmx512M -Xms512M# 集群地址- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093# 节点ID- KAFKA_CFG_NODE_ID=1restart: alwaysprivileged: truenetworks:- laokou_network
networks: laokou_network: driver: bridge
# 创建topic【进入bin目录执行】 => 每个topic 3个分区和一个副本
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_report --partitions 3 --replication-factor 1kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_reply --partitions 3 --replication-factor 1
kafka【响应式】

1.依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.5</version>
</dependency>
<dependency><groupId>io.projectreactor.kafka</groupId><artifactId>reactor-kafka</artifactId><version>1.3.23</version>
</dependency>

2.代码

KafkaAutoConfig

/*** @author laokou*/
@Configuration
public class KafkaAutoConfig {@Bean("defaultKafkaTemplate")@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)public DefaultKafkaTemplate defaultKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {return new DefaultKafkaTemplate(kafkaTemplate);}@Bean(value = "reactiveKafkaSender")@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)public KafkaSender reactiveKafkaSender(SenderOptions<String, String> senderOptions) {return new ReactiveKafkaSender(new reactor.kafka.sender.internals.DefaultKafkaSender<>(ProducerFactory.INSTANCE, senderOptions));}@Bean@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)public SenderOptions<String, String> senderOptions(KafkaProperties kafkaProperties) {Map<String, Object> props = new HashMap<>();KafkaProperties.Producer producer = kafkaProperties.getProducer();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) producer.getBatchSize().toBytes());props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (int) producer.getBufferMemory().toBytes());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return SenderOptions.create(props);}@Bean@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {Map<String, Object> props = new HashMap<>();KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.getMaxPollRecords());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return ReceiverOptions.create(props);}}

KafkaSender

/*** @author laokou*/
public interface KafkaSender {Flux<Boolean> send(String topic, String payload);}

ReactiveKafkaSender

/*** @author laokou*/
@Slf4j
@RequiredArgsConstructor
public class ReactiveKafkaSender implements KafkaSender {private final DefaultKafkaSender<String, String> defaultKafkaSender;@Overridepublic Flux<Boolean> send(String topic, String payload) {return defaultKafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, payload, null))).map(result -> {Exception exception = result.exception();if (ObjectUtils.isNotNull(exception)) {log.error("【Kafka】 => 发送消息失败,错误信息:{}", exception.getMessage(), exception);return false;}else {return true;}});}}

3.yaml配置【自动批量提交】

spring:kafka:bootstrap-servers: kafka:9092consumer:group-id: laokou-mqtt# 禁用自动提交(按周期)已消费offsetenable-auto-commit: true# 单次poll()调用返回的记录数max-poll-records: 50key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:# 发生错误后,消息重发的次数。retries: 5# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 0listener:# 在侦听器容器中运行的线程数。concurrency: 5# 批量提交模式ack-mode: batch# 批量batch类型type: batch# topic不存在报错missing-topics-fatal: falseadmin:auto-create: false
mqtt-server【响应式】

依赖

<dependency><groupId>io.vertx</groupId><artifactId>vertx-mqtt</artifactId><version>4.5.14</version>
</dependency>

VertxConfig

/*** @author laokou*/
@Configuration
public class VertxConfig {@Bean(destroyMethod = "close")public Vertx vertx() {VertxOptions vertxOptions = new VertxOptions();vertxOptions.setMaxEventLoopExecuteTime(30);vertxOptions.setWorkerPoolSize(40);vertxOptions.setMaxWorkerExecuteTime(30);vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setPreferNativeTransport(true);vertxOptions.setInternalBlockingPoolSize(40);vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));return Vertx.vertx(vertxOptions);}}

MqttServerProperties【配置了账号和密码】

/*** @author laokou*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt-server")
public class MqttServerProperties {private boolean auth = true;private String username = "vertx";private String password = "laokou123";private String host = "0.0.0.0";private int port = 0;private int threadSize = 32;private int maxMessageSize = 8196;private boolean isAutoClientId = true;private int maxClientIdLength = 30;private int timeoutOnConnect = 90;private boolean useWebSocket = false;private int webSocketMaxFrameSize = 65536;private boolean perFrameWebSocketCompressionSupported = true;private boolean perMessageWebSocketCompressionSupported = true;private int webSocketCompressionLevel = 6;private boolean webSocketAllowServerNoContext = false;private boolean webSocketPreferredClientNoContext = false;private boolean tcpNoDelay = true;private boolean tcpKeepAlive = false;private int tcpKeepAliveIdleSeconds = -1;private int tcpKeepAliveCount = -1;private int tcpKeepAliveIntervalSeconds = -1;private int soLinger = -1;private int idleTimeout = 0;private int readIdleTimeout = 0;private int writeIdleTimeout = 0;private TimeUnit idleTimeoutUnit = TimeUnit.SECONDS;private boolean ssl = false;private boolean tcpFastOpen = false;private boolean tcpCork = false;private boolean tcpQuickAck = false;private int tcpUserTimeout = 0;}

VertxMqttServer

/*** @author laokou*/
@Slf4j
public final class VertxMqttServer {private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);private volatile Flux<MqttServer> mqttServer;private final Vertx vertx;private final MqttServerProperties properties;private final List<ReactiveMessageHandler> reactiveMessageHandlers;private volatile boolean isClosed = false;public VertxMqttServer(final Vertx vertx, final MqttServerProperties properties,List<ReactiveMessageHandler> reactiveMessageHandlers) {this.properties = properties;this.vertx = vertx;this.reactiveMessageHandlers = reactiveMessageHandlers;}public Flux<MqttServer> start() {return mqttServer = getMqttServerOptions().map(mqttServerOption -> MqttServer.create(vertx, mqttServerOption).exceptionHandler(error -> log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,错误信息:{}", error.getMessage(), error)).endpointHandler(endpoint -> Optional.ofNullable(authHandler(endpoint)).ifPresent(e -> e.closeHandler(close -> log.info("【Vertx-MQTT-Server】 => MQTT客户端断开连接")).subscribeHandler(subscribe -> {for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {log.info("【Vertx-MQTT-Server】 => MQTT客户端订阅主题:{}", topicSubscription.topicName());}}).disconnectHandler(disconnect -> log.info("【Vertx-MQTT-Server】 => MQTT客户端主动断开连接")).pingHandler(ping -> log.info("【Vertx-MQTT-Server】 => MQTT客户端发送心跳")).publishHandler(messageSink::tryEmitNext)// 不保留会话.accept(false))).listen(mqttServerOption.getPort(), mqttServerOption.getHost(), asyncResult -> {if (isClosed) {return;}if (asyncResult.succeeded()) {log.info("【Vertx-MQTT-Server】 => MQTT服务启动成功,主机:{},端口:{}", mqttServerOption.getHost(),mqttServerOption.getPort());// 写入缓存PortCache.add(mqttServerOption.getPort());}else {log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,主机:{},端口:{},错误信息:{}", mqttServerOption.getHost(),mqttServerOption.getPort(), asyncResult.cause().getMessage(), asyncResult.cause());}}));}public Flux<MqttServer> stop() {isClosed = true;return mqttServer.doOnNext(server -> server.close(completionHandler -> {if (completionHandler.succeeded()) {log.info("【Vertx-MQTT-Server】 => MQTT服务停止成功");}else {log.error("【Vertx-MQTT-Server】 => MQTT服务停止失败,错误信息:{}", completionHandler.cause().getMessage(),completionHandler.cause());}}));}public Flux<Boolean> publish() {return messageSink.asFlux().flatMap(message -> {// @formatter:off// log.info("【Vertx-MQTT-Server】 => MQTT服务接收到消息,主题:{},内容:{}", message.topicName(), message.payload().toString());// @formatter:onreturn Flux.fromStream(reactiveMessageHandlers.stream().filter(reactiveMessageHandler -> reactiveMessageHandler.isSubscribe(message.topicName()))).flatMap(reactiveMessageHandler -> reactiveMessageHandler.handle(new MqttMessage(message.payload(), message.topicName())));});}private int detectAvailablePort(String host) {try (ServerSocket socket = SSLServerSocketFactory.getDefault().createServerSocket()) {socket.bind(new InetSocketAddress(host, properties.getPort()));return socket.getLocalPort();}catch (IOException e) {throw new RuntimeException("Port auto-detection failed", e);}}private Flux<MqttServerOptions> getMqttServerOptions() {return Flux.range(1, Math.max(properties.getThreadSize(), CpuCoreSensor.availableProcessors())).map(item -> getMqttServerOption());}/*** 认证.*/ private MqttEndpoint authHandler(MqttEndpoint endpoint) {MqttAuth mqttAuth = endpoint.auth();if (properties.isAuth()) {if (ObjectUtils.isNull(mqttAuth)) {endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);return null;}if (!ObjectUtils.equals(mqttAuth.getUsername(), properties.getUsername())|| !ObjectUtils.equals(mqttAuth.getPassword(), properties.getPassword())) {endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);return null;}}return endpoint;}// @formatter:offprivate MqttServerOptions getMqttServerOption() {MqttServerOptions mqttServerOptions = new MqttServerOptions();mqttServerOptions.setHost(properties.getHost());mqttServerOptions.setPort(detectAvailablePort(properties.getHost()));mqttServerOptions.setMaxMessageSize(properties.getMaxMessageSize());mqttServerOptions.setAutoClientId(properties.isAutoClientId());mqttServerOptions.setMaxClientIdLength(properties.getMaxClientIdLength());mqttServerOptions.setTimeoutOnConnect(properties.getTimeoutOnConnect());mqttServerOptions.setUseWebSocket(properties.isUseWebSocket());mqttServerOptions.setWebSocketMaxFrameSize(properties.getWebSocketMaxFrameSize());mqttServerOptions.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());mqttServerOptions.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());mqttServerOptions.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());mqttServerOptions.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());mqttServerOptions.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());mqttServerOptions.setTcpNoDelay(properties.isTcpNoDelay());mqttServerOptions.setTcpKeepAlive(properties.isTcpKeepAlive());mqttServerOptions.setTcpKeepAliveIdleSeconds(properties.getTcpKeepAliveIdleSeconds());mqttServerOptions.setTcpKeepAliveIntervalSeconds(properties.getTcpKeepAliveIntervalSeconds());mqttServerOptions.setTcpKeepAliveCount(properties.getTcpKeepAliveCount());mqttServerOptions.setSoLinger(properties.getSoLinger());mqttServerOptions.setIdleTimeout(properties.getIdleTimeout());mqttServerOptions.setReadIdleTimeout(properties.getReadIdleTimeout());mqttServerOptions.setWriteIdleTimeout(properties.getWriteIdleTimeout());mqttServerOptions.setIdleTimeoutUnit(properties.getIdleTimeoutUnit());mqttServerOptions.setSsl(properties.isSsl());mqttServerOptions.setTcpFastOpen(properties.isTcpFastOpen());mqttServerOptions.setTcpCork(properties.isTcpCork());mqttServerOptions.setTcpQuickAck(properties.isTcpQuickAck());mqttServerOptions.setTcpUserTimeout(properties.getTcpUserTimeout());return mqttServerOptions;}// @formatter:on}

PortCache【缓存端口】

/*** @author laokou*/
public final class PortCache {private PortCache() {}public static final List<Integer> PORTS = new CopyOnWriteArrayList<>();public static void add(int port) {PORTS.add(port);}public static List<Integer> get() {return PORTS;}public static void clear() {PORTS.clear();}}

ReactiveMessageHandler【消息处理,没啥好说的,就是用来转发消息到MQ】

/*** @author laokou*/
public interface ReactiveMessageHandler {boolean isSubscribe(String topic);Flux<Boolean> handle(MqttMessage mqttMessage);}
/*** 属性回复消息处理器.** @author laokou*/
@Component
@RequiredArgsConstructor
public class ReactivePropertyReplyMessageHandler implements ReactiveMessageHandler {private final KafkaSender kafkaSender;@Overridepublic boolean isSubscribe(String topic) {return TopicUtils.match("/+/+/property/reply", topic);}@Overridepublic Flux<Boolean> handle(MqttMessage mqttMessage) {return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPLY, mqttMessage.getPayload().toString());}}
/*** 属性上报消息处理.** @author laokou*/
@Component
@RequiredArgsConstructor
public class ReactivePropertyReportMessageHandler implements ReactiveMessageHandler {private final KafkaSender kafkaSender;@Overridepublic boolean isSubscribe(String topic) {return TopicUtils.match("/+/+/property/report", topic);}@Overridepublic Flux<Boolean> handle(MqttMessage mqttMessage) {return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPORT, mqttMessage.getPayload().toString());}}

配置yaml

spring:application:name: ${SERVICE_ID:laokou-mqtt}threads:virtual:enabled: truemqtt-server:auth: trueusername: vertxpassword: laokou123# 开启8196个端口thread-size: 8196

启动MQTT-Server

/*** @author laokou*/
@Slf4j
@EnableDiscoveryClient
@RequiredArgsConstructor
@EnableConfigurationProperties
@SpringBootApplication(scanBasePackages = "org.laokou")
public class MqttServerApp implements CommandLineRunner {private final Vertx vertx;private final MqttServerProperties properties;private final List<ReactiveMessageHandler> reactiveMessageHandlers;private final ExecutorService virtualThreadExecutor;@Overridepublic void run(String... args) {virtualThreadExecutor.execute(this::listenMessage);}private void listenMessage() {VertxMqttServer vertxMqttServer = new VertxMqttServer(vertx, properties, reactiveMessageHandlers);// 启动服务vertxMqttServer.start().subscribeOn(Schedulers.boundedElastic()).subscribe();// 推送数据vertxMqttServer.publish().subscribeOn(Schedulers.boundedElastic()).subscribe();Runtime.getRuntime().addShutdownHook(new Thread(() -> {// 清除缓存PortCache.clear();// 停止服务vertxMqttServer.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();}));}}

启动好之后,请自行测试,这个东西没啥好说,vertx帮我们都实现了,就是简单调用API,自己玩吧~

我是老寇,我们下次再见啦~

相关文章:

  • Spring事务管理实现机制
  • Halcon检测项目
  • 深入浅出之STL源码分析3_类模版实例化与特化
  • Shell 脚本编程1(常用命令+概述)
  • 金丝猴食品:智能中枢AI-COP构建全链路数智化运营体系
  • C++ 异常捕获 try 和 __try的区别笔记
  • Python环境搭建指南
  • 慈缘基金会“蝴蝶飞”助西藏女孩白玛卓嘎“折翼重生”
  • 基于STM32的居家环境监测报警Proteus仿真+程序设计+设计报告+讲解视频
  • smbd:快速拉取服務端SMB共享文件脚本工具
  • SAM详解3.2(关于2和3的题)
  • 黑马k8s(二)
  • 子串简写(JAVA)一维前缀和, 蓝桥杯
  • 学习黑客5 分钟深入浅出理解cron [特殊字符]
  • 基于阿伦尼斯模型的电池寿命预测:原理、应用与挑战
  • 【智能指针】
  • SD06_前后端分离项目部署流程(采用Nginx)
  • SAP Commerce(Hybris)开发实战(一)
  • linux-----------Ext系列⽂件系统(上)
  • 进阶 DFS 学习笔记
  • 外交部:愿同拉美国家共同维护多边贸易体制
  • 中国工程院院士、国医大师、现代中国针灸奠基人石学敏逝世
  • 著名蒙古族音乐学者马•斯尔古愣逝世,享年86岁
  • 观察|天空之外的战场:官方叙事、新闻与社交平台中的印巴冲突
  • 让“五颜六色”面孔讲述上海故事,2025年上海城市推荐官开启选拔
  • 长期对组织隐瞒真实年龄,广元市城发集团原董事韩治成被双开