当前位置: 首页 > news >正文

Spring Cloud中配置多个 Kafka 实例的示例

在Spring Cloud中配置多个Kafka实例的示例:

1. 配置文件设置

# application.yml
spring:kafka:# 第一个Kafka配置first:bootstrap-servers: localhost:9092consumer:group-id: first-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 第二个Kafka配置second:bootstrap-servers: localhost:9093consumer:group-id: second-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

2. 配置类定义

@Configuration
public class MultipleKafkaConfig {@Bean@Primary@ConfigurationProperties(prefix = "spring.kafka.first")public KafkaProperties firstKafkaProperties() {return new KafkaProperties();}@Bean@ConfigurationProperties(prefix = "spring.kafka.second")public KafkaProperties secondKafkaProperties() {return new KafkaProperties();}@Bean@Primarypublic ProducerFactory<String, String> firstProducerFactory() {return new DefaultKafkaProducerFactory<>(firstKafkaProperties().buildProducerProperties());}@Beanpublic ProducerFactory<String, String> secondProducerFactory() {return new DefaultKafkaProducerFactory<>(secondKafkaProperties().buildProducerProperties());}@Bean@Primarypublic ConsumerFactory<String, String> firstConsumerFactory() {return new DefaultKafkaConsumerFactory<>(firstKafkaProperties().buildConsumerProperties());}@Beanpublic ConsumerFactory<String, String> secondConsumerFactory() {return new DefaultKafkaConsumerFactory<>(secondKafkaProperties().buildConsumerProperties());}@Bean@Primarypublic KafkaTemplate<String, String> firstKafkaTemplate() {return new KafkaTemplate<>(firstProducerFactory());}@Beanpublic KafkaTemplate<String, String> secondKafkaTemplate() {return new KafkaTemplate<>(secondProducerFactory());}
}

3. 使用示例

@Service
public class MultiKafkaService {@Autowired@Qualifier("firstKafkaTemplate")private KafkaTemplate<String, String> firstKafkaTemplate;@Autowired@Qualifier("secondKafkaTemplate")private KafkaTemplate<String, String> secondKafkaTemplate;public void sendToFirstKafka(String topic, String message) {firstKafkaTemplate.send(topic, message);}public void sendToSecondKafka(String topic, String message) {secondKafkaTemplate.send(topic, message);}
}

4. 消费者配置

@Component
public class MultiKafkaListeners {@KafkaListener(topics = "first-topic", groupId = "first-group")public void listenFirstKafka(String message) {System.out.println("Received from first Kafka: " + message);}@KafkaListener(topics = "second-topic", groupId = "second-group",containerFactory = "secondKafkaListenerContainerFactory")public void listenSecondKafka(String message) {System.out.println("Received from second Kafka: " + message);}
}

5. 容器工厂配置

@Configuration
public class KafkaListenerConfig {@Autowired@Qualifier("firstConsumerFactory")private ConsumerFactory<String, String> firstConsumerFactory;@Autowired@Qualifier("secondConsumerFactory")private ConsumerFactory<String, String> secondConsumerFactory;@Bean@Primarypublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(firstConsumerFactory);return factory;}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> secondKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(secondConsumerFactory);return factory;}
}

这种方式可以让你在同一个应用中同时使用多个Kafka集群,每个集群可以有不同的配置和用途。

6. 其他形式

6.1 配置文件也可以如下形式

# application.yml
spring:kafka:multi:# 第一个Kafka配置first:bootstrap-servers: localhost:9092consumer:group-id: first-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 第二个Kafka配置second:bootstrap-servers: localhost:9093consumer:topic: xxxxxxgroup-id: second-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

6.2 配置类读取所有的配置

import lombok.Data;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;import java.util.Map;@Data  // get set生成,非必要 可以换手动get set
@Configuration
@RefreshScope // spring cloud 配置刷新注解 可以没有,具体看自身环境
@ConfigurationProperties(prefix = "spring.kafka")
public class MultiKafkaProperties {// 此时key 就是  first 和 second, 而 KafkaProperties 就是对应的配置private Map<String, KafkaProperties> multi;}

6.3 注册所有消费者,生产者到容器中

import cn.hutool.extra.spring.SpringUtil;import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.Map;@Configuration
public class MultiKafkaConfig {@Resource  // 上面读取多个kafka配置的配置类private MultiKafkaProperties multiKafkaProperties;@Bean(initMethod = "init")public MultiKafkaConfig multiKafkaConfigInit() {return new MultiKafkaConfig();}public void init() {Map<String, KafkaProperties> multi = multiKafkaProperties.getMulti();if (CollectionUtils.isEmpty(multi)) {// 配置中没有读到  没有直接返回return;}multi.forEach((keyName, valueProperty) -> {ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = buildListenerContainerFactory(valueProperty);// 注册消费者ContainerFactoryString containerFactoryName = keyName + "ContainerFactory";// 注册容器工厂到spring中,这里用了hutool工具类,也可以自己写个工具类从applicationContext写入SpringUtil.registerBean(containerFactoryName, listenerContainerFactory);// 注册生产者Template 名字是通过配置中的key名拼接的,这里名字可以自由换,通过@Autowired 从容器获取时,需要配合 @Qualifier 注解指定名字 例如 (@Qualifier("firstKafkaTemplate"))String kafkaTemplateName = keyName + "KafkaTemplate";SpringUtil.registerBean(kafkaTemplateName, buildKafkaTemplate(valueProperty));});}private KafkaTemplate<String, String> buildKafkaTemplate(KafkaProperties properties) {ProducerFactory<String, String> producerFactory = buildProducerFactory(properties);return new KafkaTemplate<>(producerFactory);}private ConcurrentKafkaListenerContainerFactory<String, String> buildListenerContainerFactory(KafkaProperties properties) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();ConsumerFactory<String, String> consumerFactory = buildConsumerFactory(properties);factory.setConsumerFactory(consumerFactory);factory.setBatchListener(Boolean.FALSE);factory.setConcurrency(properties.getListener().getConcurrency());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}private ConsumerFactory<String, String> buildConsumerFactory(KafkaProperties properties) {return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());}private ProducerFactory<String, String> buildProducerFactory(KafkaProperties properties) {return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());}}

hutool Maven配置

            <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency>

从spring容器中直接获取生产者

@DependsOn(value = "multiKafkaConfigInit")  // 所在类上可以添加 保证上面的配置类工厂类先加载完成
@Component
public class Xxxxxxxx {@Autowired@Qualifier("firstKafkaTemplate")private KafkaTemplate<String, String> firstKafkaTemplate;// 获取后业务方法中做对应的发送消息业务
}

消费者

@DependsOn(value = "multiKafkaConfigInit")  // 所在类上可以添加 保证上面的配置类工厂类先加载完成
@Component
public class xxxx{@KafkaListener(topics = "${spring.kafka.multi.first.consumer.topic}", groupId = "${spring.kafka.multi.first.consumer.group-id}", containerFactory = "firstContainerFactory")public void process(String data, Acknowledgment ack) {// .......消费业务}}

(如有需要可以通过 @ConditionalOnProperty 控制该类是否加载)

以上内容部分内容由大模型生成,注意识别!

http://www.dtcms.com/a/388922.html

相关文章:

  • 从零开始手写机器学习框架:我的深度学习之旅——核心原理解密与手写实现
  • 有方向的微小目标检测
  • 【office】如何让word每一章都单独成一页
  • git安装教程+IDEA集成+客户端命令全面讲解
  • rsync带账号密码
  • rust语言项目实战:生成双色球、大乐透所有玩法的所有数字组合(逐行注释)
  • 远程配置服务器 ubuntu22.04 里的 docker 的x11
  • rust编写web服务03-错误处理与响应封装
  • Docker基础篇07:Docker容器数据卷
  • WPF 拖拽(Drag Drop)完全指南:从入门到精通
  • rust编写web服务05-数据库连接池
  • AppInventor2使用本地SQLite实现用户注册登录功能
  • Prompt(提示词工程)优化
  • Ubuntu 系统安装 PostgreSQL 17.6
  • Kotlin-基础语法练习四
  • 开源的消逝与新生:从 TensorFlow 的落幕到开源生态的蜕
  • 原创GIS FOR Unity3d PAD VR LINUXPC 同时支持。非cesium
  • Kotlin中协程的管理
  • django如何自己写一个登录时效验证中间件
  • 【大前端++】初始技术栈跨平台方案Electron+Vue,MacOS开发环境搭建【十分钟一个Demo】
  • 限时起售价17.38万元,吉利银河M9上市
  • Vue : defineModel()
  • 一套基于Java+Vue+UniApp开发的同城配送系统
  • Vue 3 手机外观组件库
  • 部署分布式CephFS,存储的服务器的最低配置
  • 【Spring AI】Ollama大模型-智能对话实现+项目实战(Spring Boot + Vue)
  • Vue 3 实战:GIS 系统模块化设计与多功能融合方案
  • Docker多容器编排:Compose 实战教程——从入门到精通
  • Vue2 基础知识点一:数据绑定 (Data Binding)
  • layui tree组件回显bug问题,父级元素选中导致子集全部选中