当前位置: 首页 > news >正文

基于SpringBoot3集成Kafka集群

1. build.gradle依赖引入

implementation 'org.springframework.kafka:spring-kafka:3.2.0'

2. 新增kafka-log.yml文件

在resource/config下面新增kafka-log.yml,配置主题与消费者组

# Kafka消费者群组
kafka:consumer:group:log-data: log-data-grouptopic:log-data: log-data-topicauto-startup: false

加载自定义yml文件

@Configuration
public class YmlConfiguration {@Beanpublic PropertySourcesPlaceholderConfigurer properties() {PropertySourcesPlaceholderConfigurer configurer = new PropertySourcesPlaceholderConfigurer();YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();yaml.setResources(new ClassPathResource[]{new ClassPathResource("config/kafka-log.yml")});configurer.setProperties(yaml.getObject());return configurer;}
}

3. application.yml文件配置

spring:kafka:bootstrap-servers: 192.168.0.81:9092,192.168.0.82:9092,192.168.0.83:9092producer:retries: 0batch-size: 16384buffer-memory: 254288key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerauto-topic-creation:auto-create: trueproperties:linger.ms: 1session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: log-data-groupauto-offset-reset: latestproperties:session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"  password="your-password";
# 按需在不同环境配置值,如开发环境默认不启动
kafka:consumer:auto-startup: false

4. 生产者实现

@Service
@Slf4j
public class KafkaProducer {private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String data) {CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.whenComplete((sendResult, ex) -> {if (ex != null) {log.error("Kafka send message error = {}, topic = {}, data = {}", ex.getMessage(), topic, data);} else {// Handle the successful sendSystem.out.println("Message sent successfully: " + sendResult);}});}
}

5. 消费者实现

@Component
public class KafkaConsumerGroupManager {private KafkaAdmin kafkaAdmin;private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;public KafkaConsumerGroupManager(KafkaAdmin kafkaAdmin, ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {this.kafkaAdmin = kafkaAdmin;this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;}public void ensureConsumerGroupExists(String groupId) {try {// 获取 AdminClientAdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());// 检查消费者组是否存在ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();Collection<org.apache.kafka.clients.admin.ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();boolean groupExists = consumerGroupListings.stream().anyMatch(consumerGroupListing -> consumerGroupListing.groupId().equals(groupId));if (!groupExists) {// 如果不存在,则创建消费者组kafkaListenerContainerFactory.getContainerProperties().setGroupId(groupId);}} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Failed to check consumer group existence", e);}}
}
@Service
@Slf4j
public class KafkaConsumer {private ElasticsearchOperations elasticsearchOperations206;public KafkaConsumer(ElasticsearchOperations elasticsearchOperations206) {this.elasticsearchOperations206 = elasticsearchOperations206;}/*** 日志数据消费** @param message*/@KafkaListener(topics = {"${kafka.consumer.topic.log-data}"}, groupId = "${kafka.consumer.group.log-data}", autoStartup = "${kafka.consumer.auto-startup}")public void consumer(String message) {this.bulkIndexJsonData(message);}public void bulkIndexJsonData(String jsonData) {List<IndexQuery> queries = new ArrayList<>();IndexQuery query = new IndexQuery();query.setSource(jsonData);query.setOpType(IndexQuery.OpType.INDEX);queries.add(query);elasticsearchOperations206.bulkIndex(queries, IndexCoordinates.of("log"));}
}

OK, 至此完毕。在某次集群宕机后,我们发现日志无法查询,经排查,是因为最初配置了auto-offset-reset: earliest 导致从头开始重新消费,幸好ES做了幂等性处理

http://www.dtcms.com/a/276951.html

相关文章:

  • MongoDB性能优化实战指南:原理、实践与案例
  • 【设计模式】职责链模式(责任链模式) 行为型模式,纯与不纯的职责链模式
  • 前端框架状态管理对比:Redux、MobX、Vuex 等的优劣与选择
  • ALB、NLB、CLB 负载均衡深度剖析
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十二课——图像增强的FPGA实现
  • axios拦截器
  • spring cloud负载均衡分析之FeignBlockingLoadBalancerClient、BlockingLoadBalancerClient
  • 【Qt开发】Qt的背景介绍(一)
  • 时序预测 | Matlab代码实现VMD-TCN-GRU-MATT变分模态分解时间卷积门控循环单元多头注意力多变量时序预测
  • [特殊字符] Python自动化办公 | 3步实现Excel数据清洗与可视化,效率提升300%
  • 开源链动2+1模式、AI智能名片与S2B2C商城小程序在私域运营中的协同创新研究
  • 从零开始跑通3DGS教程:(五)3DGS训练
  • 《区间dp》
  • 一文读懂现代卷积神经网络—深度卷积神经网络(AlexNet)
  • 深入理解观察者模式:构建松耦合的交互系统
  • Redis技术笔记-从三大缓存问题到高可用集群落地实战
  • ESP-Timer入门(基于ESP-IDF-5.4)
  • JVM:内存、类加载与垃圾回收
  • 每天一个前端小知识 Day 30 - 前端文件处理与浏览器存储机制实践
  • [Rust 基础课程]选一个合适的 Rust 编辑器
  • 通用定时器GPT
  • 输入npm install后发生了什么
  • # 通过wifi共享打印机只有手动翻页正反打印没有自动翻页正反打印,而通过网线连接的主机电脑可以自动翻页正反打印
  • OneCode3.0 VFS分布式文件管理API速查手册
  • Codeforces Round 855 (Div. 3)
  • 【iOS】方法与消息底层分析
  • 动物世界一语乾坤韵芳华 人工智能应用大学毕业论文 -仙界AI——仙盟创梦IDE
  • Docker Compose文件内容解释
  • 鸿蒙选择本地视频文件,并获取首帧预览图
  • 14.ResourceMangaer启动解析