当前位置: 首页 > news >正文

2、Connecting to Kafka

KafkaAdmin-请参阅配置主题

ProducerFactory-请参阅发送消息

ConsumerFactory-请参阅接收消息

从2.5版本开始,每个版本都扩展了KafkaResourceFactory。这允许在运行时通过向引导服务器的配置中添加Supplier<String>来更改引导服务器:setBootstrapServersSupplier(()->…)。所有新连接都将调用此命令以获取服务器列表。消费者和生产者通常寿命较长。要关闭现有的生产者,请在DefaultKafkaProducerFactory上调用reset()。要关闭现有的Consumers,请在KafkaListenerEndpointRegistry上调用stop()(然后调用start())和/或在任何其他侦听器容器bean上调用stop()和start()。

为了方便起见,该框架还提供了一个支持两组引导服务器的ABSwitch集群;其中一个在任何时候都是活动的。通过调用setBootstrapServersSupplier()配置ABSwitchCluster并将其添加到生产者和消费者工厂以及KafkaAdmin。当你想切换时,在生产者工厂上调用primary()或secondary()并调用reset()来建立新的连接;对于消费者,stop()和start()都是监听器容器。使用@KafkaListeners时,停止()和启动()KafkaListenerEndpointRegistry bean。

有关更多信息,请参阅Javadocs。

Factory Listeners

从2.5版本开始,DefaultKafkaProducerFactory和DefaultKafkaConsumerFactory可以配置一个监听器,以便在创建或关闭生产者或消费者时接收通知。

interface Listener<K, V> {default void producerAdded(String id, Producer<K, V> producer) {}default void producerRemoved(String id, Producer<K, V> producer) {}}
interface Listener<K, V> {default void consumerAdded(String id, Consumer<K, V> consumer) {}default void consumerRemoved(String id, Consumer<K, V> consumer) {}}

在每种情况下,id都是通过将客户端id属性(在创建后从metrics()中获得)附加到工厂beanName属性中创建的,用..分隔。。

例如,这些监听器可用于在创建新客户端时创建和绑定Micrometer KafkaClientMetrics实例(并在客户端关闭时关闭它)。

该框架提供了正是这样做的监听器;请参见千分尺本地度量。

Default client ID prefixes

从版本3.2开始,对于使用Spring.application.name属性定义应用程序名称的Spring Boot应用程序,此名称现在用作这些客户端类型的自动生成客户端ID的默认前缀:

不使用消费者群体的消费者客户

生产商客户

管理员客户端

这使得在服务器端更容易识别这些客户端,以便进行故障排除或应用配额。

Client TypeWithout application nameWith application name

consumer without consumer group

consumer-null-1

myapp-consumer-1

consumer with consumer group "mygroup"

consumer-mygroup-1

consumer-mygroup-1

producer

producer-1

myapp-producer-1

admin

adminclient-1

myapp-admin-1

http://www.dtcms.com/a/268020.html

相关文章:

  • css模块化以及rem布局
  • linux/ubuntu日志管理--/dev/log 的本质与作用
  • arm 精准总线错误与非精准总线错误
  • C#使用Qdrant实现向量存储及检索
  • 基于ARM+FPGA的光栅尺精密位移加速度测试解决方案
  • 【精密测量】基于ARM+FPGA的多路光栅信号采集方案
  • 【PyTorch 当前版本不支持 NVIDIA GeForce RTX 5060 Ti处理办法】
  • 求医十年,病因不明,ChatGPT:你看起来有基因突变
  • 群晖(Synology)存储ext4视频文件删除的恢复方法
  • Java--指定控制台System.out.println的颜色
  • 408第三季part2 - 计算机网络 - 应用层
  • 洛谷 P1005 [NOIP 2007 提高组] 矩阵取数游戏
  • CentOS-6如何配置网络设置IP? 笔记250706
  • brpc怎么解决C++静态初始化顺序难题的?
  • 2025年7月最新多语言模型研发效能分析(Gemini 2.5 vs Claude 4 vs GPT-4.1)
  • Monorepo+Turborepo+Next常问问题详解
  • GitHub 趋势日报 (2025年07月04日)
  • Gin Web 服务集成 Consul:从服务注册到服务发现实践指南(下)
  • # IS-IS 协议 | LSP 传输与链路状态数据库同步机制
  • 网络爬虫认证的综合分析:从HTTP模拟到浏览器自动化
  • mac中创建 .command 文件,执行node服务
  • 微信小程序71~80
  • 善用关系网络:开源AI大模型、AI智能名片与S2B2C商城小程序赋能下的成功新路径
  • Web后端开发-SpringBootWeb入门、Http协议、Tomcat
  • Gin 框架中如何实现 JWT 鉴权中间件
  • 学习栈和队列的插入和删除操作
  • 网安系列【8】之暴力破解入门
  • 【机器学习深度学习】多分类评估策略
  • Solidity——什么是低级调用(low-level calls)和操作码的内联汇编
  • 一次内存“卡顿”全流程实战分析:从制造问题到优化解决