Spring Cloud中配置多个 Kafka 实例的示例
在Spring Cloud中配置多个Kafka实例的示例:
1. 配置文件设置
# application.yml
spring:kafka:# 第一个Kafka配置first:bootstrap-servers: localhost:9092consumer:group-id: first-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 第二个Kafka配置second:bootstrap-servers: localhost:9093consumer:group-id: second-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
2. 配置类定义
@Configuration
public class MultipleKafkaConfig {@Bean@Primary@ConfigurationProperties(prefix = "spring.kafka.first")public KafkaProperties firstKafkaProperties() {return new KafkaProperties();}@Bean@ConfigurationProperties(prefix = "spring.kafka.second")public KafkaProperties secondKafkaProperties() {return new KafkaProperties();}@Bean@Primarypublic ProducerFactory<String, String> firstProducerFactory() {return new DefaultKafkaProducerFactory<>(firstKafkaProperties().buildProducerProperties());}@Beanpublic ProducerFactory<String, String> secondProducerFactory() {return new DefaultKafkaProducerFactory<>(secondKafkaProperties().buildProducerProperties());}@Bean@Primarypublic ConsumerFactory<String, String> firstConsumerFactory() {return new DefaultKafkaConsumerFactory<>(firstKafkaProperties().buildConsumerProperties());}@Beanpublic ConsumerFactory<String, String> secondConsumerFactory() {return new DefaultKafkaConsumerFactory<>(secondKafkaProperties().buildConsumerProperties());}@Bean@Primarypublic KafkaTemplate<String, String> firstKafkaTemplate() {return new KafkaTemplate<>(firstProducerFactory());}@Beanpublic KafkaTemplate<String, String> secondKafkaTemplate() {return new KafkaTemplate<>(secondProducerFactory());}
}
3. 使用示例
@Service
public class MultiKafkaService {@Autowired@Qualifier("firstKafkaTemplate")private KafkaTemplate<String, String> firstKafkaTemplate;@Autowired@Qualifier("secondKafkaTemplate")private KafkaTemplate<String, String> secondKafkaTemplate;public void sendToFirstKafka(String topic, String message) {firstKafkaTemplate.send(topic, message);}public void sendToSecondKafka(String topic, String message) {secondKafkaTemplate.send(topic, message);}
}
4. 消费者配置
@Component
public class MultiKafkaListeners {@KafkaListener(topics = "first-topic", groupId = "first-group")public void listenFirstKafka(String message) {System.out.println("Received from first Kafka: " + message);}@KafkaListener(topics = "second-topic", groupId = "second-group",containerFactory = "secondKafkaListenerContainerFactory")public void listenSecondKafka(String message) {System.out.println("Received from second Kafka: " + message);}
}
5. 容器工厂配置
@Configuration
public class KafkaListenerConfig {@Autowired@Qualifier("firstConsumerFactory")private ConsumerFactory<String, String> firstConsumerFactory;@Autowired@Qualifier("secondConsumerFactory")private ConsumerFactory<String, String> secondConsumerFactory;@Bean@Primarypublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(firstConsumerFactory);return factory;}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> secondKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(secondConsumerFactory);return factory;}
}
这种方式可以让你在同一个应用中同时使用多个Kafka集群,每个集群可以有不同的配置和用途。
6. 其他形式
6.1 配置文件也可以如下形式
# application.yml
spring:kafka:multi:# 第一个Kafka配置first:bootstrap-servers: localhost:9092consumer:group-id: first-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 第二个Kafka配置second:bootstrap-servers: localhost:9093consumer:topic: xxxxxxgroup-id: second-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
6.2 配置类读取所有的配置
import lombok.Data;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;import java.util.Map;@Data // get set生成,非必要 可以换手动get set
@Configuration
@RefreshScope // spring cloud 配置刷新注解 可以没有,具体看自身环境
@ConfigurationProperties(prefix = "spring.kafka")
public class MultiKafkaProperties {// 此时key 就是 first 和 second, 而 KafkaProperties 就是对应的配置private Map<String, KafkaProperties> multi;}
6.3 注册所有消费者,生产者到容器中
import cn.hutool.extra.spring.SpringUtil;import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.Map;@Configuration
public class MultiKafkaConfig {@Resource // 上面读取多个kafka配置的配置类private MultiKafkaProperties multiKafkaProperties;@Bean(initMethod = "init")public MultiKafkaConfig multiKafkaConfigInit() {return new MultiKafkaConfig();}public void init() {Map<String, KafkaProperties> multi = multiKafkaProperties.getMulti();if (CollectionUtils.isEmpty(multi)) {// 配置中没有读到 没有直接返回return;}multi.forEach((keyName, valueProperty) -> {ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = buildListenerContainerFactory(valueProperty);// 注册消费者ContainerFactoryString containerFactoryName = keyName + "ContainerFactory";// 注册容器工厂到spring中,这里用了hutool工具类,也可以自己写个工具类从applicationContext写入SpringUtil.registerBean(containerFactoryName, listenerContainerFactory);// 注册生产者Template 名字是通过配置中的key名拼接的,这里名字可以自由换,通过@Autowired 从容器获取时,需要配合 @Qualifier 注解指定名字 例如 (@Qualifier("firstKafkaTemplate"))String kafkaTemplateName = keyName + "KafkaTemplate";SpringUtil.registerBean(kafkaTemplateName, buildKafkaTemplate(valueProperty));});}private KafkaTemplate<String, String> buildKafkaTemplate(KafkaProperties properties) {ProducerFactory<String, String> producerFactory = buildProducerFactory(properties);return new KafkaTemplate<>(producerFactory);}private ConcurrentKafkaListenerContainerFactory<String, String> buildListenerContainerFactory(KafkaProperties properties) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();ConsumerFactory<String, String> consumerFactory = buildConsumerFactory(properties);factory.setConsumerFactory(consumerFactory);factory.setBatchListener(Boolean.FALSE);factory.setConcurrency(properties.getListener().getConcurrency());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}private ConsumerFactory<String, String> buildConsumerFactory(KafkaProperties properties) {return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());}private ProducerFactory<String, String> buildProducerFactory(KafkaProperties properties) {return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());}}
hutool Maven配置
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency>
从spring容器中直接获取生产者
@DependsOn(value = "multiKafkaConfigInit") // 所在类上可以添加 保证上面的配置类工厂类先加载完成
@Component
public class Xxxxxxxx {@Autowired@Qualifier("firstKafkaTemplate")private KafkaTemplate<String, String> firstKafkaTemplate;// 获取后业务方法中做对应的发送消息业务
}
消费者
@DependsOn(value = "multiKafkaConfigInit") // 所在类上可以添加 保证上面的配置类工厂类先加载完成
@Component
public class xxxx{@KafkaListener(topics = "${spring.kafka.multi.first.consumer.topic}", groupId = "${spring.kafka.multi.first.consumer.group-id}", containerFactory = "firstContainerFactory")public void process(String data, Acknowledgment ack) {// .......消费业务}}
(如有需要可以通过 @ConditionalOnProperty 控制该类是否加载)
以上内容部分内容由大模型生成,注意识别!