当前位置: 首页 > news >正文

3、Configuring Topics

如果您在应用程序上下文中定义了KafkaAdmin bean,它可以自动向代理添加主题。为此,您可以将每个主题的NewTopic@Bean添加到应用程序上下文中。2.3版本引入了一个新的类TopicBuilder,使创建此类bean更加方便。以下示例显示了如何执行此操作:

@Bean
public KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");return new KafkaAdmin(configs);
}@Bean
public NewTopic topic1() {return TopicBuilder.name("thing1").partitions(10).replicas(3).compact().build();
}@Bean
public NewTopic topic2() {return TopicBuilder.name("thing2").partitions(10).replicas(3).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}@Bean
public NewTopic topic3() {return TopicBuilder.name("thing3").assignReplicas(0, List.of(0, 1)).assignReplicas(1, List.of(1, 2)).assignReplicas(2, List.of(2, 0)).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}

从2.6版本开始,您可以省略partitions()和/或replias(),代理默认值将应用于这些属性。代理版本必须至少为2.4.0才能支持此功能-请参阅KIP-464。

@Bean
public NewTopic topic4() {return TopicBuilder.name("defaultBoth").build();
}@Bean
public NewTopic topic5() {return TopicBuilder.name("defaultPart").replicas(1).build();
}@Bean
public NewTopic topic6() {return TopicBuilder.name("defaultRepl").partitions(3).build();
}

从2.7版本开始,您可以在一个KafkaAdmin中声明多个NewTopics。NewTopics bean定义:

@Bean
public KafkaAdmin.NewTopics topics456() {return new NewTopics(TopicBuilder.name("defaultBoth").build(),TopicBuilder.name("defaultPart").replicas(1).build(),TopicBuilder.name("defaultRepl").partitions(3).build());
}

使用Spring Boot时,KafkaAdmin bean会自动注册,因此您只需要NewTopic(和/或NewTopics)@Beans。

默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。您可以通过编程调用管理员的initialize()方法,稍后重试。如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable属性设置为true。随后,上下文初始化失败。

如果代理支持它(1.0.0或更高版本),如果发现现有主题的分区数少于NewTopic.numPartitions,管理员会增加分区数。

从2.7版本开始,KafkaAdmin提供了在运行时创建和检查主题的方法。

创建或修改主题

描述主题

对于更高级的功能,您可以直接使用AdminClient。以下示例显示了如何执行此操作:

@Autowired
private KafkaAdmin admin;...AdminClient client = AdminClient.create(admin.getConfigurationProperties());...client.close();

从2.9.10、3.0.9版本开始,您可以提供一个Predicate<NewTopic>,用于确定是否应考虑创建或修改特定的NewTopic bean。例如,如果您有多个指向不同集群的KafkaAdmin实例,并且希望选择应由每个管理员创建或修改的主题,则这很有用。

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));

http://www.dtcms.com/a/267886.html

相关文章:

  • I-Cache、D-Cache 和 SRAM 的区别与联系
  • 系统架构设计师论文分享-论软件体系结构的演化
  • Docker容器中安装MongoDB,导入数据
  • nvm常用指令汇总
  • Spark流水线数据质量检查组件
  • 【认知】如何在高强度工作中保持心理健康和情绪稳定?
  • WizTree v4.2.5 x86 x64 单文件版
  • 让你的asp.net网站在调试模式下也能在局域网通过ip访问
  • Java 双亲委派机制笔记
  • GitCode项目创建指南
  • 一文掌握Qt Quick数字图像处理项目开发(基于Qt 6.9 C++和QML,代码开源)
  • 【黑马点评】(二)缓存
  • PyTorch 2.7深度技术解析:新一代深度学习框架的革命性演进
  • Python作业1
  • 实现Spring MVC登录验证与拦截器保护:从原理到实战
  • Jiraph​ 简介
  • React 各颜色转换方法、颜色值换算工具HEX、RGB/RGBA、HSL/HSLA、HSV、CMYK
  • AcWing--873.欧拉函数
  • ARMv8 创建1、2、3级页表代码与注释
  • 【C++基础】内存管理四重奏:malloc/free vs new/delete - 面试高频考点与真题解析
  • Windows 11 Enterprise LTSC 转 IoT
  • C++ i386/AMD64平台汇编指令对齐长度获取实现
  • LangChain:构建一个Agent(入门篇四)
  • [leetcode] C++ 并查集模板
  • 【机器学习笔记 Ⅱ】1 神经网络
  • 云原生 Serverless 架构下的智能弹性伸缩与成本优化实践
  • 基于HTML与Java的简易在线会议系统实现
  • Javaweb - 10.5 HttpServletRequest 和 HttpServletResponse
  • Flink ClickHouse 连接器维表源码深度解析
  • 【Note】《Kafka: The Definitive Guide》第四章:Kafka 消费者全面解析:如何从 Kafka 高效读取消息