当前位置: 首页 > news >正文

记录一次Kafka重复消费的问题

不讲道理,先抛问题

日志发现Kafka同一个消费者在一段时间内对同一条消息多次消费。
请添加图片描述
在这里插入图片描述
在这里插入图片描述

原理不讲,先上配置

# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
spring.kafka.consumer.auto-offset-reset=latest
# Rebalance 超时
spring.kafka.consumer.properties.max.poll.interval.ms=300000  # 5 分钟 

三言两语,背景简介

Kafka一个生产者,一个消费者,消费同一个Topic,但是其中的某些消息处理耗时超过5分钟。

捕获问题,深度剖析

Kafka自动提交offset后因默认max.poll.interval.ms设置5分钟没有调用poll()从而发生Reblance重复消费的问题。

解决方案,横向对比

  1. offset自动提交改为手动提交
    spring.kafka.consumer.enable-auto-commit=false
@KafkaListener(topics = "generateYyVoucher-topic", groupId = "defaultConsumerGroup")
public void generateYyVoucher(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        ...
        // 提交 Offset
        ack.acknowledge();
    } catch (Exception e) {
        log.error("Message processing failed: ", e);
        // 如果处理失败,Offset 不会被提交
    }
}

2.增加max.poll.interval.ms延迟
spring.kafka.consumer.properties.max.poll.interval.ms=900000 # 设置为15分钟

总结:但是上述两种方案均不能从根本上解决消费者重复消费的问题!根本问题是在于Reblance消费重组原因导致!
第1种只能解决offset偏移量不会重发消费当前消息,但可能会消费上一个消息;
第2种增加延迟,当业务逻辑超过设置时间时仍然会重复消费。

重复消费,最佳解决方案

幂等消费标识(唯一标识)

        // 幂等消费标识(唯一标识),以解决Kafka自动提交offset后因默认max.poll.interval.ms设置5分钟没有调用poll()从而发生Reblance重复消费的问题
        String redisKey = "voucher:processing:" + ledgerId;
        // 检查 Redis 中是否已存在该幂等标识
        if (stringRedisTemplate.hasKey(redisKey)) {
            return;
        }
        // 设置 Redis 中的标识为正在处理中(可以设置一个有效期,比如 30 分钟)
        stringRedisTemplate.opsForValue().set(redisKey, "processing", 30, TimeUnit.MINUTES);

氪肝提示,不是温馨

面试八股文准备再多的中间件问题,也不如真正项目上实际遇到的问题。中间件也不是每个都要用,只关注项目上使用的就行。只有真正经历解决过一两个实际问题,才能了解中间件。

相关文章:

  • Mysql并发事务带来哪些问题?
  • Windows 10 系统下配置Flutter开发环境,保姆级教程冢铖2023-02-17 09:56广东
  • 26考研——图_图的基本概念(6)
  • VSCode中操作gitee
  • R语言ggplot2散点形状和填充
  • C++语法学习的主要内容
  • Spring 循环依赖
  • python并发爬虫
  • 基于Spring Boot的个性化商铺系统的设计与实现(LW+源码+讲解)
  • 数据结构day04
  • 爱普生VG3225EFN压控晶振5G基站低噪声的解决方案
  • windows下面nginx配置及测试
  • 网络安全之vlan实验
  • 接口/UI自动化面试题
  • Springboot整合elasticsearch详解 封装模版 仓库方法 如何在linux里安装elasticsearch
  • 八股——Mysql篇
  • WebAssembly实践,性能也有局限性
  • 小白工具PDF转换 PDF转图片 超便捷软件 文件格式转换 简单好用效率高
  • 新手村:逻辑回归-理解04:熵是什么?
  • 第五天 开始Unity Shader的学习之旅之Unity中的基础光照之漫反射光照模型
  • 怎么申请公司网址/宁波seo排名外包
  • 衡水注册公司流程和费用/独立站seo
  • 青海网站制作的公司/东莞做网站排名优化推广
  • 电子商务网站建设与运营/推广普通话的意义50字
  • 素马杭州网站设计介绍/google chrome 网络浏览器
  • 珠海华中建设工程有限公司网站/百度推广代理公司广州