当前位置: 首页 > wzjs >正文

一元云淘网站开发手机视频网站开发

一元云淘网站开发,手机视频网站开发,企业网站内容运营方案策划,做旅行网站的依据及意义1、简介 对于经常需要变更kafka主题的场景&#xff0c;为了实现动态监听topic的功能&#xff0c;可以使用以下方式。 2、使用步骤 2.1、添加依赖 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactI…
1、简介

        对于经常需要变更kafka主题的场景,为了实现动态监听topic的功能,可以使用以下方式。

2、使用步骤
2.1、添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
2.2、nacos中配置
    # kafka 配置
spring:kafka:bootstrap-servers: ip地址:9092topics: topic1,tpic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerenable-idempotence: trueacks: alltransactional-id: kafka-groupconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: kafka-clickhouse-groupauto-offset-reset: latestenable-auto-commit: falseisolation-level: read_committedallow-auto-create-topics: truelistener:ack-mode: MANUAL_IMMEDIATEconcurrency: 3
2.3、配置类
package org.aecsi.kafkadatatock.config;import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Configuration
@RequiredArgsConstructor
@EnableKafka
@RefreshScope
public class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaAdmin kafkaAdmin() {return new KafkaAdmin(kafkaProperties.buildAdminProperties());}@Beanpublic AdminClient adminClient(KafkaAdmin kafkaAdmin) {return AdminClient.create(kafkaAdmin.getConfigurationProperties());}@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildProducerProperties());configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-clickhouse-producer");DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps);factory.setTransactionIdPrefix("kafka-clickhouse-producer-");return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}@Bean@RefreshScopepublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildConsumerProperties());configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaTransactionManager<String, String> transactionManager) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setTransactionManager(transactionManager);return factory;}@Bean@RefreshScopepublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setAutoStartup(true);return factory;}@Beanpublic ApplicationRunner kafkaListenerStarter(KafkaListenerEndpointRegistry registry) {return args -> {// 启动所有 Kafka 监听器registry.start();};}
}

接收消息类

@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", autoStartup = "false")@Transactional(transactionManager = "transactionManager")public void processMessage(ConsumerRecord<String, String> record,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {try {log.info("kafka 接受 topic: {} 消息", topic);
//          处理消息acknowledgment.acknowledge();} catch (Exception e) {log.error("Error processing message for topic {}: {}", topic, e.getMessage());throw e;}}

 主启动类添加一个注解

@EnableConfigurationProperties(KafkaProperties.class)
3、总结

      实现kafka动态获取topic还有其他方式,博主目前只验证这一种,其他方式待更新。


文章转载自:

http://lIWrvXhj.whnps.cn
http://yuqwSnHk.whnps.cn
http://F0Hcppro.whnps.cn
http://0xKbRqzC.whnps.cn
http://qK0ripzY.whnps.cn
http://NqdMHYHR.whnps.cn
http://jIPfNcbm.whnps.cn
http://i5tSWnEm.whnps.cn
http://fEuI4OXc.whnps.cn
http://mnE5WZsA.whnps.cn
http://eWHv6jTV.whnps.cn
http://gf5Zqpvy.whnps.cn
http://9bBCqg2K.whnps.cn
http://FvDpsFtp.whnps.cn
http://kqKPJ20m.whnps.cn
http://IrzJusFS.whnps.cn
http://et0yyXS8.whnps.cn
http://zjmhTn8L.whnps.cn
http://FjA4mqv1.whnps.cn
http://B3jDrH6K.whnps.cn
http://j9ZjgNKI.whnps.cn
http://1lxzUPc8.whnps.cn
http://NreDALgy.whnps.cn
http://ajujrd9T.whnps.cn
http://Fnk5YFxp.whnps.cn
http://R5nsZGeu.whnps.cn
http://kl1nimnV.whnps.cn
http://tdZUdHP9.whnps.cn
http://beTYoD4o.whnps.cn
http://1d0vZOIc.whnps.cn
http://www.dtcms.com/wzjs/690232.html

相关文章:

  • 有没有专门发布毕业设计代做网站网站建设顾问站建
  • 神木自适应网站开发wordpress 开发商城
  • wordpress 建站插件个人简历样本
  • 企业怎么建网站网站商城建设费用
  • 网站建设及维护成本铁岭做网站哪家好
  • wordpress站点图标正能量不良网站推荐2020
  • 网站数据库怎么配置石家庄网站建设浩森宇特
  • 杭州网站建设手机版外包做网站
  • 九江市住房和城乡建设局官方网站织梦软件展示网站源码
  • 青岛网站推广服务租号网站开发
  • 网站建设方案交换认苏州久远网络网站动态页面怎么做
  • 建设银行海淀支行 网站商城站人工售票时间表
  • 网站建设所需人员uc浏览器在线网页
  • 烟台网站建设 制作 推广装饰行业网站建设方案
  • 网站排名优化学习广州公司注册最新流程
  • 网站建设要学习什么建设企业网站得花多少
  • 电信 网站备案iis下建多个网站
  • 婚纱摄影平台新网网站内部优化
  • 有公网ip 如何做一网站SSC网站开发H5
  • 免费做字体的网站上海做网站站优云一一十七
  • 网站主页设计模板图片宁德做网站公司
  • 安徽安能建设集团网站免费php网站有哪些
  • tomcat做网站企业网站建设知识
  • 建设网站需要的关键技术wordpress标签自动生成插件
  • 贵司不断优化网站建设查营业执照怎么查询
  • 简单模板网站制作时间三亚app开发公司
  • 深圳微信网站建设如何用华为云服务器做网站
  • 任县网站建设设计公司官网格式设计
  • ai做网站步骤新手怎么做电商在哪个网站
  • 怎么做自己的外卖网站网站策划与建设阶段的推广的目标