当前位置: 首页 > news >正文

rabbitMQ延时队列实现,怎么保证消息的幂等

一、RabbitMQ 延时队列实现方式

  1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
    这是最常用的实现方式,核心思路是:
    (1)消息设置过期时间(TTL)
    (2)消息过期后进入绑定的死信队列
    (3)消费者监听死信队列,实现延时消费
// 1. 配置交换机和队列
@Configuration
public class DelayQueueConfig {// 普通交换机(用于接收原始消息)public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列(消息过期后会进入死信队列)public static final String NORMAL_QUEUE = "normal_queue";// 死信队列(实际消费的队列)public static final String DEAD_QUEUE = "dead_queue";// 声明普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}// 声明死信交换机@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}// 声明普通队列(设置死信相关参数)@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信路由键args.put("x-dead-letter-routing-key", "dead_routing_key");// 队列消息统一过期时间(可选,也可在发送消息时单独设置)// args.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();}// 声明死信队列@Beanpublic Queue deadQueue() {return QueueBuilder.durable(DEAD_QUEUE).build();}// 绑定普通队列和普通交换机@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_key");}// 绑定死信队列和死信交换机@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_routing_key");}
}// 2. 发送延时消息(设置消息级别的TTL)
@Service
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayMillis) {// 设置消息过期时间rabbitTemplate.convertAndSend(DelayQueueConfig.NORMAL_EXCHANGE,"normal_routing_key",message,correlationData -> {correlationData.getMessageProperties().setExpiration(String.valueOf(delayMillis));return correlationData;});}
}// 3. 消费死信队列消息(延时后的消息)
@Service
public class DelayMessageReceiver {@RabbitListener(queues = DelayQueueConfig.DEAD_QUEUE)public void receiveDelayMessage(String message) {System.out.println("收到延时消息:" + message + ",时间:" + new Date());}
}
  1. 基于 RabbitMQ 插件(rabbitmq_delayed_message_exchange)
    更推荐的方式,需先安装插件:
    (1)下载对应版本的 rabbitmq_delayed_message_exchange 插件
    (2)启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configuration
public class DelayedExchangeConfig {// 延时交换机(类型必须是 x-delayed-message)@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定转发类型return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed_queue").build();}@Beanpublic Binding delayedBinding() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();}
}// 发送延时消息
@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayedMessage(String message, long delayMillis) {rabbitTemplate.convertAndSend("delayed_exchange","delayed_routing_key",message,correlationData -> {// 设置延时时间(毫秒)correlationData.getMessageProperties().setHeader("x-delay", delayMillis);return correlationData;});}
}

二、保证消息幂等性的方案
消息幂等性指:同一条消息被多次消费时,结果是一致的,不会重复处理。常见实现方式:

  1. 基于唯一 ID + Redis / 数据库去重
    (1)发送消息时生成唯一 ID(如 UUID)
    (2)消费前检查该 ID 是否已处理
    (3)处理完成后标记该 ID 为已处理
@Service
public class IdempotentMessageReceiver {@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "delayed_queue")public void receiveMessage(Message message) {// 1. 获取消息唯一ID(假设放在消息头)String messageId = message.getMessageProperties().getMessageId();if (StringUtils.isEmpty(messageId)) {// 非法消息,直接拒绝throw new AmqpRejectAndDontRequeueException("消息ID为空");}// 2. 检查是否已处理(Redis分布式锁保证原子性)String key = "message:processed:" + messageId;Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);if (Boolean.FALSE.equals(isFirst)) {// 已处理过,直接返回System.out.println("消息已处理,ID:" + messageId);return;}// 3. 处理消息业务逻辑String content = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("处理消息:" + content);}
}
  1. 基于业务唯一标识去重
    如果消息没有全局 ID,可使用业务字段组合作为唯一标识(如订单号):
// 例如处理订单支付消息,用订单号作为唯一标识
String orderNo = extractOrderNo(content); // 从消息中提取订单号
String key = "order:processed:" + orderNo;
// 后续逻辑同上(检查Redis -> 处理业务)
  1. 数据库唯一约束
    通过数据库唯一索引实现幂等:
@Transactional
public void processOrder(String orderNo) {// 插入记录前检查,或直接插入(利用唯一索引报错)try {orderMapper.insert(new Order(orderNo)); // 假设orderNo有唯一索引// 处理订单逻辑} catch (DuplicateKeyException e) {// 已处理过,忽略log.info("订单已处理:{}", orderNo);}
}

总结
1.延时队列实现:
(1)简单场景用 TTL + 死信队列
(2)生产环境推荐用 rabbitmq_delayed_message_exchange 插件(更可靠)
2.幂等性保证核心:
(1)为消息生成唯一标识(全局 ID 或业务唯一键)
(2)消费前检查标识是否已处理(Redis / 数据库)
(3)确保检查和标记操作的原子性(分布式锁 / 事务)
这两种机制结合,可实现可靠的延时任务处理。

http://www.dtcms.com/a/357888.html

相关文章:

  • HTML 核心元素实战:超链接、iframe 框架与 form 表单全面解析
  • 【WDG协议栈】AUTOSAR架构下WDG模块软硬件功能详解
  • 基于单片机指纹考勤系统/智能考勤
  • ⸢ 叁 ⸥ ⤳ 默认安全:概述与建设思路
  • 【Day 33】Linux-MySQL 备份与恢复详解
  • 从分子工具到技术革新:链霉亲和素 - 生物素系统与 M13 噬菌体展示的交叉应用解析
  • 针对 “TCP 数据传输机制” 的攻击
  • vue2下拉菜单
  • 服务器托管多少钱一年?服务器托管收费标准
  • C++day2作业
  • TuringComplete游戏攻略(2.2存储器)
  • 【C++】类和对象(终章)
  • 数值分析——误差的来源与分类、误差的基本概念(绝对误差、相对误差、有效数字)
  • 世界模型的典型框架与分类
  • react性能优化有哪些
  • 卷积神经网络项目:基于CNN实现心律失常(ECG)的小颗粒度分类系统
  • 拆解《AUTOSAR Adaptive Platform Core》(Core.pdf)—— 汽车电子的 “基础技术说明书”
  • 开发指南136-设置零值不显示
  • Java中使用JSONUtil处理JSON数据:从前端到后端的完美转换
  • docker命令(二)
  • vue+Django 双推荐算法旅游大数据可视化系统Echarts mysql数据库 带爬虫
  • 指纹云手机网络环境隔离技术:筑牢海外社媒多账号运营安全屏障
  • Git与DevOps实战:从版本控制到自动化部署
  • jsqlparser(六):TablesNamesFinder 深度解析与 SQL 格式化实现
  • 基于单片机商用电子计价秤电子秤系统Proteus仿真(含全部资料)
  • 从零开始学习单片机18
  • 谷歌NotebookLM升级视频概述功能:为AI音频播客注入视觉元素,重塑教学与学习体验
  • 基于单片机温控风扇ds18b20系统Proteus仿真(含全部资料)
  • GD32入门到实战21--输入捕获
  • FOC开环控制代码解读