【Kafka】架构原理、消息丢失、重复消费、顺序消费、事务消息
【Kafka】架构原理、消息丢失、重复消费、顺序消费、事务消息
- 1、为什么要使用消息队列?
- 2、Kafka的架构是怎么样的?
- 2.1 Kafka 为什么有 Topic 还要用 Partition?
- 2.2 Kafka 消息的发送过程简单介绍一下?
- 2.3 介绍下Kafka的数据存储结构?
- 3、消息丢失问题
- 3.1 Kafka如何保证消息不丢失?
- 3.2 为什么Kafka没办法100%保证消息不丢失?
- 4、重复消费问题
- 4.1 Kafka的三种消息传递语义
- 5、顺序消费问题
- 6、事务消息
- 6.1 Kafka支持事务消息吗?如何实现的?
- 7、重平衡问题
- 7.1 什么是Kafka的重平衡机制?
- 7.2 MQ的重平衡会带来哪些问题?
- 7.3 什么是Kafka的渐进式重平衡?
- 8、选举问题
- 8.1 Kafka 几种选举过程简单介绍一下?
- 8.2 Kafka 高水位了解过吗?
- 9、批量消费问题
- 9.1 Kafka的批量消费如何确保消息不丢?
- 10、Kafka 为什么这么快?
1、为什么要使用消息队列?
使用消息队列的主要目的:解耦、异步、削峰填谷
。
解耦:在一个复杂的系统中,不同的模块或服务之间可能需要相互依赖,如果直接使用函数调用或者 API 调用的方式,会造成模块之间的耦合,当其中一个模块发生改变时,需要同时修改调用方和被调用方的代码。而使用消息队列作为中间件,不同的模块可以将消息发送到消息队列中,不需要知道具体的接收方是谁,接收方可以独立地消费消息,实现了模块之间的解耦。
异步:有些操作比较耗时,例如发送邮件、生成报表等,如果使用同步的方式处理,会阻塞主线程或者进程,导致系统的性能下降。而使用消息队列,可以将这些操作封装成消息,放入消息队列中,异步地处理这些操作,不影响主流程的执行,提高了系统的性能和响应速度。
削峰填谷:削峰填谷是一种在高并发场景下平衡系统压力的技术,通常用于平衡系统在高峰期和低谷期的资源利用率,提高系统的吞吐量和响应速度。在削峰填谷的过程中,通常使用消息队列作为缓冲区,将请求放入消息队列中,然后在系统负载低的时候进行处理。这种方式可以将系统的峰值压力分散到较长的时间段内,减少瞬时压力对系统的影响,从而提高系统的稳定性和可靠性。
另外消息队列还有以下优点:
- 可靠性高:消息队列通常具有高可靠性,可以实现消息的持久化存储、消息的备份和故障恢复等功能,保证消息不会丢失。
- 扩展性好:通过增加消息队列实例或者添加消费者实例,可以实现消息队列的水平扩展,提高系统的处理能力。
- 灵活性高:消息队列通常支持多种消息传递模式,如点对点模式和发布/订阅模式,可以根据不同的业务场景选择不同的模式。
2、Kafka的架构是怎么样的?
Kafka 的整体架构比较简单,是显式分布式架构,主要由 Producer(生产者)、broker(Kafka集群)和 consumer(消费者) 组成。
1、生产者(Producer):生产者负责将消息发布到Kafka集群中的一个或多个主题(Topic),每个Topic包含一个或多个分区(Partition)。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
2、消费者(Consumer):消费者负责从Kafka集群中的一个或多个主题消费消息,并将消费的偏移量(Offset)提交回Kafka以保证消息的顺序性和一致性。
偏移量:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
3、Kafka集群:Kafka集群是由多个Kafka节点(Broker)组成的分布式系统。每个节点都可以存储一个或多个主题(topic)的分区(partition)副本,以提供高可用性和容错能力。
如下图中,包含了 Broker1、Broker2和 Broker3组成了一个集群。用来提升高可用性。
在集群中,每个分区(partition)都可以有多个副本。这些副本中包含了一个 Leader (也可以叫做Leader Partition 或者 Leader Replication) 和多个 Follower (也可以叫做Follower Partition 或者 Follower Replication),只有 Leader 才能处理生产者和消费者的请求,而 Follower 只是 Leader 的备份,用于提供数据的冗余备份和容错能力。如果 Leader 发生故障,Kafka 集群会自动将 Follower 提升为新的 Leader ,从而实现高可用性和容错能力。
4、ZooKeeper:ZooKeeper是Kafka集群中使用的分布式协调服务,用于维护Kafka集群的状态和元数据信息,例如主题和分区的分配信息、消费者组和消费者偏移量等。(消费者组的偏移量从kafka 0.9版本开始就不再存放在zk中,而是存放在内部的topic中)
2.1 Kafka 为什么有 Topic 还要用 Partition?
Topic和Partition是kafka中比较重要的概念。
主题:Topic是Kafka中承载消息的逻辑容器。可以理解为一个消息队列。生产者将消息发送到特定的Topic,消费者从Topic中读取消息。Topic可以被认为是逻辑上的消息流。在实际使用中多用来区分具体的业务。
分区:Partition。是Topic的物理分区。一个Topic可以被分成多个Partition,每个Partition是一个有序且持久化存储的日志文件。每个Partition都存储了一部分消息,并且有一个唯一的标识符(称为Partition ID)。
在软件领域中,任何问题都可以加一个中间层来解决,而这,就是类似的思想,在Topic的基础上,再细粒度的划分出了一层,主要能带来以下几个好处:
-
提升吞吐量:通过将一个Topic分成多个Partition,可以实现消息的并行处理。每个Partition可以由不同的消费者组进行独立消费,这样就可以提高整个系统的吞吐量。
-
负载均衡:Partition的数量通常比消费者组的数量多,这样可以使每个消费者组中的消费者均匀地消费消息。当有新的消费者加入或离开消费者组时,可以通过重新分配Partition的方式进行负载均衡。
-
扩展性:通过增加Partition的数量,可以实现Kafka集群的扩展性。更多的Partition可以提供更高的并发处理能力和更大的存储容量。
综上,Topic是逻辑上的消息分类,而Partition是物理上的消息分区。通过将Topic分成多个Partition,可以实现提升吞吐量、负载均衡、以及增加可扩展性。
2.2 Kafka 消息的发送过程简单介绍一下?
当我们使用Kafka发送消息时,一般有两种方式,分别是同步发送(producer.send(msg).get() )及异步发送(producer.send(msg, callback))。
同步发送的时候,可以在发送消息后,通过get方法等待消息结果:producer.send(record).get(); ,这种情况能够准确的拿到消息最终的发送结果,要么是成功,要么是失败。
而异步发送,是采用了callback的方式进行回调的,可以大大的提升消息的吞吐量,也可以根据回调来判断消息是否发送成功。
不管是同步发送还是异步发送,最终都需要在Producer端把消息发送到Broker中,那么这个过程大致如下:
Kafka 的 Producer 在发送消息时通常涉及两个线程,主线程(Main)和发送线程(Sender)和一个消息累加器(RecordAccumulator)
。
1、主线程(Main)
Main线程是 Producer 的入口,负责初始化 Producer 的配置、创建 KafkaProducer 实例并执行发送逻辑。它会按照用户定义的发送方式(同步或异步)发送消息,然后等待消息发送完成。
一条消息的发送,在调用send方法后,会经过拦截器、序列化器及分区器。
● 拦截器主要用于在消息发送之前和之后对消息进行定制化的处理,如对消息进行修改、记录日志、统计信息等。
● 序列化器负责将消息的键和值对象转换为字节数组,以便在网络上传输。
● 分区器决定了一条消息被发送到哪个 Partition 中。它根据消息的键(如果有)或者特定的分区策略,选择出一个目标 Partition。
2、消息累加器(RecordAccumulator)
RecordAccumulator在 Kafka Producer 中起到了消息积累和批量发送的作用,当 Producer 发送消息时,不会立即将每条消息发送到 Broker,而是将消息添加到 RecordAccumulator 维护的内部缓冲区中,RecordAccumulator 会根据配置的条件(如batch.size、linger.ms)对待发送的消息进行批量处理。
当满足指定条件时,RecordAccumulator 将缓冲区中的消息组织成一个批次(batch),然后一次性发送给 Broker。如果发送失败或发生错误,RecordAccumulator 可以将消息重新分配到新的批次中进行重试。这样可以确保消息不会丢失,同时提高消息的可靠性。
3、发送线程(Sender)
Send线程是负责实际的消息发送和处理的。发送线程会定期从待发送队列中取出消息,并将其发送到对应的 Partition 的 Leader Broker 上。它主要负责网络通信操作,并处理发送请求的结果,包括确认的接收、错误处理等。
NetworkClient 和 Selector 是两个重要的组件,分别负责网络通信和 I/O 多路复用。
发送线程会把消息发送到Kafka集群中对应的Partition的Partition Leader,Partition Leader 接收到消息后,会对消息进行一系列的处理。它会将消息写入本地的日志文件(Log)
4、In-Sync Replicas(同步副本)
为了保证数据的可靠性和高可用性,Kafka 使用了消息复制机制。Leader Broker 接收到消息后,会将消息复制到其他副本(Partition Follower)。副本是通过网络复制数据的,它们会定期从 Leader Broker 同步消息。
每一个Partition Follower在写入本地log之后,会向Leader发送一个ACK。
但是我们的Producer其实也是需要依赖ACK才能知道消息有没有投递成功的,而这个ACK是何时发送的,Producer又要不要关心呢?这就涉及到了kafka的ack机制,生产者会根据设置的 request.required.acks 参数不同,选择等待或或直接发送下一条消息:
● request.required.acks = 0
○ 表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消息。在这种情况下,如果 Leader 分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
● request.required.acks = 1
○ 表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同步完成。所以如果在消息已经被写入 Leader 分片,但是还未同步到 Follower 节点,此时Leader 分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
● request.required.acks = -1
○ Leader会把消息复制到集群中的所有ISR(In-Sync Replicas,同步副本),要等待所有ISR的ACK确认后,再向Producer发送ACK消息,然后Producer再继续发下一条消息。
2.3 介绍下Kafka的数据存储结构?
Kafka 的存储理念非常简洁:它将所有收到的消息简单地以顺序追加(Append-Only)的方式写入磁盘文件。 这种利用顺序磁盘 I/O 的方式,这也是kafka性能好的重要原因之一。
Kafka 的存储结构是一个从逻辑概念到物理文件的层级映射关系:
逻辑概念:Topic -> Partition
物理文件:Partition -> Log Segment 文件
- Topic:逻辑上的消息分类,相当于消息队列的名字
- Partition: Topic 下会划分多个分区,每个分区对应一个有序、不可变的消息队列。这是 Kafka 并行处理和水平扩展的基础。
○ 在物理上,每个 Partition 对应磁盘上的一个文件夹。
/tmp/kafka-logs/
├── my-topic-0/ # Partition 0 对应的文件夹
├── my-topic-1/ # Partition 1 对应的文件夹
├── my-topic-2/ # Partition 2 对应的文件夹
└── ...
1、Segment
虽然每个 Partition 是一个逻辑上的日志,但物理上它并不会被存储为一个巨大的文件,而是被切割成多个大小相等的 Segment 文件。便于过期数据的删除、提升查找效率。
每个 segment 包含两类文件(同名不同后缀):
- 日志数据文件(.log)
○ 存储消息本体,采用顺序写,性能极高(磁盘顺序写接近内存速度)。
○ 文件名是该 segment 第一条消息的 offset,比如:
/tmp/kafka-logs/
├── my-topic-0/ # Partition 0 对应的文件夹
│ ├── 00000000000000000000.log
│ └── leader-epoch-checkpoint
├── my-topic-1/ # Partition 1 对应的文件夹
│ ├── 00000000000000000000.log
└── ...
- 索引文件(.index / .timeindex)
○ .index:offset 索引,存储相对 offset 与物理位置的映射。
○ .timeindex:时间索引,存储消息时间戳与物理位置的映射。
○ 便于快速查找消息位置。
/tmp/kafka-logs/
├── my-topic-0/ # Partition 0 对应的文件夹
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── 00000000000000000005.index
│ ├── 00000000000000000005.log
│ ├── 00000000000000000005.timeindex
│ └── leader-epoch-checkpoint
├── my-topic-1/ # Partition 1 对应的文件夹
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── ...
│ └── leader-epoch-checkpoint
└── ...
leader-epoch-checkpoint:用于存储 Leader Epoch 信息,主要用于防止数据丢失和保证数据一致性,与事务和副本同步相关。
2、整体结构
了解了上面的内容之后,就可以画出一张Kafka的数据存储的结构图了:
3、消息的写入与读取流程
写入(生产者)
- 生产者发送消息到指定 Topic 的 Partition。
- Broker 接收到消息后,将其顺序追加到该 Partition 当前活跃的 Segment(即最新的那个 .log 文件)的末尾。
- 只有当消息被写入磁盘(根据配置,可以是刷盘也可以是 PageCache)后,这次写入才被认为是成功的。
读取(消费者)
- 消费者指定要消费的 Topic、Partition 以及 Offset。
- Broker 根据 Offset 找到对应的 Segment 文件(通过文件名快速定位)。
- 使用该 Segment 的 .index 文件,快速定位到该 Offset 在 .log 文件中的大致物理位置。
- 从该物理位置开始,在 .log 文件中进行顺序扫描,直到找到精确的消息。
- 将消息发送给消费者。
3、消息丢失问题
3.1 Kafka如何保证消息不丢失?
Kafka作为一个消息中间件,他需要结合消息生产者和消费者一起才能工作,一次消息发送包含以下是三个过程:
1)Producer 端发送消息给 Kafka Broker 。
2)Kafka Broker 将消息进行同步并持久化数据。
3)Consumer 端从Kafka Broker 将消息拉取并进行消费。
Kafka只对已提交的消息做最大限度的持久化保证不丢失,但是没办法保证100%。
1、Producer
消息的生产者端,最怕的就是消息发送给Kafka集群的过程中失败,所以,我们需要有机制来确保消息能够发送成功,但是,因为存在网络问题,所以基本没有什么办法可以保证一次消息一定能成功。
所以,就需要有一个确认机制来告诉生产者这个消息是否有发送成功,如果没成功,需要重新发送直到成功。
我们通常使用Kafka发送消息的时候,通常使用的producer.send(msg)其实是一种异步发送,发送消息的时候,方法会立即返回,但是并不代表消息一定能发送成功。(producer.send(msg).get() 是同步等待返回的。)
那么,为了保证消息不丢失,通常会建议使用producer.send(msg, callback)方法,这个方法支持传入一个callback,我们可以在消息发送时进行重试。
同时,我们也可以通过给producer设置一些参数来提升发送成功率:
acks=-1 // 表示 Leader 和 Follower 都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。
retries=3 // 生产端的重试次数
retry.backoff.ms = 300 //消息发送超时或失败后,间隔的重试时间
1、acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
2、acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
3、acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。
2、Broker
Kafka的集群有一些机制来保证消息的不丢失,比如复制机制、持久化存储机制以及ISR机制。
● 持久化存储:Kafka使用持久化存储来存储消息。这意味着消息在写入Kafka时将被写入磁盘,这种方式可以防止消息因为节点宕机而丢失。
● ISR复制机制:Kafka使用ISR机制来确保消息不会丢失,Kafka使用复制机制来保证数据的可靠性。每个分区都有多个副本,副本可以分布在不同的节点上。当一个节点宕机时,其他节点上的副本仍然可以提供服务,保证消息不丢失。
在服务端,也有一些参数配置可以调节来避免消息丢失:
replication.factor //表示分区副本的个数,replication.factor >1 当leader 副本挂了,follower副本会被选举为leader继续提供服务。
min.insync.replicas //表示 ISR 最少的副本数量,通常设置 min.insync.replicas >1,这样才有可用的follower副本执行替换,保证消息不丢失
unclean.leader.election.enable = false //是否可以把非 ISR 集合中的副本选举为 leader 副本。
3、Consumer
作为Kafka的消费者端,只需要确保投递过来的消息能正常消费,并且不会胡乱的提交偏移量就行了。
Kafka消费者会跟踪每个分区的偏移量,消费者每次消费消息时,都会将偏移量向后移动。当消费者宕机或者不可用时,Kafka会将该消费者所消费的分区的偏移量保存下来,下次该消费者重新启动时,可以从上一次的偏移量开始消费消息。
另外,Kafka消费者还可以组成消费者组,每个消费者组可以同时消费多个分区。当一个消费者组中的消费者宕机或者不可用时,其他消费者仍然可以消费该组的分区,保证消息不丢失。
为了保证消息不丢失,建议使用手动提交偏移量的方式,避免拉取了消息以后,业务逻辑没处理完,提交偏移量后但是消费者挂了的问题:
enable.auto.commit=false
3.2 为什么Kafka没办法100%保证消息不丢失?
依靠Kafka自身,是没有办法100%保证可靠性的。
1、生产者
Kafka允许生产者以异步方式发送消息,这意味着生产者在发送消息后不会等待确认。当然,我们可以注册一个回调等待消息的成功回调。
但是,如果生产者在发送消息之后,Kafka的集群发生故障或崩溃,而消息尚未被完全写入Kafka的日志中,那么这些消息可能会丢失。虽然后续有可能会重试,但是,如果重试也失败了呢?如果这个过程中刚好生产者也崩溃了呢?那就可能会导致没有人知道这个消息失败了,就导致不会重试了。
2、消费者
消费者来说比较简单,只要保证在消息成功时,才提交偏移量就行了,这样就不会导致消息丢失了。
3、Broker
Kafka使用日志来做消息的持久化的,日志文件是存储在磁盘之上的,但是如果Broker在消息尚未完全写入日志之前崩溃,那么这些消息可能会丢失了。
而且,操作系统在写磁盘之前,会先把数据写入Page Cache中,然后再由操作系统中自己决定什么时候同步到磁盘当中,而在这个过程中,如果还没来得及同步到磁盘中,就直接宕机了,那这个消息也就丢了。
当然,也可以通过配置log.flush.interval.messages=1,来实现类似于同步刷盘的功能,但是回到了前面说的情况,还没来得及做持久化,就宕机了。
即使Kafka中引入了副本机制来提升消息的可靠性,但是如果发生同步延迟,还没来及的同步,主副本就挂掉了,那么消息就可能会发生丢失。
这几种情况,只从Broker的角度分析,Broker自身是没办法保证消息不丢失的,但是如果配合Producer,再配合request.required.acks = -1 这种ACK策略,可以确保消息持久化成功之后,才会ACK给Producer,那么, 如果我们的Producer在一定时间段内,没有收到ACK,是可以重新发送的。
但是,这种重新发送,就回到了我们前面介绍生产者的时候的问题,生产者也有可能挂,重新发送也有可能会没有发送依据,导致消息最终丢失。
所以,只靠Kafka自己,其实是没有办法保证极端情况下的消息100%不丢失的。
但是,可以在做一些机制来保证,比如引入分布式事务,或者引入本地消息表等,保证在Kafka Broker没有保存消息成功时,可以重新投递消息。这样才行。
4、重复消费问题
首先,在Kafka中,每个消费者都必须加入至少一个消费者组。同一个消费者组内的消费者可以共享消费者的负载。因此,如果一个消息被消费组中的任何一个消费者消费了,那么其他消费者就不会再收到这个消息了。
另外,消费者可以通过手动提交消费位移来控制消息的消费情况。通过手动提交位移,消费者可以跟踪自己已经消费的消息,确保不会重复消费同一消息。
另外可以借助Kafka的Exactly-once消费语义,其实就是引入了事务,消费者使用事务来保证消息的消费和位移提交是原子的,而生产者可以使用事务来保证消息的生产和位移提交是原子的。Exactly-once消费语义则解决了重复问题,但需要更复杂的设置和配置。
4.1 Kafka的三种消息传递语义
在Kafka中,有三种常见的消息传递语义:At-least-once、At-most-once和Exactly-once。其中At-least-once和Exactly-once是最常用的。
● At most once—消息可能会丢,但绝不会重复传递;
● At least once—消息绝不会丢,但可能会重复传递;
● Exactly once—每条消息只会被精确地传递一次:既不会多,也不会少;
1、At-least-once消费语义
At-least-once消费语义意味着消费者至少消费一次消息,但可能会重复消费同一消息。在At-least-once语义中,当消费者从Kafka服务器读取消息时,消息的偏移量会被记录下来。一旦消息被成功处理,消费者会将位移提交回Kafka服务器。如果消息处理失败,消费者不会提交位移。这意味着该消息将在下一次重试时再次被消费。
At-least-once语义通常用于实时数据处理或消费者不能容忍数据丢失的场景,例如金融交易或电信信令。
2、Exactly-once消费语义
Exactly-once消费语义意味着每个消息仅被消费一次,且不会被重复消费。在Exactly-once语义中,Kafka保证消息只被处理一次,同时保持消息的顺序性。为了实现Exactly-once语义,Kafka引入了一个新的概念:事务。
事务是一系列的读写操作,这些操作要么全部成功,要么全部失败。在Kafka中,生产者和消费者都可以使用事务,以保证消息的Exactly-once语义。具体来说,消费者可以使用事务来保证消息的消费和位移提交是原子的,而生产者可以使用事务来保证消息的生产和位移提交是原子的。
At-least-once消费语义保证了数据的可靠性,但可能会导致数据重复。而Exactly-once消费语义则解决了重复问题,但需要更复杂的设置和配置。选择哪种消费语义取决于业务需求和数据可靠性要求。
5、顺序消费问题
Kafka的消息是存储在指定的topic中的某个partition中的。并且一个topic是可以有多个partition的。同一个partition中的消息是有序的,但是跨partition,或者跨topic的消息就是无序的了。
为什么同一个partition的消息是有序的?
因为当生产者向某个partition发送消息时,消息会被追加到该partition的日志文件(log)中,并且被分配一个唯一的 offset,文件的读写是有顺序的。而消费者在从该分区消费消息时,会从该分区的最早 offset 开始逐个读取消息,保证了消息的顺序性。
基于此,想要实现消息的顺序消费,可以有以下几个办法:
1、在一个topic中,只创建一个partition,这样这个topic下的消息都会按照顺序保存在同一个partition中,这就保证了消息的顺序消费。
2、发送消息的时候指定partition,如果一个topic下有多个partition,那么我们可以把需要保证顺序的消息都发送到同一个partition中,这样也能做到顺序消费。
6、事务消息
6.1 Kafka支持事务消息吗?如何实现的?
在Kafka中,事务消息可以确保一组生产或消费操作要么全部成功,要么全部失败,以保证消息处理的原子性。也就是说,他的作用是保证一组消息要么都成功,要么都失败。
只不过,通常在分布式系统中,我们通常说的事务消息,如RocketMQ的事务消息保证的时候本地事务和发MQ能作为一个原子性,即要么一起成功,要么一起失败。
所以,Kafka的事务消息只保证他自己的消息发送的原子性。而RocketMQ的事务消息是保证本地事务和发消息的原子性
。
Kafka为了实现事务,有几个关键组件:
1、Transaction Coordinator(事务协调器)
事务协调器是Kafka的一个特殊组件,负责管理生产者的事务状态。它会分配一个唯一的Transaction ID给每个生产者,并维护一个Transaction Log来记录事务的开始、提交或中止等状态信息。
2、Producer ID (PID) 和 Epoch
Kafka为每个生产者(Producer)分配一个唯一的PID(Producer ID)和Epoch(版本号)。PID用于标识生产者,Epoch用于标识该生产者的事务版本。通过这两个字段,可以防止因重启等导致的重复消息和过期消息。
3、Transaction Log(事务日志)
事务日志是Kafka的一个特殊内部主题,用于记录事务的开始、提交、回滚等操作。事务协调器通过该日志确保事务的原子性和一致性。
过程大致如下:
- 开启事务
生产者在初始化时需要配置transactional.id来启用事务,在每次事务开始时,生产者会给协调者发请求来开启事务,协调者在事务日志中记录下事务 ID。
-
发送消息
生产者发消息之前,先发送请求事务协调器,让他记录消息发送的主题和分区。接下来开始向Broker发消息, -
提交事务
当所有消息发送完毕,生产者调用commitTransaction()来提交事务。此时,事务协调器会将所有消息的状态改为已提交,并通知消费者可以读取这些消息。 -
回滚事务
如果在事务过程中出现异常,生产者可以调用abortTransaction()回滚事务。事务协调器会将所有相关消息标记为中止状态,确保消费者无法读取未完成的消息。
这里的事务提交和回滚,其实也是遵循了一个2阶段提交的。协调者在接收到提交或者回滚请求时,第一阶段,他会把事务的状态设置为“预提交”,并写入事务日志。接下来第二阶段,协调者在事务相关的所有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者,也就是客户端,读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。最后,协调者记录最后一条事务日志,标识这个事务已经结束了。
7、重平衡问题
7.1 什么是Kafka的重平衡机制?
Kafka 的重平衡机制是指在消费者组中新增或删除消费者时,Kafka 集群会重新分配主题分区给各个消费者,以保证每个消费者消费的分区数量尽可能均衡。
重平衡机制的目的是实现消费者的负载均衡和高可用性,以确保每个消费者都能够按照预期的方式消费到消息。
重平衡的 3 个触发条件:
● 消费者组成员数量发生变化。(新消费者的加入或者退出)
● 订阅主题(Topic)数量发生变化。
● 订阅主题的分区(Partition)数发生变化。
还有两种异常情况:
● 组协调器(Group Coordinator) 是 Kafka 负责管理消费者组的 Broker 节点。如果它崩溃或者发生故障,Kafka 需要重新选举新的 Group Coordinator,并进行重平衡。
● 当消费者组中的 Leader 消费者崩溃或退出。Kafka 需要选举新的 Leader,重新进行重平衡。
当Kafka 集群要触发重平衡机制时,大致的步骤如下:
-
暂停消费:在重平衡开始之前,Kafka 会暂停所有消费者的拉取操作,以确保不会出现重平衡期间的消息丢失或重复消费。
-
计算分区分配方案:Kafka 集群会根据当前消费者组的消费者数量和主题分区数量,计算出每个消费者应该分配的分区列表,以实现分区的负载均衡。
-
通知消费者:一旦分区分配方案确定,Kafka 集群会将分配方案发送给每个消费者,告诉它们需要消费的分区列表,并请求它们重新加入消费者组。
-
重新分配分区:在消费者重新加入消费者组后,Kafka 集群会将分区分配方案应用到实际的分区分配中,重新分配主题分区给各个消费者。
-
恢复消费:最后,Kafka 会恢复所有消费者的拉取操作,允许它们消费分配给自己的分区。
Kafka 的重平衡机制能够有效地实现消费者的负载均衡和高可用性,提高消息的处理能力和可靠性。但是,由于重平衡会带来一定的性能开销和不确定性,因此在设计应用时需要考虑到重平衡的影响,并采取一些措施来降低重平衡的频率和影响。
在重平衡过程中,所有 Consumer 实例都会停止消费,等待重平衡完成。但是目前并没有什么好的办法来解决重平衡带来的STW,只能尽量避免它的发生。
7.2 MQ的重平衡会带来哪些问题?
1、STW
最容易理解的就是STW的问题。 因为在重平衡过程中,消费者可能会短暂停止消费,等待新的分区/队列分配,导致吞吐下降。
当然,也可以使用Kafka中的渐进式重平衡或 下一代重平衡协议(Kafka 4.0)减少影响。 或者RocketMQ本身的重平衡机制带来的STW问题也比较小。
2、重复消费
除了STW的问题还有一个就是可能导致消费重复的问题,当消费者重新分配队列/分区时,可能会重新拉取道未提交的消息,导致消息被多次消费。
比如说开始分给了消费者A,A还没消费完还没提交偏移量,发生了重平衡,这个消息就会再分给消费者B,这时候就出现了重复消费了。
3、消息堆积
Kafka 旧版本(4.0之前)重平衡时,所有消费者都会暂停,导致短时间内消息积压。
RocketMQ 采用 定时重平衡(默认 20s),如果消费者宕机,消息可能会积压在该消费者上,直到下一次重平衡。
7.3 什么是Kafka的渐进式重平衡?
渐进式重平衡,目的是在进行消费者组重平衡时,尽可能减少数据中断和不必要的分区变更。它由 Cooperative Sticky Assignor提供支持,并在 Kafka 2.4.0 引入。
开启了CooperativeStickyAssignor之后,Kafka 通过 两阶段分配 机制实现渐进式重平衡,下面举个例子,假设有一个topic,共6个partition,一个消费者,其中有两个消费者再消费数据:
渐进式重平衡,则采用2阶段方式。
第一阶段:部分撤销
● ConsumerA 和 ConsumerB 不会释放所有分区,而是部分释放。
● Kafka 计算 最小变更 方案,并决定 ConsumerA、ConsumerB 各释放一个分区,给 ConsumerC 用。即可能是ConsumerA 释放 Partition-2;ConsumerB 释放 Partition-5
第二阶段:重新分配
● Kafka 重新分配被释放的分区,则 ConsumerC 消费 Partition-2 和 Partition-5
这样重平衡过程,就可以减少所有分区都停止消费的情况,而只有其中部分分区需要重新分配而已。以上使用增加消费者的方式距离的,如果是减少消费者其实也一样。只有移除的消费者释放自己的分区,再重新分配给其他分区就好了,其他分区自己的那部分不用改变。
8、选举问题
8.1 Kafka 几种选举过程简单介绍一下?
1、Partition Leader 选举
Kafka 中的每个 Partition 都有一个 Leader,负责处理该 Partition 的读写请求。在正常情况下,Leader 和 ISR 集合中的所有副本保持同步,Leader 接收到的消息也会被 ISR 集合中的副本所接收。当 leader 副本宕机或者无法正常工作时,需要选举新的 leader 副本来接管分区的工作。
Leader 选举的过程如下:
● 每个参与选举的副本会尝试向 ZooKeeper 上写入一个临时节点,表示它们正在参与 Leader 选举;
● 所有写入成功的副本会在 ZooKeeper 上创建一个序列号节点,并将自己的节点序列号写入该节点;
● 节点序列号最小的副本会被选为新的 Leader,并将自己的节点名称写入 ZooKeeper 上的 /broker/…/leader 节点中。
2、Controller 选举
Kafka 集群中只能有一个 Controller 节点,用于管理分区的副本分配、leader 选举等任务。当一个Broker变成Controller后,会在Zookeeper的/controller节点 中记录下来。然后其他的Broker会实时监听这个节点,主要就是避免当这个controller宕机的话,就需要进行重新选举。
Controller选举的过程如下:
● 所有可用的 Broker 向 ZooKeeper 注册自己的 ID,并监听 ZooKeeper 中 /controller 节点的变化。
● 当 Controller 节点出现故障时,ZooKeeper 会删除 /controller 节点,这时所有的 Broker 都会监听到该事件,并开始争夺 Controller 的位置。
● 为了避免出现多个 Broker 同时竞选 Controller 的情况,Kafka 设计了一种基于 ZooKeeper 的 Master-Slave 机制,其中一个 Broker 成为 Master,其它 Broker 成为 Slave。Master 负责选举 Controller,并将选举结果写入 ZooKeeper 中,而 Slave 则监听 /controller 节点的变化,一旦发现 Master 发生故障,则开始争夺 Master 的位置。
● 当一个 Broker 发现 Controller 失效时,它会向 ZooKeeper 写入自己的 ID,并尝试竞选 Controller 的位置。如果他创建临时节点成功,则该 Broker 成为新的 Controller,并将选举结果写入 ZooKeeper 中。
● 其它的 Broker 会监听到 ZooKeeper 中 /controller 节点的变化,一旦发现选举结果发生变化,则更新自己的元数据信息,然后与新的 Controller 建立连接,进行后续的操作。
8.2 Kafka 高水位了解过吗?
高水位(HW,High Watermark)是Kafka中的一个重要的概念,主要是用于管理消费者的进度和保证数据的可靠性的。
高水位标识了一个特定的消息偏移量(offset),即一个分区中已提交消息的最高偏移量(offset),消费者只能拉取到这个 offset 之前的消息。消费者可以通过跟踪高水位来确定自己消费的位置。
在Kafka中,HW主要有两个作用:
● 消费进度管理:消费者可以通过记录上一次消费的偏移量,然后将其与分区的高水位进行比较,来确定自己的消费进度。消费者可以在和高水位对比之后继续消费新的消息,确保不会错过任何已提交的消息。这样,消费者可以按照自己的节奏进行消费,不受其他消费者的影响。
● 数据的可靠性:高水位还用于确保数据的可靠性。在Kafka中,只有消息被写入主副本(Leader Replica)并被所有的同步副本(In-Sync Replicas,ISR)确认后,才被认为是已提交的消息。高水位表示已经被提交的消息的边界。只有高水位之前的消息才能被认为是已经被确认的,其他的消息可能会因为副本故障或其他原因而丢失。
还有一个概念,叫做LEO,即 Log End Offset,他是日志最后消息的偏移量。 它标识当前日志文件中下一条待写入消息的 offset。
当消费者消费消息时,它可以使用高水位作为参考点,只消费高水位之前的消息,以确保消费的是已经被确认的消息,从而保证数据的可靠性。如上图,只消费offet为6之前的消息。
我们都知道,在Kafka中,每个分区都有一个Leader副本和多个Follower副本。
当Leader副本发生故障时,Kafka会选择一个新的Leader副本。这个切换过程中,需要保证数据的一致性,即新的Leader副本必须具有和旧Leader副本一样的消息顺序。
为了实现这个目标,Kafka引入了Leader Epoch的概念。Leader Epoch是一个递增的整数,每次副本切换时都会增加。它用于标识每个Leader副本的任期。
每个副本都会维护自己的Leader Epoch记录。它记录了副本所属的分区在不同Leader副本之间切换时的任期。
在副本切换过程中,新的Leader会检查旧Leader副本的Leader Epoch和高水位。只有当旧Leader副本的Leader Epoch小于等于新Leader副本的Leader Epoch,并且旧Leader副本的高水位小于等于新Leader副本的高水位时,新Leader副本才会接受旧Leader副本的数据。
通过使用Leader Epoch和高水位的验证,Kafka可以避免新的Leader副本接受旧Leader副本之后的消息,从而避免数据回滚。只有那些在旧Leader副本的Leader Epoch和高水位之前的消息才会被新Leader副本接受。
9、批量消费问题
批量消费指的是一次性拉过来一批消息,然后进行批量处理。
Kafka想要实现批量消费有很多种方案。其中比较简单的就是基于@KafkaListener 实现,这也是比较推荐的方案。
9.1 Kafka的批量消费如何确保消息不丢?
首先配置成手动提交,其次是待所有消息处理完后判断是否都成功,任意一条失败则不提交偏移量,千万不要在finally中提交偏移量。
1、丢消息的第一种情况
当使用自动提交的时候,可能会丢消息。假如kafka中有以下配置:
enable.auto.commit=true
auto.commit.interval.ms=5000
这样配置表示每隔 5 秒自动提交当前 poll 到的最大 offset。 那么就会出现这样的情况:
● 消费者从 Kafka 拉取了一批消息。
● Kafka 客户端自动在 5 秒后提交 offset。
● 但是应用代码还没处理完这批消息,有可能执行过程中出错或者失败了。
● 但是 Kafka 因为接收到了offset,那么他就会认为这批消息已经处理完,不再重新发送了。
2、丢消息的第2种情况
在finally中调用偏移量提交,这时候会把最大的偏移量+1提交掉,也就意味着,不管你的try执行成功还是失败,都会提交,那么就会出现上面一样的情况,消息执行失败,但是偏移量被提交了,导致丢消息。
@KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {try{// 批量处理逻辑}finally{ack.acknowledge(); //手动提交偏移量}}
10、Kafka 为什么这么快?
kafka是一个成熟的消息队列,一直以性能高著称,它之所以能够实现高吞吐量和低延迟,主要是由于以下几个方面的优化。
1、消息发送
- 批量发送:Kafka 通过将多个消息打包成一个批次,减少了网络传输和磁盘写入的次数,从而提高了消息的吞吐量和传输效率。
- 异步发送:生产者可以异步发送消息,不必等待每个消息的确认,这大大提高了消息发送的效率。
- 消息压缩:支持对消息进行压缩,减少网络传输的数据量。
- 并行发送:通过将数据分布在不同的分区(Partitions)中,生产者可以并行发送消息,从而提高了吞吐量。
2、消息存储
-
零拷贝技术:Kafka 使用零拷贝技术来避免了数据的拷贝操作,降低了内存和 CPU 的使用率,提高了系统的性能。
-
磁盘顺序写入:Kafka把消息存储在磁盘上,且以顺序的方式写入数据。顺序写入比随机写入速度快很多,因为它减少了磁头寻道时间。避免了随机读写带来的性能损耗,提高了磁盘的使用效率。
-
页缓存:Kafka 将其数据存储在磁盘中,但在访问数据时,它会先将数据加载到操作系统的页缓存中,并在页缓存中保留一份副本,从而实现快速的数据访问。
-
稀疏索引:Kafka 存储消息是通过分段的日志文件,每个分段都有自己的索引文件。这些索引文件中的条目不是对分段中的每条消息都建立索引,而是每隔一定数量的消息建立一个索引点,这就构成了稀疏索引。稀疏索引减少了索引大小,使得加载到内存中的索引更小,提高了查找特定消息的效率。
-
分区和副本:Kafka 采用分区和副本的机制,可以将数据分散到多个节点上进行处理,从而实现了分布式的高可用性和负载均衡。
3、消息消费
- 消费者群组:通过消费者群组可以实现消息的负载均衡和容错处理。
- 并行消费:不同的消费者可以独立地消费不同的分区,实现消费的并行处理。
- 批量拉取:Kafka支持批量拉取消息,可以一次性拉取多个消息进行消费。减少网络消耗,提升性能
面试问题:
一、Kafka的架构是怎么样的?
Kafka主要由生产者、kafka集群、消费者组成。
Kafka集群是由多个Broker组成的分布式系统,每一个broker中存放某一个topic的leader partition,该topic的副本partition分布在其他broker中,通过将Topic分成多个Partition,可以实现提升吞吐量、负载均衡、以及增加可扩展性。
二、Kafka如何保证消息不丢失?Kafka能否100%保证消息不丢失?
如何保证消息不丢失:
生产者角度,通常会建议使用producer.send(msg, callback)方法,这个方法支持传入一个callback,我们可以在消息发送时进行重试。其次是ack确认机制配置为需要leader和副本都确认接收成功才进行确认。
broker角度,通过持久化机制和副本机制,来保证消息不丢失。
消费者角度,确认消息处理成功后,再手动提交偏移量。
Kafka不能100%保证消息不丢失,主要原因是在生产者发送消息后,broker集群发生崩溃后,可能导致消息丢失。
如果生产者在发送消息之后,Kafka的集群发生故障或崩溃,而消息尚未被完全写入Kafka的日志中,那么这些消息可能会丢失。虽然后续有可能会重试,但是,如果重试也失败了呢?如果这个过程中刚好生产者也崩溃了呢?那就可能会导致没有人知道这个消息失败了,就导致不会重试了。
三、Kafka如何确保不重复消费?
消费者可以通过手动提交消费位移来控制消息的消费情况。通过手动提交位移,消费者可以跟踪自己已经消费的消息,确保不会重复消费同一消息。
另外可以借助Kafka的Exactly-once消费语义配置,通过引入了事务,消费者使用事务来保证消息的消费和位移提交是原子的。
四、Kafka如何实现顺序消费?
Kafka的消息是存储在指定的topic中的某个partition中的。并且一个topic是可以有多个partition的。同一个partition中的消息是有序的,但是跨partition,或者跨topic的消息就是无序的了。实现顺序消费,有以下2个方式:
1、在一个topic中,只创建一个partition,这样这个topic下的消息都会按照顺序保存在同一个partition中,这就保证了消息的顺序消费。
2、发送消息的时候指定partition,如果一个topic下有多个partition,那么我们可以把需要保证顺序的消息都发送到同一个partition中,这样也能做到顺序消费。
五、Kafka的事务消息是怎样的?
在Kafka中,事务消息可以确保一组生产或消费操作要么全部成功,要么全部失败,以保证消息处理的原子性。也就是说,他的作用是保证一组消息要么都成功,要么都失败。
只不过,通常在分布式系统中,我们通常说的事务消息,如RocketMQ的事务消息保证的时候本地事务和发MQ能作为一个原子性,即要么一起成功,要么一起失败。
所以,Kafka的事务消息只保证他自己的消息发送的原子性。而RocketMQ的事务消息是保证本地事务和发消息的原子性
。
参考链接:
1、https://www.yuque.com/hollis666/wk6won/glnsckpypwycgh54
2、https://www.yuque.com/hollis666/wk6won/imx4a7z8zq65erlo
3、https://www.yuque.com/hollis666/wk6won/rqzepcxvq2a1w2e9
4、https://www.yuque.com/hollis666/wk6won/kpvazhtr1ukqoyx7