当前位置: 首页 > news >正文

【Kafka】重点概念和架构总结

【Kafka】重点概念和架构总结

  • 【一】Kafka 核心概念与作用
    • 【1】基本概念
    • 【2】分区 (Partitions)
      • (1)作用与重要性​:
      • (2)关键特性​:
      • (3)副本 (Replicas)
      • (4)偏移量 (Offset)
      • (5)代理 Broker 和集群
      • (6)ZooKeeper / KRaft
  • 【二】Kafka 架构设计
    • 【1】整体架构
    • 【2】写入流程
    • 【3】读取流程
    • 【4】存储设计
  • 【三】高性能高可用配置
    • 【1】生产者配置优化
    • 【2】消费者配置优化
    • 【3】Broker配置优化
    • 【4】主题级别配置
  • 【四】Kafka集群搭建
    • (1)单机多Broker集群(开发环境)
    • 【2】多节点生产集群
    • 【3】集群验证和管理
  • 【五】Spring Boot整合Kafka配置详解
    • 【1】application.yml配置
    • 【2】配置项详解
      • (1)​生产者关键配置​:
      • (2)​消费者关键配置​:
      • (3)​监听器关键配置​:
  • 【六】注意事项
    • 【1】生产环境注意事项
    • 【2】消费者注意事项
    • 【3】生产者注意事项
    • 【4】Spring集成注意事项
    • 【5】常见问题处理
  • 【七】常见问题解决方案
    • 【1】Kafka 如何保证消息的消费顺序?
    • 【2】Kafka 如何保证消息不丢失?
      • (1)生产者丢失消息的情况
      • (2)消费者丢失消息的情况
      • (3)Kafka 弄丢了消息
        • 1-设置 acks = all
        • 2-设置 replication.factor >= 3
        • 3-设置 min.insync.replicas > 1
        • 4-设置 unclean.leader.election.enable = false
    • 【3】Kafka 如何保证消息不重复消费?
      • (1)kafka 出现消息重复消费的原因
      • (2)解决方案
    • 【4】Kafka 重试机制
      • (1)消费失败会怎么样?
      • (2)默认会重试多少次?
      • (3)如何自定义重试次数以及时间间隔?
      • (4)如何在重试失败后进行告警?
      • (5)重试失败后的数据如何再次处理?

【一】Kafka 核心概念与作用

在这里插入图片描述

【1】基本概念

(1)​消息 (Message)​​:Kafka 中的基本数据单元,包含键、值和时间戳
(2)主题 (Topic)​​:消息的逻辑分类,相当于数据库中的表
(3)生产者 (Producer)​​:向主题发布消息的客户端
(4)消费者 (Consumer)​​:从主题订阅并处理消息的客户端
(5)消费者组 (Consumer Group)​​:一组共同消费一个或多个主题的消费者

【2】分区 (Partitions)

Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。

(1)作用与重要性​:

(1)并行处理基础​:每个分区可以独立处理,允许水平扩展
(2)消息顺序保证​:同一分区内的消息保持顺序性(FIFO)
(3)负载均衡​:将数据分布到多个Broker上,提高吞吐量
(4)消费者扩展​:分区数决定了一个消费者组的最大并发消费者数量

(2)关键特性​:

(1)分区数量在创建主题时指定,后期可增加但不能减少
(2)消息通过分区器(Partitioner)决定写入哪个分区
(3)默认分区策略:键哈希(相同键的消息进入同一分区)或轮询

(3)副本 (Replicas)

​(1)作用与重要性​:
1-数据高可用​:防止单点故障导致数据丢失
2-读写分离​:副本可以处理读请求,分担主分区压力
​(2)副本类型​:
1-Leader副本​:处理所有读写请求
2-Follower副本​:从Leader异步复制数据,作为备份

(3)ISR (In-Sync Replicas)​​:
1-与Leader保持同步的副本集合
2-只有ISR中的副本有资格被选为新的Leader

(4)介绍
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。

(4)偏移量 (Offset)

​作用与重要性​:
(1)消息位置标识​:每个分区中的消息都有唯一的顺序ID
(2)消费进度跟踪​:消费者通过提交offset来记录消费位置
(3)消息重放能力​:可以指定offset重新消费历史消息

offset管理​:
(1)自动提交:定期自动提交已消费消息的offset
(2)手动提交:消费者控制提交时机,确保"至少一次"或"恰好一次"语义

(5)代理 Broker 和集群

(1)Broker​:Kafka服务器实例,负责消息存储和转发。可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
(2)集群​:多个Broker组成的分布式系统
(3)Controller​:集群中负责管理分区和副本的特殊Broker

(6)ZooKeeper / KRaft

(1)ZooKeeper​(传统):管理集群元数据、Broker注册、Leader选举
(2)KRaft​(新):Kafka自带的元数据管理系统,消除对ZooKeeper的依赖

作用
(1)Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
(2)Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1
(3)负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

【二】Kafka 架构设计

【1】整体架构

【2】写入流程

(1)Producer根据分区策略选择目标分区
(2)消息被追加到分区的Leader副本
(3)Leader将消息复制到所有Follower副本
(4)收到足够副本的确认后,向Producer返回成功

【3】读取流程

(1)Consumer向Broker发送拉取请求
(2)Broker返回指定offset之后的消息
(3)Consumer处理消息并提交新的offset
(4)Consumer定期心跳表明存活状态

【4】存储设计

(1)​分段日志​:每个分区被分为多个segment文件
(2)索引文件​:每个segment有对应的偏移量和时间戳索引
(3)零拷贝技术​:使用sendfile系统调用提高数据传输效率

【三】高性能高可用配置

【1】生产者配置优化

# 高吞吐配置
acks=1                          # 平衡吞吐和可靠性
compression.type=snappy         # 消息压缩
linger.ms=5                     # 批量发送等待时间
batch.size=16384                # 批量大小# 高可靠性配置
acks=all                        # 需要所有ISR确认
max.in.flight.requests.per.connection=1  # 保证顺序
enable.idempotence=true         # 启用幂等性
retries=10                      # 重试次数

【2】消费者配置优化

# 性能配置
fetch.min.bytes=1               # 最小拉取字节数
fetch.max.wait.ms=500           # 拉取等待时间
max.poll.records=500            # 单次拉取最大记录数# 可靠性配置
enable.auto.commit=false         # 手动提交offset
auto.offset.reset=latest         # 无offset时从最新开始
isolation.level=read_committed   # 只读取已提交的消息

【3】Broker配置优化

# 存储配置
num.partitions=3                 # 默认分区数
default.replication.factor=3    # 默认副本数
min.insync.replicas=2           # 最小同步副本数
log.retention.hours=168         # 数据保留时间# 网络配置
num.network.threads=3           # 网络线程数
num.io.threads=8                # IO线程数
socket.send.buffer.bytes=102400 # 发送缓冲区大小
socket.receive.buffer.bytes=102400 # 接收缓冲区大小

【4】主题级别配置

# 创建高性能主题
kafka-topics.sh --create \--topic high-throughput-topic \--partitions 6 \--replication-factor 3 \--config retention.ms=604800000 \--config segment.bytes=1073741824

【四】Kafka集群搭建

(1)单机多Broker集群(开发环境)

# broker0.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs-0
num.partitions=3# broker1.properties  
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
num.partitions=3# broker2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
num.partitions=3

启动命令:

kafka-server-start.sh config/broker0.properties
kafka-server-start.sh config/broker1.properties  
kafka-server-start.sh config/broker2.properties

【2】多节点生产集群

每个节点配置:

# 节点特有配置
broker.id=唯一ID
listeners=PLAINTEXT://主机IP:9092
log.dirs=/data/kafka/logs# 集群通用配置
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

【3】集群验证和管理

# 查看集群状态
kafka-topics.sh --describe --bootstrap-server localhost:9092# 创建主题
kafka-topics.sh --create \--topic test-topic \--partitions 3 \--replication-factor 3 \--bootstrap-server localhost:9092# 生产消息
kafka-console-producer.sh \--topic test-topic \--bootstrap-server localhost:9092# 消费消息  
kafka-console-consumer.sh \--topic test-topic \--from-beginning \--bootstrap-server localhost:9092

【五】Spring Boot整合Kafka配置详解

【1】application.yml配置

spring:kafka:# 集群配置bootstrap-servers: localhost:9092,localhost:9093,localhost:9094# 生产者配置producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3properties:linger.ms: 5batch.size: 16384compression.type: snappy# 消费者配置  consumer:group-id: my-spring-groupauto-offset-resit: earliestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:max.poll.records: 500session.timeout.ms: 30000# 监听器配置listener:ack-mode: manualconcurrency: 3

【2】配置项详解

(1)​生产者关键配置​:

(1)acks:消息确认机制(0:不等待,1:Leader确认,all:所有ISR确认)
(2)retries:发送失败时的重试次数
(3)batch.size和linger.ms:控制批量发送行为
(4)compression.type:消息压缩算法(none, gzip, snappy, lz4)

(2)​消费者关键配置​:

(1)auto-offset-reset:无偏移量时策略(earliest, latest, none)
(2)enable-auto-commit:是否自动提交偏移量
(3)max-poll-records:单次拉取最大消息数
(4)session.timeout.ms:消费者会话超时时间

(3)​监听器关键配置​:

(1)ack-mode:确认模式(manual, manual_immediate, etc)
(2)concurrency:并发消费者数量,通常设置为分区数

【六】注意事项

【1】生产环境注意事项

(1)​分区数规划​:提前评估吞吐量需求,分区数后期只能增加不能减少
(2)副本配置​:生产环境至少3个副本,min.insync.replicas=2
(3)监控告警​:监控Lag、吞吐量、错误率等关键指标
(4)容量规划​:预留足够的磁盘空间和网络带宽

【2】消费者注意事项

(1)避免重复消费​:确保消息处理是幂等的
(2)合理设置poll间隔​:避免频繁poll导致吞吐量下降
(3)处理Rebalance​:在消费者加入或离开时正确处理状态
(4)偏移量管理​:根据业务需求选择自动或手动提交

【3】生产者注意事项

(1)​错误处理​:实现重试逻辑和死信队列机制
(2)​顺序性保证​:需要严格顺序时设置max.in.flight.requests.per.connection=1
(3)​批量优化​:根据延迟要求调整linger.ms和batch.size
(4)​压缩选择​:根据CPU和网络带宽权衡选择压缩算法

【4】Spring集成注意事项

(1)​序列化错误​:配置ErrorHandler处理序列化失败的消息
(2)​并发设置​:消费者并发数不要超过分区数
(3)​事务支持​:需要Exactly-Once语义时启用事务支持
(4)​健康检查​:集成Spring Actuator监控Kafka健康状态

【5】常见问题处理

(1)​消息积压​:增加消费者实例或提高处理能力
(2)​数据丢失​:检查acks配置和副本同步机制
(3)​性能瓶颈​:分析网络、磁盘IO或CPU瓶颈
(4)​连接问题​:确保防火墙和网络配置正确

【七】常见问题解决方案

【1】Kafka 如何保证消息的消费顺序?

我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:

(1)更改用户会员等级。
(2)根据会员等级计算订单价格。

假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。

在这里插入图片描述
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
(1) 1 个 Topic 只对应一个 Partition。
(2)(推荐)发送消息的时候指定 key/Partition。

【2】Kafka 如何保证消息不丢失?

(1)生产者丢失消息的情况

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendResult.getProducerRecord().value().toString());
}

但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

如果消息发送失败的话,我们检查失败的原因之后重新发送即可!

另外,这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。

(2)消费者丢失消息的情况

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

(3)Kafka 弄丢了消息

Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

1-设置 acks = all

解决办法就是我们设置 acks = all。

acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。

当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高

2-设置 replication.factor >= 3

为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

3-设置 min.insync.replicas > 1

一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。

4-设置 unclean.leader.election.enable = false

Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false

我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

【3】Kafka 如何保证消息不重复消费?

(1)kafka 出现消息重复消费的原因

服务端侧已经消费的数据没有成功提交 offset(根本原因)。Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

(2)解决方案

(1)消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。

(2)将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适?
1-处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
2-拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。

【4】Kafka 重试机制

(1)消费失败会怎么样?

在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。

(2)默认会重试多少次?

看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,判断是否重试的逻辑:

Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。

(3)如何自定义重试次数以及时间间隔?

默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();// 自定义重试时间间隔以及次数FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));factory.setConsumerFactory(consumerFactory);return factory;
}

(4)如何在重试失败后进行告警?

自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。

@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {public DelErrorHandler(FixedBackOff backOff) {super(null,backOff);}@Overridepublic void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {super.handleRemaining(thrownException, records, consumer, container);log.info("重试多次失败");// 自定义操作}
}

DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。

(5)重试失败后的数据如何再次处理?

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。

@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。

// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(attempts = "5",backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {log.info("kafka customer:{}", message);Integer n = Integer.parseInt(message);if (n % 5 == 0) {throw new RuntimeException();}System.out.println(n);
}

当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。

http://www.dtcms.com/a/352733.html

相关文章:

  • Unity 串口通信
  • 解开 Ansible 任务复用谜题:过滤器用法、Include/Import 本质差异与任务文件价值详解
  • Writer-你的私人内容创作助手
  • TCP并发服务器构建
  • TensorFlow 深度学习 | Layer 基础知识介绍
  • 浅谈Elasticsearch数据写入流程的refresh和flush操作
  • 智能一卡通系统通过集成身份识别、权限管理、数据联动等技术,实现多场景一体化管理。以下是多奥基于最新技术趋势和应用案例的系统解析
  • screen命令
  • AI一周事件(2025年8月20日-8月26日)
  • 74hc4094芯片点亮LED闪烁问题的解决
  • JS(面试)
  • 深度学习——激活函数
  • 碳化硅衬底 TTV 厚度不均匀性测量的特殊采样策略
  • Redis哨兵机制:高可用架构的守护神!⚔️ 主从秒级切换实战指南
  • 力扣LCP 46. 志愿者调配随笔
  • 基于Spring Boot+Vue的生活用品购物平台/在线购物系统/生活用户在线销售系统/基于javaweb的在线商城系统
  • 微生产力革命:AI解决生活小任务分享会
  • AI 解决生活小事 2——用 AI 做一回新闻播客
  • 解决mac brew4.0安装速度慢的问题
  • 卫星轨道动力学基本理论
  • 精品短剧《奔腾的心》正式开机,以匠心描绘新时代西藏故事
  • 深入解析达梦数据库:模式分类、状态管理与实操指南
  • 21款m1 max升级到macOS 13——Ventura
  • ModuleNotFoundError: No module named ‘dbgpt_app‘
  • 【开源工具】基于Flask与Socket.IO的跨平台屏幕监控系统实战(附完整源码)
  • 宠物智能手机PetPhone技术解析:AI交互与健康监测的系统级创新
  • 设计模式与设计原则简介——及其设计模式学习方法
  • 【Java】异常处理:从入门到精通
  • `open()` 系统调用详解
  • Day7--HOT100--54. 螺旋矩阵,48. 旋转图像,240. 搜索二维矩阵 II