当前位置: 首页 > news >正文

007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

文章目录

  • 死信队列
    • RabbitMQ 配置类 RabbitMQConfig.java
    • 生产者 OrderTimeoutProducer.java
    • 消费者 OrderTimeoutConsumer.java
    • 应用配置 application.yml
    • pom.xml 依赖
    • 实体类 Order.java(不变)
    • Mapper 接口 OrderMapper.java(不变)
    • 服务层 OrderService.java(不变)
    • 缓存配置 CacheConfig.java(不变)
    • 对账服务 ReconciliationTask.java(不变)
    • 控制器 OrderController.java(不变)

死信队列

在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。

不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。

另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。

还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。

总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。

                      +---------------------+
                      |   RabbitMQ Message  |
                      |  (携带唯一messageId)   |
                      +----------+----------+
                                 |
                                 v
+----------------+       +-------+-------+       +-----------------+
|  消息到达消费者   | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 |
+----------------+       +-------+-------+       +-----------------+
                                 |
                                 | 不存在
                                 v
                      +-------+-------+       +-----------------+
                      | 执行业务逻辑处理  | ----> | 成功:存入缓存并ACK |
                      +---------------+       +-----------------+

缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间)
计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间

缓存击穿空值缓存对不存在的key也进行缓存(需设置较短过期时间)
缓存穿透布隆过滤器在缓存前增加过滤层
消费者重启持久化存储配合数据库记录处理状态
网络分区最终一致性依赖对账服务修正状态
组件类型作用说明
processedMsgCacheCaffeine缓存存储已处理消息的唯一标识
messageId字符串消息唯一标识(需生产者保证唯一性)
deliveryTag长整型RabbitMQ消息投递标识
sequenceDiagram
    participant RabbitMQ
    participant Consumer
    participant Cache
    participant DB

    RabbitMQ->>Consumer: 投递消息(messageId=123)
    Consumer->>Cache: 查询messageId=123
    alt 存在缓存
        Cache-->>Consumer: 返回true
        Consumer->>RabbitMQ: 发送ACK
    else 无缓存
        Consumer->>DB: 执行取消操作
        alt 操作成功
            Consumer->>Cache: 写入messageId=123
            Consumer->>RabbitMQ: 发送ACK
        else 操作失败
            Consumer->>RabbitMQ: 发送NACK(requeue=true)
        end
    end

RabbitMQ 配置类 RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    
    // 订单超时相关配置
    public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";
    public static final String ORDER_DELAY_QUEUE = "order.delay.queue";
    public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";
    
    // 死信队列配置
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";
    public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";
    public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";

    // 声明延迟队列(设置死信参数)
    @Bean
    public Queue orderDelayQueue() {
        return QueueBuilder.durable(ORDER_DELAY_QUEUE)
                .withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY)
                .build();
    }

    // 声明延迟交换机
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }

    // 绑定延迟队列到交换机
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(orderDelayQueue())
                .to(orderDelayExchange())
                .with(ORDER_DELAY_ROUTING_KEY);
    }

    // 声明死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(ORDER_DEAD_LETTER_QUEUE, true);
    }

    // 声明死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);
    }

    // 绑定死信队列到交换机
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with(ORDER_DEAD_LETTER_ROUTING_KEY);
    }

    // JSON 消息转换器
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

生产者 OrderTimeoutProducer.java

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderTimeoutProducer {
    
    private final RabbitTemplate rabbitTemplate;

    public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendTimeoutMessage(String orderId) {
        // 设置消息过期时间为30分钟(单位:毫秒)
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("1800000");
                return message;
            }
        };
        
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.ORDER_DELAY_EXCHANGE,
                RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,
                orderId,
                messagePostProcessor
        );
    }
}

消费者 OrderTimeoutConsumer.java

import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Component
public class OrderTimeoutConsumer {
    
    private final OrderService orderService;
    private final Cache<String, Boolean> processedMsgCache;

    public OrderTimeoutConsumer(OrderService orderService, 
                               Cache<String, Boolean> processedMsgCache) {
        this.orderService = orderService;
        this.processedMsgCache = Caffeine.newBuilder()
                .expireAfterWrite(1, TimeUnit.HOURS)
                .maximumSize(10000)
                .build();
    }

    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
    public void processMessage(Message message, Channel channel) throws IOException {
        String orderId = new String(message.getBody(), StandardCharsets.UTF_8);
        String messageId = message.getMessageProperties().getMessageId();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 幂等性检查
            if (processedMsgCache.getIfPresent(messageId) != null) {
                channel.basicAck(deliveryTag, false);
                return;
            }

            boolean success = orderService.safeCancel(orderId);
            if (success) {
                processedMsgCache.put(messageId, true);
                System.out.println("订单超时取消成功: " + orderId);
            }
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 记录错误日志,重新放回队列
            channel.basicNack(deliveryTag, false, true);
            System.err.println("处理订单超时取消失败: " + orderId);
            e.printStackTrace();
        }
    }
}

应用配置 application.yml

spring:
  rabbitmq:
    host: ${RABBITMQ_HOST:localhost}
    port: 5672
    username: ${RABBITMQ_USER:guest}
    password: ${RABBITMQ_PASSWORD:guest}
    virtual-host: /
    connection-timeout: 5000
    template:
      retry:
        enabled: true
        max-attempts: 3
        initial-interval: 1000ms
    listener:
      simple:
        acknowledge-mode: manual # 手动确认模式
        prefetch: 10 # 每次预取数量
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000ms

pom.xml 依赖

<!-- 移除 RocketMQ 依赖 -->
<!-- 添加 RabbitMQ 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实体类 Order.java(不变)

public class Order {
    // 保持原有实现
}

Mapper 接口 OrderMapper.java(不变)

@Mapper
public interface OrderMapper {
    // 保持原有SQL操作
}

服务层 OrderService.java(不变)

@Service
public class OrderService {
    // 保持原有业务逻辑
}

缓存配置 CacheConfig.java(不变)

@Configuration
public class CacheConfig {
    // 保持原有缓存配置
}

对账服务 ReconciliationTask.java(不变)

@Component
public class ReconciliationTask {
    // 保持原有定时任务逻辑
}

控制器 OrderController.java(不变)

@RestController
@RequestMapping("/orders")
public class OrderController {
    // 保持原有API接口
}

关键差异对比

功能点RocketMQ 实现RabbitMQ 实现
延迟机制内置延迟级别TTL+死信队列
消息存储持久化到CommitLog内存+磁盘持久化
消费确认自动ACK手动ACK+重试机制
消息追踪原生支持消息轨迹需要额外实现
集群方案主从复制镜像队列

部署注意事项
队列初始化:确保首次启动时自动创建所需交换机和队列
消息持久化:所有队列声明时设置durable=true
监控配置:需要监控以下指标:
死信队列消息堆积量
消费者处理耗时
消息TTL过期情况
集群配置:建议至少3节点RabbitMQ集群
故障转移:配置镜像队列实现高可用

性能优化建议
批量处理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {
    // 批量查询缓存
    Map<String, Boolean> existMap = cache.getAllPresent(messageIds);
    // 过滤已处理消息
    // 批量执行数据库操作
}

异步更新:

CompletableFuture.runAsync(() -> {
    processedMsgCache.put(messageId, true);
});

分级缓存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。
看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。
另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。
所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。

相关文章:

  • C++左值引用与右值引用区别
  • 【Transformer模型学习】第三篇:位置编码
  • NLP10-TF-IDF文本向量化
  • java+jvm笔记
  • 如何使用C#与SQL Server数据库进行交互
  • hutool Java的工具箱介绍
  • Tomcat 是什么?有什么功能和作用?为什么启动 Spring 或 Spring Boot 项目需要 Tomcat?
  • Redis的持久化-RDBAOF
  • 大白话React第九章React 前沿技术与企业级应用实战
  • Python本地下载文件的教程
  • Linux服务器部署Deepseek、Dify、RAGflow实战教程
  • 代码的解读——自用
  • Spring Boot 异步编程
  • 大语言模型学习--LangChain
  • 6. 自动关闭文件
  • 知识图谱neo4j+vue+flask课程在线学习系统
  • 怎么下载安装yarn
  • Hive-05之查询 分组、排序、case when、 什么情况下Hive可以避免进行MapReduce
  • 【计算机网络基础】-------计算机网络概念
  • postgresql源码学习(60)—— VFD的作用及机制
  • 主题猫wordpress/网站优化有哪些技巧
  • 黄村网站建设一条龙/小网站搜什么关键词
  • 西安网站建设app建设/营销推广怎么做
  • 网站值不值得做seo/嘉兴关键词优化报价
  • 响应式网站案例源码/衡阳seo排名
  • 武汉便宜的做网站公司/营销推广活动策划方案