当前位置: 首页 > news >正文

spring中的@KafkaListener 注解详解

@KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析:

基本用法

@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {System.out.println("Received Message: " + message);
}

主要属性

1. 必需属性

  • topics / topicPattern:指定监听的 topic
    • topics:逗号分隔的 topic 列表
    • `topicPattern**:使用正则表达式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")

2. 消费者配置

  • groupId:指定消费者组 ID
  • containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "myFactory")

3. 消息处理

  • id:为监听器指定唯一 ID
  • concurrency:设置并发消费者数量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")

4. 高级配置

  • containerGroup:指定容器组(Spring Kafka 2.5+)
  • errorHandler:指定错误处理器
  • idIsGroup:是否使用监听器 ID 作为组 ID(默认 false)

消息处理方法签名

监听器方法可以接受多种形式的参数:

  1. 简单消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message) { ... }
    
  2. 带元数据的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> record) { ... }
    
  3. 批量消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(List<String> messages) { ... }
    
  4. 带确认的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message, Acknowledgment ack) {// 处理消息后手动确认ack.acknowledge();
    }
    

配置选项

可以通过 @KafkaListenercontainerFactory 属性引用自定义配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;
}@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }

错误处理

可以通过以下方式处理错误:

  1. 配置错误处理器

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setErrorHandler(new SeekToCurrentErrorHandler());return factory;
    }
    
  2. 使用 @SendTo 发送到死信队列

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    @SendTo("myDltTopic")
    public String listen(String message) {// 处理失败时返回错误消息return "error";
    }
    

注意事项

  1. 监听器方法应该是 public 的
  2. 避免在监听器方法中执行长时间运行的操作
  3. 考虑消息处理的幂等性
  4. 对于批量处理,确保方法参数是 List 类型
  5. 在 Spring Boot 中,许多配置可以通过 application.properties/yml 设置

完整示例

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}
}@Service
public class KafkaMessageListener {@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "kafkaListenerContainerFactory")public void listen(String message, Acknowledgment ack) {try {System.out.println("Received Message: " + message);// 业务处理逻辑ack.acknowledge();} catch (Exception e) {// 错误处理}}
}

@KafkaListener 注解提供了灵活的方式来消费 Kafka 消息,开发者可以根据具体需求进行配置和扩展。

ConcurrentKafkaListenerContainerFactory详解

在Spring Kafka中,ConcurrentKafkaListenerContainerFactory是一个核心配置类,用于创建并发消息监听容器,支持多线程消费Kafka消息,以下是其详细介绍:

1、核心作用

  1. 并发消费支持:通过创建多个KafkaMessageListenerContainer实例(每个对应一个线程),实现多线程并发消费消息。例如设置concurrency=3会创建3个消费者线程,每个线程处理分配到的分区。
  2. 线程安全保障:生成的ConcurrentMessageListenerContainer内部委托给多个单线程的KafkaMessageListenerContainer实例,保证线程安全性(Kafka Consumer本身非线程安全)。

2、关键特性

  1. 并发度配置

    • 通过setConcurrency()方法设置并发消费者数量,可提高消息处理速度和吞吐量。
    • 配置规则为concurrency<=分区数/应用实例数,设置过多会导致线程闲置。
  2. 批量处理支持

    • 通过setBatchListener(true)启用批量消费
    • 配合MAX_POLL_RECORDS_CONFIG参数控制单次poll最大返回记录数
  3. 错误处理机制

    • 可配置自定义错误处理器(如SeekToCurrentErrorHandler
    • 支持重试策略集成
  4. 分区分配控制

    • 可自定义分区分配逻辑
    • 配合group.id实现消费者组协调

3、配置示例

@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setPollTimeout(3000); // 设置轮询超时factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 设置错误处理器return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消费配置return new DefaultKafkaConsumerFactory<>(props);}
}

4、使用场景

  1. 高吞吐量需求:通过增加并发消费者数量提升处理能力
  2. 批量数据处理:需要批量处理消息的场景
  3. 复杂错误处理:需要自定义错误处理逻辑的场景
  4. 多主题监听:需要同时监听多个主题的场景

5、注意事项

  1. 顺序性问题:并发消费可能导致消息顺序混乱,需业务保证
  2. 重复处理问题:需实现幂等性处理机制
  3. 数据库访问:需注意并发访问控制
  4. 资源限制:并发度设置需考虑系统资源限制

在这里插入图片描述

相关文章:

  • uni-app学习笔记二十三--交互反馈showToast用法
  • LeetCode - 560. 和为 K 的子数组
  • 【西门子杯工业嵌入式-5-串口实现数据收发】
  • java中static学习笔记
  • ubuntu下编译osg3.6.5源码
  • AT_abc409_e [ABC409E] Pair Annihilation
  • 【JAVA】javadoc —— 如何生成标准的 Java API 文档
  • 12.7Swing控件6 JList
  • SQL Server从入门到项目实践(超值版)读书笔记 16
  • 用 DeepSeek 高效完成数据分析与挖掘
  • 时序数据库IoTDB结合SeaTunnel实现高效数据同步
  • 浅谈未来汽车电子电气架构发展趋势中的通信部分
  • 码蹄杯真题分享
  • LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
  • 前端现行架构浅析
  • masm32汇编实现扫雷进程注入
  • 深入理解 x86 汇编中的符号扩展指令:从 CBW 到 CDQ 的全解析
  • Chainlink Automation 深度解析与实战
  • 【算法】【优选算法】优先级队列
  • LUFFY(路飞): 使用DeepSeek指导Qwen强化学习
  • 做易拉宝的素材网站/windows优化大师免费
  • 做页面设计的网站/百度云链接
  • wordpress 跟随滚动/搜索引擎优化seo什么意思
  • 网站用axure做的rp格式/网站推广平台搭建
  • 同方云罐网站设计/南宁做网站公司
  • 住房和城乡建设部网站登录/优秀企业网站欣赏