3、Configuring Topics
如果您在应用程序上下文中定义了KafkaAdmin bean,它可以自动向代理添加主题。为此,您可以将每个主题的NewTopic@Bean添加到应用程序上下文中。2.3版本引入了一个新的类TopicBuilder,使创建此类bean更加方便。以下示例显示了如何执行此操作:
@Bean
public KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");return new KafkaAdmin(configs);
}@Bean
public NewTopic topic1() {return TopicBuilder.name("thing1").partitions(10).replicas(3).compact().build();
}@Bean
public NewTopic topic2() {return TopicBuilder.name("thing2").partitions(10).replicas(3).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}@Bean
public NewTopic topic3() {return TopicBuilder.name("thing3").assignReplicas(0, List.of(0, 1)).assignReplicas(1, List.of(1, 2)).assignReplicas(2, List.of(2, 0)).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}
从2.6版本开始,您可以省略partitions()和/或replias(),代理默认值将应用于这些属性。代理版本必须至少为2.4.0才能支持此功能-请参阅KIP-464。
@Bean
public NewTopic topic4() {return TopicBuilder.name("defaultBoth").build();
}@Bean
public NewTopic topic5() {return TopicBuilder.name("defaultPart").replicas(1).build();
}@Bean
public NewTopic topic6() {return TopicBuilder.name("defaultRepl").partitions(3).build();
}
从2.7版本开始,您可以在一个KafkaAdmin中声明多个NewTopics。NewTopics bean定义:
@Bean
public KafkaAdmin.NewTopics topics456() {return new NewTopics(TopicBuilder.name("defaultBoth").build(),TopicBuilder.name("defaultPart").replicas(1).build(),TopicBuilder.name("defaultRepl").partitions(3).build());
}
使用Spring Boot时,KafkaAdmin bean会自动注册,因此您只需要NewTopic(和/或NewTopics)@Beans。
默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。您可以通过编程调用管理员的initialize()方法,稍后重试。如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable属性设置为true。随后,上下文初始化失败。
如果代理支持它(1.0.0或更高版本),如果发现现有主题的分区数少于NewTopic.numPartitions,管理员会增加分区数。
从2.7版本开始,KafkaAdmin提供了在运行时创建和检查主题的方法。
创建或修改主题
描述主题
对于更高级的功能,您可以直接使用AdminClient。以下示例显示了如何执行此操作:
@Autowired
private KafkaAdmin admin;...AdminClient client = AdminClient.create(admin.getConfigurationProperties());...client.close();
从2.9.10、3.0.9版本开始,您可以提供一个Predicate<NewTopic>,用于确定是否应考虑创建或修改特定的NewTopic bean。例如,如果您有多个指向不同集群的KafkaAdmin实例,并且希望选择应由每个管理员创建或修改的主题,则这很有用。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));