当前位置: 首页 > news >正文

Kafka在Springboot项目中的实践

目录

1、kafka原理介绍

1.1、Kafka的ack机制

1.2、kafka集群组成

2、原生kafka消费

3、SpringBoot集成


前言:

        在之前的介绍中,kafka里面的Topic可以进行多个分区:每个Partion内部是有顺序的、不可变的消息队列, 并且可以持续的添加。

        每个 partition 是一个 append-only 的日志文件,消息按写入顺序严格排序。

基于以上诸多因素:Kafka 适合高吞吐量和流式数据处理。

如下图所示:

关于更多kafka消费模式的介绍,可参考:

kafka消费的模式及消息积压处理方案_kafka消费的三种模式-CSDN博客文章浏览阅读1.1k次,点赞24次,收藏27次。 Kafka消费积压(Consumer Lag)是指消费者处理消息的速度跟不上生产者发送消息的速度,导致消息在Kafka主题中堆积。1. 如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数(提高kafka的并行度),同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)2. 若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。_kafka消费的三种模式 https://dyclt.blog.csdn.net/article/details/148747641?spm=1011.2415.3001.5331

关于kafka的原理介绍,可参考:

关于MQ之kafka的深入研究_mq kafaka-CSDN博客文章浏览阅读1.6k次,点赞22次,收藏15次。Kafka、RabbitMQ、RocketMQ 和 ActiveMQ 是流行的消息队列解决方案,它们在架构设计、性能、特性和适用场景上各有不同。Kafka 适合高吞吐量和流式数据处理,RabbitMQ 适合需要复杂路由和灵活性场景,RocketMQ 适用于高并发的应用场景,而 ActiveMQ 则适合企业级 Java 应用集成。_mq kafaka https://dyclt.blog.csdn.net/article/details/148535599?spm=1011.2415.3001.5331


1、kafka原理介绍

1.1、Kafka的ack机制

        producer在向kafka写入消息的时候,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all

0:

        代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效 率最高。


1:

        代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。


all:

        代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保 leader发送成功和所有的副本都完成备份。安全性最⾼高,但是效率最低。

⚠️注意:

        如果往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数量 默认配置都是1。

1.2、kafka集群组成

        当消息从producer推送给broker,消费者会从broker的leader节点去读取数据。每个broker对应的不同的节点。

如下所示:


2、原生kafka消费

1、pom 引入核心依赖

引入依赖时,尽量选择和 kafka 版本对应的依赖版本。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>4.0.0</version>
</dependency>

2、提供者客户端代码

  1. 设置提供者客户端属性(可选属性都被定义在 ProducerConfig 类中)
  2. 设置要发送的消息
  3. 发送(有三种发送方式,下面代码中都有)

代码如下所示:

public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 第一步:设置提供者属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 第二步:设置要发送的消息ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");// 第三部:发送消息// send(producer, record);// sendSync(producer, record);sendASync(producer, record);}}/*** 发送方式1:单向推送,不关心服务器的应答*/private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {producer.send(record);}/*** 发送方式2:同步推送,得到服务器的应答前会阻塞当前线程*/private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {RecordMetadata metadata = producer.send(record).get();System.out.println(metadata.topic());System.out.println(metadata.partition());System.out.println(metadata.offset());}/*** 发送方式3:异步推送,不需等待服务器应答,当服务器有应答后会触发函数回调*/private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {producer.send(record, (metadata, exception) -> {if (exception != null) {throw new RuntimeException("向 kafka 推送失败", exception);}System.out.println(metadata.topic());System.out.println(metadata.partition());System.out.println(metadata.offset());});}
}

3、消费者客户端代码:

消费者客户端要做三件事:

  1. 设置消费者客户端属性(可选属性都被定义在 ConsumerConfig 类中)
  2. 设置消费者订阅的主题
  3. 拉取消息
  4. 提交 offset(有两种提交方式,下面代码中都有)
public class MyConsumer {public static void main(String[] args) {// 第一步:设置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {// 第二步:设置要订阅的主题consumer.subscribe(Collections.singletonList("testTopic"));while (true) {// 第三步:拉取消息,100 代表最大等待时间,如果时间到了还没有拉取到消息就不阻塞了继续往后执行ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}// 第四步:提交 offset// consumer.commitSync(); // 同步提交,表示必须等到 offset 提交完毕,再去消费下⼀批数据consumer.commitSync(); // 异步提交,表示发送完提交 offset 请求后,就开始消费下⼀批数据了。不⽤等到Broker的确认。}}}
}


3、SpringBoot集成

springboot 版本是最常用的,比原生客户端使用方便。但是道理是一样的,底层也是原生客户端。

1、pom引入依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.0</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

2、yaml 配置文件

把原生客户端中的属性配置,配置在 yaml 中。

如下所示:

spring:kafka:bootstrap-servers: 192.168.2.28:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: testGroupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、客户端代码

  1. 注入 KafkaTemplate
  2. 发送
@RestController
public class ProducerController {/*** kafka*/private KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@GetMapping("/test")public void send() {// 发送 kafka 消息kafkaTemplate.send("testTopic", "testKey", "testValue");}}

4、消费者

只需要监听主题就可以,如下所示:

@RestController
public class ConsumerController {// 监听 kafka 消息@KafkaListener(topics = {"testTopic"})public void test(ConsumerRecord<?, ?> record) {System.out.println(record.value());}
}


总结

        Kafka 是一个强大的分布式流处理平台,凭借其 高吞吐量持久化水平扩展 和 实时处理能力,成为大数据和实时系统的核心组件。


参考文章:

1、Kafka 在 java 中的基本使用_java kafka-CSDN博客文章浏览阅读973次,点赞5次,收藏5次。Kafka 在 java 中的基本使用、原生客户端、集成 springboot_java kafka https://blog.csdn.net/ougaii_/article/details/146944003?spm=1000.2115.3001.10526&utm_medium=distribute.pc_feed_blog_category.none-task-blog-classify_tag-19-146944003-null-null.nonecase&depth_1-utm_source=distribute.pc_feed_blog_category.none-task-blog-classify_tag-19-146944003-null-null.nonecase

http://www.dtcms.com/a/307529.html

相关文章:

  • vue3.0 + TypeScript 中使用 axios 同时进行二次封装
  • ESXI虚拟交换机 + H3C S5120交换机 + GR5200路由器组网笔记
  • 数据结构与算法:队列的表示和操作的实现
  • Linux 下自动化脚本安装Jdk、Nginx等软件
  • Java语言/Netty框架的新能源汽车充电桩系统平台
  • 《人工智能导论》(python版)第2章 python基础2.2编程基础
  • Rust视频处理开源项目精选
  • FFmpegHandler 功能解析,C语言程序化设计与C++面向对象设计的核心差异
  • 【日常问题解决方案】VS2022不小心解决方案资源管理器把关掉了怎么办
  • spring cloud alibaba ——gateway网关
  • Day36| 1049. 最后一块石头的重量 II、494.目标和、474.一和零
  • 图论-最短路Dijkstra算法
  • 澳交所技术重构窗口开启,中资科技企业如何破局?——从ASX清算系统转型看跨境金融基础设施的赋能路径
  • Python爬虫07_Requests爬取图片
  • 基于Spring Boot实现中医医学处方管理实践
  • 【05】大恒相机SDK C#开发 —— Winform中采集图像并显示
  • 金融分类提示词演示
  • 【03】大恒相机SDK C#开发 —— 回调采集图像,关闭相机
  • STM32学习记录--Day4
  • TOC-Transformer-LSTM-ABKDE,计算机一区算法龙卷风优化算法应用到概率区间预测!Matlab实现
  • 九识智能与星逻智能达成战略合作,共推“无人车 + 无人机”空地一体巡检升级
  • Java中的“Dead Code”
  • 基于 Amazon Nova Sonic 和 MCP 构建语音交互 Agent
  • set_max_delay为何失效了?
  • Python爬虫06_Requests政府采购严重违法失信行为信息记录爬取
  • 全栈:怎么把IDEA和Maven集成一下?
  • 【盘古100Pro+开发板实验例程】FPGA学习 | 基于紫光 FPGA 的键控 LED 流水灯
  • 水库泄洪声光电监测预警系统解决方案
  • Kubernetes (K8s) 部署资源的完整配置OceanBase
  • sqli-labs:Less-13关卡详细解析