当前位置: 首页 > news >正文

Kafka 学习笔记

文章目录

  • 一、消息队列应用场景
  • 二、Kafka 集群搭建
  • 三、消息测试
  • 四、Java API
  • 五、Kafka 相关概念
  • 六、Kafka 幂等性
    • 6.1 Kafka 幂等性的背景
    • 6.2 Kafka 幂等性机制
  • 七、分区和副本
    • 7.1 生产者分区写入策略
      • 1️⃣ 轮询分区策略
      • 2️⃣ 随机分区策略
      • 3️⃣ 按 Key 分区策略
      • 4️⃣ 自定义分区策略
    • 7.2 Rebalance 机制
    • 7.3 消费者分区消费策略
      • 1️⃣ RangeAssignor
      • 2️⃣ RoundRobinAssignor
      • 3️⃣ StickyAssignor
    • 7.4 副本的 ACK 机制


一、消息队列应用场景

  • 异步处理:当系统中某些任务耗时较长(如发送邮件、图片处理、视频转码、接口调用等),可以将任务发送到消息队列中,由后台异步处理,提高主流程的响应速度和系统吞吐量。如:用户注册后发送欢迎邮件,将邮件发送任务放入队列,由专门的邮件服务异步处理。
  • 系统解耦:消息队列作为中间层,使得各系统或模块之间无需直接调用,通过消息传递实现耦合的最小化,便于系统的扩展与维护。如:电商平台下单后,订单系统、库存系统、物流系统通过消息队列通信,避免模块之间强依赖。
  • 流量削峰(削峰填谷):在高并发场景下,将突发的请求流量写入队列中,后台服务按自身处理能力从队列中逐步拉取处理,避免系统瞬时压力过大导致崩溃。如:秒杀活动中,用户下单请求进入队列,后端按顺序处理订单,避免数据库或服务被瞬间打垮。
  • 日志处理:将系统产生的日志消息发送到消息队列中,由专门的日志消费程序进行存储、分析或异步处理,提高主系统性能,便于实现日志中心化管理。如:Web 服务将访问日志发送到 Kafka,由日志系统统一消费并存入 Elasticsearch。

二、Kafka 集群搭建

为了简化服务配置,这里通过 docker 构建一个包含 3个 Kafka Broker + 1个 ZooKeeper 的集群,docker-compose.yml 配置文件:

services:zookeeper:image: zookeeper:3.7container_name: zookeeperports:- "2181:2181"environment:ZOO_MY_ID: 1ZOO_PORT: 2181kafka1:image: confluentinc/cp-kafka:latestcontainer_name: kafka1ports:- "9091:9091"depends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.137:9091KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTkafka2:image: confluentinc/cp-kafka:latestcontainer_name: kafka2ports:- "9092:9092"depends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.137:9092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTkafka3:image: confluentinc/cp-kafka:latestcontainer_name: kafka3ports:- "9093:9093"depends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 3KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.137:9093KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTkafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uiports:- "9090:8080"depends_on:- kafka1- kafka2- kafka3environment:- KAFKA_CLUSTERS_0_NAME=local-kafka- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9091,kafka2:9092,kafka3:9093

启动集群:

PS C:\Users\atreus\DockerProjects\KafkaCluster> docker-compose up -d
[+] Running 5/5✔ Container zookeeper  Started                                                                                                                                                        0.2s ✔ Container kafka1     Started                                                                                                                                                        0.6s ✔ Container kafka3     Started                                                                                                                                                        0.4s ✔ Container kafka2     Started                                                                                                                                                        0.5s ✔ Container kafka-ui   Started                                                                                                                                                        0.8s 
PS C:\Users\atreus\DockerProjects\KafkaCluster> docker ps           
CONTAINER ID   IMAGE                           COMMAND                   CREATED          STATUS          PORTS                                                  NAMES
c478dfa1cd7b   provectuslabs/kafka-ui:latest   "/bin/sh -c 'java --…"   11 minutes ago   Up 11 seconds   0.0.0.0:9090->8080/tcp                                 kafka-ui
4438f682c119   confluentinc/cp-kafka:latest    "/etc/confluent/dock…"   3 hours ago      Up 11 seconds   0.0.0.0:9092->9092/tcp                                 kafka2
23ddef90230d   confluentinc/cp-kafka:latest    "/etc/confluent/dock…"   3 hours ago      Up 11 seconds   9092/tcp, 0.0.0.0:9093->9093/tcp                       kafka3
3f412db6a68d   confluentinc/cp-kafka:latest    "/etc/confluent/dock…"   3 hours ago      Up 11 seconds   0.0.0.0:9091->9091/tcp, 9092/tcp                       kafka1
927d4bc46bc0   zookeeper:3.7                   "/docker-entrypoint.…"   3 hours ago      Up 12 seconds   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
PS C:\Users\atreus\DockerProjects\KafkaCluster> docker exec -it zookeeper bash  
root@927d4bc46bc0:/apache-zookeeper-3.7.2-bin# bin/zkCli.sh -server localhost:2181
Connecting to localhost:2181
...
Welcome to ZooKeeper!
...
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://3f412db6a68d:9091"],"jmx_port":-1,"port":9091,"host":"3f412db6a68d","version":5,"timestamp":"1748079296428"}
[zk: localhost:2181(CONNECTED) 2]

ls /brokers/ids 输出 [1, 2, 3] 表明三个 Kafka 应用已经启动成功。

在这里插入图片描述

关闭集群:

PS C:\Users\atreus\DockerProjects\KafkaCluster> docker-compose stop
[+] Stopping 5/5✔ Container kafka-ui   Stopped                                                                                                                                                        2.6s ✔ Container kafka2     Stopped                                                                                                                                                       10.5s ✔ Container kafka1     Stopped                                                                                                                                                        1.5s ✔ Container kafka3     Stopped                                                                                                                                                        1.7s ✔ Container zookeeper  Stopped                                                                                                                                                        0.8s 
PS C:\Users\atreus\DockerProjects\KafkaCluster> 

三、消息测试

创建 topic:

[appuser@3f412db6a68d bin]$ kafka-topics --create --bootstrap-server localhost:9091 --topic test_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_topic.
[appuser@3f412db6a68d bin]$ sh /bin/kafka-topics --list --bootstrap-server localhost:9091
test_topic
[appuser@3f412db6a68d bin]$ 

生产消息:

sh /bin/kafka-console-producer --broker-list localhost:9091 --topic test_topic

消费消息:

sh /bin/kafka-console-consumer --bootstrap-server localhost:9091 --topic test_topic --from-beginning

四、Java API

生产者:

public class KafkaProducerTest {private static final Logger log = LoggerFactory.getLogger(KafkaProducerTest.class);public static void main(String[] args) throws ExecutionException, InterruptedException {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9091");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", Integer.toString(i), now);Future<RecordMetadata> future = producer.send(record);future.get();log.info("Sent record: " + record);}// 关闭生产者producer.close();}
}

消费者:

public class KafkaConsumerTest {private static final Logger log = LoggerFactory.getLogger(KafkaConsumerTest.class);public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9091");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 接收消息consumer.subscribe(Arrays.asList("test_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {log.info(record.toString());}}}
}

五、Kafka 相关概念

概念说明
BrokerKafka 集群中的一个服务器节点,负责接收、存储和转发消息。每个 Broker 可以处理多个 Topic 的数据。Kafka 集群通常包含多个 Broker 实现负载均衡和容错。
Producer消息的发送者,负责将数据发送到指定的 Topic。支持同步/异步、分区策略、批量发送等功能。
Consumer消息的接收者,从 Topic 中拉取消息进行处理。通常属于某个 Consumer Group,以支持并行消费。
TopicKafka 中消息的逻辑分类,每条消息必须写入某个 Topic。一个 Topic 可以包含多个 Partition,用于分布式扩展。
PartitionTopic 的物理划分,一个 Partition 是一个有序的、不可变的消息序列。消息在分区中通过 Offset 定位。Partition 的数量影响并发度和吞吐量。
Replica分区的副本。用于高可用性,每个 Partition 有一个 leader 和多个 follower。leader 处理读写请求,follower 进行数据同步,客户端只与 Leader 通信。
Consumer Group消费者组。一个组内的多个 Consumer 共同消费一个 Topic 的不同 Partition。实现负载均衡和容错。每个 Partition 在同一个 Consumer Group 内只能被一个 Consumer 消费,但一个 Consumer 可以消费多个 Partition
Offset消息在分区中的唯一编号,表示消费位置。消费者使用 offset 记录进度,可以手动或自动提交。Kafka 默认不会删除已消费的数据,而是根据保留策略保留一段时间。

在这里插入图片描述
图片来自:https://blog.csdn.net/qq_45076180/article/details/111344822


六、Kafka 幂等性

Kafka 的幂等性是指 即使 producer 发生重试,也能保证消息只会被写入一次(Exactly Once),不会出现重复数据。

6.1 Kafka 幂等性的背景

在以下场景中,Kafka Producer 可能会重复发送消息:

  • 由于网络抖动或超时,producer 没有收到 ack,主动重发。
  • Leader 崩溃,producer 发送到了新的 leader。
  • Producer 重启,重复发送缓冲区中的消息。

默认情况下,Kafka 的 Producer 是 at least once(至少一次)语义,可能会出现重复消息。

6.2 Kafka 幂等性机制

Producer 会为每个消息自动带上:

  • PID(Producer ID)标识某个 producer 实例。
  • Sequence Number 标识消息在当前 partition 中的顺序。
  • Broker 会为每个 (PID, Partition) 记录最后一个成功的 Sequence Number
  • 重试发送时,如果发现 sequence number 是重复的,就丢弃消息,不写入

开启幂等性只需设置:

props.put("enable.idempotence", "true");

七、分区和副本

7.1 生产者分区写入策略

Kafka 中的每个 Topic 可以被划分为多个 Partition(分区),Producer 在发送消息时需要决定消息写入哪个分区。Kafka 提供了多种分区策略,常见的有以下几种:

1️⃣ 轮询分区策略

  • 特点:默认策略(当 key 为空时),Producer 会将消息轮流发送到各个分区,实现负载均衡
  • 使用场景:不需要按照某个业务 key 做分区,只需要均匀分配消息。
  • 缺点:同一类型的消息可能被写入不同分区,消息有序性无法保证
new ProducerRecord<>("topic", null, value);  // key 为 null

2️⃣ 随机分区策略

  • 特点:Kafka 在早期版本支持的方式,Producer 会随机选择一个分区。现代版本中,如果设置了 key 为 null,通常实际使用的是轮询策略。
  • 缺点:随机性导致分布不均衡的可能性较大。

3️⃣ 按 Key 分区策略

  • 特点:Kafka 使用 key 的 hash 值对分区数取模进行分区分配:partition = hash(key) % numPartitions
  • 优点:确保相同 key 的消息总是发送到相同的分区,从而可以保证有序性
  • 缺点:可能会导致分区数据倾斜(分区过热)。同时由于 Kafka 的消费是以分区为单位的,一个分区在一个时刻只能被一个 Consumer 消费,因此按 Key 分区也会限制消息并行消费能力。
  • 使用场景:订单、用户行为等需要按业务 key 保序的场景。
new ProducerRecord<>("topic", "key", value);  // 相同 key 的消息落在同一分区

4️⃣ 自定义分区策略

  • 特点:开发者可以实现 org.apache.kafka.clients.producer.Partitioner 接口,定义自己的分区规则。
  • 使用场景:需要复杂逻辑(如动态路由、权重、地理位置等)来决定消息投递的分区。

示例:

public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, Object value, int numPartitions) {// 根据 key 或 value 实现自定义逻辑if (key.toString().startsWith("vip")) {return 0;  // vip 用户全部写入分区 0} else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

然后在配置中指定:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "atreus.ink.MyCustomPartitioner");

7.2 Rebalance 机制

在 Kafka 中,一个 Topic 通常被划分为多个 Partition,而一个 Consumer Group 中的多个消费者实例会分配到不同的分区进行消费。为了确保消费的负载均衡和分区的唯一消费(即一个分区同一时刻只能被一个消费者消费),Kafka 使用 Rebalance 来动态分配分区。Rebalance 期间所有消费者会暂停拉取数据,会导致短暂的消费中断,对于低延迟要求的系统,这是不可忽略的开销。

Kafka 中的 Rebalance 通常会在以下几种情况发生:

  • Consumer Group 中有新 Consumer 加入。
  • 已有 Consumer 宕机或主动退出。
  • 订阅的 Topic 数量变化。
  • Topic 中 Partition 数量发生变化(如扩容)。
  • 手动触发,如调用 unsubscribe()subscribe() 重新订阅 Topic。

7.3 消费者分区消费策略

1️⃣ RangeAssignor

每个消费者按 Topic 排序后顺序分配一段连续的分区。

C1 -> 分区 0, 1, 2
C2 -> 分区 3, 4, 5

2️⃣ RoundRobinAssignor

将所有 Topic 的分区合并为一个列表,然后按轮询方式分配给消费者。

C1 -> 分区 0, 2, 4
C2 -> 分区 1, 3, 5

3️⃣ StickyAssignor

基于 RoundRobin 的改进,尽可能维持上次分配结果,保持分区与消费者的粘性关系,尽量减少分区迁移,从而减少因频繁 Rebalance 导致的消费中断。

7.4 副本的 ACK 机制

生产者在发送消息时,可以设置 acks 参数来控制 Kafka 何时认为消息写入成功:

acks含义优点风险或劣势
0不等待任何 Broker 的确认,发送后即认为成功。最高吞吐,最低延迟无法保证消息是否成功写入
1只等待 Leader 副本写入成功即可返回 ACK。较快,适度可靠如果 Leader 写入后崩溃,Follower 可能未同步
-1all等待所有 ISR(in-sync replicas)中的副本都写入成功后才 ACK。最强可靠性,避免数据丢失延迟较高,吞吐相对较低

文章转载自:

http://NdYAxSQG.fqpgf.cn
http://Mh7iS4wU.fqpgf.cn
http://2mp8qgiR.fqpgf.cn
http://8giTeWjw.fqpgf.cn
http://HM7Ohjcy.fqpgf.cn
http://SffMKxTH.fqpgf.cn
http://aRVdqVT0.fqpgf.cn
http://qhopbKIT.fqpgf.cn
http://4SAXuAx6.fqpgf.cn
http://1MWhUjRP.fqpgf.cn
http://t5Fgwl3p.fqpgf.cn
http://hmyfDLvU.fqpgf.cn
http://gK8zNwHQ.fqpgf.cn
http://dCRLxmPh.fqpgf.cn
http://j2RgbsUv.fqpgf.cn
http://LhPXp7w3.fqpgf.cn
http://bACONDQu.fqpgf.cn
http://g7zy62Hm.fqpgf.cn
http://nF4YF9TI.fqpgf.cn
http://Sn6TQFLj.fqpgf.cn
http://CQlU8u6h.fqpgf.cn
http://3jsa5KJh.fqpgf.cn
http://x0GA5KCh.fqpgf.cn
http://Fj71hR32.fqpgf.cn
http://AfoV2Uns.fqpgf.cn
http://7mjRfyef.fqpgf.cn
http://rchDFCBu.fqpgf.cn
http://EQplraqL.fqpgf.cn
http://1SzCC6E5.fqpgf.cn
http://1WwctrtR.fqpgf.cn
http://www.dtcms.com/a/379466.html

相关文章:

  • 机械零件极限应力线图
  • 萤石安全生产监管解决方案:构建企业安全智能化防护网
  • sqlmap常用命令
  • MID认证:全球电力计量市场的通行证与中国协议兼容性分析
  • STM32开发(USART:IIC总线)
  • Spring框架中用到的设计模式
  • 从源码和设计模式深挖AQS(AbstractQueuedSynchronizer)
  • 四、计算机网络与分布式系统(中)
  • 半导体学习笔记
  • 深入解析Dart虚拟机运行原理
  • 一文教您解决Ubuntu ModuleNotFoundError: No module named ‘_tkinter‘问题
  • 部署合约常见的问题
  • Python快速入门专业版(二十三):for循环基础:遍历字符串、列表与range()函数(计数案例)
  • MySQL 非空约束(NOT NULL):看似简单,却决定数据质量的关键细节
  • 【笔记】悬架减振器的阻尼带宽
  • C++:迭代器失效问题(vector为例)
  • TDengine 选择函数 TAIL() 用户手册
  • 在Linux系统中清理大文件的方法
  • oracle里的int类型
  • 【开关电源篇】整流及其滤波电路的工作原理和设计指南-超简单解读
  • 第五章 Logstash深入指南
  • 猫狗识别算法在智能喂食器上的应用
  • 数据库事务详解
  • Linux学习:基于环形队列的生产者消费者模型
  • size()和length()的区别
  • Windows系统下安装Dify
  • 企业云环境未授权访问漏洞 - 安全加固笔记
  • sv时钟块中default input output以及@(cb)用法总结
  • 广谱破局!芦康沙妥珠单抗覆罕见突变,一解“少数派”的用药困境
  • Guli Mall 25/08/12(高级上部分)