Spring框架集成Kakfa的方式
Spring框架集成Kakfa的方式
springboot集成kafka的方式
添加maven依赖
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
配置application.yml
spring:kafka:producer:bootstrap-servers: ip:porttopics: topicsretries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:topics: topicsbootstrap-servers: ip:portgroup-id: group_idauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security.protocol: SASL_SSLsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";ssl.truststore.location: client.truststore.jksssl.truststore.password: trus_passwordssl.endpoint.identification.algorithm:
创建kafka生产者和消费者
在Spring Boot应用中,正确配置application.properties
或application.yml
后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration
)会自动创建和装配KafkaTemplate
和KafkaConsumer
等相关的Bean。
-
KafkaTemplate
:用于发送消息到Kafka -
ConsumerFactory
:创建Kafka消费者的工厂 -
KafkaListenerContainerFactory
:为@KafkaListener
方法创建消息监听容器。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class KafkaMessageService {@Value("${spring.kafka.producer.topics}")private String outputTopic;@Autowiredprivate final KafkaTemplate<String, String> kafkaTemplate;/*** 监听输入主题的消息* @param message 接收到的消息*/@KafkaListener(topics = "${spring.kafka.consumer.topics}")public void listen(String message) {log.info("Received message: message = {}", topic, message);// todo 处理消息// 发送到输出主题kafkaTemplate.send(outputTopic, processedMessage);log.info("Sent Processed Message: {}", processedMessage);}
}
手动配置kafka生产者和消费者
如果需要更复杂的配置,也可以自定义kafka的配置类。
kafka消费者配置类:
@Configuration
@Slf4j
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);props.put("ssl.truststore.location", truststoreLocation);props.put("ssl.truststore.password", truststorePassword);props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量,模拟多个独立的消费者并发处理消息factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
kafka生产者配置类:
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keyDeserializer;@Value("${spring.kafka.producer.value-serializer}")private String valueDeserializer;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(4);configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}
监听消息并处理:
@Component
@Slf4j
public class KafkaMessageProcess {@Value("${spring.kafka.producer.topics}")private String outTopic;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")public void listen(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment acknowledgment) {log.info("Received message: topic = {}, message = {}", topic, message);// 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。acknowledgment.acknowledge();}private void process(String message) {// todo process msg}}
KafkaListener 源码分析
@KafkaListener
的注册
- 扫描注解:在bean初始化阶段,
KafkaListenerAnnotationBeanPostProcessor
由于实现了BeanPostProcessor
,会扫描所有 Bean,查找@KafkaListener
注解
KafkaListenerAnnotationBeanPostProcessor// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 扫描标注了KafkaListener的类Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);// 扫描标注了KafkaListener的方法Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);return !listenerMethods.isEmpty() ? listenerMethods : null;});// 遍历扫描到的方法,解析签名Iterator var13 = annotatedMethods.entrySet().iterator();Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();Method method = (Method)entry.getKey();Iterator var11 = ((Set)entry.getValue()).iterator();while(var11.hasNext()) {KafkaListener listener = (KafkaListener)var11.next();// 扫描到后,后续的解析注册逻辑this.processKafkaListener(listener, method, bean, beanName);}return bean;
}
- 解析注解:提取
topics
、groupId
、containerFactory
等信息。
KafkaListenerAnnotationBeanPostProcessorprotected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {// 解析注解,将注解元数据、方法、bean等静态配置封装到endpointthis.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);String containerFactory = this.resolve(kafkaListener.containerFactory());KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);// 将上一步扫描到的listener、method等封装成endpoint,进行注册this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
- 注册监听端点:调用
KafkaListenerEndpointRegistrar.registerEndpoint()
注册监听器。
KafkaListenerEndpointRegistrarpublic void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized(this.endpointDescriptors) {// 是否立即启动,// true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)// false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动if (this.startImmediately) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);} else {this.endpointDescriptors.add(descriptor);}}
}// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true; // trigger immediate startup}
}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {synchronized (this.listenerContainers) {// 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析MessageListenerContainer container = createListenerContainer(endpoint, factory);// 将创建好的容器放到一个线程安全的map中this.listenerContainers.put(id, container);if (startImmediately) {// 启动startIfNecessary(container);}}
}
KafkaListenerContainerFactory
创建监听容器
KafkaMessageListenerContainer
是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer
,提供了线程管理、消息拉取、监听器调用、错误处理等功能。
暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactory
,concurrency=3
表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory
创建ConcurrentMessageListenerContainer
。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;
}
在在ConcurrentMessageListenerContainer
中有一个集合,到时候会根据concurrency
创建对应数量的KafkaMessageListenerContainer
子容器。
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
容器创建代码
AbstractKafkaListenerContainerFactory//
public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}endpoint.setupListenerContainer(instance, this.messageConverter);// 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有initializeContainer(instance, endpoint);customizeContainer(instance);return instance;
}protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) { // NOSONARContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONARreturn new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}
}
启动容器,消费消息
前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);
,会真正启动容器,进一步触发消费者线程(ListenerConsumer
)的初始化并开始消息消费流程。
KafkaListenerEndpointRegistrarprivate void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {listenerContainer.start();}
}// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {doStart();}}
}// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {if (!isRunning()) {// 根据concurrency创建对应数量的子容器for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);configureChildContainer(i, container);if (isPaused()) {container.pause();}// 启动子容器container.start();// 保存到子容器列表this.containers.add(container);}}
}// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {// 创建消费者线程this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);// 阻塞等待消费者线程真正启动完成。this.startLatch = new CountDownLatch(1);// 提交到线程池,异步启动消费者线程。this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}
消费消息的逻辑在ListenerConsumer
中,该类实现了Runnable
接口的run()
方法,在run()
方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。
ListenerConsumerpublic void run() {while (isRunning()) {try {// 从kafka拉取消息并通过反射调用业务方法pollAndInvoke();}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}
}protected void pollAndInvoke() {// 拉取消息ConsumerRecords<K, V> records = doPoll();// 通过反射调用到我们自定义的方法进行消息处理invokeIfHaveRecords(records);
}