当前位置: 首页 > news >正文

Kafka客户端整合

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。

文章目录

  • 前言
  • 一、传统客户端整合
    • 1.1、生产者
    • 1.2、消费者
  • 二、Spring Boot 整合Kafka


前言

  在前两篇中,初步介绍了Kafka的服务器相关操作,本篇介绍Kafka与JAVA进行整合。

一、传统客户端整合

  这里的客户端,指的是相比于运行了Kafka进程的服务端,生产者消费者,都是属于客户端的。
  首先需要引入依赖:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

1.1、生产者

  生产者的编程模型:

  1. 设置基本属性
  2. 设置Kafka服务器的信息,并进行参数的配置,常见的配置有序列化和反序列化器
  3. 创建生产者KafkaProducer
  4. 创建ProducerRecord,构建消息,需要指定Topic
  5. 利用KafkaProducer的send方法发送消息,并且可以获取消息发送的结果(可选,支持同步和异步回调)
  6. 关闭KafkaProducer(可以在创建KafkaProducer时利用try…with…resource,因为KafkaProducer继承了Closeable接口)
public class MyProducer {private static final String BOOTSTRAP_SERVERS ="服务器1:9092,服务器2:9092,服务器3:9092";private static final String TOPIC = "test-cluster-topic";public static void main(String[] args) throws InterruptedException, ExecutionException {//设置发送者的基本属性Properties props = new Properties();//设置kafka服务器的信息props.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//配置key 和 value的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//创建生产者Producer<String, String> producer = new KafkaProducer<>(props);CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {//构建消息  需要设置topicProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>(TOPIC, Integer.toString(i), "test" + i);//发送消息,分为单向发送 和 接收返回值
//            producer.send(stringStringProducerRecord);//接收返回值 也分为同步发送和异步发送//同步发送,在接收到返回之前,会阻塞当前线程RecordMetadata recordMetadata = producer.send(stringStringProducerRecord).get();
//            int partition = recordMetadata.partition();
//            String topic = recordMetadata.topic();
//            long offset = recordMetadata.offset();
//            String message = recordMetadata.toString()
//
//            System.out.println(partition + " " + topic + " " + offset + " " + message);//异步发送,不阻塞,使用回调函数接收producer.send(stringStringProducerRecord, (metadata, exception) -> {if (exception != null) {System.out.println("消息发送错误!" + exception.getMessage());exception.printStackTrace();} else {int partition1 = recordMetadata.partition();String topic1 = recordMetadata.topic();long offset1 = recordMetadata.offset();String message1 = recordMetadata.toString();System.out.println(partition1 + " " + topic1 + " " + offset1 + " " + message1);}//处理完成后 countDownLatch - 1countDownLatch.countDown();});}countDownLatch.await();producer.close();}
}

1.2、消费者

  消费者的编程模型:

  1. 设置基本属性
  2. 设置Kafka服务器的信息,并进行参数的配置,常见的配置有序列化和反序列化器
  3. 指定消费者组名
  4. 创建消费者KafkaConsumer
  5. KafkaConsumer订阅Topic
  6. 利用KafkaConsumer的poll从服务器拉取消息
  7. 利用KafkaConsumer的commitAsync或commitSync提交偏移量,前者是异步提交,后者是同步提交
/*** 消费者端*/
public class MyConsumer {private static final String BOOTSTRAP_SERVERS ="服务器1:9092,服务器2:9092,服务器3:9092";private static final String TOPIC = "test-cluster-topic";public static void main(String[] args) {//1.设置基本属性Properties props = new Properties();//2.设置kafka服务器的信息props.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//3.消费者需要设置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//创建客户端Consumer<String, String> consumer = new KafkaConsumer<>(props);//订阅Topicconsumer.subscribe(Collections.singleton(TOPIC));//接收消息while (true) {//超时时间1000msConsumerRecords<String, String> records = consumer.poll(1000);records.forEach(record -> {System.out.println(record.offset() + " " + record.key() + " " + record.value());});//提交offsetconsumer.commitAsync();}}
}

  从上述的整合过程中,可以发现几点:

  • 生产者和消费者不关心具体的Partition,而是只关注Topic这一个逻辑存储单元,生产者的消息到达服务器后,如何分配给具体的Partition存储结构,以及消费者从哪一个Partition上拉取消息,都是服务端内部实现的细节。
  • 服务端对于offset偏移量,并非是自己控制,而是交给客户端去控制。为什么要这样设计?因为偏移量是随着用户处理消息而变更,用户在处理业务的过程中本身就有很多不确定性,并且业务场景也不同,服务端无法预判到客户端所有的业务场景。假设有一个业务场景,数据分析任务,需要反复读取同一时间段的消息做离线计算,那么交给服务端去控制offset,一定是没有客户端自主去控制灵活的。如果需要适配所有客户端的消费场景(重放、事务、多线程等),服务端复杂度飙升,性能和稳定性下降。但是将控制offset的权限交给客户端,服务端不能控制客户端的行为,仅仅是提供了一个API给客户端,那客户端可以选择调用,也可以选择不调用,同时在调用的过程中,还有可能存在问题。所以服务端在offset的控制上,必定是有一个兜底方案的。
  • 生产者发送消息,可以是像案例中,在循环中单条发送的,但是消费者接收消息,通过poll拉取到的却是一个实现了迭代器的可迭代的集合。那生产者发送消息,是支持批量缓存,然后在同一批次中发送到服务端的,就比如Redis的pipeline,或者MySQL的批量插入。
    在这里插入图片描述

二、Spring Boot 整合Kafka

  Spring Boot整合Kafka,比起前者要简单的多,首先依旧要引入依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

  生产者端和消费者端只需要通过注解即可:

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送消息@GetMapping("/kafka/normal/{message}")public void sendMessage1(@PathVariable("message") String normalMessage) {kafkaTemplate.send("test-cluster-topic", normalMessage);}
}
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"test-cluster-topic"})public void onMessage1(ConsumerRecord<?, ?> record) {// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "-" + record.value());}
}

  但是配置文件中需要进行配置,相当于将原生案例中的写在代码中的Properties相关配置,放在了配置文件中:

spring.application.name=my-kafka-demo
spring.kafka.bootstrap-servers=服务器1:9092,服务器2:9092,服务器3:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=test
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

在这里插入图片描述
在这里插入图片描述

http://www.dtcms.com/a/618274.html

相关文章:

  • 购物网站建设方案手机建立网站的软件
  • 力扣hot100----1day
  • 二叉树的前序遍历解题思路
  • python手写数字识别计分系统+CNN模型+YOLOv5模型 深度学习 计算机毕业设计(建议收藏)✅
  • 网站服务器租赁价格上海低价网站建设
  • 基于Python房价预测系统 数据分析 Flask框架 爬虫 随机森林回归预测模型、链家二手房 可视化大屏 大数据毕业设计(附源码)✅
  • linux服务-tomcat原理与安装
  • Kotlinx.serialization 对多态对象(sealed class )支持更好用
  • ArkTS接口与泛型在HarmonyOS应用开发中的深度应用
  • 4.4 跨越文本边界!多模态Agent开发实战,视觉+语言融合的新可能
  • 【数据结构】从零开始认识B树 --- 高效外查找的数据结构
  • 东莞seo网站排名优化建立外贸网站多少钱
  • 有没有什么做地堆的网站wordpress 文章摘要字数
  • stateflow和shareflow的区别
  • Qt QLibrary程序在运行时加载外部库
  • 电线电缆做销售哪个网站好海南哪家公司做网站做的好
  • 做it题的网站知名网站欣赏
  • 番禺做网站哪家强网站定位方案
  • 当AI学会叠衣服,我们才会真正需要它
  • Python中的输出函数
  • flash网站制作下载网站可以备案先提交类别后来改么
  • Maya 集成 pycharm(下载devkit、设置python运行环境、安装mayacharm插件、设置debug的配置)
  • AI工具在CTF中的战术应用
  • 乐清做网站建设行业管理信息系统官网
  • Rust 异步编程深度解析:从 Future 到运行时
  • Streaming ELT with Flink CDC · OceanBase Sink
  • 环境变量与地址
  • C/C++爱心①
  • 7.4、Python-变量的作用域
  • 英文专业的网站建设网站设计建设流程