当前位置: 首页 > news >正文

013-2 订单支付超时自动取消订单(rocketmq jpa)

文章目录

  • 完整实现代码(基于Spring Boot + RocketMQ + MySQL)
    • 1. 项目依赖(pom.xml)
    • 2. 应用配置(application.yml)
    • 3. 实体类(Order.java)
    • 4. 数据库仓库(OrderRepository.java)
    • 5. 服务层(OrderService.java)
    • 6. RocketMQ生产者(OrderTimeoutProducer.java)
    • 7. RocketMQ消费者(OrderTimeoutConsumer.java)
    • 8. 缓存配置(CacheConfig.java)
    • 9. 订单控制器(OrderController.java)
    • 10. 对账服务(每日补偿)
    • 系统架构图
    • 关键特性说明
    • 使用说明
  • 其他

  1. 实体类(Entity):定义订单的数据结构。

  2. 数据库访问层(Repository):使用JPA或JDBC进行数据库操作。

  3. 服务层(Service):处理业务逻辑,包括创建订单、支付订单、取消订单等。

  4. 消息生产者(Producer):发送延迟消息到RocketMQ。

  5. 消息消费者(Consumer):消费延迟消息,处理订单超时取消。

  6. 配置类(Configuration):配置RocketMQ、数据源等。

  7. 异常处理:处理可能出现的异常,保证系统稳定性。

  8. 测试类:验证各个功能是否正常。

订单实体需要包含订单ID、状态、创建时间等字段。服务层需要处理订单的创建、支付和取消,其中取消订单需要结合数据库的条件更新和内存检查。消息生产者需要在订单创建时发送延迟消息,消费者在消息到达时处理超时取消。

在并发处理方面,要确保在支付和取消同时发生时,数据库的条件更新能够保证原子性,避免状态覆盖。此外,消息消费者需要处理幂等性,防止重复消费导致错误。

还需要考虑性能优化,比如使用缓存(如Caffeine)来减少数据库查询的压力,同时保证缓存与数据库的一致性。可能还需要引入分布式锁(如Redisson)来处理高并发下的资源竞争问题。

异常处理方面,需要处理RocketMQ消息发送失败、数据库操作失败等情况,使用重试机制或事务管理来保证数据一致性。

测试部分需要模拟并发场景,验证在高并发下订单状态是否正确变更,延迟消息是否按时触发,以及系统的稳定性和性能表现。

最后,需要验证整个流程是否顺畅,各个组件是否协同工作,没有遗漏任何关键点。例如,订单创建后是否成功发送延迟消息,消息消费者是否能正确处理超时订单,支付和取消的并发操作是否不会导致状态错误等。

完整实现代码(基于Spring Boot + RocketMQ + MySQL)

1. 项目依赖(pom.xml)

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    <!-- RocketMQ -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    
    <!-- MySQL -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    
    <!-- Caffeine Cache -->
    <dependency>
        <groupId>com.github.ben-manes.caffeine</groupId>
        <artifactId>caffeine</artifactId>
    </dependency>
    
    <!-- Redisson -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.17.0</version>
    </dependency>
</dependencies>

2. 应用配置(application.yml)

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/order_db?useSSL=false
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

rocketmq:
  name-server: localhost:9876
  producer:
    group: order-producer-group

caffeine:
  spec: maximumSize=10000,expireAfterWrite=5m

redisson:
  config: classpath:redisson.yaml

3. 实体类(Order.java)

@Entity
@Table(name = "orders")
public class Order {
    @Id
    private String orderId;
    
    @Enumerated(EnumType.STRING)
    private OrderStatus status;
    
    private Long createTime;
    
    @Version
    private Integer version;
    
    // Getters and Setters
}

public enum OrderStatus {
    UNPAID, PAID, CANCELLED
}

4. 数据库仓库(OrderRepository.java)

public interface OrderRepository extends JpaRepository<Order, String> {
    
    @Modifying
    @Query("UPDATE Order o SET o.status = 'CANCELLED', o.version = o.version + 1 " +
           "WHERE o.orderId = :orderId AND o.status = 'UNPAID'")
    int cancelUnpaidOrder(@Param("orderId") String orderId);
    
    @Modifying
    @Query("UPDATE Order o SET o.status = 'PAID', o.version = o.version + 1 " +
           "WHERE o.orderId = :orderId AND o.status = 'UNPAID'")
    int payOrder(@Param("orderId") String orderId);
}

5. 服务层(OrderService.java)

@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderRepository orderRepository;
    private final Cache<String, OrderStatus> orderCache;
    private final RedissonClient redisson;
    
    @Transactional
    public String createOrder() {
        String orderId = UUID.randomUUID().toString();
        Order order = new Order();
        order.setOrderId(orderId);
        order.setStatus(OrderStatus.UNPAID);
        order.setCreateTime(System.currentTimeMillis());
        orderRepository.save(order);
        
        orderCache.put(orderId, OrderStatus.UNPAID);
        return orderId;
    }
    
    public boolean safePay(String orderId) {
        // 分布式锁保证同一订单的支付原子性
        RLock lock = redisson.getLock("order:pay:" + orderId);
        try {
            if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
                // 双重检查
                if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) {
                    return false;
                }
                
                int affected = orderRepository.payOrder(orderId);
                if (affected > 0) {
                    orderCache.put(orderId, OrderStatus.PAID);
                    return true;
                }
            }
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    @Transactional
    public boolean safeCancel(String orderId) {
        // 内存缓存快速过滤
        if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) {
            return false;
        }
        
        // 数据库条件更新
        int affected = orderRepository.cancelUnpaidOrder(orderId);
        if (affected > 0) {
            orderCache.put(orderId, OrderStatus.CANCELLED);
            return true;
        }
        return false;
    }
}

6. RocketMQ生产者(OrderTimeoutProducer.java)

@Component
@RequiredArgsConstructor
public class OrderTimeoutProducer {
    private final RocketMQTemplate rocketMQTemplate;
    
    public void sendTimeoutMessage(String orderId) {
        Message<String> message = MessageBuilder.withPayload(orderId)
                .setHeader(RocketMQHeaders.KEYS, orderId)
                .build();
                
        // 使用延迟等级4对应30分钟(需提前配置Broker)
        rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", message, 3000, 4);
    }
}

7. RocketMQ消费者(OrderTimeoutConsumer.java)

@Service
@RocketMQMessageListener(
    topic = "ORDER_TIMEOUT_TOPIC",
    consumerGroup = "order-timeout-consumer",
    selectorType = SelectorType.TAG,
    messageModel = MessageModel.CLUSTERING
)
public class OrderTimeoutConsumer implements RocketMQListener<MessageExt> {
    private final OrderService orderService;
    private final Cache<String, Boolean> processedMsgCache = 
        Caffeine.newBuilder()
            .expireAfterWrite(1, TimeUnit.HOURS)
            .maximumSize(10000)
            .build();
    
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        if (processedMsgCache.getIfPresent(msgId) != null) {
            return;
        }
        
        String orderId = new String(message.getBody());
        try {
            boolean success = orderService.safeCancel(orderId);
            if (success) {
                processedMsgCache.put(msgId, true);
                log.info("订单超时取消成功:{}", orderId);
            }
        } catch (Exception e) {
            log.error("订单取消处理失败:{}", orderId, e);
            throw new RuntimeException(e);
        }
    }
}

8. 缓存配置(CacheConfig.java)

@Configuration
public class CacheConfig {
    @Bean
    public Cache<String, OrderStatus> orderStatusCache() {
        return Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();
    }
}

9. 订单控制器(OrderController.java)

@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
public class OrderController {
    private final OrderService orderService;
    private final OrderTimeoutProducer timeoutProducer;
    
    @PostMapping
    public String createOrder() {
        String orderId = orderService.createOrder();
        timeoutProducer.sendTimeoutMessage(orderId);
        return orderId;
    }
    
    @PostMapping("/{orderId}/pay")
    public ResponseEntity<?> payOrder(@PathVariable String orderId) {
        return orderService.safePay(orderId) ?
            ResponseEntity.ok().build() :
            ResponseEntity.status(HttpStatus.CONFLICT).build();
    }
}

10. 对账服务(每日补偿)

@Component
@RequiredArgsConstructor
public class ReconciliationTask {
    private final OrderRepository orderRepository;
    private final RocketMQTemplate rocketMQTemplate;
    
    @Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
    public void checkUnpaidOrders() {
        LocalDateTime threshold = LocalDateTime.now().minusMinutes(30);
        orderRepository.findUnpaidExpiredOrders(threshold)
            .forEach(order -> {
                rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", order.getOrderId());
            });
    }
}

系统架构图

graph TD
    A[用户] --> B[创建订单]
    B --> C[发送延迟消息]
    C --> D[RocketMQ Broker]
    D -->|30分钟后| E[消费者处理]
    E --> F[条件更新取消订单]
    
    G[支付请求] --> H[分布式锁]
    H --> I[状态校验]
    I --> J[数据库更新]
    
    K[定时对账] --> L[补偿未处理订单]
    
    classDef service fill:#e1f5fe,stroke:#039be5;
    classDef mq fill:#f0f4c3,stroke:#afb42b;
    classDef db fill:#dcedc8,stroke:#689f38;
    
    class B,H,I,J,K,L service
    class C,D,E mq
    class F,J db

关键特性说明

多层防御体系
内存缓存快速过滤(Caffeine)
数据库条件更新保证原子性
分布式锁(Redisson)协调支付操作
消息幂等处理(Caffeine缓存)

异常处理机制
消息消费失败自动重试
每日对账补偿异常订单
分布式锁自动释放保证

性能优化
缓存命中率监控(Micrometer)
批量消息处理优化
数据库连接池配置(HikariCP)

监控指标

@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
    return registry -> {
        CaffeineCacheMetrics.monitor(
            registry, 
            orderStatusCache, 
            "order_status_cache"
        );
    };
}

使用说明

启动前准备

# 创建MySQL数据库
CREATE DATABASE order_db CHARACTER SET utf8mb4;

# 配置RocketMQ延迟等级(broker.conf)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

API调用示例

# 创建订单
POST /orders
Response: {"orderId": "5d8f8c7e-1234-5678-90ab-cdef12345678"}

# 支付订单
POST /orders/5d8f8c7e-1234-5678-90ab-cdef12345678/pay

# 查询订单
GET /orders/5d8f8c7e-1234-5678-90ab-cdef12345678

压力测试

# 使用wrk模拟并发创建订单
wrk -t4 -c100 -d30s http://localhost:8080/orders

# 同时发起支付请求
ab -n 1000 -c 100 http://localhost:8080/orders/{orderId}/pay

其他

JPA的全称是Java Persistence API,它是Java EE(现在Jakarta EE)的一部分,用于对象关系映射(ORM),让开发者可以通过面向对象的方式来操作数据库。JPA本身是一个规范,不是具体的实现,常见的实现有Hibernate(最流行的实现,功能丰富)、EclipseLink(Oracle 官方参考实现)、OpenJPA(Apache 项目)等。

考虑不同的场景和需求。可能的替代方案包括:

  1. MyBatis:与JPA不同,MyBatis更偏向于SQL映射,允许开发者直接编写SQL语句,提供了更大的灵活性,特别是对于复杂查询或需要优化SQL性能的情况。

  2. JDBC:这是最基础的数据库连接方式,直接使用SQL语句,没有ORM功能。虽然灵活,但需要大量样板代码,容易出错。

  3. Spring Data JDBC:Spring提供的另一种数据访问方式,比纯JDBC更简单,支持一些简单的ORM功能,但不提供JPA的复杂特性如缓存、延迟加载等。

  4. NoSQL数据库的驱动程序:如果项目使用MongoDB、Redis等NoSQL数据库,通常会使用对应的客户端库,如MongoTemplate、Lettuce等,这些不属于JPA的范畴。

  5. 其他ORM框架:比如JOOQ,它基于SQL的模式生成类型安全的SQL查询,适合喜欢SQL但希望有类型安全的开发者。

  6. 新兴技术:如Micronaut Data、Quarkus的Hibernate Panache,它们可能在简化数据访问层方面提供不同的方法。

MyBatis适合需要高度控制SQL的场景,而JOOQ适合喜欢类型安全SQL的开发者。
如果用户在处理复杂的领域模型,JPA的级联操作和声明式事务可能更有优势;如果项目需要高性能的定制SQL,MyBatis或JOOQ可能更合适。

JPA 的核心特点

特性说明
对象关系映射将 Java 对象(Entity)映射到数据库表,自动处理增删改查操作
JPQL类似 SQL 的查询语言,但面向对象(如 SELECT u FROM User u WHERE u.age > 18)
事务管理通过 @Transactional 注解管理数据库事务
延迟加载按需加载关联数据(如 @OneToMany(fetch = FetchType.LAZY))
缓存机制一级缓存(Session 级别)和二级缓存(应用级别)提升性能

JPA 的常见替代方案
MyBatis
类型:SQL 映射框架(半自动化 ORM)
特点:
开发者需手动编写 SQL,但通过 XML 或注解映射到 Java 对象。
灵活性强,适合复杂查询和高度定制的 SQL 优化。
学习曲线低于 JPA,但需维护更多 SQL 代码。

适用场景:
需要精细控制 SQL 语句。
遗留数据库结构复杂,难以通过 JPA 自动映射。

Spring Data JDBC
类型:轻量级 ORM(Spring 生态)

特点:
简化 JDBC 操作,支持基本的 CRUD 和简单查询。
无 JPA 的复杂特性(如延迟加载、缓存),性能更高。
适合对 SQL 有一定控制需求但不想写纯 JDBC 代码的场景。

JOOQ(Java Object Oriented Querying)
类型:类型安全的 SQL 构建工具

特点:
通过代码生成器将数据库表结构映射为 Java 类。
支持编写类型安全的 SQL 查询,避免 SQL 注入。
适合喜欢 SQL 但希望有编译时检查的开发者。

纯 JDBC
类型:Java 数据库连接基础 API

特点:
直接通过 SQL 操作数据库,无任何 ORM 抽象。
灵活性最高,但需要手动处理资源(Connection、ResultSet)和异常。
适合简单应用或需要极致性能的场景。

NoSQL 数据库驱动
类型:非关系型数据库专用客户端

示例:
MongoDB:使用 MongoTemplate 或 Spring Data MongoDB。
Redis:使用 Lettuce 或 Jedis 客户端。
Cassandra:使用 Spring Data Cassandra。

如何选择替代方案?

场景推荐方案理由
需要快速开发简单 CRUDJPA(Hibernate)自动化程度高,减少样板代码
复杂 SQL 或性能优化需求MyBatis 或 JOOQ直接控制 SQL,灵活优化
轻量级应用,避免 JPA 复杂特性Spring Data JDBC简单易用,无延迟加载等复杂机制
非关系型数据库(如 MongoDB)Spring Data MongoDB专为 NoSQL 设计,语法与 Spring Data JPA 相似
遗留系统或高度定制 SQL纯 JDBC完全控制 SQL 执行流程

总结
JPA 适合:快速开发、对象关系映射需求明确、团队熟悉 ORM 概念。
替代方案适用场景:
需要精细控制 SQL → MyBatis/JOOQ
轻量级 ORM → Spring Data JDBC
非关系型数据库 → 专用 NoSQL 客户端
极致性能或遗留系统 → 纯 JDBC

相关文章:

  • 迷你世界脚本玩家接口:Player
  • 蓝桥杯2025模拟三(01字符串)
  • Python Tornado 框架面试题及参考答案
  • 【leetcode hot 100 76】最小覆盖子串
  • TypeScript系列01-类型系统全解析
  • 【中值滤波器(Median Filter)详解】
  • go routine 并发和同步
  • 虚拟机的IP配置
  • 网络安全月度报告
  • LLM 对话框组件 | 字节青训营前端开发项目
  • 计算机网络-实验四子网划分
  • 拒绝被假量薅羊毛,游戏渠道反作弊解决方案发布
  • go类型转换
  • Docker 模拟 kubernetes 的 pod
  • 【C++】:STL详解 —— priority_queue类
  • docker关闭mysql端口映射的使用
  • 计算机基础面试(数据结构)
  • Laravel从入门到精通:开启高效开发之旅
  • C++:多态与虚函数
  • Leetcode 刷题记录 01 —— 哈希
  • 网站建设ningqueseo/站长统计幸福宝下载
  • 如何用dw制作简单网页/seo还能赚钱吗
  • 做淘宝网站需要多大空间/济南百度竞价开户
  • 网站qq号获取/网络推广seo教程
  • 千博企业网站管理系统完整版 2014/搜索引擎营销的特征
  • 西昌市网站建设公司/一媒体app软件下载老版本