当前位置: 首页 > wzjs >正文

厦门网站建设 软件园济南网络seo公司

厦门网站建设 软件园,济南网络seo公司,西安网页公司,高端定制网站开发文章目录 问题示例异常 原因nack方法Acknowledgment接口实现类:ConsumerAcknowledgment实现类:ConsumerBatchAcknowledgment 解决方案1 批量消费指定index示例 2 单条消费示例 问题 使用BatchAcknowledgingMessageListener 批量消费Kafka消息&#xff0…

文章目录

  • 问题
    • 示例
    • 异常
  • 原因
    • nack方法
    • Acknowledgment接口
    • 实现类:ConsumerAcknowledgment
    • 实现类:ConsumerBatchAcknowledgment
  • 解决方案
    • 1 批量消费指定index
      • 示例
    • 2 单条消费
      • 示例

问题

使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 处理多条消息acknowledgment.acknowledge(); // 成功处理后提交偏移量} catch (Exception e) {// 消息处理失败,30min后重试// nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。acknowledgment.nack(30 * 60 * 1000); // 这里报了异常}}
}

上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。

nack(sleep) is not supported by this Acknowledgment

异常

在这里插入图片描述
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有两个nack方法:nack(long sleep)nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。

nack方法

注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。

Acknowledgment接口

在这里插入图片描述

实现类:ConsumerAcknowledgment

在这里插入图片描述

实现类:ConsumerBatchAcknowledgment

在这里插入图片描述

这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。

而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。

也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。

解决方案

1 批量消费指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {int index = 0;try {for (; index < data.size(); index++) {messageHandler.handle(data.get(index)); // 处理单条消息}// 成功处理后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 消息处理失败,30min后重试index及index之后的消息acknowledgment.nack(index, 30 * 60 * 1000);}}}

2 单条消费

改成单条消费消息,调用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {private MessageHandler messageHandler;public SingleCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 处理单条消息acknowledgment.acknowledge();} catch (Exception e) {acknowledgment.nack(30 * 60 * 1000);}}
http://www.dtcms.com/wzjs/383404.html

相关文章:

  • 大连市人民政府门户网站seo排名系统
  • 百度一直不收录网站建立自己的网站
  • 用自己主机做网站视频站长统计入口
  • 网站建设都需学哪些百度学术论文官网入口
  • 网站建设条件学企业管理培训班
  • 做网站要买什么空间广州网站建设方案维护
  • 打开网站8秒原则保定seo网络推广
  • wordpress文章更新seo外包公司兴田德润
  • 东莞市做网站的公司seo课程哪个好
  • 辽宁省建设工程信息网官网电话百度怎么做关键词优化
  • 网站集成微信登陆c++线上培训机构哪个好
  • php网站验证码错误重庆seo排名软件
  • 个人网站模板html下载友情链接交换的作用在于
  • 做网站拍摄照片用什么佳能相机好网络营销创意案例
  • 绍兴做团购的网站qq群推广平台
  • 建wiki网站产品互联网营销推广
  • 小说网站有源码了该怎么做今天的新闻最新消息
  • 提交图片的网站要怎么做网页搭建
  • 江津区做网站友情链接推广平台
  • 怎么做app下载网站什么软件可以发布推广信息
  • 网站推广方法有几种百度大数据官网入口
  • wordpress全屏主题指定关键词seo报价
  • 付费下载网站源码360信息流广告平台
  • 网络营销的网站建设报告怎么做属于自己的网站
  • 做神马网站优化快邀请推广app
  • 什么是seo搜索西安网站排名优化培训
  • 唐山住房城乡建设局门户网站什么是网络营销
  • 系统网站怎么做的开发小程序
  • 网站开发工程师社交可以放友情链接的网站
  • 广州网站建设 seo在线网站流量查询