Spring MVC 分布式事务与数据一致性教程
文章目录
- 目录
- 分布式事务概述
- 分布式事务定义
- 分布式事务挑战
- 分布式事务特性
- 分布式事务理论基础
- CAP定理
- BASE理论
- 一致性模型
- Spring事务管理机制
- 本地事务管理
- 分布式事务配置
- 两阶段提交(2PC)实现
- 2PC协议原理
- 订单服务2PC实现
- 2PC问题与改进
- Saga模式应用
- Saga模式概述
- 编排式Saga
- 事件驱动Saga
- TCC模式实现
- TCC模式概述
- TCC接口定义
- 账户服务TCC实现
- TCC事务管理器
- 最终一致性保证
- 事件溯源模式
- 补偿事务模式
- 消息驱动最终一致性
- 分布式锁机制
- Redis分布式锁
- 数据库分布式锁
- 事件驱动架构
- 领域事件
- 事件处理器
- 事件存储与重放
- 性能优化与监控
- 事务性能监控
- 连接池优化
- 最佳实践
- 1. 事务设计最佳实践
- 2. 分布式锁最佳实践
- 3. 事件处理最佳实践
- 常见问题解决
- 1. 分布式事务超时
- 2. 死锁检测与处理
- 3. 数据一致性修复
- 总结
目录
- 分布式事务概述
- 分布式事务理论基础
- Spring事务管理机制
- 两阶段提交(2PC)实现
- Saga模式应用
- TCC模式实现
- 最终一致性保证
- 分布式锁机制
- 事件驱动架构
- 性能优化与监控
- 最佳实践
- 常见问题解决
- 总结
分布式事务概述
分布式事务定义
分布式事务是指涉及多个独立系统或服务的事务操作,需要保证这些操作要么全部成功,要么全部失败,确保数据的一致性和完整性。
分布式事务挑战
1. 网络分区
- 网络延迟和不可靠性
- 消息丢失和重复
- 网络分区导致脑裂
2. 并发控制
- 多节点并发访问
- 死锁检测和预防
- 锁竞争和性能影响
3. 故障处理
- 节点故障恢复
- 数据不一致修复
- 事务状态管理
分布式事务特性
ACID特性在分布式环境中的挑战:
特性 | 本地事务 | 分布式事务 |
---|---|---|
原子性(Atomicity) | 容易保证 | 需要协调机制 |
一致性(Consistency) | 数据库约束 | 业务规则验证 |
隔离性(Isolation) | 数据库锁 | 分布式锁 |
持久性(Durability) | 本地存储 | 多副本存储 |
分布式事务理论基础
CAP定理
CAP定理说明:
- 一致性(Consistency):所有节点看到相同的数据
- 可用性(Availability):系统持续可用
- 分区容错性(Partition Tolerance):网络分区时系统仍能工作
CAP权衡:
CP系统:强一致性 + 分区容错性
AP系统:高可用性 + 分区容错性
CA系统:强一致性 + 高可用性(分布式环境不可行)
BASE理论
BASE特性:
- 基本可用(Basically Available):系统基本可用
- 软状态(Soft State):允许中间状态
- 最终一致性(Eventually Consistent):最终达到一致
一致性模型
强一致性
// 强一致性:立即看到更新
@Transactional
public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) {// 所有操作要么全部成功,要么全部失败accountService.debit(fromAccount, amount);accountService.credit(toAccount, amount);
}
最终一致性
// 最终一致性:允许短暂不一致
@Async
public void processOrder(Order order) {// 异步处理,最终达到一致状态inventoryService.reserve(order.getItems());paymentService.charge(order.getPayment());shippingService.schedule(order.getShipping());
}
Spring事务管理机制
本地事务管理
声明式事务
@Service
@Transactional
public class OrderService {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Transactional(rollbackFor = Exception.class)public Order createOrder(OrderRequest request) {// 创建订单Order order = new Order();order.setCustomerId(request.getCustomerId());order.setAmount(request.getAmount());order = orderRepository.save(order);// 处理支付paymentService.processPayment(order.getId(), request.getPaymentInfo());return order;}
}
编程式事务
@Service
public class OrderService {@Autowiredprivate TransactionTemplate transactionTemplate;public Order createOrder(OrderRequest request) {return transactionTemplate.execute(status -> {try {// 执行业务逻辑Order order = createOrderInternal(request);paymentService.processPayment(order.getId(), request.getPaymentInfo());return order;} catch (Exception e) {status.setRollbackOnly();throw e;}});}
}
分布式事务配置
Maven依赖配置
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- Spring Cloud --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter</artifactId></dependency><!-- Seata分布式事务 --><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.5.2</version></dependency><!-- RocketMQ事务消息 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
</dependencies>
Seata配置
seata:enabled: trueapplication-id: order-servicetx-service-group: my_test_tx_groupservice:vgroup-mapping:my_test_tx_group: defaultgrouplist:default: 127.0.0.1:8091client:rm:async-commit-buffer-limit: 10000report-retry-count: 5table-meta-check-enable: falsereport-success-enable: falsetm:commit-retry-count: 5rollback-retry-count: 5
两阶段提交(2PC)实现
2PC协议原理
阶段一:准备阶段
@Component
public class TwoPhaseCommitCoordinator {@Autowiredprivate List<TransactionParticipant> participants;public boolean prepare() {List<Boolean> results = new ArrayList<>();// 询问所有参与者是否可以提交for (TransactionParticipant participant : participants) {try {boolean canCommit = participant.prepare();results.add(canCommit);} catch (Exception e) {results.add(false);}}// 所有参与者都同意才能提交return results.stream().allMatch(Boolean::booleanValue);}public void commit() {if (prepare()) {// 阶段二:提交participants.forEach(TransactionParticipant::commit);} else {// 阶段二:回滚participants.forEach(TransactionParticipant::rollback);}}
}
事务参与者接口
public interface TransactionParticipant {/*** 准备阶段:检查是否可以提交*/boolean prepare();/*** 提交阶段:执行提交操作*/void commit();/*** 回滚阶段:执行回滚操作*/void rollback();
}
订单服务2PC实现
订单服务参与者
@Service
public class OrderServiceParticipant implements TransactionParticipant {@Autowiredprivate OrderRepository orderRepository;private Order pendingOrder;@Overridepublic boolean prepare() {try {// 检查库存if (!inventoryService.checkStock(pendingOrder.getItems())) {return false;}// 检查账户余额if (!accountService.checkBalance(pendingOrder.getCustomerId(), pendingOrder.getAmount())) {return false;}return true;} catch (Exception e) {return false;}}@Overridepublic void commit() {// 创建订单orderRepository.save(pendingOrder);// 扣减库存inventoryService.reserve(pendingOrder.getItems());// 扣减账户余额accountService.debit(pendingOrder.getCustomerId(), pendingOrder.getAmount());}@Overridepublic void rollback() {// 释放库存inventoryService.release(pendingOrder.getItems());// 恢复账户余额accountService.credit(pendingOrder.getCustomerId(), pendingOrder.getAmount());}
}
2PC问题与改进
2PC存在的问题:
- 同步阻塞:协调者等待所有参与者响应
- 单点故障:协调者故障导致事务阻塞
- 数据不一致:网络分区时可能出现不一致
改进方案:
@Component
public class Improved2PCCoordinator {private final ExecutorService executor = Executors.newCachedThreadPool();public CompletableFuture<Boolean> prepareAsync() {List<CompletableFuture<Boolean>> futures = participants.stream().map(participant -> CompletableFuture.supplyAsync(() -> {try {return participant.prepare();} catch (Exception e) {return false;}}, executor)).collect(Collectors.toList());return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(v -> futures.stream().map(CompletableFuture::join).allMatch(Boolean::booleanValue));}
}
Saga模式应用
Saga模式概述
Saga模式通过将长事务分解为多个本地事务,每个本地事务都有对应的补偿操作,实现分布式事务的最终一致性。
编排式Saga
Saga编排器
@Component
public class OrderSagaOrchestrator {@Autowiredprivate OrderService orderService;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate ShippingService shippingService;public void processOrder(OrderRequest request) {SagaContext context = new SagaContext();try {// 步骤1:创建订单Order order = orderService.createOrder(request);context.setOrder(order);// 步骤2:处理支付PaymentResult payment = paymentService.processPayment(order.getId(), request.getPaymentInfo());context.setPayment(payment);// 步骤3:扣减库存inventoryService.reserve(order.getItems());context.setInventoryReserved(true);// 步骤4:安排发货shippingService.schedule(order.getId(), request.getShippingInfo());context.setShippingScheduled(true);} catch (Exception e) {// 执行补偿操作compensate(context);throw new OrderProcessingException("Order processing failed", e);}}private void compensate(SagaContext context) {// 按相反顺序执行补偿操作if (context.isShippingScheduled()) {shippingService.cancel(context.getOrder().getId());}if (context.isInventoryReserved()) {inventoryService.release(context.getOrder().getItems());}if (context.getPayment() != null) {paymentService.refund(context.getPayment().getId());}if (context.getOrder() != null) {orderService.cancel(context.getOrder().getId());}}
}
Saga上下文
public class SagaContext {private Order order;private PaymentResult payment;private boolean inventoryReserved;private boolean shippingScheduled;// constructors, getters, setters
}
事件驱动Saga
Saga事件
// 订单创建事件
@Event
public class OrderCreatedEvent {private Long orderId;private Long customerId;private BigDecimal amount;private List<OrderItem> items;// constructors, getters, setters
}// 支付处理事件
@Event
public class PaymentProcessedEvent {private Long orderId;private String paymentId;private PaymentStatus status;// constructors, getters, setters
}// 库存扣减事件
@Event
public class InventoryReservedEvent {private Long orderId;private List<OrderItem> items;// constructors, getters, setters
}
Saga事件处理器
@Component
public class OrderSagaEventHandler {@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate ShippingService shippingService;@EventHandlerpublic void handle(OrderCreatedEvent event) {try {// 处理支付PaymentResult payment = paymentService.processPayment(event.getOrderId(), event.getPaymentInfo());// 发布支付处理事件eventPublisher.publishEvent(new PaymentProcessedEvent(event.getOrderId(), payment.getId(), payment.getStatus()));} catch (Exception e) {// 发布补偿事件eventPublisher.publishEvent(new OrderCancelledEvent(event.getOrderId()));}}@EventHandlerpublic void handle(PaymentProcessedEvent event) {if (event.getStatus() == PaymentStatus.SUCCESS) {try {// 扣减库存inventoryService.reserve(event.getOrderId());// 发布库存扣减事件eventPublisher.publishEvent(new InventoryReservedEvent(event.getOrderId()));} catch (Exception e) {// 发布支付退款事件eventPublisher.publishEvent(new PaymentRefundEvent(event.getPaymentId()));}}}
}
TCC模式实现
TCC模式概述
TCC(Try-Confirm-Cancel)模式通过三个操作实现分布式事务:
- Try:尝试执行业务,预留资源
- Confirm:确认执行业务,提交资源
- Cancel:取消执行业务,释放资源
TCC接口定义
TCC服务接口
public interface TccService {/*** Try阶段:尝试执行业务*/boolean tryExecute(TccContext context);/*** Confirm阶段:确认执行业务*/void confirm(TccContext context);/*** Cancel阶段:取消执行业务*/void cancel(TccContext context);
}
账户服务TCC实现
账户TCC服务
@Service
public class AccountTccService implements TccService {@Autowiredprivate AccountRepository accountRepository;@Autowiredprivate TccTransactionRepository tccTransactionRepository;@Overridepublic boolean tryExecute(TccContext context) {try {Long accountId = context.getAccountId();BigDecimal amount = context.getAmount();// 检查账户余额Account account = accountRepository.findById(accountId).orElseThrow(() -> new AccountNotFoundException(accountId));if (account.getBalance().compareTo(amount) < 0) {return false;}// 冻结资金account.setFrozenAmount(account.getFrozenAmount().add(amount));account.setBalance(account.getBalance().subtract(amount));accountRepository.save(account);// 记录TCC事务TccTransaction tccTransaction = new TccTransaction();tccTransaction.setTransactionId(context.getTransactionId());tccTransaction.setServiceName("AccountTccService");tccTransaction.setStatus(TccStatus.TRY);tccTransaction.setContext(context);tccTransactionRepository.save(tccTransaction);return true;} catch (Exception e) {return false;}}@Overridepublic void confirm(TccContext context) {TccTransaction tccTransaction = tccTransactionRepository.findByTransactionIdAndServiceName(context.getTransactionId(), "AccountTccService").orElseThrow(() -> new TccTransactionNotFoundException(context.getTransactionId()));if (tccTransaction.getStatus() == TccStatus.TRY) {// 确认扣减资金Account account = accountRepository.findById(context.getAccountId()).orElseThrow(() -> new AccountNotFoundException(context.getAccountId()));account.setFrozenAmount(account.getFrozenAmount().subtract(context.getAmount()));accountRepository.save(account);tccTransaction.setStatus(TccStatus.CONFIRM);tccTransactionRepository.save(tccTransaction);}}@Overridepublic void cancel(TccContext context) {TccTransaction tccTransaction = tccTransactionRepository.findByTransactionIdAndServiceName(context.getTransactionId(), "AccountTccService").orElseThrow(() -> new TccTransactionNotFoundException(context.getTransactionId()));if (tccTransaction.getStatus() == TccStatus.TRY) {// 恢复资金Account account = accountRepository.findById(context.getAccountId()).orElseThrow(() -> new AccountNotFoundException(context.getAccountId()));account.setFrozenAmount(account.getFrozenAmount().subtract(context.getAmount()));account.setBalance(account.getBalance().add(context.getAmount()));accountRepository.save(account);tccTransaction.setStatus(TccStatus.CANCEL);tccTransactionRepository.save(tccTransaction);}}
}
TCC事务管理器
TCC事务协调器
@Component
public class TccTransactionManager {@Autowiredprivate List<TccService> tccServices;@Autowiredprivate TccTransactionRepository tccTransactionRepository;public void executeTccTransaction(String transactionId, List<TccContext> contexts) {try {// 阶段1:Try所有服务List<Boolean> tryResults = new ArrayList<>();for (int i = 0; i < tccServices.size(); i++) {TccService service = tccServices.get(i);TccContext context = contexts.get(i);boolean result = service.tryExecute(context);tryResults.add(result);}// 检查所有Try结果if (tryResults.stream().allMatch(Boolean::booleanValue)) {// 阶段2:Confirm所有服务for (int i = 0; i < tccServices.size(); i++) {TccService service = tccServices.get(i);TccContext context = contexts.get(i);service.confirm(context);}} else {// 阶段2:Cancel所有服务for (int i = 0; i < tccServices.size(); i++) {TccService service = tccServices.get(i);TccContext context = contexts.get(i);service.cancel(context);}}} catch (Exception e) {// 异常时执行Cancelfor (int i = 0; i < tccServices.size(); i++) {try {TccService service = tccServices.get(i);TccContext context = contexts.get(i);service.cancel(context);} catch (Exception cancelException) {// 记录Cancel异常log.error("Failed to cancel TCC transaction", cancelException);}}throw new TccTransactionException("TCC transaction failed", e);}}
}
最终一致性保证
事件溯源模式
事件存储
@Entity
@Table(name = "domain_events")
public class DomainEvent {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(name = "aggregate_id", nullable = false)private String aggregateId;@Column(name = "event_type", nullable = false)private String eventType;@Column(name = "event_data", columnDefinition = "TEXT")private String eventData;@Column(name = "event_version", nullable = false)private Long eventVersion;@Column(name = "created_at", nullable = false)private LocalDateTime createdAt;// constructors, getters, setters
}
事件存储服务
@Service
public class EventStoreService {@Autowiredprivate DomainEventRepository eventRepository;public void saveEvent(String aggregateId, String eventType, Object eventData) {DomainEvent event = new DomainEvent();event.setAggregateId(aggregateId);event.setEventType(eventType);event.setEventData(JSON.toJSONString(eventData));event.setEventVersion(getNextVersion(aggregateId));event.setCreatedAt(LocalDateTime.now());eventRepository.save(event);}public List<DomainEvent> getEvents(String aggregateId) {return eventRepository.findByAggregateIdOrderByEventVersion(aggregateId);}private Long getNextVersion(String aggregateId) {return eventRepository.findMaxVersionByAggregateId(aggregateId) + 1;}
}
补偿事务模式
补偿事务定义
public interface CompensatableTransaction {/*** 执行业务操作*/void execute(TransactionContext context);/*** 执行补偿操作*/void compensate(TransactionContext context);
}
订单补偿事务
@Service
public class OrderCompensatableTransaction implements CompensatableTransaction {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate PaymentService paymentService;@Overridepublic void execute(TransactionContext context) {Order order = (Order) context.getData("order");// 创建订单orderRepository.save(order);// 扣减库存inventoryService.reserve(order.getItems());// 处理支付paymentService.processPayment(order.getId(), order.getPaymentInfo());}@Overridepublic void compensate(TransactionContext context) {Order order = (Order) context.getData("order");// 取消订单order.setStatus(OrderStatus.CANCELLED);orderRepository.save(order);// 释放库存inventoryService.release(order.getItems());// 退款paymentService.refund(order.getPaymentId());}
}
消息驱动最终一致性
消息发布者
@Component
public class DomainEventPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;public void publishEvent(DomainEvent event) {// 发布到消息队列rabbitTemplate.convertAndSend("domain.events", event);// 本地事件存储eventStoreService.saveEvent(event.getAggregateId(), event.getEventType(), event.getEventData());}
}
消息消费者
@Component
@RabbitListener(queues = "domain.events")
public class DomainEventConsumer {@Autowiredprivate List<DomainEventHandler> eventHandlers;@RabbitHandlerpublic void handleEvent(DomainEvent event) {for (DomainEventHandler handler : eventHandlers) {if (handler.canHandle(event.getEventType())) {try {handler.handle(event);} catch (Exception e) {// 处理失败,发送到死信队列log.error("Failed to handle domain event", e);}}}}
}
分布式锁机制
Redis分布式锁
Redis锁实现
@Component
public class RedisDistributedLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String LOCK_PREFIX = "lock:";private static final int DEFAULT_EXPIRE_TIME = 30; // 30秒public boolean tryLock(String lockKey, String lockValue, int expireTime) {String key = LOCK_PREFIX + lockKey;Boolean success = redisTemplate.opsForValue().setIfAbsent(key, lockValue, Duration.ofSeconds(expireTime));return Boolean.TRUE.equals(success);}public boolean releaseLock(String lockKey, String lockValue) {String key = LOCK_PREFIX + lockKey;// 使用Lua脚本确保原子性String script = """if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])elsereturn 0end""";Long result = redisTemplate.execute((RedisCallback<Long>) connection -> connection.eval(script.getBytes(), ReturnType.INTEGER, 1, key.getBytes(), lockValue.getBytes()));return result != null && result == 1;}
}
分布式锁服务
@Service
public class DistributedLockService {@Autowiredprivate RedisDistributedLock distributedLock;public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {String lockValue = UUID.randomUUID().toString();if (distributedLock.tryLock(lockKey, lockValue, 30)) {try {return supplier.get();} finally {distributedLock.releaseLock(lockKey, lockValue);}} else {throw new LockAcquisitionException("Failed to acquire lock: " + lockKey);}}public void executeWithLock(String lockKey, Runnable runnable) {String lockValue = UUID.randomUUID().toString();if (distributedLock.tryLock(lockKey, lockValue, 30)) {try {runnable.run();} finally {distributedLock.releaseLock(lockKey, lockValue);}} else {throw new LockAcquisitionException("Failed to acquire lock: " + lockKey);}}
}
数据库分布式锁
数据库锁实现
@Entity
@Table(name = "distributed_locks")
public class DistributedLock {@Id@Column(name = "lock_key", nullable = false)private String lockKey;@Column(name = "lock_value", nullable = false)private String lockValue;@Column(name = "expire_time", nullable = false)private LocalDateTime expireTime;@Column(name = "created_at", nullable = false)private LocalDateTime createdAt;// constructors, getters, setters
}
数据库锁服务
@Service
public class DatabaseDistributedLockService {@Autowiredprivate DistributedLockRepository lockRepository;public boolean tryLock(String lockKey, String lockValue, int expireSeconds) {try {// 清理过期锁lockRepository.deleteExpiredLocks(LocalDateTime.now());// 尝试获取锁DistributedLock lock = new DistributedLock();lock.setLockKey(lockKey);lock.setLockValue(lockValue);lock.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));lock.setCreatedAt(LocalDateTime.now());lockRepository.save(lock);return true;} catch (DataIntegrityViolationException e) {// 锁已存在return false;}}public boolean releaseLock(String lockKey, String lockValue) {return lockRepository.deleteByLockKeyAndLockValue(lockKey, lockValue) > 0;}
}
事件驱动架构
领域事件
领域事件基类
public abstract class DomainEvent {private String eventId;private LocalDateTime occurredOn;private String aggregateId;private Long aggregateVersion;public DomainEvent() {this.eventId = UUID.randomUUID().toString();this.occurredOn = LocalDateTime.now();}// getters, setters
}
订单领域事件
public class OrderCreatedEvent extends DomainEvent {private Long orderId;private Long customerId;private BigDecimal amount;private List<OrderItem> items;public OrderCreatedEvent(Long orderId, Long customerId, BigDecimal amount, List<OrderItem> items) {super();this.orderId = orderId;this.customerId = customerId;this.amount = amount;this.items = items;}// getters, setters
}public class OrderCancelledEvent extends DomainEvent {private Long orderId;private String reason;public OrderCancelledEvent(Long orderId, String reason) {super();this.orderId = orderId;this.reason = reason;}// getters, setters
}
事件处理器
事件处理器接口
public interface DomainEventHandler<T extends DomainEvent> {void handle(T event);Class<T> getEventType();
}
库存事件处理器
@Component
public class InventoryEventHandler implements DomainEventHandler<OrderCreatedEvent> {@Autowiredprivate InventoryService inventoryService;@Overridepublic void handle(OrderCreatedEvent event) {try {// 扣减库存inventoryService.reserve(event.getItems());// 发布库存扣减事件eventPublisher.publishEvent(new InventoryReservedEvent(event.getOrderId()));} catch (InsufficientStockException e) {// 库存不足,发布订单取消事件eventPublisher.publishEvent(new OrderCancelledEvent(event.getOrderId(), "Insufficient stock"));}}@Overridepublic Class<OrderCreatedEvent> getEventType() {return OrderCreatedEvent.class;}
}
事件存储与重放
事件存储服务
@Service
public class EventStoreService {@Autowiredprivate EventRepository eventRepository;public void saveEvent(DomainEvent event) {StoredEvent storedEvent = new StoredEvent();storedEvent.setEventId(event.getEventId());storedEvent.setEventType(event.getClass().getName());storedEvent.setEventData(JSON.toJSONString(event));storedEvent.setOccurredOn(event.getOccurredOn());storedEvent.setAggregateId(event.getAggregateId());storedEvent.setAggregateVersion(event.getAggregateVersion());eventRepository.save(storedEvent);}public List<StoredEvent> getEvents(String aggregateId) {return eventRepository.findByAggregateIdOrderByOccurredOn(aggregateId);}public void replayEvents(String aggregateId, List<DomainEventHandler> handlers) {List<StoredEvent> events = getEvents(aggregateId);for (StoredEvent storedEvent : events) {DomainEvent event = deserializeEvent(storedEvent);for (DomainEventHandler handler : handlers) {if (handler.getEventType().isAssignableFrom(event.getClass())) {handler.handle(event);}}}}private DomainEvent deserializeEvent(StoredEvent storedEvent) {try {Class<?> eventClass = Class.forName(storedEvent.getEventType());return (DomainEvent) JSON.parseObject(storedEvent.getEventData(), eventClass);} catch (Exception e) {throw new EventDeserializationException("Failed to deserialize event", e);}}
}
性能优化与监控
事务性能监控
事务监控指标
@Component
public class TransactionMetrics {private final MeterRegistry meterRegistry;private final Timer transactionTimer;private final Counter successCounter;private final Counter failureCounter;public TransactionMetrics(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;this.transactionTimer = Timer.builder("transaction.duration").register(meterRegistry);this.successCounter = Counter.builder("transaction.success").register(meterRegistry);this.failureCounter = Counter.builder("transaction.failure").register(meterRegistry);}public void recordTransaction(String transactionType, Duration duration, boolean success) {transactionTimer.record(duration);if (success) {successCounter.increment(Tags.of("type", transactionType));} else {failureCounter.increment(Tags.of("type", transactionType));}}
}
事务性能分析
@Service
public class TransactionPerformanceAnalyzer {@Autowiredprivate TransactionMetrics metrics;public void analyzeTransactionPerformance() {// 分析事务执行时间Timer.Sample sample = Timer.start(metrics.getMeterRegistry());try {// 执行事务executeTransaction();sample.stop(Timer.builder("transaction.analysis").register(metrics.getMeterRegistry()));} catch (Exception e) {sample.stop(Timer.builder("transaction.analysis.failed").register(metrics.getMeterRegistry()));throw e;}}
}
连接池优化
数据源配置优化
spring:datasource:hikari:maximum-pool-size: 20minimum-idle: 5connection-timeout: 30000idle-timeout: 600000max-lifetime: 1800000leak-detection-threshold: 60000
连接池监控
@Component
public class ConnectionPoolMonitor {@Autowiredprivate DataSource dataSource;@Scheduled(fixedRate = 60000) // 每分钟检查一次public void monitorConnectionPool() {if (dataSource instanceof HikariDataSource) {HikariDataSource hikariDataSource = (HikariDataSource) dataSource;HikariPoolMXBean poolBean = hikariDataSource.getHikariPoolMXBean();log.info("Active connections: {}", poolBean.getActiveConnections());log.info("Idle connections: {}", poolBean.getIdleConnections());log.info("Total connections: {}", poolBean.getTotalConnections());log.info("Threads awaiting connection: {}", poolBean.getThreadsAwaitingConnection());}}
}
最佳实践
1. 事务设计最佳实践
事务边界设计
// ✅ 好的实践:合理的事务边界
@Service
@Transactional
public class OrderService {@Transactional(rollbackFor = Exception.class)public Order createOrder(OrderRequest request) {// 只包含必要的业务操作Order order = new Order();order.setCustomerId(request.getCustomerId());order.setAmount(request.getAmount());return orderRepository.save(order);}
}// ❌ 不好的实践:事务边界过大
@Service
@Transactional
public class BadOrderService {@Transactional(rollbackFor = Exception.class)public Order createOrder(OrderRequest request) {// 包含太多操作,增加锁竞争Order order = createOrderInternal(request);sendEmailNotification(request.getCustomerId());updateCustomerStatistics(request.getCustomerId());logAuditTrail(request);return order;}
}
2. 分布式锁最佳实践
锁粒度控制
// ✅ 好的实践:细粒度锁
public void updateUserProfile(Long userId, UserProfile profile) {String lockKey = "user:" + userId;distributedLockService.executeWithLock(lockKey, () -> {// 只锁定特定用户userService.updateProfile(userId, profile);});
}// ❌ 不好的实践:粗粒度锁
public void updateUserProfile(Long userId, UserProfile profile) {String lockKey = "user_update"; // 锁住所有用户更新distributedLockService.executeWithLock(lockKey, () -> {userService.updateProfile(userId, profile);});
}
3. 事件处理最佳实践
幂等性保证
@Component
public class IdempotentEventHandler {@Autowiredprivate EventProcessedRepository processedRepository;public void handleEvent(DomainEvent event) {String eventId = event.getEventId();// 检查事件是否已处理if (processedRepository.existsByEventId(eventId)) {return; // 已处理,跳过}try {// 处理事件processEvent(event);// 记录已处理EventProcessed processed = new EventProcessed();processed.setEventId(eventId);processed.setProcessedAt(LocalDateTime.now());processedRepository.save(processed);} catch (Exception e) {// 处理失败,不记录为已处理throw e;}}
}
常见问题解决
1. 分布式事务超时
问题描述:分布式事务执行时间过长导致超时
解决方案:
@Service
public class TimeoutAwareTransactionService {@Transactional(timeout = 30) // 设置事务超时public void executeWithTimeout() {try {// 执行业务逻辑doBusinessLogic();} catch (TransactionTimedOutException e) {// 处理超时情况handleTimeout();}}private void handleTimeout() {// 记录超时日志log.warn("Transaction timed out, initiating compensation");// 执行补偿操作compensationService.compensate();}
}
2. 死锁检测与处理
死锁检测器
@Component
public class DeadlockDetector {@Autowiredprivate DistributedLockService lockService;@Scheduled(fixedRate = 30000) // 每30秒检查一次public void detectDeadlocks() {List<String> longRunningLocks = lockService.getLongRunningLocks(60); // 超过60秒的锁for (String lockKey : longRunningLocks) {if (isDeadlock(lockKey)) {log.warn("Deadlock detected for lock: {}", lockKey);handleDeadlock(lockKey);}}}private boolean isDeadlock(String lockKey) {// 实现死锁检测逻辑return lockService.isLockStuck(lockKey);}private void handleDeadlock(String lockKey) {// 强制释放锁lockService.forceReleaseLock(lockKey);// 发送告警alertService.sendDeadlockAlert(lockKey);}
}
3. 数据一致性修复
一致性检查器
@Component
public class ConsistencyChecker {@Autowiredprivate List<ConsistencyCheckService> checkServices;@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行public void checkConsistency() {for (ConsistencyCheckService checkService : checkServices) {try {List<Inconsistency> inconsistencies = checkService.check();if (!inconsistencies.isEmpty()) {log.warn("Found {} inconsistencies", inconsistencies.size());repairInconsistencies(inconsistencies);}} catch (Exception e) {log.error("Failed to check consistency", e);}}}private void repairInconsistencies(List<Inconsistency> inconsistencies) {for (Inconsistency inconsistency : inconsistencies) {try {repairService.repair(inconsistency);} catch (Exception e) {log.error("Failed to repair inconsistency: {}", inconsistency, e);}}}
}
总结
Spring MVC分布式事务与数据一致性是现代微服务架构的核心技术。通过合理的设计和实现,可以确保分布式环境下的数据一致性。
核心技术点:
- 事务管理:本地事务与分布式事务的协调
- 一致性模式:强一致性与最终一致性的选择
- 补偿机制:Saga、TCC等补偿模式的应用
- 事件驱动:事件溯源与事件驱动的架构设计
- 性能优化:连接池、锁机制、监控等性能优化
实施建议:
- 根据业务需求选择合适的一致性模型
- 设计合理的事务边界和补偿机制
- 实施完善的监控和告警系统
- 建立数据一致性检查和修复机制
- 持续优化性能和可靠性
通过本教程的学习,小伙伴们将掌握Spring MVC分布式事务的设计与实现,为构建高可用的分布式系统奠定坚实基础。