当前位置: 首页 > news >正文

Acknowledgment.nack方法重试消费kafka消息异常

文章目录

  • 问题
    • 示例
    • 异常
  • 原因
    • nack方法
    • Acknowledgment接口
    • 实现类:ConsumerAcknowledgment
    • 实现类:ConsumerBatchAcknowledgment
  • 解决方案
    • 1 批量消费指定index
      • 示例
    • 2 单条消费
      • 示例

问题

使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。

示例


  public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理多条消息
            acknowledgment.acknowledge(); // 成功处理后提交偏移量
        } catch (Exception e) {
            // 消息处理失败,30min后重试
            // nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。
            acknowledgment.nack(30 * 60 * 1000); // 这里报了异常
        }
    }
}

上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。

nack(sleep) is not supported by this Acknowledgment

异常

在这里插入图片描述
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有两个nack方法:nack(long sleep)nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。

nack方法

注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。

Acknowledgment接口

在这里插入图片描述

实现类:ConsumerAcknowledgment

在这里插入图片描述

实现类:ConsumerBatchAcknowledgment

在这里插入图片描述

这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。

而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。

也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。

解决方案

1 批量消费指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        int index = 0;
        try {
            for (; index < data.size(); index++) {
                messageHandler.handle(data.get(index)); // 处理单条消息
            }
            // 成功处理后提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 消息处理失败,30min后重试index及index之后的消息
            acknowledgment.nack(index, 30 * 60 * 1000);
        }
    }

}

2 单条消费

改成单条消费消息,调用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public SingleCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理单条消息
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(30 * 60 * 1000);
        }
    }

相关文章:

  • 【算法】经典排序算法介绍+代码示例
  • NAFNet:Simple Baselines for Image Restoration
  • 懒加载(Lazy Loading):原理、实现与优化策略
  • Windows系统编程项目(四)窗口管理器
  • 典型相关分析(CCA)探索多维数据间的深层关系:基于Matlab
  • 第一章:像素学徒的觉醒
  • 网络信息安全专业(710207)网络安全攻防实训室建设方案
  • HTML <head> 元素详解:网页头部的关键组成部分
  • rpmlib(SetVersions) is needed by can-uilts-v2019.00.0-alt1.aarch64
  • uniapp简单table表
  • vue项目的琐碎点
  • Vue | 开学第一课!零基础教程
  • PostgreSQL17(最新版)安装部署
  • 【三维重建】Proc-GS:使用3DGS的程序性城市建筑生成
  • c++介绍信号六
  • react基础语法视图层类组件
  • 计算机毕业设计:驾校综合信息系统
  • 基于SpringBoot实现旅游酒店平台功能八
  • MCU与SFU:实时音视频通信架构的对比
  • 使用STM32CubeMX配置定时器中断实现LED每秒闪烁一次(STM32G070CBT6)
  • 北京建设协会网站/市场营销毕业后做什么工作
  • 建设银行开县支行 网站/网站seo平台
  • 把网站内的文本保存到txt怎么做/超级外链工具有用吗
  • 网站布局优化策略/百度营销登录
  • 电子商务营销理论/南宁seo咨询
  • 定安住房和城乡建设局网站/外贸网站大全