当前位置: 首页 > news >正文

Spring框架集成Kakfa的方式

Spring框架集成Kakfa的方式

springboot集成kafka的方式

添加maven依赖

<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置application.yml

spring:kafka:producer:bootstrap-servers: ip:porttopics: topicsretries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:topics: topicsbootstrap-servers: ip:portgroup-id: group_idauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security.protocol: SASL_SSLsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";ssl.truststore.location: client.truststore.jksssl.truststore.password: trus_passwordssl.endpoint.identification.algorithm:

创建kafka生产者和消费者

在Spring Boot应用中,正确配置application.propertiesapplication.yml后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration)会自动创建和装配KafkaTemplateKafkaConsumer等相关的Bean。

  • KafkaTemplate:用于发送消息到Kafka

  • ConsumerFactory:创建Kafka消费者的工厂

  • KafkaListenerContainerFactory:为@KafkaListener方法创建消息监听容器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class KafkaMessageService {@Value("${spring.kafka.producer.topics}")private String outputTopic;@Autowiredprivate final KafkaTemplate<String, String> kafkaTemplate;/*** 监听输入主题的消息* @param message 接收到的消息*/@KafkaListener(topics = "${spring.kafka.consumer.topics}")public void listen(String message) {log.info("Received message:  message = {}", topic, message);// todo 处理消息// 发送到输出主题kafkaTemplate.send(outputTopic, processedMessage);log.info("Sent Processed Message: {}", processedMessage);}
}

手动配置kafka生产者和消费者

如果需要更复杂的配置,也可以自定义kafka的配置类。

kafka消费者配置类:

@Configuration
@Slf4j
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);props.put("ssl.truststore.location", truststoreLocation);props.put("ssl.truststore.password", truststorePassword);props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量,模拟多个独立的消费者并发处理消息factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}

kafka生产者配置类:

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keyDeserializer;@Value("${spring.kafka.producer.value-serializer}")private String valueDeserializer;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(4);configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}

监听消息并处理:

@Component
@Slf4j
public class KafkaMessageProcess {@Value("${spring.kafka.producer.topics}")private String outTopic;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")public void listen(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment acknowledgment) {log.info("Received message: topic = {}, message = {}", topic, message);// 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。acknowledgment.acknowledge();}private void process(String message) {// todo process msg}}

KafkaListener 源码分析

@KafkaListener 的注册

  1. 扫描注解:在bean初始化阶段,KafkaListenerAnnotationBeanPostProcessor 由于实现了BeanPostProcessor,会扫描所有 Bean,查找 @KafkaListener 注解
KafkaListenerAnnotationBeanPostProcessor// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 扫描标注了KafkaListener的类Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);// 扫描标注了KafkaListener的方法Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);return !listenerMethods.isEmpty() ? listenerMethods : null;});// 遍历扫描到的方法,解析签名Iterator var13 = annotatedMethods.entrySet().iterator();Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();Method method = (Method)entry.getKey();Iterator var11 = ((Set)entry.getValue()).iterator();while(var11.hasNext()) {KafkaListener listener = (KafkaListener)var11.next();// 扫描到后,后续的解析注册逻辑this.processKafkaListener(listener, method, bean, beanName);}return bean;
}
  1. 解析注解:提取 topicsgroupIdcontainerFactory 等信息。
KafkaListenerAnnotationBeanPostProcessorprotected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {// 解析注解,将注解元数据、方法、bean等静态配置封装到endpointthis.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);String containerFactory = this.resolve(kafkaListener.containerFactory());KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);// 将上一步扫描到的listener、method等封装成endpoint,进行注册this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
  1. 注册监听端点:调用 KafkaListenerEndpointRegistrar.registerEndpoint() 注册监听器。
KafkaListenerEndpointRegistrarpublic void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized(this.endpointDescriptors) {// 是否立即启动,// true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)// false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动if (this.startImmediately) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);} else {this.endpointDescriptors.add(descriptor);}}
}// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}
}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {synchronized (this.listenerContainers) {// 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析MessageListenerContainer container = createListenerContainer(endpoint, factory);// 将创建好的容器放到一个线程安全的map中this.listenerContainers.put(id, container);if (startImmediately) {// 启动startIfNecessary(container);}}
}

KafkaListenerContainerFactory 创建监听容器

KafkaMessageListenerContainer 是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer,提供了线程管理、消息拉取、监听器调用、错误处理等功能。

暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactoryconcurrency=3表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory创建ConcurrentMessageListenerContainer

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;
}

在ConcurrentMessageListenerContainer中有一个集合,到时候会根据concurrency创建对应数量的KafkaMessageListenerContainer 子容器。

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

容器创建代码

AbstractKafkaListenerContainerFactory// 
public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}endpoint.setupListenerContainer(instance, this.messageConverter);// 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有initializeContainer(instance, endpoint);customizeContainer(instance);return instance;
}protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) { // NOSONARContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONARreturn new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}
}

启动容器,消费消息

前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);,会真正启动容器,进一步触发消费者线程(ListenerConsumer)的初始化并开始消息消费流程。

KafkaListenerEndpointRegistrarprivate void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {listenerContainer.start();}
}// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {doStart();}}
}// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {if (!isRunning()) {// 根据concurrency创建对应数量的子容器for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);configureChildContainer(i, container);if (isPaused()) {container.pause();}// 启动子容器container.start();// 保存到子容器列表this.containers.add(container);}}
}// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {// 创建消费者线程this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);// 阻塞等待消费者线程真正启动完成。this.startLatch = new CountDownLatch(1);// 提交到线程池,异步启动消费者线程。this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

消费消息的逻辑在ListenerConsumer中,该类实现了Runnable接口的run()方法,在run()方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。

ListenerConsumerpublic void run() {while (isRunning()) {try {// 从kafka拉取消息并通过反射调用业务方法pollAndInvoke();}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}
}protected void pollAndInvoke() {// 拉取消息ConsumerRecords<K, V> records = doPoll();// 通过反射调用到我们自定义的方法进行消息处理invokeIfHaveRecords(records);
}
http://www.dtcms.com/a/356539.html

相关文章:

  • 网络与信息安全有哪些岗位:(12)威胁分析师
  • LeetCode算法日记 - Day 25: 数组中的第K个最大元素、库存管理III
  • Docker的常用命令及简单使用
  • More Effective C++ 条款15:了解异常处理(exception handling)的成本
  • 判断语句中std::cin隐式转换为bool--重载operator bool()
  • Point Transformer V3(PTv3)【3:上采样unpooling】
  • 【C++详解】C++11(一) 列表初始化、右值引⽤和移动语义
  • 【查看css技巧】hover或者其他方式触发出来的样式如何查看
  • Linux网络基础1(一)之计算机网络背景
  • Java常用工具类
  • python 日常学习记录
  • rust打包增加图标
  • 中国国际商会副秘书长徐梁一行到访国联股份
  • Daily Review
  • 查看docker容器内部的环境变量并向docker容器内部添加新的环境变量
  • Java试题-选择题(21)
  • linux学习-数据库
  • 2025年9月计算机二级C++语言程序设计——选择题打卡Day10
  • 2025楼宇自控DDC全面解析
  • WPF+IOC学习记录
  • 使用 Wheel Variants 简化 CUDA 加速 Python 安装和打包工作流
  • mysql中表的约束
  • AI供应链优化+AI门店排班:蜜雪冰城降本20%、瑞幸提效的AI商业落地实战
  • SQL优化--OR
  • springboot中循环依赖的解决方法-使用反射
  • linux mysql数据备份
  • 零基础上手:Cursor + MCP 爬取 YouTube 视频数据
  • 政策技术双轮驱动 | 新一代工业软件供需对接会·顺德站成功举办
  • 深入解析Nginx核心模块
  • npm使用的环境变量及其用法