当前位置: 首页 > wzjs >正文

给网站app做后台的公司微信商城小程序怎么自己开发

给网站app做后台的公司,微信商城小程序怎么自己开发,社交账号登录wordpress,苏州集团网站设计开发Java集成MQTT和Kafka实现高可用方案 1. 概述 在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。 MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和…

Java集成MQTT和Kafka实现高可用方案

1. 概述

在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。

MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和低带宽、高延迟网络。而Kafka是一个分布式流处理平台,提供高吞吐量、可扩展性和持久性。将两者结合,可以创建一个既能处理大量IoT设备连接,又能保证消息可靠传递和处理的系统。

2. 架构设计

我们的高可用架构设计如下:
在这里插入图片描述

主要组件:

  • MQTT集群:使用EMQ X等MQTT代理实现集群
  • Kafka集群:作为中央消息总线和持久化层
  • 桥接组件:将MQTT消息转发到Kafka
  • Java应用服务:处理和分析消息
  • 监控系统:确保整个系统的健康运行

3. Java集成MQTT实现

3.1 Maven依赖

<dependencies><!-- MQTT客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Spring Integration MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.15</version></dependency><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.7.8</version></dependency>
</dependencies>

3.2 MQTT配置类

@Configuration
public class MqttConfig {@Value("${mqtt.broker.urls}")private String[] brokerUrls;  // 多个MQTT代理地址,用于故障转移@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.topics}")private String[] topics;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();// 设置多个服务器地址,实现故障转移options.setServerURIs(brokerUrls);// 设置自动重连options.setAutomaticReconnect(true);options.setKeepAliveInterval(30);options.setConnectionTimeout(30);// 设置遗嘱消息,当客户端异常断开时发送options.setWill("clients/status", (clientId + ": disconnected").getBytes(), 1, true);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}// 设置清除会话,false表示客户端断开连接后,服务器保留其订阅信息options.setCleanSession(false);factory.setConnectionOptions(options);return factory;}// 出站通道(用于发送消息)@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}// 出站消息处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultQos(1);return messageHandler;}// 入站通道(用于接收消息)@Beanpublic MessageChannel mqttInboundChannel() {return new DirectChannel();}// 入站消息适配器@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-sub", mqttClientFactory(), topics);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInboundChannel());return adapter;}
}

3.3 MQTT服务类

@Service
@Slf4j
public class MqttService {private final MessageChannel mqttOutboundChannel;@Autowiredpublic MqttService(MessageChannel mqttOutboundChannel) {this.mqttOutboundChannel = mqttOutboundChannel;}// 发布消息到MQTT主题public void publish(String topic, String payload) {log.info("Publishing message to topic {}: {}", topic, payload);Message<String> message = MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, 1).setHeader(MqttHeaders.RETAINED, false).build();mqttOutboundChannel.send(message);}// 处理接收到的MQTT消息@ServiceActivator(inputChannel = "mqttInboundChannel")public void handleMessage(Message<?> message) {String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = message.getPayload().toString();log.info("Received message from topic {}: {}", topic, payload);// 这里可以添加消息处理逻辑,或者转发到Kafka}
}

4. Java集成Kafka实现

4.1 Maven依赖

<dependencies><!-- Kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.5</version></dependency>
</dependencies>

4.2 Kafka配置类

@Configuration
public class KafkaConfig {@Value("${kafka.bootstrap.servers}")private String bootstrapServers;@Value("${kafka.consumer.group.id}")private String consumerGroupId;// Kafka生产者配置@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();// 设置Kafka集群地址configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 高可用配置// acks=all表示所有副本都确认后才认为消息发送成功configProps.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数configProps.put(ProducerConfig.RETRIES_CONFIG, 10);// 启用幂等性,确保消息不会重复发送configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 批处理大小configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);// 批处理延迟configProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);// 缓冲区大小configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// Kafka消费者配置@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 高可用配置// 自动提交偏移量configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 从最早的消息开始消费configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 最大拉取记录数configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 心跳间隔configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);// 会话超时configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// 最大拉取间隔configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置并发消费者数量factory.setConcurrency(3);// 批量消费factory.setBatchListener(true);// 手动提交偏移量factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

4.3 Kafka服务类

@Service
http://www.dtcms.com/wzjs/808041.html

相关文章:

  • 公司展示网站费用网站嵌套代码
  • 给工厂做英文外贸网站福州网站怎么做
  • 用模版做网站的好处和坏处会做网站有什么可以做吗
  • 异地网站建设公司wordpress 主题 英文版
  • 网站功能模块结构图广州白云机场网站建设
  • 中国建设工程标准化协会网站亚马逊新店投广告是免费的吗
  • 网站代理商公司网站建设合同模板
  • 有没有做装修中介的网站网站联系我们怎么做
  • 做运动鞋的网站视频如何找有需求做网站的公司
  • 六安网站制作费用进出口采购网
  • 保定企业建站系统模板在线做视频的网站
  • 福州网站建设免费网站app哪个好
  • 怎么用手机做网站平台南京鼓楼做网站的公司
  • 网站建设静态代码猪八戒包装设计
  • 廊坊公司快速建站爱采购卖家版app下载
  • 网站开发与维护总结wordpress静态页面
  • 一站式商家服务平台石碣镇网站仿做
  • 网站域名注销流程哈尔滨手机网页制作
  • 四川同风源建设工程有限公司网站适合网站开发的python
  • 淘宝做网站很便宜福州云建站模版
  • 如何在学校内网建立网站寮步网站建设价钱
  • 商业网站的后缀三合一模板网站
  • 如何针对你的网站做搜索优化陕西机械加工网
  • 为什么网站需要静态化生成html网站产品内容在数据库
  • 吴中区做网站的公司做电影网站需要注意什么
  • 什么网站做的好看又便宜账户竞价托管费用
  • 做网站能赚钱吗软件app开发公司哪家好
  • 哪里有个人卖房网站仿制网站建设
  • ui素材网站移动应用开发实训报告
  • 网站建设与管理课程用Wordpress建的网站有