基于RabbitMQ实现订单超时自动处理
基于RabbitMQ实现订单超时自动处理
引言
在现代电商系统中,订单超时自动取消是一个常见的业务需求。传统的定时任务扫描数据库的方式存在性能瓶颈和实时性差的问题。本文将介绍如何使用RabbitMQ的消息队列和死信队列特性,构建一个高效可靠的订单超时自动处理系统。
一、技术方案概述
我们采用RabbitMQ的以下特性实现需求:
- TTL(Time-To-Live):设置消息的存活时间
- 死信队列(DLX):处理过期消息
- 手动确认机制:确保消息可靠消费
系统架构分为三个核心组件:
- 订单生产者(投递延迟消息)
- RabbitMQ配置(队列和交换器)
- 订单消费者(处理超时订单)
二、依赖配置
1、pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml配置
sping:rabbitmq:host: 192.168.64.100port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual
三、RabbitMQ配置详解
package com.ruoyi.shop.component;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @Description: rabbitmq配置类* @author: zh* @Create : 2025/4/23* @Project_name : RuoYi-Cloud* @Version :**/
@Component
public class RabbitConfig {private static final long DELAY_TIME = 1000 * 60 * 5; // 单位:毫秒// 延迟队列public static final String ORDER_QUEUE_NAME = "ORDER_QUEUE";public static final String ORDER_EXCHANGE_NAME = "ORDER_EXCHANGE";public static final String ORDER_ROUTING_KEY = "ORDER_ROUTING_KEY";// 死信队列public static final String ORDER_DLX_QUEUE_NAME = "ORDER_DLX_QUEUE";public static final String ORDER_DLX_EXCHANGE_EXCHANGE = "ORDER_DLX_EXCHANGE_EXCHANGE";public static final String ORDER_DEAD_KEY = "ORDER_DEAD_KEY";// 1. 声明延迟队列(绑定死信交换机)@Bean(value="orderOverQueue")public Queue orderOverqueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE_EXCHANGE); // 死信交换机args.put("x-dead-letter-routing-key", ORDER_DEAD_KEY); // 死信路由键args.put("x-message-ttl", DELAY_TIME); // TTL(毫秒)return new Queue(ORDER_QUEUE_NAME, true, false, false, args);}// 2. 声明死信队列@Bean(value = "orderDlxQueue")public Queue orderDlxQueue() {return new Queue(ORDER_DLX_QUEUE_NAME, true,false,false);}// 3. 声明延迟队列的交换机@Bean(value = "orderOverExchange")public DirectExchange orderOverExchange() {return new DirectExchange(ORDER_EXCHANGE_NAME, true, false);}// 4. 声明死信队列的交换机@Bean(value = "orderDeadExchange")public DirectExchange orderDeadExchange() {return new DirectExchange(ORDER_DLX_EXCHANGE_EXCHANGE, true, false);}// 5. 绑定延迟队列到交换机@Beanpublic Binding bindingOrderOverDirect(@Qualifier("orderOverQueue")Queue orderOverQueue , @Qualifier("orderOverExchange")DirectExchange orderOverExchange) {return BindingBuilder.bind(orderOverQueue).to(orderOverExchange).with(ORDER_ROUTING_KEY);}// 6. 绑定死信队列到死信交换机@Beanpublic Binding bindingOrderDeadDirect(@Qualifier("orderDlxQueue")Queue orderDlxQueue , @Qualifier("orderDeadExchange")DirectExchange orderDeadExchange) {return BindingBuilder.bind(orderDlxQueue).to(orderDeadExchange).with(ORDER_DEAD_KEY);}
}
关键点说明:
- 通过
x-message-ttl
设置消息5分钟后过期 - 配置死信交换器和路由键,过期消息将自动转发
- 所有队列和交换器都设置为持久化(durable=true)
四、生产者实现
package com.ruoyi.shop.component;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description: 订单生产者* @author: zh* @Create : 2025/4/23* @Project_name : RuoYi-Cloud* @Version :**/
@Component
@Slf4j
public class OrderProduce {@AutowiredRabbitTemplate rabbitTemplate;/*** 发送延时订单MQ** @param id*/public void sendOver(Long id) {String mqMessage = JSON.toJSONString(id);log.info("创建订单消息:{}",mqMessage);//TODO :发送创建订单消息try {rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE_NAME, RabbitConfig.ORDER_ROUTING_KEY, mqMessage);}catch (Exception e){throw new RuntimeException("发送邀请消息失败");}}
}
使用场景:
- 用户下单成功后立即调用
sendOver(orderId)
- 消息将在RabbitMQ中保留5分钟,如果未被消费则转入死信队列
五、消费者实现
package com.ruoyi.shop.component;import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.shop.controller.order.OrderController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @Description: 订单消费者* @author: zh* @Create : 2025/4/23* @Project_name : RuoYi-Cloud* @Version :**/
@Component
@Slf4j
public class OrderConsumer {@Autowiredprivate OrderController orderController;/*** 订单超时监听** @param massage* @param channel* @param tag*/@RabbitListener(bindings ={@QueueBinding(value = @Queue(value = RabbitConfig.ORDER_DLX_QUEUE_NAME, durable = "true"),exchange = @Exchange(value = RabbitConfig.ORDER_DLX_EXCHANGE_EXCHANGE), key = RabbitConfig.ORDER_DEAD_KEY)})@RabbitHandlerpublic void processOrder(Message massage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {//TODO:接收死信队列消息Long id = JSON.parseObject(new String(massage.getBody()), Long.class);log.info("获取到消息:{}",id);if (null == id) {return;}try {
// 手动确认机制Boolean data = (Boolean)orderController.UpdateOrderStatus(id).getData();if(!data){log.info("消费失败");channel.basicNack(tag, false, true);}else{channel.basicAck(tag, false);}} catch (Exception e) {e.printStackTrace();}}}
消费逻辑:
- 从死信队列获取超时订单ID
- 调用业务服务更新订单状态
- 根据处理结果手动确认或拒绝消息
六、实现效果
1、发送消息
2、接收消息
七、注意事项
1、性能调优
spring:rabbitmq:listener:simple:prefetch: 10 # 根据业务调整concurrency: 5
2、添加异常处理
- 添加死信队列监控
- 实现消息补偿机制
八、总结
本文介绍的基于RabbitMQ的订单超时处理方案,相比传统定时任务方式具有明显优势。通过合理利用消息队列的特性,我们构建了一个可靠、高效、实时的订单处理系统。该方案也可扩展应用于其他需要延迟处理的业务场景,如优惠券过期、自动确认收货等。