Kafka客户端整合
本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。
文章目录
- 前言
- 一、传统客户端整合
- 1.1、生产者
- 1.2、消费者
- 二、Spring Boot 整合Kafka
前言
在前两篇中,初步介绍了Kafka的服务器相关操作,本篇介绍Kafka与JAVA进行整合。
一、传统客户端整合
这里的客户端,指的是相比于运行了Kafka进程的服务端,生产者和消费者,都是属于客户端的。
首先需要引入依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>
1.1、生产者
生产者的编程模型:
- 设置基本属性
- 设置Kafka服务器的信息,并进行参数的配置,常见的配置有序列化和反序列化器
- 创建生产者KafkaProducer
- 创建ProducerRecord,构建消息,需要指定Topic
- 利用KafkaProducer的send方法发送消息,并且可以获取消息发送的结果(可选,支持同步和异步回调)
- 关闭KafkaProducer(可以在创建KafkaProducer时利用try…with…resource,因为KafkaProducer继承了Closeable接口)
public class MyProducer {private static final String BOOTSTRAP_SERVERS ="服务器1:9092,服务器2:9092,服务器3:9092";private static final String TOPIC = "test-cluster-topic";public static void main(String[] args) throws InterruptedException, ExecutionException {//设置发送者的基本属性Properties props = new Properties();//设置kafka服务器的信息props.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//配置key 和 value的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//创建生产者Producer<String, String> producer = new KafkaProducer<>(props);CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {//构建消息 需要设置topicProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>(TOPIC, Integer.toString(i), "test" + i);//发送消息,分为单向发送 和 接收返回值
// producer.send(stringStringProducerRecord);//接收返回值 也分为同步发送和异步发送//同步发送,在接收到返回之前,会阻塞当前线程RecordMetadata recordMetadata = producer.send(stringStringProducerRecord).get();
// int partition = recordMetadata.partition();
// String topic = recordMetadata.topic();
// long offset = recordMetadata.offset();
// String message = recordMetadata.toString()
//
// System.out.println(partition + " " + topic + " " + offset + " " + message);//异步发送,不阻塞,使用回调函数接收producer.send(stringStringProducerRecord, (metadata, exception) -> {if (exception != null) {System.out.println("消息发送错误!" + exception.getMessage());exception.printStackTrace();} else {int partition1 = recordMetadata.partition();String topic1 = recordMetadata.topic();long offset1 = recordMetadata.offset();String message1 = recordMetadata.toString();System.out.println(partition1 + " " + topic1 + " " + offset1 + " " + message1);}//处理完成后 countDownLatch - 1countDownLatch.countDown();});}countDownLatch.await();producer.close();}
}
1.2、消费者
消费者的编程模型:
- 设置基本属性
- 设置Kafka服务器的信息,并进行参数的配置,常见的配置有序列化和反序列化器
- 指定消费者组名
- 创建消费者KafkaConsumer
- KafkaConsumer订阅Topic
- 利用KafkaConsumer的poll从服务器拉取消息
- 利用KafkaConsumer的commitAsync或commitSync提交偏移量,前者是异步提交,后者是同步提交
/*** 消费者端*/
public class MyConsumer {private static final String BOOTSTRAP_SERVERS ="服务器1:9092,服务器2:9092,服务器3:9092";private static final String TOPIC = "test-cluster-topic";public static void main(String[] args) {//1.设置基本属性Properties props = new Properties();//2.设置kafka服务器的信息props.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//3.消费者需要设置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//创建客户端Consumer<String, String> consumer = new KafkaConsumer<>(props);//订阅Topicconsumer.subscribe(Collections.singleton(TOPIC));//接收消息while (true) {//超时时间1000msConsumerRecords<String, String> records = consumer.poll(1000);records.forEach(record -> {System.out.println(record.offset() + " " + record.key() + " " + record.value());});//提交offsetconsumer.commitAsync();}}
}
从上述的整合过程中,可以发现几点:
- 生产者和消费者不关心具体的Partition,而是只关注Topic这一个逻辑存储单元,生产者的消息到达服务器后,如何分配给具体的Partition存储结构,以及消费者从哪一个Partition上拉取消息,都是服务端内部实现的细节。
- 服务端对于offset偏移量,并非是自己控制,而是交给客户端去控制。为什么要这样设计?因为偏移量是随着用户处理消息而变更,用户在处理业务的过程中本身就有很多不确定性,并且业务场景也不同,服务端无法预判到客户端所有的业务场景。假设有一个业务场景,
数据分析任务,需要反复读取同一时间段的消息做离线计算,那么交给服务端去控制offset,一定是没有客户端自主去控制灵活的。如果需要适配所有客户端的消费场景(重放、事务、多线程等),服务端复杂度飙升,性能和稳定性下降。但是将控制offset的权限交给客户端,服务端不能控制客户端的行为,仅仅是提供了一个API给客户端,那客户端可以选择调用,也可以选择不调用,同时在调用的过程中,还有可能存在问题。所以服务端在offset的控制上,必定是有一个兜底方案的。 - 生产者发送消息,可以是像案例中,在循环中单条发送的,但是消费者接收消息,通过poll拉取到的却是一个实现了迭代器的可迭代的集合。那生产者发送消息,是支持批量缓存,然后在同一批次中发送到服务端的,就比如Redis的pipeline,或者MySQL的批量插入。

二、Spring Boot 整合Kafka
Spring Boot整合Kafka,比起前者要简单的多,首先依旧要引入依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
生产者端和消费者端只需要通过注解即可:
@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送消息@GetMapping("/kafka/normal/{message}")public void sendMessage1(@PathVariable("message") String normalMessage) {kafkaTemplate.send("test-cluster-topic", normalMessage);}
}
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"test-cluster-topic"})public void onMessage1(ConsumerRecord<?, ?> record) {// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "-" + record.value());}
}
但是配置文件中需要进行配置,相当于将原生案例中的写在代码中的Properties相关配置,放在了配置文件中:
spring.application.name=my-kafka-demo
spring.kafka.bootstrap-servers=服务器1:9092,服务器2:9092,服务器3:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=test
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


