当前位置: 首页 > news >正文

RabbitMQ:消费者可靠性(消费者确认、消费失败处理、业务幂等性)

目录

  • 一、消费者确认机制
  • 二、消息失败处理
  • 三、业务幂等性


一、消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息。
  • nack:处理消息失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ack处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立即ack,消息会立刻从RabbitMQ中删除,非常不安全,不建议使用。
  • manual:手动模式。需要自己在业务代码中调用API,发送ack或reject,存在业务入侵,但更灵活。
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时自动返回ack。当业务出现异常时,根据异常判断返回不同的结果:
    • 如果是业务异常,会自动返回nack。
    • 如果是消息处理或校验异常,自动返回reject。
spring:rabbitmq:listener:simple:acknowledge-mode: auto

二、消息失败处理

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者。然后再次异常,再次requeue,无限循环,导致MQ的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列。

spring:rabbitmq:listener:simple:retry:enabled: true  # 开启消费者失败重试initial-interval: 1000  # 初始失败等待时长multiplier: 1  # 下次失败等待时长倍数=multiplier * last-intervalmax-attempts: 3  # 最大重试次数stateless: true  # true无状态 false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

在这里插入图片描述
这里推荐的是第三种方案RepublishMessageRecoverer

  1. 首先,定义接收失败消息的交换机、队列及其绑定关系。
  2. 然后,定义RepublishMessageRecoverer
package com.ming.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息处理失败策略*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")  // 当配置文件中配置了该属性,才能生效
public class ErrorConfiguration {private static final String ERROR_EXCHANGE_NAME = "mt.error.direct";private static final String ERROR_QUEUE_NAME = "error.queue";private static final String ERROR_ROUTING_KEY = "error";@Beanpublic DirectExchange errorExchange() {return ExchangeBuilder.directExchange(ERROR_EXCHANGE_NAME).durable(true).build();}@Beanpublic Queue errorQueue() {return QueueBuilder.durable(ERROR_QUEUE_NAME).build();}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(ERROR_ROUTING_KEY);}/*** 失败消息处理策略* @return MessageRecoverer*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE_NAME, ERROR_ROUTING_KEY);}
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常发挥nack。
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,并交由人工处理。

三、业务幂等性

幂等:是一个数学概念。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。比如:根据ID查询数据、根据ID删除数据。

唯一消息ID,是给每个消息都设置一个唯一ID,利用ID区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
package com.ming.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ 基础配置*/
@Configuration
public class RabbitmqConfig {/*** 序列化RabbitMQ的消息* @return MessageConverter*/@Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setCreateMessageIds(true);return messageConverter;}
}
http://www.dtcms.com/a/342491.html

相关文章:

  • RabbitMQ面试精讲 Day 26:RabbitMQ监控体系建设
  • 1. 准备工作---数据分析编程 - 从入门到精通
  • uniapp 自定义组件封装、easycom匹配规则
  • Go语言变量声明与初始化详解
  • TDengine IDMP 运维指南(管理策略)
  • CRII-Net
  • 【领码课堂】让Java数据检索更智能——Bean Searcher全景解读
  • 从”0“开始学JAVA——第九节下 泛型和集合框架
  • #运维 | 前端 # Linux http.server 实践:隐藏长文件名,简短路径 (http://IP:port/别名 ) 访问
  • AI研究引擎的简单技术实现步骤
  • Web 安全之 HTTP 响应截断攻击详解
  • JavaScript 系列之:图片压缩
  • 微信小程序设计的请求封装方案(request.js)
  • NPM模块化总结
  • DINOv3 重磅发布
  • 计算机网络技术学习-day6《三层交换机配置》
  • python发布文章和同步文章到社区的工具小脚本
  • 第三阶段数据库-6:sql中函数,多表查询,运算符,索引,约束
  • 智慧城管云平台源码,微服务vue+element+springboot+uniapp技术架构,数字化综合执法办案系统
  • 数据结构之排序大全(4)
  • 苷类成分通过 PI3K/AKT 信号通路促进内皮祖细胞来源外泌体修复受损血管内皮
  • 基于YOLO11的茶叶病害智能检测系统
  • 组态软件——工业监控“大脑”
  • leetcode-python-242有效的字母异位词
  • 代码随线录刷题Day39
  • 【uni-app】自定义导航栏以及状态栏,胶囊按钮位置信息的获取
  • Java的运行时数据区
  • Notepad++换行符替换
  • 机器学习——AdaBoost算法
  • 基于YOLO11的水稻叶片病害检测项目