当前位置: 首页 > news >正文

Acknowledgment.nack方法重试消费kafka消息异常

文章目录

  • 问题
    • 示例
    • 异常
  • 原因
    • nack方法
    • Acknowledgment接口
    • 实现类:ConsumerAcknowledgment
    • 实现类:ConsumerBatchAcknowledgment
  • 解决方案
    • 1 批量消费指定index
      • 示例
    • 2 单条消费
      • 示例

问题

使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。

示例


  public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理多条消息
            acknowledgment.acknowledge(); // 成功处理后提交偏移量
        } catch (Exception e) {
            // 消息处理失败,30min后重试
            // nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。
            acknowledgment.nack(30 * 60 * 1000); // 这里报了异常
        }
    }
}

上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。

nack(sleep) is not supported by this Acknowledgment

异常

在这里插入图片描述
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有两个nack方法:nack(long sleep)nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。

nack方法

注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。

Acknowledgment接口

在这里插入图片描述

实现类:ConsumerAcknowledgment

在这里插入图片描述

实现类:ConsumerBatchAcknowledgment

在这里插入图片描述

这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。

而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。

也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。

解决方案

1 批量消费指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        int index = 0;
        try {
            for (; index < data.size(); index++) {
                messageHandler.handle(data.get(index)); // 处理单条消息
            }
            // 成功处理后提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 消息处理失败,30min后重试index及index之后的消息
            acknowledgment.nack(index, 30 * 60 * 1000);
        }
    }

}

2 单条消费

改成单条消费消息,调用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public SingleCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理单条消息
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(30 * 60 * 1000);
        }
    }
http://www.dtcms.com/a/63158.html

相关文章:

  • 【算法】经典排序算法介绍+代码示例
  • NAFNet:Simple Baselines for Image Restoration
  • 懒加载(Lazy Loading):原理、实现与优化策略
  • Windows系统编程项目(四)窗口管理器
  • 典型相关分析(CCA)探索多维数据间的深层关系:基于Matlab
  • 第一章:像素学徒的觉醒
  • 网络信息安全专业(710207)网络安全攻防实训室建设方案
  • HTML <head> 元素详解:网页头部的关键组成部分
  • rpmlib(SetVersions) is needed by can-uilts-v2019.00.0-alt1.aarch64
  • uniapp简单table表
  • vue项目的琐碎点
  • Vue | 开学第一课!零基础教程
  • PostgreSQL17(最新版)安装部署
  • 【三维重建】Proc-GS:使用3DGS的程序性城市建筑生成
  • c++介绍信号六
  • react基础语法视图层类组件
  • 计算机毕业设计:驾校综合信息系统
  • 基于SpringBoot实现旅游酒店平台功能八
  • MCU与SFU:实时音视频通信架构的对比
  • 使用STM32CubeMX配置定时器中断实现LED每秒闪烁一次(STM32G070CBT6)
  • 【Yonghong 企业日常问题07 】 东方通TongWeb替代Tomcat的实战指南!
  • ubuntu中用docker下载opengauss
  • 利用 ECB 加密 json并压测接口,输出测试报告
  • 计算机网络基础:PKI(公钥基础设施)
  • vue el-select 省市区三级联动 vue + element-ui使用第三方插件实现省市区三级联动
  • BambuStudio学习笔记:MinAreaBoundigBox
  • 远程手机遥控开关原理及应用
  • 如何安全处置旧设备?
  • Java 无 GUI 浏览器:HtmlUnit 入门及实战 [特殊字符]
  • Linux笔记