013-2 订单支付超时自动取消订单(rocketmq jpa)
文章目录
- 完整实现代码(基于Spring Boot + RocketMQ + MySQL)
- 1. 项目依赖(pom.xml)
- 2. 应用配置(application.yml)
- 3. 实体类(Order.java)
- 4. 数据库仓库(OrderRepository.java)
- 5. 服务层(OrderService.java)
- 6. RocketMQ生产者(OrderTimeoutProducer.java)
- 7. RocketMQ消费者(OrderTimeoutConsumer.java)
- 8. 缓存配置(CacheConfig.java)
- 9. 订单控制器(OrderController.java)
- 10. 对账服务(每日补偿)
- 系统架构图
- 关键特性说明
- 使用说明
- 其他
-
实体类(Entity):定义订单的数据结构。
-
数据库访问层(Repository):使用JPA或JDBC进行数据库操作。
-
服务层(Service):处理业务逻辑,包括创建订单、支付订单、取消订单等。
-
消息生产者(Producer):发送延迟消息到RocketMQ。
-
消息消费者(Consumer):消费延迟消息,处理订单超时取消。
-
配置类(Configuration):配置RocketMQ、数据源等。
-
异常处理:处理可能出现的异常,保证系统稳定性。
-
测试类:验证各个功能是否正常。
订单实体需要包含订单ID、状态、创建时间等字段。服务层需要处理订单的创建、支付和取消,其中取消订单需要结合数据库的条件更新和内存检查。消息生产者需要在订单创建时发送延迟消息,消费者在消息到达时处理超时取消。
在并发处理方面,要确保在支付和取消同时发生时,数据库的条件更新能够保证原子性,避免状态覆盖。此外,消息消费者需要处理幂等性,防止重复消费导致错误。
还需要考虑性能优化,比如使用缓存(如Caffeine)来减少数据库查询的压力,同时保证缓存与数据库的一致性。可能还需要引入分布式锁(如Redisson)来处理高并发下的资源竞争问题。
异常处理方面,需要处理RocketMQ消息发送失败、数据库操作失败等情况,使用重试机制或事务管理来保证数据一致性。
测试部分需要模拟并发场景,验证在高并发下订单状态是否正确变更,延迟消息是否按时触发,以及系统的稳定性和性能表现。
最后,需要验证整个流程是否顺畅,各个组件是否协同工作,没有遗漏任何关键点。例如,订单创建后是否成功发送延迟消息,消息消费者是否能正确处理超时订单,支付和取消的并发操作是否不会导致状态错误等。
完整实现代码(基于Spring Boot + RocketMQ + MySQL)
1. 项目依赖(pom.xml)
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Caffeine Cache -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
</dependency>
</dependencies>
2. 应用配置(application.yml)
spring:
datasource:
url: jdbc:mysql://localhost:3306/order_db?useSSL=false
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: true
rocketmq:
name-server: localhost:9876
producer:
group: order-producer-group
caffeine:
spec: maximumSize=10000,expireAfterWrite=5m
redisson:
config: classpath:redisson.yaml
3. 实体类(Order.java)
@Entity
@Table(name = "orders")
public class Order {
@Id
private String orderId;
@Enumerated(EnumType.STRING)
private OrderStatus status;
private Long createTime;
@Version
private Integer version;
// Getters and Setters
}
public enum OrderStatus {
UNPAID, PAID, CANCELLED
}
4. 数据库仓库(OrderRepository.java)
public interface OrderRepository extends JpaRepository<Order, String> {
@Modifying
@Query("UPDATE Order o SET o.status = 'CANCELLED', o.version = o.version + 1 " +
"WHERE o.orderId = :orderId AND o.status = 'UNPAID'")
int cancelUnpaidOrder(@Param("orderId") String orderId);
@Modifying
@Query("UPDATE Order o SET o.status = 'PAID', o.version = o.version + 1 " +
"WHERE o.orderId = :orderId AND o.status = 'UNPAID'")
int payOrder(@Param("orderId") String orderId);
}
5. 服务层(OrderService.java)
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final Cache<String, OrderStatus> orderCache;
private final RedissonClient redisson;
@Transactional
public String createOrder() {
String orderId = UUID.randomUUID().toString();
Order order = new Order();
order.setOrderId(orderId);
order.setStatus(OrderStatus.UNPAID);
order.setCreateTime(System.currentTimeMillis());
orderRepository.save(order);
orderCache.put(orderId, OrderStatus.UNPAID);
return orderId;
}
public boolean safePay(String orderId) {
// 分布式锁保证同一订单的支付原子性
RLock lock = redisson.getLock("order:pay:" + orderId);
try {
if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
// 双重检查
if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) {
return false;
}
int affected = orderRepository.payOrder(orderId);
if (affected > 0) {
orderCache.put(orderId, OrderStatus.PAID);
return true;
}
}
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
@Transactional
public boolean safeCancel(String orderId) {
// 内存缓存快速过滤
if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) {
return false;
}
// 数据库条件更新
int affected = orderRepository.cancelUnpaidOrder(orderId);
if (affected > 0) {
orderCache.put(orderId, OrderStatus.CANCELLED);
return true;
}
return false;
}
}
6. RocketMQ生产者(OrderTimeoutProducer.java)
@Component
@RequiredArgsConstructor
public class OrderTimeoutProducer {
private final RocketMQTemplate rocketMQTemplate;
public void sendTimeoutMessage(String orderId) {
Message<String> message = MessageBuilder.withPayload(orderId)
.setHeader(RocketMQHeaders.KEYS, orderId)
.build();
// 使用延迟等级4对应30分钟(需提前配置Broker)
rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", message, 3000, 4);
}
}
7. RocketMQ消费者(OrderTimeoutConsumer.java)
@Service
@RocketMQMessageListener(
topic = "ORDER_TIMEOUT_TOPIC",
consumerGroup = "order-timeout-consumer",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING
)
public class OrderTimeoutConsumer implements RocketMQListener<MessageExt> {
private final OrderService orderService;
private final Cache<String, Boolean> processedMsgCache =
Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.maximumSize(10000)
.build();
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
if (processedMsgCache.getIfPresent(msgId) != null) {
return;
}
String orderId = new String(message.getBody());
try {
boolean success = orderService.safeCancel(orderId);
if (success) {
processedMsgCache.put(msgId, true);
log.info("订单超时取消成功:{}", orderId);
}
} catch (Exception e) {
log.error("订单取消处理失败:{}", orderId, e);
throw new RuntimeException(e);
}
}
}
8. 缓存配置(CacheConfig.java)
@Configuration
public class CacheConfig {
@Bean
public Cache<String, OrderStatus> orderStatusCache() {
return Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
}
9. 订单控制器(OrderController.java)
@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
private final OrderTimeoutProducer timeoutProducer;
@PostMapping
public String createOrder() {
String orderId = orderService.createOrder();
timeoutProducer.sendTimeoutMessage(orderId);
return orderId;
}
@PostMapping("/{orderId}/pay")
public ResponseEntity<?> payOrder(@PathVariable String orderId) {
return orderService.safePay(orderId) ?
ResponseEntity.ok().build() :
ResponseEntity.status(HttpStatus.CONFLICT).build();
}
}
10. 对账服务(每日补偿)
@Component
@RequiredArgsConstructor
public class ReconciliationTask {
private final OrderRepository orderRepository;
private final RocketMQTemplate rocketMQTemplate;
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
public void checkUnpaidOrders() {
LocalDateTime threshold = LocalDateTime.now().minusMinutes(30);
orderRepository.findUnpaidExpiredOrders(threshold)
.forEach(order -> {
rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", order.getOrderId());
});
}
}
系统架构图
graph TD
A[用户] --> B[创建订单]
B --> C[发送延迟消息]
C --> D[RocketMQ Broker]
D -->|30分钟后| E[消费者处理]
E --> F[条件更新取消订单]
G[支付请求] --> H[分布式锁]
H --> I[状态校验]
I --> J[数据库更新]
K[定时对账] --> L[补偿未处理订单]
classDef service fill:#e1f5fe,stroke:#039be5;
classDef mq fill:#f0f4c3,stroke:#afb42b;
classDef db fill:#dcedc8,stroke:#689f38;
class B,H,I,J,K,L service
class C,D,E mq
class F,J db
关键特性说明
多层防御体系
内存缓存快速过滤(Caffeine)
数据库条件更新保证原子性
分布式锁(Redisson)协调支付操作
消息幂等处理(Caffeine缓存)
异常处理机制
消息消费失败自动重试
每日对账补偿异常订单
分布式锁自动释放保证
性能优化
缓存命中率监控(Micrometer)
批量消息处理优化
数据库连接池配置(HikariCP)
监控指标
@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
return registry -> {
CaffeineCacheMetrics.monitor(
registry,
orderStatusCache,
"order_status_cache"
);
};
}
使用说明
启动前准备
# 创建MySQL数据库
CREATE DATABASE order_db CHARACTER SET utf8mb4;
# 配置RocketMQ延迟等级(broker.conf)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
API调用示例
# 创建订单
POST /orders
Response: {"orderId": "5d8f8c7e-1234-5678-90ab-cdef12345678"}
# 支付订单
POST /orders/5d8f8c7e-1234-5678-90ab-cdef12345678/pay
# 查询订单
GET /orders/5d8f8c7e-1234-5678-90ab-cdef12345678
压力测试
# 使用wrk模拟并发创建订单
wrk -t4 -c100 -d30s http://localhost:8080/orders
# 同时发起支付请求
ab -n 1000 -c 100 http://localhost:8080/orders/{orderId}/pay
其他
JPA的全称是Java Persistence API,它是Java EE(现在Jakarta EE)的一部分,用于对象关系映射(ORM),让开发者可以通过面向对象的方式来操作数据库。JPA本身是一个规范,不是具体的实现,常见的实现有Hibernate(最流行的实现,功能丰富)、EclipseLink(Oracle 官方参考实现)、OpenJPA(Apache 项目)等。
考虑不同的场景和需求。可能的替代方案包括:
-
MyBatis:与JPA不同,MyBatis更偏向于SQL映射,允许开发者直接编写SQL语句,提供了更大的灵活性,特别是对于复杂查询或需要优化SQL性能的情况。
-
JDBC:这是最基础的数据库连接方式,直接使用SQL语句,没有ORM功能。虽然灵活,但需要大量样板代码,容易出错。
-
Spring Data JDBC:Spring提供的另一种数据访问方式,比纯JDBC更简单,支持一些简单的ORM功能,但不提供JPA的复杂特性如缓存、延迟加载等。
-
NoSQL数据库的驱动程序:如果项目使用MongoDB、Redis等NoSQL数据库,通常会使用对应的客户端库,如MongoTemplate、Lettuce等,这些不属于JPA的范畴。
-
其他ORM框架:比如JOOQ,它基于SQL的模式生成类型安全的SQL查询,适合喜欢SQL但希望有类型安全的开发者。
-
新兴技术:如Micronaut Data、Quarkus的Hibernate Panache,它们可能在简化数据访问层方面提供不同的方法。
MyBatis适合需要高度控制SQL的场景,而JOOQ适合喜欢类型安全SQL的开发者。
如果用户在处理复杂的领域模型,JPA的级联操作和声明式事务可能更有优势;如果项目需要高性能的定制SQL,MyBatis或JOOQ可能更合适。
JPA 的核心特点
特性 | 说明 |
---|---|
对象关系映射 | 将 Java 对象(Entity)映射到数据库表,自动处理增删改查操作 |
JPQL | 类似 SQL 的查询语言,但面向对象(如 SELECT u FROM User u WHERE u.age > 18) |
事务管理 | 通过 @Transactional 注解管理数据库事务 |
延迟加载 | 按需加载关联数据(如 @OneToMany(fetch = FetchType.LAZY)) |
缓存机制 | 一级缓存(Session 级别)和二级缓存(应用级别)提升性能 |
JPA 的常见替代方案
MyBatis
类型:SQL 映射框架(半自动化 ORM)
特点:
开发者需手动编写 SQL,但通过 XML 或注解映射到 Java 对象。
灵活性强,适合复杂查询和高度定制的 SQL 优化。
学习曲线低于 JPA,但需维护更多 SQL 代码。
适用场景:
需要精细控制 SQL 语句。
遗留数据库结构复杂,难以通过 JPA 自动映射。
Spring Data JDBC
类型:轻量级 ORM(Spring 生态)
特点:
简化 JDBC 操作,支持基本的 CRUD 和简单查询。
无 JPA 的复杂特性(如延迟加载、缓存),性能更高。
适合对 SQL 有一定控制需求但不想写纯 JDBC 代码的场景。
JOOQ(Java Object Oriented Querying)
类型:类型安全的 SQL 构建工具
特点:
通过代码生成器将数据库表结构映射为 Java 类。
支持编写类型安全的 SQL 查询,避免 SQL 注入。
适合喜欢 SQL 但希望有编译时检查的开发者。
纯 JDBC
类型:Java 数据库连接基础 API
特点:
直接通过 SQL 操作数据库,无任何 ORM 抽象。
灵活性最高,但需要手动处理资源(Connection、ResultSet)和异常。
适合简单应用或需要极致性能的场景。
NoSQL 数据库驱动
类型:非关系型数据库专用客户端
示例:
MongoDB:使用 MongoTemplate 或 Spring Data MongoDB。
Redis:使用 Lettuce 或 Jedis 客户端。
Cassandra:使用 Spring Data Cassandra。
如何选择替代方案?
场景 | 推荐方案 | 理由 |
---|---|---|
需要快速开发简单 CRUD | JPA(Hibernate) | 自动化程度高,减少样板代码 |
复杂 SQL 或性能优化需求 | MyBatis 或 JOOQ | 直接控制 SQL,灵活优化 |
轻量级应用,避免 JPA 复杂特性 | Spring Data JDBC | 简单易用,无延迟加载等复杂机制 |
非关系型数据库(如 MongoDB) | Spring Data MongoDB | 专为 NoSQL 设计,语法与 Spring Data JPA 相似 |
遗留系统或高度定制 SQL | 纯 JDBC | 完全控制 SQL 执行流程 |
总结
JPA 适合:快速开发、对象关系映射需求明确、团队熟悉 ORM 概念。
替代方案适用场景:
需要精细控制 SQL → MyBatis/JOOQ
轻量级 ORM → Spring Data JDBC
非关系型数据库 → 专用 NoSQL 客户端
极致性能或遗留系统 → 纯 JDBC