当前位置: 首页 > news >正文

Spring集成kafka的最佳方式

本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。

可用类库
  • kafka client
  • spring for apache kafka
  • spring integration kafka
  • spring cloud stream binder kafka

基于java版的kafka client与spring进行集成

org.springframework.kafkaspring-kafka1.2.2.RELEASE
与springboot的集成

对于springboot 1.5版本之前的话,需要自己去配置java configuration,而1.5版本以后则提供了auto config,具体详见org.springframework.boot.autoconfigure.kafka这个包,主要有

  • KafkaAutoConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

private final KafkaProperties properties;public KafkaAutoConfiguration(KafkaProperties properties) {this.properties = properties;
}@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<, > kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(kafkaProducerFactory);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;
}@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<Object, Object>();
}@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<, > kafkaConsumerFactory() {return new DefaultKafkaConsumerFactory<Object, Object>(this.properties.buildConsumerProperties());
}@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<, > kafkaProducerFactory() {return new DefaultKafkaProducerFactory<Object, Object>(this.properties.buildProducerProperties());
}

}

  • KafkaAnnotationDrivenConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

private final KafkaProperties properties;KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {this.properties = properties;
}@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);return configurer;
}@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<, > kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();configurer.configure(factory, kafkaConsumerFactory);return factory;
}@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA\_LISTENER\_ANNOTATION\_PROCESSOR\_BEAN\_NAME)
protected static class EnableKafkaConfiguration {}

}

  • ConcurrentKafkaListenerContainerFactoryConfigurer spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

public class ConcurrentKafkaListenerContainerFactoryConfigurer {

private KafkaProperties properties;/\*\*\* Set the {@link KafkaProperties} to use.\* @param properties the properties\*/
void setKafkaProperties(KafkaProperties properties) {this.properties = properties;
}/\*\*\* Configure the specified Kafka listener container factory. The factory can be\* further tuned and default settings can be overridden.\* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}\* instance to configure\* @param consumerFactory the {@link ConsumerFactory} to use\*/
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,ConsumerFactory<Object, Object> consumerFactory) {listenerContainerFactory.setConsumerFactory(consumerFactory);Listener container = this.properties.getListener();ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties();if (container.getAckMode() != null) {containerProperties.setAckMode(container.getAckMode());}if (container.getAckCount() != null) {containerProperties.setAckCount(container.getAckCount());}if (container.getAckTime() != null) {containerProperties.setAckTime(container.getAckTime());}if (container.getPollTimeout() != null) {containerProperties.setPollTimeout(container.getPollTimeout());}if (container.getConcurrency() != null) {listenerContainerFactory.setConcurrency(container.getConcurrency());}
}

}

创建并发的多个KafkaMessageListenerContainer,相当于一个应用实例创建多个consumer 如果是1.5版本及以上的springboot,使用起来就比较简单了,注入kafkaTemplate直接发消息,然后简单配置一下就可以消费消息

spring integration kafka

spring integration是spring关于Enterprise Integration Patterns的实现,而spring integration kafka则基于spring for apache kafka提供了inbound以及outbound channel的适配器 Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x

这个的话,没有自动配置,又引入了integration相关的概念,整体来讲,相对复杂一些。

consumer配置
@Bean
public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> kafkaConsumerFactory) {return new KafkaMessageListenerContainer<>(kafkaConsumerFactory,new ContainerProperties(new TopicPartitionInitialOffset(topic, 0)));
}
@Bean
public ConsumerFactory<, > kafkaConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, brokerAddress);props.put(ConsumerConfig.GROUP\_ID\_CONFIG, consumerGroup);props.put(ConsumerConfig.ENABLE\_AUTO\_COMMIT\_CONFIG, true);props.put(ConsumerConfig.AUTO\_COMMIT\_INTERVAL\_MS\_CONFIG, 100);props.put(ConsumerConfig.SESSION\_TIMEOUT\_MS\_CONFIG, 15000);props.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG,"earliest");props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =new KafkaMessageDrivenChannelAdapter<>(container);kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel fromKafka() {return new QueueChannel();
}
producer配置
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {KafkaProducerMessageHandler<String, String> handler =new KafkaProducerMessageHandler<>(kafkaTemplate());handler.setTopicExpression(new LiteralExpression(topic));handler.setMessageKeyExpression(new LiteralExpression(messageKey));return handler;
}
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP\_SERVERS\_CONFIG, brokerAddress);props.put(ProducerConfig.RETRIES\_CONFIG, 0);props.put(ProducerConfig.BATCH\_SIZE\_CONFIG, 16384);props.put(ProducerConfig.LINGER\_MS\_CONFIG, 1);props.put(ProducerConfig.BUFFER\_MEMORY\_CONFIG, 33554432);props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(kafkaProducerFactory());
}
收发信息
@Autowired
@Qualifier("fromKafka")
private PollableChannel fromKafka;@Autowired
@Qualifier("toKafka")
MessageChannel toKafka;Message msg = fromKafka.receive(10000l);
toKafka.send(new GenericMessage<Object>(UUID.randomUUID().toString()));

spring cloud stream

基于Spring Integration构建,在spring cloud环境中又稍作加工,也稍微有点封装了. 具体详见spring cloud stream kafka实例以及spring-cloud-stream-binder-kafka属性配置

doc

  • spring-kafka
  • spring-integration
  • spring-integration-kafka
  • spring-integration-samples-kafka
  • spring-cloud-stream
  • spring boot与kafka集成
  • 总结kafka的consumer消费能力很低的情况下的处理方案
http://www.dtcms.com/a/600621.html

相关文章:

  • 设计网站怎么做网业是什么行业
  • RK3588应用分享之国产化系统-开源鸿蒙OpenHarmony
  • RabbitMQ-基础-总结
  • 学习react第二天
  • 【JVS更新日志】低代码、APS排产、物联网、企业计划11.12更新说明!
  • 前端注释规范:如何写“后人能看懂”的注释(附示例)
  • C语言编译器下载地址 | 如何选择适合自己的C语言编译器
  • HarmonyOS之深入解析如何实现语音朗读能力
  • 台州企业网站的建设做网站能挣多少钱
  • 网站开发内容包括哪些wordpress 统计代码
  • 【昇腾CANN工程实践】BERT情感分析API性能优化实录:从CPU到NPU的15倍加速
  • 【Linux基础开发工具 (二)】详解Linux文本编辑器:Vim从入门到精通——完整教程与实战指南(上)
  • 使用 BR 备份 TiDB 到阿里云 OSS 存储
  • 机器学习项目——基于集成学习提升树情绪分类(代码/论文)
  • C++ 抽象类与多态原理深度解析:从纯虚函数到虚表机制(附高频面试题)
  • 尚硅谷 SpringCloud 01 分布式概念-工程创建-nacos安装-nacos服务注册与发现 -远程调用
  • C# Sqlite帮助类
  • 传统方式部署 Hadoop 高可用集群
  • 微软 Win11 经典版 Outlook 曝 BUG,加速 SSD 损耗
  • C++在边缘AI加速中的硬件优化:结合位运算与SIMD提升推理效率
  • 网站开发文档撰写作业牡丹江整站优化
  • QT:ItemView视图控件
  • 让UI完全按屏幕比例变化的方法
  • 结项报告完整版:Apache SeaTunnel 支持 Flink 引擎 Schema Evolution 功能
  • 微服务生态组件之Spring Cloud LoadBalancer详解和源码分析
  • 重庆长寿网站设计公司哪家专业网站跳转微信链接
  • 阿里云域名DNS解析URL转发不支持HTTPS?
  • leetcode 2654. 使数组所有元素变成 1 的最少操作次数 中等
  • AI取名大师 | PM2 部署 Bun.js 应用及配置 Let‘s Encrypt 免费 HTTPS 证书
  • 结项报告完整版 | Apache SeaTunnel支持metalake开发