当前位置: 首页 > news >正文

Kafka如何保证「消息不丢失」,「顺序传输」,「不重复消费」,以及为什么会发生重平衡(reblanace)

前言

上一篇文章总结了kafka为什么快,下面来总结一下,kafka高频的常见的问题。内容有点多,全部看完需要有一定的耐心。

kafka如何保证消息不丢失

Producer端

要保证消息不丢失,第一点要做的就是要保证消息从producer端发送到了kafka的broker中,并且broker把消息保存了下来。
由于在发送消息的过程中有可能会发生网络故障,broker故障等原因导致消息发送失败,因此在producer端有两种方式来避免消息丢失。

接收发送消息回执

我们在使用kafka发送消息的时候,通常是使用producer.send(msg)方法,但是这个方法其实是一种异步发送,调用此方法发送消息的时候,虽然会立即返回,但是并不代表消息真的发送成功了。
1、所以可以使用同步发送消息,producer.send(msg).get()此方法会执行同步发生消息,并等待结果返回
2、也可以使用带回调函数的异步方法,producer.send(msg,callback),用回调函数来监听消息的发送结果,如果发送失败了,可以在回调函数里面进行重试。

producer参数配置

producer也提供了一些配置参数来避免消息丢失。

// 此配置表示,Leader和Follower全部成功接收消息后才确认收到消息,
// 可以最大限度保证消息不丢失,但是吞吐量会下降
acks = -1 
// producer 发送消息失败后,自动重试次数
retries = 3
// 发送消息失败后的重试时间间隔
retry.backoff.ms = 300
Broker端

当消息发送到broker后,broker需要保证此消息不会丢失,我们都知道,kafka是会将消息持久化到磁盘中的。
但是kafka为了保持性能采用了,页缓存+异步刷盘的形式将消息持久化到磁盘的。也就是批量定时将消息持久化到磁盘。
但是页缓存如果还没来的及将消息刷到磁盘,broker就挂了,还是会有消息丢失的风险,因此kafka又提供了partition的ISR(同步副本机制),即每一个patrtition都会有一个唯一的Leader和一到多个Follower,Leader专门处理一些事务类型的请求,Follower负责同步Leader的数据。当leader挂了后,会重新从Follower中选举出新的Leader,保证消息能够最终持久化。
另外,在producer中的配置参数acks,配置不同的值,broker也是会做不同的处理的。

acks=0:表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
acks =-1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
acks=1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

producer中还有一些参数的配置也是会起到避免消息丢失的作用

//表示分区副本的个数,replication.factor>1,当Leader挂了,follower会被选举为leader继续提供服务
replication.factor=2//表示 ISR 最少的副本数量,通常设置 replication.factormin.insync.replicas>1,这样才有可用的follower副本执行替换,保证消息不丢失
replication.factormin.insync.replicas=2//是否可以把非ISR集合中的副本选举为leader
min.inunclean.leader.election.enable= false
Consumer端

Consumer端,只要保证消息接收到不胡乱的提交offset就行,kafka本身也是会记录每个pratition的偏移量,但是为了业务的可靠性,也可以自己存储一份offset,防止由于业务代码的问题导致消息没有处理就提交的offset,有自己存储才这一份offset就可以对偏移量进行一个回拨。

为了避免消息丢失,建议使用手动提交偏移量的方式,防止消息的业务逻辑未处理完,提交偏移量后消费者挂了的问题。

enable.auto.commit=false

kafka如何保证消息的顺序传输

我们知道,kafka的消息实际是存在某个topic的partition中的,一个topic有多个partition分区,同一个partition中的消息是有序的,跨partition的消息是无序的。
这个是怎么实现的呢?
因为我们在【Kafka为什么吞吐量大,速度快?】这篇文章里面总结了,kafka写入磁盘时是顺序写的,并且会被分配一个唯一的offset,所以同一个partition保存的数据都是有序的。而在读取消息时,消费者会从该分区最早的offset开始,依次读取消息,保证了消息顺序消费。

具体实现顺序发送消息有两种方式:
1、在使用kafka时,对需要保证顺序消费的topic,只创建一个partition,这样消息就都会顺序的存储到这一个partition中,也就能保证顺序消费了。
2、当一个topic有多个partition时,对需要保证顺序的消息,都发到指定的partition即可,这样也能保证顺序消费。

注:需要注意一点,虽然发送时保证了顺序,也都写到了同一个partition中,但在消费端,也要保证顺序消费,即单线程处理消息。

目前kafka4.0,可以允许一个consumer group下的多个消费者同时消费同一个partition了。
其借助新推出的共享组(Shared Group)特性来达成这一功能,且支持逐条消息确认,从而让消费模式更具灵活性,还能助力提升吞吐量。以往版本默认仅允许一个消费者组内单个消费者消费一个特定分区,当消费者多于分区时,多余消费者会闲置,共享组则可避免出现该类资源浪费情况。

将消息发到指定partition中也有几种方式。
1、发送消息,组装producerRecord时,指定partition

// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要发送消息的topic
String topic ="jimer_topic";
// 发送的消息内容
String message =Hello World!";
// 指定发送消息的分区
int partition =0;// 创建包含分区信息的ProducerRecord
ProducerRecordProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, message);
// 发送消息
producer.send(record);
//关闭Kafka生产者
producer.close();

2、指定消息的key,保证相同key的消息发送到同一个partition

// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要发送消息的topic
String topic ="jimer_topic";
// 发送的消息内容
String message =Hello World";
// 指定发送消息的key
String msg_key = "order_msg";// 创建包含消息key的producerRecord
ProducerRecordProducerRecord<String,String> record = 
new ProducerRecord<>(topic, null,msg_key, message);
// 发送消息
producer.send(record);
//关闭Kafka生产者
producer.close();

3、自定义Partitioner
除了上面两种方式外,还可以自定义指定分区的方式。通过实现Partitioner这个接口,具体实现partition方法,就可以了。具体使用的时候,在初始化Producer时,指定具体的partition实现类即可。
例如:

public class MyPartitioner implatents Partitioner{@Override
public void configure(Map<String,?> configs){// 可以在这里处理和获取分区器的配置参数
}
@0verride
public int partition(String topic, Object key, byte[] keyBytes, 
Object value,byte[] valueBytes,Cluster cluster){int partition =  int ss = new Random().nextInt(2);// 返回分区编号return partition;
}
@0verride
public void close(){// 可以在这里进行一些清理操作
}

使用时

Properties propsProducer = new Properties();propsProducer.put("acks", "all"); // 全部ISR列表中的副本接收成功后返回propsProducer.put("retries", 3);//失败时重试次数propsProducer.put("partitioner.class", "com.jimoer.MyPartitioner"); // 指定自定义分区器类
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(propsProducer);

kafka如何保证消息不重复消费

什么情况下会导致消息被重复消费呢?

1、生产者,生产者可能重复推送了一条消息到kafka,例如:某接口未做幂等处理,接口中会发送kafka消息。
2、kafka,在消费者消费完消息后,提交offset时,kafka突然挂了,导致kafka认为此消息还未消费,又重新推送了该条消息,导致了重复消费消息。
3、消费者,在消费者消费完消息后,提交offset时,Consumer突然宕机挂掉,这个时候,kafka未接收到已处理的offset值,当Consumer恢复后,会重新消费此部分消息。
4、还有一种情况,Kafka 存在 Partition Balance 机制,会将多个 Partition 均衡分配给多个消费者。若 Consumer 在默认 5 分钟内未处理完一批消息,会触发 Rebalance 机制,导致 offset 自动提交失败,重新 Rebalance 后,消费者会从之前未提交的 offset 位置开始消费,从而造成消息重复消费。

那么我们该如何防止消息被重复消费呢

其实上面的1、2、3、4这些情况都可以用幂等机制来防止消息被重复消费。为消息生成 一个唯一标识,并保存到 mysql 或 redis 中,处理消息前先到 mysql 或 redis 中判断该消息是否已被消费过。

但是第4种情况,前提是要先优化消费端处理性能,避免触发 Rebalance,例如:采用异步方式处理消息、缩短单个消息消费时长、调整消息处理超时时间、减少一次性从 Broker 拉取的数据条数等。

kafka什么情况下会发生reblanace(重平衡)

Kafka 的重平衡(Rebalance)是指消费者组(Consumer Group)内的消费者与分区(Partition)之间的分配关系发生重新调整的过程
主要有以下几种情况会触发:
1、消费者组成员数量发生变化。((新消费者的加入或者退出)
2、订阅主题(Topic)数量发生变化。
3、订阅主题的分区(Partition)数发生变化。

还有某些异常情况也会触发Rebalance:
1、消费端处理消息超时,上面我们说到过,若消费者未在设定时间内处理完消息,消费者组会认为当前消费者有问题了,会触发Rebalance,重新分配消息。又或者当前消费者挂了,也是一样会触发Rebalance。
2、组协调器(Group Coordinator)是 Kafka 负责管理消费者组的 Broker 节点。如果它崩溃或者发生故障。Kafka 需要重新选举新的 Group Coordinator ,并进行重平衡。

当Kafka 集群要触发重平衡机制时,大致的步骤如下:
1.暂停消费:在重平衡开始之前,Kafka 会暂停所有消费者的拉取操作,以确保不会出现重平衡期间的消息丢失或重复消费。
2.计算分区分配方案:Kafka 集群会根据当前消费者组的消费者数量和主题分区数量,计算出每个消费者应该分配的分区列表,以实现分区的负载均衡。
3.通知消费者:一旦分区分配方案确定,Kafka 集群会将分配方案发送给每个消费者,告诉它们需要消费的分区列表,并请求它们重新加入消费者组。
4.重新分配分区:在消费者重新加入消费者组后,Kafka 集群会将分区分配方案应用到实际的分区分配中,重新分配主题分区给各个消费者。
5.恢复消费:最后,Kafka 会恢复所有消费者的拉取操作,允许它们消费分配给自己的分区。

http://www.dtcms.com/a/341571.html

相关文章:

  • 攻克PostgreSQL专家认证
  • Git Commit 提交信息标准格式
  • Python打卡Day47 注意力热图可视化
  • 字符设备驱动、块设备驱动和网络设备驱动
  • Gitee仓库 日常操作详细步骤
  • Linux服务器性能优化总结
  • 【数据结构】快速排序算法精髓解析
  • shell脚本——搜索某个目录下带指定前缀的文件
  • 50.Seata-AT模式
  • Cyberduck (FTP和SFTP工具) v9.2.3.43590
  • 189.轮转数组
  • 设计模式的一些笔记
  • list集合可以一边遍历一遍修改元素吗?
  • Rust 入门 包 (二十一)
  • 计算机网络基础复习
  • 【数据分享】295个地级市互联网用户、邮电业务数据(2001-2022)
  • win10安装最新docker 4.44.2版图文教程(2025版)
  • 3.Shell脚本修炼手册之---Shell 变量基础知识
  • Android动画小补充
  • 【Obsidian插件】HiNote
  • 爬虫项目实践之淘宝商品详情数据采集​||电商API接口
  • 结构化 OCR 技术:破解各类检测报告信息提取难题
  • 5.Kotlin作用于函数let、run、with、apply、also
  • SpringCloud微服务架构入门指南
  • Day12--滑动窗口与双指针--2762. 不间断子数组,LCP 68. 美观的花束,2743. 计算没有重复字符的子字符串数量
  • day075-MySQL数据库服务安装部署与基础服务管理命令
  • Unity 开源分享一个轻量路点编辑器插件 常用于对象寻路
  • 在IDEA中DEBUG调试时查看MyBatis-Plus动态生成的SQL语句
  • 数据结构:AVL 树
  • RHCA05-文件系统调优