当前位置: 首页 > news >正文

如何利用RabbitMQ延迟消息优化电商支付

业务痛点分析

在电商核心交易链路中,库存扣减时机始终是业务设计的难点。比如即时扣库存:影院选座/高铁购票场景中,用户体验优先原则要求下单即锁定资源。

延迟任务是什么?

核心需求:精准实现“下单后第30分钟检查支付状态”的定时操作
技术本质:延迟任务(Delayed Task)—— 在指定时间后触发执行的任务

RabbitMQ延迟消息方案对比

方案1:死信交换机(DLX) + TTL

// 死信队列声明
@Bean
public Queue ttlQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "hmall.direct"); // 死信交换机args.put("x-dead-letter-routing-key", "blue");      // 路由Keyreturn new Queue("ttl.queue", true, false, false, args);
}

缺点:

  • 队首阻塞问题:仅当消息到达队首时才检查TTL

  • 精度不足:队列堆积时延迟误差可达分钟级

  • 配置繁琐:需声明多组交换机/队列

方案2:延迟消息插件(rabbitmq_delayed_message_exchange)

操作流程

1.安装插件

# 将插件放入卷目录
cp rabbitmq_delayed_message_exchange-3.8.17.ez /var/lib/docker/volumes/mq-plugins/_data# 启用插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2.声明延迟交换机

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed()  // 关键!启用延迟特性.durable(true).build();
}

3.发送延迟消息

rabbitTemplate.convertAndSend("delay.direct", "delay", message, msg -> {msg.getMessageProperties().setDelay(5000); // 5秒延迟return msg;
});

电商支付超时检测练习

业务优化:渐进式延迟检测

传统方案:30分钟后单次检测 → 资源浪费
创新方案:多级延迟检测(10s,20s,30s,45s,60s...30min)

核心数据结构

@Data
public class MultiDelayMessage<T> {private T data;              // 业务数据private List<Long> delayMillis; // 延迟时间序列// 获取并移除下一个延迟public Long removeNextDelay() {return delayMillis.remove(0);}// 构造示例public static <T> MultiDelayMessage<T> of(T data, Long... delays) {return new MultiDelayMessage<>(data, Arrays.asList(delays));}
}

消息监听实现

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY
))
public void listenOrderCheck(MultiDelayMessage<Long> msg) {Long orderId = msg.getData();Order order = orderService.getById(orderId);// 订单不存在或已结束if (order == null || order.getStatus() > 1) return;  // 查询支付服务PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {orderService.markOrderPaySuccess(orderId);return;}// 继续延迟检测if (msg.hasNextDelay()) {Long nextDelay = msg.removeNextDelay();rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, message -> {message.getMessageProperties().setDelay(nextDelay.intValue());return message;});return;}// 最终取消订单orderService.cancelOrder(orderId);
}

性能优化点

  1. 提前终止机制:任一阶段检测到支付成功立即终止流程

  2. 渐进时间设计:前密后疏的检测节奏

  3. 消息体复用:MultiDelayMessage对象全程复用减少序列化开销

订单取消业务实现

@Override
@Transactional
public void cancelOrder(Long orderId) {// 幂等性检查Order order = getById(orderId);if (order == null || order.getStatus() == 5) return; // 更新订单状态boolean updated = lambdaUpdate().setSql("status = 5") // 已关闭.eq(Order::getId, orderId).eq(Order::getStatus, 1) // 仅未支付订单可取消.update();if (!updated) return;// 恢复库存(消息通知商品服务)List<OrderDetail> details = orderDetailService.query().eq("order_id", orderId).list();List<ItemStockDTO> stockList = details.stream().map(d -> new ItemStockDTO(d.getItemId(), d.getNum())).collect(Collectors.toList());// 发送库存恢复消息rabbitMqHelper.sendMessage("stock.exchange", "stock.restore", stockList);
}

MQ工具类封装

// 自动配置类
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled")
@Configuration
public class MqConsumeErrorAutoConfiguration {@Value("${spring.application.name}")private String appName;@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue(appName + ".error.queue");}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(appName);}@Beanpublic MessageRecoverer republishMessageRecoverer(AmqpTemplate amqpTemplate) {return new RepublishMessageRecoverer(amqpTemplate, "error.direct", appName);}
}// MQ工具类
public class RabbitMqHelper {private final RabbitTemplate rabbitTemplate;// 基础发送public void sendMessage(String exchange, String routingKey, Object msg) {rabbitTemplate.convertAndSend(exchange, routingKey, msg);}// 延迟发送public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay) {rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {message.getMessageProperties().setDelay(delay);return message;});}// 带确认机制的发送public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries) {RetryTemplate retryTemplate = new RetryTemplate();retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxRetries));retryTemplate.execute(context -> {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);// 等待Broker确认correlationData.getFuture().get(3, TimeUnit.SECONDS);return null;});}
}

生产环境注意事项

1.延迟插件监控项

  • rabbitmq_delayed_message_exchange 进程内存占用

  • 延迟消息积压数量(x-delayed-messages队列)

  • MQ节点磁盘空间

2.死信队列防护

// 在声明队列时添加限制
@Bean
public Queue dlxQueue() {Map<String, Object> args = new HashMap<>();args.put("x-max-length", 10000);  // 最大消息数args.put("x-overflow", "reject-publish"); // 超限拒绝return new Queue("dlx.queue", true, false, false, args);
}

3.多级延迟配置建议

// 推荐的时间梯度(单位:毫秒)
Long[] delays = {10_000,    // 10秒20_000,    // 20秒30_000,    // 30秒45_000,    // 45秒60_000,    // 1分钟120_000,   // 2分钟300_000,   // 5分钟1800_000   // 30分钟
};

http://www.dtcms.com/a/323848.html

相关文章:

  • MPLS特性之PHP(Penultimate Hop Popping)
  • Android的事件分发流程、Kotlin协程、4大组件、Handler机制、架构设计、性能优化、内存泄漏
  • 从神经网络语言模型(NNLM)到Word2Vec:自然语言处理中的词向量学习
  • NLP——TF-IDF算法
  • WebAssembly技术详解:从浏览器到云原生的高性能革命
  • 麒麟系统播放 pptx
  • Spring MVC 九大组件源码深度剖析(二):LocaleResolver - 国际化背后的调度者
  • 集成电路学习:什么是Parameter Server参数服务器
  • 【软件测试】BUG篇 — 详解
  • 从 `unittest` 到 `pytest`:探寻 Python 测试框架的优雅进化与社区选择*
  • Java 后端性能优化实战:从 SQL 到 JVM 调优
  • Spring 依赖注入、AOP代理
  • GC如何判断对象可以被回收?
  • 分享一个基于Python和Hadoop的的电信客户特征可视化分析平台 基于Spark平台的电信客服数据存储与处理系统源码
  • Django @login_required实现登陆认证
  • 十、Linux Shell脚本:流程控制语句
  • Hadoop MapReduce过程
  • K8s DaemonSet 详解
  • K8s四层负载均衡-service
  • NLP学习开始-02逻辑回归
  • DevOps:从GitLab .gitlab-ci.yml 配置文件到CI/CD
  • LeetCode - 搜索插入位置 / 排序链表
  • win11(RTX5060)下进行nanodetplus训练
  • Kafka消费者相关原理
  • 第4章 程序段的反复执行4 多重循环练习(题及答案)
  • Audio Flamingo
  • 网站升级https地址方法
  • LeetCode每日一题,2025-8-10
  • jmeter常规压测【读取csv文件】
  • BGP HCIP