当前位置: 首页 > news >正文

6,Receiving Messages:@KafkaListener Annotation

@KafkaListener注释用于将bean方法指定为侦听器容器的侦听器。bean被包装在一个配置了各种功能的MessagingMessageListenerAdapter中,例如在必要时转换数据以匹配方法参数的转换器。

您可以使用#{…}或属性占位符(${…})使用SpEL配置注释上的大多数属性。有关更多信息,请参阅Javadoc。

Record Listeners

@KafkaListener注释为简单的POJO监听器提供了一种机制。以下示例显示了如何使用它:

public class Listener {@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")public void listen(String data) {...}}

此机制需要在您的@Configuration类之一上添加@EnableKafka注释和侦听器容器工厂,用于配置底层ConcurrentMessageListenerContainer。默认情况下,需要一个名为kafkaListencerContainerFactory的bean。以下示例显示了如何使用ConcurrentMessageListenerContainer:

@Configuration
@EnableKafka
public class KafkaConfig {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");...return props;}
}

请注意,要设置容器属性,必须在工厂上使用getContainerProperties()方法。它被用作注入容器的实际属性的模板。

从2.1.1版本开始,您现在可以为注释创建的消费者设置client.id属性。clientIdPrefix的后缀为-n,其中n是一个整数,表示使用并发时的容器号。

从2.2版本开始,您现在可以通过使用注释本身的属性来覆盖容器工厂的并发性和autoStartup属性。属性可以是简单的值、属性占位符或SpEL表达式。以下示例显示了如何执行此操作:

@KafkaListener(id = "myListener", topics = "myTopic",autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {...
}

Explicit Partition Assignment

您还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。以下示例显示了如何执行此操作:

@KafkaListener(id = "thing2", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

您可以在partitions或partitionOffsets属性中指定每个分区,但不能同时指定两者。

与大多数注释属性一样,您可以使用SpEL表达式;有关如何生成大量分区列表的示例,请参阅手动分配所有分区。

从2.5.5版本开始,您可以对所有分配的分区应用初始偏移:

@KafkaListener(id = "thing3", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" },partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

*通配符表示partitions属性中的所有分区。每个@TopicPartition中只能有一个带有通配符的@PartitionOffset。

此外,当侦听器实现ConsumerSeekAware时,即使使用手动分配,现在也会调用onPartionsAssigned。例如,这允许在当时进行任何任意的寻道操作。

从2.6.4版本开始,您可以指定以逗号分隔的分区列表或分区范围:

@KafkaListener(id = "pp", autoStartup = "false",topicPartitions = @TopicPartition(topic = "topic1",partitions = "0-5, 7, 10-15"))
public void process(String in) {...
}

范围是包容性的;上面的示例将分配分区0、1、2、3、4、5、7、10、11、12、13、14、15。

指定初始偏移量时可以使用相同的技术:

@KafkaListener(id = "thing3", topicPartitions ={ @TopicPartition(topic = "topic1",partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

初始偏移将应用于所有6个分区。

从3.2开始,@PartitionOffset支持SeekPosition。结束,寻找位置。开始,寻找位置。TIMESTAMP,seekPosition匹配seekPosition枚举名称:

@KafkaListener(id = "seekPositionTime", topicPartitions = {@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")})
})
public void listen(ConsumerRecord<?, ?> record) {...
}

如果seekPosition设置为END或BEGINNING,将忽略initialOffset和relativeToCurrent。如果seekPosition设置TIMESTAMP,initialOffset表示时间戳。

Manual Acknowledgment

使用手动确认模式时,您还可以向听众提供确认。要激活手动确认模式,您需要将ContainerProperties中的确认模式设置为相应的手动模式。以下示例还显示了如何使用不同的容器工厂。此自定义容器工厂必须通过调用getContainerProperties(),然后对其调用setAckMode,将AckMode设置为手动类型。否则,Acknowledgment对象将为null。

@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {...ack.acknowledge();
}

Consumer Record Metadata

最后,可以从消息头中获得有关记录的元数据。您可以使用以下标头名称来检索邮件的标头:

卡夫卡标头。抵消

卡夫卡标头。已接收_KEY

卡夫卡标头。RECEIVED_TOPIC

卡夫卡标头。已接收_分区

卡夫卡标头。已接收_时间戳

卡夫卡标头。时间戳类型

从版本2.5开始,如果传入记录具有空键,则RECEIVED_KEY不存在;以前,标头填充了null值。此更改是为了使框架与不存在空值标头的spring消息传递约定保持一致。

以下示例显示了如何使用标头:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {...
}

必须在侦听器方法的具体实现上指定参数注释(@Payload,@Header);如果它们是在接口上定义的,则不会被检测到。

从2.5版本开始,您可以在ConsumerRecordMetadata参数中接收记录元数据,而不是使用离散的标头。

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {...
}

这包含ConsumerRecord中除键和值之外的所有数据。

Batch Listeners

从1.1版本开始,您可以配置@KafkaListener方法来接收从消费者调查中收到的整批消费者记录。

批处理侦听器不支持非阻塞检索。

要配置侦听器容器工厂以创建批侦听器,可以设置batchListener属性。以下示例显示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setBatchListener(true);return factory;
}

从2.8版本开始,您可以使用@KafkaListener注释上的batch属性覆盖工厂的batchListener属性。这一点,再加上对容器错误处理程序的更改,允许将同一工厂用于记录和批处理侦听器。

从2.9.6版本开始,容器工厂为recordMessageConverter和batchMessageConverter属性提供了单独的设置程序。以前,只有一个属性messageConverter适用于记录和批处理侦听器。

以下示例显示了如何接收有效载荷列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

主题、分区、偏移量等都可以在与有效载荷并行的标头中找到。以下示例显示了如何使用标头:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.OFFSET) List<Long> offsets) {...
}

或者,您可以接收消息列表<?>每个消息中都有每个偏移量和其他详细信息的对象,但它必须是方法上定义的唯一参数(使用手动提交时,除了可选的Acknowledgment和/或Consumer参数)。以下示例显示了如何执行此操作:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {...
}@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {...
}@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {...
}

在这种情况下,不会对有效载荷进行转换。

如果BatchMessagingMessageConverter配置了RecordMessageConverter,您还可以向Message参数添加泛型类型,然后转换有效载荷。有关更多信息,请参阅使用批处理监听器的有效载荷转换。

您还可以收到ConsumerRecord列表<?,?>对象,但它必须是方法上定义的唯一参数(除了可选的Acknowledgment,当使用手动提交和Consumer<,?>参数时)。以下示例显示了如何执行此操作:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {...
}@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {...
}

从2.2版本开始,侦听器可以接收完整的ConsumerRecords<?,?>poll()方法返回的对象,让监听器访问其他方法,如partitions()(返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。同样,这必须是方法上的唯一参数(除了可选的Acknowledgment,当使用手动提交或Consumer<?,?>参数时)。以下示例显示了如何执行此操作:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {...
}

如果容器工厂配置了RecordFilterStrategy,则ConsumerRecords会忽略它?,?>侦听器,并发出警告日志消息。如果列表<?,则只能使用批处理侦听器筛选记录?>使用听众的形式。默认情况下,记录一次过滤一个;从2.8版本开始,您可以重写filterBatch,在一次调用中过滤整个批处理。

Annotation Properties

从2.0版本开始,id属性(如果存在)用作Kafka consumer group.id属性,覆盖消费者工厂中配置的属性(如果有的话)。您还可以显式设置groupId或将idIsGroup设置为false,以恢复使用消费者工厂group.id的先前行为。

您可以在大多数注释属性中使用属性占位符或SpEL表达式,如下例所示:

@KafkaListener(topics = "${some.property}")@KafkaListener(topics = "#{someBean.someProperty}",groupId = "#{someBean.someProperty}.group")

从2.1.2版本开始,SpEL表达式支持一个特殊的标记:__listener。它是一个伪bean名称,表示此注释所在的当前bean实例。

考虑以下示例:

@Bean
public Listener listener1() {return new Listener("topic1");
}@Bean
public Listener listener2() {return new Listener("topic2");
}

给定前面示例中的bean,我们可以使用以下内容:

public class Listener {private final String topic;public Listener(String topic) {this.topic = topic;}@KafkaListener(topics = "#{__listener.topic}",groupId = "#{__listener.topic}.group")public void listen(...) {...}public String getTopic() {return this.topic;}}

如果,在不太可能的情况下,您有一个名为__listener的实际bean,您可以使用beanRef属性更改表达式标记。以下示例显示了如何执行此操作:

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")

从2.2.4版本开始,您可以直接在注释上指定Kafka消费者属性,这些属性将覆盖消费者工厂中配置的任何同名属性。您不能以这种方式指定group.id和client.id属性;他们将被忽视;使用groupId和clientIdPrefix注释属性。

属性被指定为具有正常Java属性文件格式的单个字符串:foo:bar、foo=bar或foo-bar,如下例所示:

@KafkaListener(topics = "myTopic", groupId = "group", properties = {"max.poll.interval.ms:60000",ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

以下是使用RoutingKafkaTemplate中的示例的相应侦听器示例。

@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {System.out.println("1: " + in);
}@KafkaListener(id = "two", topics = "two",properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {System.out.println("2: " + new String(in));
}

http://www.dtcms.com/a/268659.html

相关文章:

  • 诊断工程师进阶篇 --- 车载诊断怎么与时俱进?
  • vue3 字符包含
  • vue openlayer 找出直线上的某一个点 , 点距离直线 最短路径的点 WKT坐标转换为GeoJSON坐标
  • iOS Widget 开发-1:什么是 iOS Widget?开发前的基本认知
  • 亚马逊运营进阶指南:如何用AI工具赋能广告运营
  • 期待在 VR 森林体验模拟中实现与森林的 “虚拟复现”​
  • 华锐视点 VR 污水处理技术对激发学习兴趣的作用​
  • 北京-4年功能测试2年空窗-报培训班学测开-第四十四天
  • UI + MCP Client + MCP Server实验案例
  • 【机器学习笔记 Ⅱ】11 决策树模型
  • Spring Boot 操作 Redis 时 KeySerializer 和 HashKeySerializer 有什么区别?
  • day16——Java集合进阶(Collection、List、Set)
  • Kafka消息积压的原因分析与解决方案
  • 网络安全之重放攻击:原理、危害与防御之道
  • windows grpcurl
  • 用安卓手机给苹果手机设置使用时长限制,怎样将苹果手机的某些APP设置为禁用?有三种方法
  • 软件工程功能点估算基础
  • QML Row与Column布局
  • YOLOv11 架构优化:提升目标检测性能
  • 国内免代理免费使用Gemini大模型实战
  • Vue的生命周期(Vue2)
  • Maven继承:多模块项目高效管理秘笈
  • 微软重磅开源Magentic-UI!
  • 【Rust CLI项目】Rust CLI命令行处理csv文件项目实战
  • AI Tool Calling 实战——让 LLM 控制 Java 工具
  • java-Milvus 连接池(多key)与自定义端点监听设计
  • C++开源项目—2048.cpp
  • 部署MongoDB
  • 接口漏洞怎么抓?Fiddler 中文版 + Postman + Wireshark 实战指南
  • 记录一个关于Maven配置TSF的报错问题