当前位置: 首页 > news >正文

Spring事务实现原理深度解析:从源码到架构全面剖析

摘要

Spring事务通过AOP代理、事务管理器、同步机制等组件提供了强大而灵活的事务管理能力。本文将重点剖析事务传播机制的实现原理,包括事务管理器的架构、AOP代理原理、事务同步机制、传播行为实现细节等核心概念。

1. Spring事务架构概述

1.1 事务管理核心接口体系

Spring事务管理建立在三个核心接口之上,这些接口构成了整个事务管理体系的抽象层:

// 事务管理器接口 - 定义事务的基本操作
public interface PlatformTransactionManager {// 根据事务定义获取事务状态TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;// 提交事务void commit(TransactionStatus status) throws TransactionException;// 回滚事务void rollback(TransactionStatus status) throws TransactionException;
}// 事务定义接口 - 定义事务的基本属性
public interface TransactionDefinition {// 传播行为常量int PROPAGATION_REQUIRED = 0;int PROPAGATION_SUPPORTS = 1;int PROPAGATION_MANDATORY = 2;int PROPAGATION_REQUIRES_NEW = 3;int PROPAGATION_NOT_SUPPORTED = 4;int PROPAGATION_NEVER = 5;int PROPAGATION_NESTED = 6;// 隔离级别常量int ISOLATION_DEFAULT = -1;int ISOLATION_READ_UNCOMMITTED = 1;int ISOLATION_READ_COMMITTED = 2;int ISOLATION_REPEATABLE_READ = 3;int ISOLATION_SERIALIZABLE = 4;// 超时时间常量int TIMEOUT_DEFAULT = -1;// 获取传播行为int getPropagationBehavior();// 获取隔离级别int getIsolationLevel();// 获取超时时间int getTimeout();// 是否只读boolean isReadOnly();// 获取事务名称String getName();
}// 事务状态接口 - 表示事务的当前状态
public interface TransactionStatus extends SavepointManager, Flushable {// 是否是新事务boolean isNewTransaction();// 是否有保存点boolean hasSavepoint();// 设置为仅回滚void setRollbackOnly();// 是否标记为仅回滚boolean isRollbackOnly();// 是否已完成boolean isCompleted();
}

1.2 事务管理器实现层次结构

Spring提供了多种事务管理器实现,形成了清晰的继承体系:

// 抽象事务管理器 - 实现通用的事务管理逻辑
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager {// 事务同步策略public static final int SYNCHRONIZATION_ALWAYS = 0;public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;public static final int SYNCHRONIZATION_NEVER = 2;// 事务同步策略private int transactionSynchronization = SYNCHRONIZATION_ON_ACTUAL_TRANSACTION;// 默认超时时间private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;// 是否在参与失败时全局回滚private boolean globalRollbackOnParticipationFailure = true;// 是否在提交失败时回滚private boolean rollbackOnCommitFailure = true;// 是否验证现有事务private boolean validateExistingTransaction = true;// 核心事务获取方法@Overridepublic final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {// 1. 获取底层事务对象Object transaction = doGetTransaction();// 2. 缓存调试标志boolean debugEnabled = logger.isDebugEnabled();// 3. 如果没有事务定义,使用默认定义if (definition == null) {definition = new DefaultTransactionDefinition();}// 4. 检查是否已存在事务if (isExistingTransaction(transaction)) {// 处理已存在事务的情况(根据传播行为)return handleExistingTransaction(definition, transaction, debugEnabled);}// 5. 检查传播行为if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// 挂起任何现有事务SuspendedResourcesHolder suspendedResources = suspend(null);if (debugEnabled) {logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);}try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(transaction, true, newSynchronization, definition.getIsolationLevel(), debugEnabled, suspendedResources);// 开始事务doBegin(transaction, definition);// 准备同步prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error ex) {// 开始失败时恢复挂起的事务resume(null, suspendedResources);throw ex;}}else {// PROPAGATION_SUPPORTS 或 PROPAGATION_NOT_SUPPORTED 或 PROPAGATION_NEVERif (debugEnabled) {logger.debug("Non-transactional execution with propagation '" + definition.getPropagationBehavior() + "' on: " + definition);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(transaction, false, newSynchronization, definition.getIsolationLevel(), debugEnabled);}}// 处理已存在事务的情况private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {// 根据传播行为处理switch (definition.getPropagationBehavior()) {case TransactionDefinition.PROPAGATION_NEVER:throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:// 挂起现有事务,以非事务方式执行if (debugEnabled) {logger.debug("Suspending current transaction");}Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);return prepareTransactionStatus(null, false, newSynchronization, definition.getIsolationLevel(), debugEnabled, suspendedResources);case TransactionDefinition.PROPAGATION_REQUIRES_NEW:// 挂起现有事务,创建新事务if (debugEnabled) {logger.debug("Suspending current transaction, creating new transaction with name [" +definition.getName() + "]");}SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(transaction, true, newSynchronization, definition.getIsolationLevel(), debugEnabled, suspendedResources);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}case TransactionDefinition.PROPAGATION_NESTED:// 嵌套事务if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (useSavepointForNestedTransaction()) {// 使用保存点创建嵌套事务DefaultTransactionStatus status =prepareTransactionStatus(transaction, false, false, definition.getIsolationLevel(), debugEnabled);status.createAndHoldSavepoint();return status;}else {// 使用真正的嵌套事务boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(transaction, true, newSynchronization, definition.getIsolationLevel(), debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}case TransactionDefinition.PROPAGATION_SUPPORTS:case TransactionDefinition.PROPAGATION_REQUIRED:case TransactionDefinition.PROPAGATION_MANDATORY:// 默认情况:加入现有事务if (debugEnabled) {logger.debug("Participating in existing transaction");}if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(transaction, false, newSynchronization, definition.getIsolationLevel(), debugEnabled);default:throw new IllegalTransactionStateException("Unknown propagation behavior: " + definition.getPropagationBehavior());}}
}

1.3 数据源事务管理器实现

DataSourceTransactionManager是Spring中最常用的事务管理器实现,它管理JDBC连接的事务:

public class DataSourceTransactionManager extends AbstractPlatformTransactionManagerimplements ResourceTransactionManager, InitializingBean {private DataSource dataSource;// 事务对象,封装连接持有者private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {private boolean newConnectionHolder;private boolean mustRestoreAutoCommit;private Integer previousIsolationLevel;public void setConnectionHolder(ConnectionHolder connectionHolder, boolean newConnectionHolder) {super.setConnectionHolder(connectionHolder);this.newConnectionHolder = newConnectionHolder;}public boolean isNewConnectionHolder() {return this.newConnectionHolder;}public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {this.mustRestoreAutoCommit = mustRestoreAutoCommit;}public boolean isMustRestoreAutoCommit() {return this.mustRestoreAutoCommit;}public void setPreviousIsolationLevel(Integer previousIsolationLevel) {this.previousIsolationLevel = previousIsolationLevel;}public Integer getPreviousIsolationLevel() {return this.previousIsolationLevel;}public boolean hasConnectionHolder() {return (getConnectionHolder() != null);}}@Overrideprotected Object doGetTransaction() {DataSourceTransactionObject txObject = new DataSourceTransactionObject();txObject.setSavepointAllowed(isNestedTransactionAllowed());// 从线程上下文中获取当前连接持有者ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());txObject.setConnectionHolder(conHolder, false);return txObject;}@Overrideprotected boolean isExistingTransaction(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());}@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;try {// 1. 如果没有现有连接,从数据源获取新连接if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {Connection newCon = obtainDataSource().getConnection();if (logger.isDebugEnabled()) {logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}// 2. 绑定连接持有者到当前线程txObject.getConnectionHolder().setSynchronizedWithTransaction(true);txObject.getConnectionHolder().setTransactionActive(true);// 3. 设置事务隔离级别和只读状态Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(txObject.getConnectionHolder(), definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);// 4. 设置自动提交为false(开启事务)Connection conToUse = txObject.getConnectionHolder().getConnection();if (conToUse.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);if (logger.isDebugEnabled()) {logger.debug("Switching JDBC Connection [" + conToUse + "] to manual commit");}conToUse.setAutoCommit(false);}// 5. 准备事务连接持有者prepareTransactionalConnection(conToUse, definition);txObject.getConnectionHolder().setTransactionActive(true);// 6. 绑定数据源和连接持有者到同步管理器int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// 7. 绑定连接持有者到当前线程if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());}}catch (SQLException ex) {// 异常处理:释放连接if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, obtainDataSource());txObject.setConnectionHolder(null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);}}@Overrideprotected void doCommit(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {if (status.isDebug()) {logger.debug("Committing JDBC transaction on Connection [" + con + "]");}// 提交事务con.commit();}catch (SQLException ex) {throw new TransactionSystemException("Could not commit JDBC transaction", ex);}}@Overrideprotected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {if (status.isDebug()) {logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");}// 回滚事务con.rollback();}catch (SQLException ex) {throw new TransactionSystemException("Could not roll back JDBC transaction", ex);}}@Overrideprotected void doSetRollbackOnly(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only");}txObject.getConnectionHolder().setRollbackOnly();}@Overrideprotected void doCleanupAfterCompletion(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// 1. 解绑连接持有者if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.unbindResource(obtainDataSource());}// 2. 重置连接状态Connection con = txObject.getConnectionHolder().getConnection();try {if (txObject.isMustRestoreAutoCommit()) {con.setAutoCommit(true);}DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());}catch (Throwable ex) {logger.debug("Could not reset JDBC Connection after transaction", ex);}if (txObject.isNewConnectionHolder()) {if (logger.isDebugEnabled()) {logger.debug("Releasing JDBC Connection [" + con + "] after transaction");}DataSourceUtils.releaseConnection(con, obtainDataSource());}// 3. 清理连接持有者txObject.getConnectionHolder().clear();}
}

2. Spring事务AOP实现机制

2.1 事务代理创建过程

Spring通过InfrastructureAdvisorAutoProxyCreator自动创建事务代理,这是Spring基础设施自动代理机制的一部分:

@Configuration
@EnableTransactionManagement
public class TransactionManagementConfiguration {@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();advisor.setTransactionAttributeSource(transactionAttributeSource());advisor.setAdvice(transactionInterceptor());if (this.enableTx != null) {advisor.setOrder(this.enableTx.<Integer>getNumber("order"));}return advisor;}@Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public TransactionAttributeSource transactionAttributeSource() {return new AnnotationTransactionAttributeSource();}@Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public TransactionInterceptor transactionInterceptor() {TransactionInterceptor interceptor = new TransactionInterceptor();interceptor.setTransactionAttributeSource(transactionAttributeSource());if (this.txManager != null) {interceptor.setTransactionManager(this.txManager);}return interceptor;}
}

BeanFactoryTransactionAttributeSourceAdvisor是一个Spring AOP Advisor,它结合了事务属性源和事务拦截器:

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {private TransactionAttributeSource transactionAttributeSource;private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {@Overrideprotected TransactionAttributeSource getTransactionAttributeSource() {return transactionAttributeSource;}};public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {this.transactionAttributeSource = transactionAttributeSource;}@Overridepublic Pointcut getPointcut() {return this.pointcut;}// 事务属性源切点private abstract static class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {@Overridepublic boolean matches(Method method, Class<?> targetClass) {TransactionAttributeSource tas = getTransactionAttributeSource();return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);}@Overridepublic boolean equals(Object other) {return (this == other || (other instanceof TransactionAttributeSourcePointcut &&Objects.equals(getTransactionAttributeSource(), ((TransactionAttributeSourcePointcut) other).getTransactionAttributeSource())));}@Overridepublic int hashCode() {return TransactionAttributeSourcePointcut.class.hashCode();}@Overridepublic String toString() {return getClass().getName() + ": " + getTransactionAttributeSource();}protected abstract TransactionAttributeSource getTransactionAttributeSource();}
}

2.2 事务拦截器核心逻辑

TransactionInterceptor是Spring事务的核心拦截器,它实现了MethodInterceptor接口:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {public TransactionInterceptor() {setTransactionAttributeSource(new AnnotationTransactionAttributeSource());}public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {setTransactionManager(ptm);setTransactionAttributeSource(tas);}@Override@Nullablepublic Object invoke(MethodInvocation invocation) throws Throwable {// 获取目标类Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// 适配到事务方法调用return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}
}

TransactionAspectSupport提供了事务拦截的核心逻辑:

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {@Nullableprotected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {// 1. 获取事务属性TransactionAttributeSource tas = getTransactionAttributeSource();final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);// 2. 获取事务管理器final PlatformTransactionManager tm = determineTransactionManager(txAttr);// 3. 构造连接点标识(用于日志)final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);// 4. 声明式事务处理if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {// 标准事务:获取事务,执行目标方法,提交/回滚TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);Object retVal;try {// 执行目标方法retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// 异常处理:回滚或重新抛出completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {// 清理事务信息cleanupTransactionInfo(txInfo);}// 提交事务commitTransactionAfterReturning(txInfo);return retVal;}else {// 编程式事务处理final ThrowableHolder throwableHolder = new ThrowableHolder();try {Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);try {Object retVal = invocation.proceedWithInvocation();if (throwableHolder.throwable != null) {throw throwableHolder.throwable;}return retVal;}catch (Throwable ex) {if (txAttr.rollbackOn(ex)) {if (ex instanceof RuntimeException) {throw (RuntimeException) ex;}else {throw new ThrowableHolderException(ex);}}else {throwableHolder.throwable = ex;return null;}}finally {cleanupTransactionInfo(txInfo);}});if (throwableHolder.throwable != null) {throw throwableHolder.throwable;}return result;}catch (ThrowableHolderException ex) {throw ex.getCause();}catch (TransactionSystemException ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);ex2.initApplicationException(throwableHolder.throwable);}throw ex2;}catch (RuntimeException | Error ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);}throw ex2;}}}// 创建事务信息protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {// 1. 如果没有指定名称,使用方法标识作为事务名称if (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {@Overridepublic String getName() {return joinpointIdentification;}};}TransactionStatus status = null;if (txAttr != null) {if (tm != null) {// 2. 获取事务status = tm.getTransaction(txAttr);}else {// 3. 非事务环境if (logger.isDebugEnabled()) {logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +"] because no transaction manager has been configured");}}}// 4. 创建事务信息并绑定到线程return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}// 准备事务信息protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);if (txAttr != null) {if (logger.isTraceEnabled()) {logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");}txInfo.newTransactionStatus(status);}else {if (logger.isTraceEnabled()) {logger.trace("No Spring transaction for [" + txInfo.getJoinpointIdentification() + "]");}}// 绑定事务信息到当前线程txInfo.bindToThread();return txInfo;}// 异常处理protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {if (txInfo != null && txInfo.getTransactionStatus() != null) {if (logger.isTraceEnabled()) {logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +"] after exception: " + ex);}if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {try {txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());}catch (TransactionSystemException ex2) {logger.error("Application exception overridden by rollback exception", ex);ex2.initApplicationException(ex);throw ex2;}catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by rollback exception", ex);throw ex2;}}else {try {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}catch (TransactionSystemException ex2) {logger.error("Application exception overridden by commit exception", ex);ex2.initApplicationException(ex);throw ex2;}catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by commit exception", ex);throw ex2;}}}}// 提交事务protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {if (logger.isTraceEnabled()) {logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");}txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}// 清理事务信息protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {if (txInfo != null) {txInfo.restoreThreadLocalStatus();}}// 事务信息内部类protected static final class TransactionInfo {@Nullableprivate final PlatformTransactionManager transactionManager;@Nullableprivate final TransactionAttribute transactionAttribute;private final String joinpointIdentification;@Nullableprivate TransactionStatus transactionStatus;@Nullableprivate TransactionInfo oldTransactionInfo;public TransactionInfo(@Nullable PlatformTransactionManager transactionManager,@Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {this.transactionManager = transactionManager;this.transactionAttribute = transactionAttribute;this.joinpointIdentification = joinpointIdentification;}public PlatformTransactionManager getTransactionManager() {Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");return this.transactionManager;}@Nullablepublic TransactionAttribute getTransactionAttribute() {return this.transactionAttribute;}public String getJoinpointIdentification() {return this.joinpointIdentification;}public void newTransactionStatus(@Nullable TransactionStatus status) {this.transactionStatus = status;}@Nullablepublic TransactionStatus getTransactionStatus() {return this.transactionStatus;}public void bindToThread() {this.oldTransactionInfo = transactionInfoHolder.get();transactionInfoHolder.set(this);}public void restoreThreadLocalStatus() {transactionInfoHolder.set(this.oldTransactionInfo);}@Overridepublic String toString() {return this.transactionAttribute.toString();}}private static final ThreadLocal<TransactionInfo> transactionInfoHolder =new NamedThreadLocal<>("Current aspect-driven transaction");
}

3. 事务传播机制深度解析

3.1 事务传播行为概述

事务传播行为定义了事务方法被调用时事务边界的行为,Spring定义了7种传播行为:

  1. PROPAGATION_REQUIRED(默认):如果当前存在事务,则加入该事务;如果不存在,则创建一个新事务。
  2. PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果不存在,则以非事务方式执行。
  3. PROPAGATION_MANDATORY:如果当前存在事务,则加入该事务;如果不存在,则抛出异常。
  4. PROPAGATION_REQUIRES_NEW:创建一个新事务,如果当前存在事务,则挂起当前事务。
  5. PROPAGATION_NOT_SUPPORTED:以非事务方式执行,如果当前存在事务,则挂起当前事务。
  6. PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
  7. PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行;如果不存在,则创建一个新事务。

3.2 事务传播行为实现原理

Spring事务传播行为的实现核心在AbstractPlatformTransactionManager.handleExistingTransaction方法中:

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {// 根据传播行为处理switch (definition.getPropagationBehavior()) {case TransactionDefinition.PROPAGATION_NEVER:// 不允许存在事务throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:// 挂起现有事务,以非事务方式执行if (debugEnabled) {logger.debug("Suspending current transaction");}Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);return prepareTransactionStatus(null, false, newSynchronization, definition.getIsolationLevel(), debugEnabled, suspendedResources);case TransactionDefinition.PROPAGATION_REQUIRES_NEW:// 挂起现有事务,创建新事务if (debugEnabled) {logger.debug("Suspending current transaction, creating new transaction with name [" +definition.getName() + "]");}SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(transaction, true, newSynchronization, definition.getIsolationLevel(), debugEnabled, suspendedResources);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}case TransactionDefinition.PROPAGATION_NESTED:// 嵌套事务if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (useSavepointForNestedTransaction()) {// 使用保存点创建嵌套事务DefaultTransactionStatus status =prepareTransactionStatus(transaction, false, false, definition.getIsolationLevel(), debugEnabled);status.createAndHoldSavepoint();return status;}else {// 使用真正的嵌套事务boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(transaction, true, newSynchronization, definition.getIsolationLevel(), debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}case TransactionDefinition.PROPAGATION_SUPPORTS:case TransactionDefinition.PROPAGATION_REQUIRED:case TransactionDefinition.PROPAGATION_MANDATORY:// 默认情况:加入现有事务if (debugEnabled) {logger.debug("Participating in existing transaction");}if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(transaction, false, newSynchronization, definition.getIsolationLevel(), debugEnabled);default:throw new IllegalTransactionStateException("Unknown propagation behavior: " + definition.getPropagationBehavior());}
}

3.3 事务挂起与恢复机制

事务挂起是事务传播行为中的关键机制,它允许在需要时暂停当前事务,执行其他操作,然后再恢复原事务:

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {if (TransactionSynchronizationManager.isSynchronizationActive()) {List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();try {Object suspendedResources = null;if (transaction != null) {suspendedResources = doSuspend(transaction);}String name = TransactionSynchronizationManager.getCurrentTransactionName();TransactionSynchronizationManager.setCurrentTransactionName(null);Boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);Boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();TransactionSynchronizationManager.setActualTransactionActive(false);return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);}catch (RuntimeException | Error ex) {// 挂起失败时恢复同步doResumeSynchronization(suspendedSynchronizations);throw ex;}}else if (transaction != null) {// 没有活跃同步,只挂起事务资源Object suspendedResources = doSuspend(transaction);return new SuspendedResourcesHolder(suspendedResources);}else {// 没有事务和同步,返回空return null;}
}protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)throws TransactionException {if (resourcesHolder != null) {Object suspendedResources = resourcesHolder.suspendedResources;if (suspendedResources != null) {doResume(transaction, suspendedResources);}List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;if (suspendedSynchronizations != null) {TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);doResumeSynchronization(suspendedSynchronizations);}}
}// 挂起资源的具体实现
@Override
protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;txObject.setConnectionHolder(null);return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}// 恢复资源的具体实现
@Override
protected void doResume(Object transaction, Object suspendedResources) {TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}// 挂起同步的具体实现
private List<TransactionSynchronization> doSuspendSynchronization() {List<TransactionSynchronization> suspendedSynchronizations =TransactionSynchronizationManager.getSynchronizations();for (TransactionSynchronization synchronization : suspendedSynchronizations) {synchronization.suspend();}TransactionSynchronizationManager.clearSynchronization();return suspendedSynchronizations;
}// 恢复同步的具体实现
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {TransactionSynchronizationManager.initSynchronization();for (TransactionSynchronization synchronization : suspendedSynchronizations) {TransactionSynchronizationManager.registerSynchronization(synchronization);synchronization.resume();}
}

3.4 嵌套事务实现机制

Spring嵌套事务有两种实现方式:保存点和真正的嵌套事务:

// DefaultTransactionStatus中的保存点相关方法
public class DefaultTransactionStatus extends AbstractTransactionStatus {@Nullableprivate Object savepoint;// 创建并持有保存点public void createAndHoldSavepoint() throws TransactionException {setSavepoint(getSavepointManager().createSavepoint());}// 回滚到保存点public void rollbackToHeldSavepoint() throws TransactionException {Object savepoint = getSavepoint();if (savepoint == null) {throw new TransactionUsageException("No savepoint associated with current transaction");}getSavepointManager().rollbackToSavepoint(savepoint);getSavepointManager().releaseSavepoint(savepoint);setSavepoint(null);}// 释放保存点public void releaseHeldSavepoint() throws TransactionException {Object savepoint = getSavepoint();if (savepoint == null) {throw new TransactionUsageException("No savepoint associated with current transaction");}getSavepointManager().releaseSavepoint(savepoint);setSavepoint(null);}
}// DataSourceTransactionManager中的保存点管理
@Override
protected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();if (status.hasSavepoint()) {// 嵌套事务:回滚到保存点if (status.isDebug()) {logger.debug("Rolling back transaction to savepoint");}try {con.rollback((Savepoint) status.getSavepoint());}catch (SQLException ex) {throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);}}else if (txObject.isNewTransaction()) {// 新事务:执行回滚if (status.isDebug()) {logger.debug("Initiating transaction rollback");}try {con.rollback();}catch (SQLException ex) {throw new TransactionSystemException("Could not roll back JDBC transaction", ex);}}else if (txObject.hasConnectionHolder()) {// 参与现有事务:设置回滚标记if (status.isDebug()) {logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}txObject.getConnectionHolder().setRollbackOnly();}else {if (logger.isDebugEnabled()) {logger.debug("No transaction available for rollback");}}
}

3.5 事务传播行为实际应用场景

@Service
public class OrderService {@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate LogService logService;@Autowiredprivate NotificationService notificationService;@Transactional(propagation = Propagation.REQUIRED)public void createOrder(Order order) {// 1. 创建订单(在当前事务中)orderRepository.save(order);try {// 2. 处理支付(REQUIRES_NEW:独立事务)paymentService.processPayment(order);// 3. 减少库存(REQUIRED:加入当前事务)inventoryService.reduceStock(order);// 4. 记录日志(NOT_SUPPORTED:非事务执行)logService.logOrderCreation(order);} catch (Exception e) {// 支付或库存操作失败,订单创建也会回滚throw new OrderCreationException("订单创建失败", e);}// 5. 发送通知(NEVER:不允许事务)try {notificationService.sendOrderNotification(order);} catch (Exception e) {// 通知失败不影响订单创建logger.error("发送订单通知失败", e);}}@Transactional(propagation = Propagation.NESTED)public void createOrderWithNested(Order order) {// 创建订单orderRepository.save(order);try {// 减少库存(嵌套事务)inventoryService.reduceStockWithNested(order);} catch (Exception e) {// 库存操作失败,只回滚库存操作,订单保留throw new OrderCreationException("库存操作失败,订单已创建", e);}}
}@Service
public class PaymentService {@Transactional(propagation = Propagation.REQUIRES_NEW)public void processPayment(Order order) {// 独立事务:无论调用方是否回滚,支付记录都会保留paymentRepository.save(new Payment(order));// 模拟支付失败if (order.getAmount() > 1000) {throw new PaymentException("支付金额超限");}}
}@Service
public class InventoryService {@Transactional(propagation = Propagation.REQUIRED)public void reduceStock(Order order) {// 加入当前事务:与订单创建在同一个事务中inventoryRepository.reduceStock(order.getProductId(), order.getQuantity());}@Transactional(propagation = Propagation.NESTED)public void reduceStockWithNested(Order order) {// 嵌套事务:使用保存点实现inventoryRepository.reduceStock(order.getProductId(), order.getQuantity());// 模拟库存不足if (order.getQuantity() > 100) {throw new InventoryException("库存不足");}}
}@Service
public class LogService {@Transactional(propagation = Propagation.NOT_SUPPORTED)public void logOrderCreation(Order order) {// 非事务执行:无论调用方是否回滚,日志都会记录logRepository.save(new OrderLog("订单创建", order.getId()));}
}@Service
public class NotificationService {@Transactional(propagation = Propagation.NEVER)public void sendOrderNotification(Order order) {// 不允许事务:如果当前存在事务,抛出异常notificationService.sendNotification("订单创建成功", order.getUserId());}
}

4. 事务同步机制深度解析

4.1 TransactionSynchronizationManager核心机制

TransactionSynchronizationManager是Spring事务同步的核心管理器,负责管理事务资源和同步回调:

public abstract class TransactionSynchronizationManager {// 线程本地变量,存储事务资源private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");// 线程本地变量,存储事务同步回调private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");// 线程本地变量,存储当前事务名称private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");// 线程本地变量,存储当前事务只读状态private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");// 线程本地变量,存储当前事务隔离级别private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");// 线程本地变量,存储当前事务活跃状态private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");// 绑定资源到当前线程public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");Map<Object, Object> map = resources.get();if (map == null) {map = new HashMap<>();resources.set(map);}Object oldValue = map.put(actualKey, value);if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" +actualKey + "] bound to thread");}}// 从当前线程解绑资源public static Object unbindResource(Object key) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doUnbindResource(actualKey);if (value == null) {throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread");}return value;}// 从当前线程解绑资源(不抛出异常)@Nullablepublic static Object unbindResourceIfPossible(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);return doUnbindResource(actualKey);}// 实际解绑资源@Nullableprivate static Object doUnbindResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.remove(actualKey);if (map.isEmpty()) {resources.remove();}if (value instanceof ResourceHolder) {((ResourceHolder) value).unbound();}return value;}// 获取资源@Nullablepublic static Object getResource(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doGetResource(actualKey);if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread");}return value;}// 实际获取资源@Nullableprivate static Object doGetResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.get(actualKey);if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {value = null;}return value;}// 检查是否有活跃同步public static boolean isSynchronizationActive() {return (synchronizations.get() != null);}// 初始化同步public static void initSynchronization() throws IllegalStateException {if (isSynchronizationActive()) {throw new IllegalStateException("Cannot activate transaction synchronization - already active");}logger.trace("Initializing transaction synchronization");synchronizations.set(new LinkedHashSet<>());}// 注册同步public static void registerSynchronization(TransactionSynchronization synchronization)throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null");if (!isSynchronizationActive()) {throw new IllegalStateException("Transaction synchronization is not active");}synchronizations.get().add(synchronization);}// 获取所有同步public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {Set<TransactionSynchronization> synchs = synchronizations.get();if (synchs == null) {throw new IllegalStateException("Transaction synchronization is not active");}if (synchs.isEmpty()) {return Collections.emptyList();}else {List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);OrderComparator.sort(sortedSynchs);return Collections.unmodifiableList(sortedSynchs);}}// 清除同步public static void clearSynchronization() throws IllegalStateException {if (!isSynchronizationActive()) {throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");}logger.trace("Clearing transaction synchronization");synchronizations.remove();}// 设置当前事务名称public static void setCurrentTransactionName(@Nullable String name) {currentTransactionName.set(name);}// 获取当前事务名称@Nullablepublic static String getCurrentTransactionName() {return currentTransactionName.get();}// 设置当前事务只读状态public static void setCurrentTransactionReadOnly(boolean readOnly) {currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);}// 检查当前事务是否只读public static boolean isCurrentTransactionReadOnly() {return (currentTransactionReadOnly.get() != null);}// 设置当前事务隔离级别public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {currentTransactionIsolationLevel.set(isolationLevel);}// 获取当前事务隔离级别@Nullablepublic static Integer getCurrentTransactionIsolationLevel() {return currentTransactionIsolationLevel.get();}// 设置当前事务活跃状态public static void setActualTransactionActive(boolean active) {actualTransactionActive.set(active ? Boolean.TRUE : null);}// 检查当前事务是否活跃public static boolean isActualTransactionActive() {return (actualTransactionActive.get() != null);}// 清除所有资源public static void clear() {resources.remove();synchronizations.remove();currentTransactionName.remove();currentTransactionReadOnly.remove();currentTransactionIsolationLevel.remove();actualTransactionActive.remove();}
}

4.2 事务同步回调机制

Spring事务同步回调机制允许在事务的不同阶段执行自定义逻辑:

public interface TransactionSynchronization extends Flushable {// 完成状态枚举enum Status {COMMITTED, ROLLED_BACK, UNKNOWN}// 挂起回调default void suspend() {}// 恢复回调default void resume() {}// 刷新回调@Overridedefault void flush() {}// 提交前回调default void beforeCommit(boolean readOnly) {}// 完成前回调default void beforeCompletion() {}// 提交后回调default void afterCommit() {}// 完成后回调default void afterCompletion(int status) {}// 获取执行顺序default int getOrder() {return Ordered.LOWEST_PRECEDENCE;}
}// 事务同步触发器
public abstract class TransactionSynchronizationUtils {// 触发挂起回调public static void triggerSuspend() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {synchronization.suspend();}}// 触发恢复回调public static void triggerResume() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {synchronization.resume();}}// 触发提交前回调public static void triggerBeforeCommit(boolean readOnly) {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {synchronization.beforeCommit(readOnly);}}// 触发完成前回调public static void triggerBeforeCompletion() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.beforeCompletion();}catch (Throwable tsex) {logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);}}}// 触发提交后回调public static void triggerAfterCommit() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.afterCommit();}catch (Throwable tsex) {logger.error("TransactionSynchronization.afterCommit threw exception", tsex);}}}// 触发完成后回调public static void triggerAfterCompletion(int status) {List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();if (!synchronizations.isEmpty()) {for (TransactionSynchronization synchronization : synchronizations) {try {synchronization.afterCompletion(status);}catch (Throwable tsex) {logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);}}}}
}

4.3 事务同步应用场景

@Component
public class TransactionSynchronizationExample {// 会话同步:在事务提交后更新会话@Componentpublic static class SessionSynchronization implements TransactionSynchronization {private final String sessionId;private final Object sessionData;public SessionSynchronization(String sessionId, Object sessionData) {this.sessionId = sessionId;this.sessionData = sessionData;}@Overridepublic void afterCommit() {// 事务成功提交后更新会话SessionManager.updateSession(sessionId, sessionData);}@Overridepublic void afterCompletion(int status) {// 无论事务成功还是失败,都清理资源SessionManager.cleanupSession(sessionId);}}// 缓存同步:在事务提交后更新缓存@Componentpublic static class CacheSynchronization implements TransactionSynchronization {private final String cacheKey;private final Object cacheValue;private final CacheOperation operation;public CacheSynchronization(String cacheKey, Object cacheValue, CacheOperation operation) {this.cacheKey = cacheKey;this.cacheValue = cacheValue;this.operation = operation;}@Overridepublic void afterCommit() {// 事务成功提交后更新缓存switch (operation) {case PUT:CacheManager.put(cacheKey, cacheValue);break;case EVICT:CacheManager.evict(cacheKey);break;case CLEAR:CacheManager.clear();break;}}@Overridepublic void afterCompletion(int status) {// 如果事务失败,不更新缓存if (status == STATUS_ROLLED_BACK) {logger.debug("Transaction rolled back, skipping cache update for key: {}", cacheKey);}}public enum CacheOperation {PUT, EVICT, CLEAR}}// 消息同步:在事务提交后发送消息@Componentpublic static class MessageSynchronization implements TransactionSynchronization {private final String topic;private final Object message;public MessageSynchronization(String topic, Object message) {this.topic = topic;this.message = message;}@Overridepublic void afterCommit() {// 事务成功提交后发送消息MessagePublisher.send(topic, message);}@Overridepublic void afterCompletion(int status) {// 如果事务失败,记录日志if (status == STATUS_ROLLED_BACK) {logger.warn("Transaction rolled back, message not sent: {}", message);}}}// 使用事务同步@Transactionalpublic void updateUserWithSync(User user) {// 更新用户userRepository.save(user);// 注册会话同步TransactionSynchronizationManager.registerSynchronization(new SessionSynchronization(user.getSessionId(), user));// 注册缓存同步TransactionSynchronizationManager.registerSynchronization(new CacheSynchronization("user:" + user.getId(), user, CacheSynchronization.CacheOperation.PUT));// 注册消息同步TransactionSynchronizationManager.registerSynchronization(new MessageSynchronization("user.updated", user));}
}

5. 事务提交与回滚机制

5.1 事务提交流程

Spring事务提交流程在AbstractPlatformTransactionManager.processCommit方法中实现:

private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {// 1. 提交前触发同步回调triggerBeforeCommit(status);// 2. 完成前触发同步回调triggerBeforeCompletion(status);beforeCompletionInvoked = true;// 3. 根据是否有保存点进行不同处理if (status.hasSavepoint()) {// 嵌套事务:释放保存点if (status.isDebug()) {logger.debug("Releasing transaction savepoint");}status.releaseHeldSavepoint();}else if (status.isNewTransaction()) {// 新事务:执行提交if (status.isDebug()) {logger.debug("Initiating transaction commit");}doCommit(status);}else if (isFailEarlyOnGlobalRollbackOnly()) {// 参与现有事务:检查全局回滚标记unexpectedRollback = status.isGlobalRollbackOnly();}}catch (UnexpectedRollbackException ex) {// 触发完成回调并重新抛出triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;}catch (TransactionException ex) {// 提交失败:尝试回滚if (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);}else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;}catch (RuntimeException | Error ex) {// 触发完成回调并重新抛出if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}// 4. 触发提交后回调try {triggerAfterCommit(status);}finally {// 5. 触发完成后回调triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}finally {// 6. 清理事务同步cleanupAfterCompletion(status);}
}

5.2 事务回滚流程

Spring事务回滚流程在AbstractPlatformTransactionManager.processRollback方法中实现:

private void processRollback(DefaultTransactionStatus status) {try {try {// 1. 完成前触发同步回调triggerBeforeCompletion(status);// 2. 根据是否有保存点进行不同处理if (status.hasSavepoint()) {// 嵌套事务:回滚到保存点if (status.isDebug()) {logger.debug("Rolling back transaction to savepoint");}status.rollbackToHeldSavepoint();}else if (status.isNewTransaction()) {// 新事务:执行回滚if (status.isDebug()) {logger.debug("Initiating transaction rollback");}doRollback(status);}else if (status.hasTransaction()) {// 参与现有事务:设置全局回滚标记if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {if (status.isDebug()) {logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}doSetRollbackOnly(status);}else {if (status.isDebug()) {logger.debug("Participating transaction failed - letting transaction originator decide on rollback");}}}else {if (logger.isDebugEnabled()) {logger.debug("No transaction available for rollback");}}}catch (RuntimeException | Error ex) {// 触发完成回调并重新抛出triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}// 3. 触发完成后回调triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);}finally {// 4. 清理事务同步cleanupAfterCompletion(status);}
}

5.3 事务清理机制

事务清理在AbstractPlatformTransactionManager.cleanupAfterCompletion方法中实现:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {// 1. 设置事务完成状态status.setCompleted();// 2. 如果是新事务同步,清除同步if (status.isNewSynchronization()) {TransactionSynchronizationManager.clear();}// 3. 如果是新事务,清理资源if (status.isNewTransaction()) {doCleanupAfterCompletion(status.getTransaction());}// 4. 如果有挂起资源,恢复if (status.getSuspendedResources() != null) {if (status.isDebug()) {logger.debug("Resuming suspended transaction after completion of inner transaction");}resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());}
}

6. 事务异常处理机制

6.1 异常回滚规则

Spring事务默认情况下只对RuntimeExceptionError进行回滚,对检查异常不回滚。这个规则在DefaultTransactionAttribute.rollbackOn方法中定义:

public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {@Overridepublic boolean rollbackOn(Throwable ex) {// 默认情况下,RuntimeException和Error会触发回滚return (ex instanceof RuntimeException || ex instanceof Error);}
}

可以通过@Transactional注解的rollbackFornoRollbackFor属性自定义回滚规则:

public class RuleBasedTransactionAttribute extends DefaultTransactionAttribute {private List<RollbackRuleAttribute> rollbackRules;public RuleBasedTransactionAttribute(RollbackRuleAttribute... rollbackRules) {this();this.rollbackRules = Arrays.asList(rollbackRules);}@Overridepublic boolean rollbackOn(Throwable ex) {RollbackRuleAttribute winner = null;int deepest = Integer.MAX_VALUE;// 检查回滚规则if (this.rollbackRules != null) {for (RollbackRuleAttribute rule : this.rollbackRules) {int depth = rule.getDepth(ex);if (depth >= 0 && depth < deepest) {deepest = depth;winner = rule;}}}// 如果有匹配的回滚规则if (winner == null) {// 使用默认规则:RuntimeException和Error回滚return logger.isDebugEnabled() ? !(ex instanceof BusinessException) : super.rollbackOn(ex);}// 根据规则判断是否回滚return !(winner instanceof NoRollbackRuleAttribute);}
}// 回滚规则基类
public abstract class RollbackRuleAttribute implements Serializable {private final Class<?> exceptionType;public RollbackRuleAttribute(Class<?> exceptionType) {Assert.notNull(exceptionType, "'exceptionType' cannot be null");if (!Throwable.class.isAssignableFrom(exceptionType)) {throw new IllegalArgumentException("'exceptionType' must be a Throwable type");}this.exceptionType = exceptionType;}public int getDepth(Throwable ex) {return getDepth(ex.getClass(), 0);}private int getDepth(Class<?> exceptionClass, int depth) {if (exceptionClass.getName().startsWith(this.exceptionType.getName())) {return depth;}if (exceptionClass == Throwable.class) {return -1;}return getDepth(exceptionClass.getSuperclass(), depth + 1);}public Class<?> getExceptionType() {return this.exceptionType;}@Overridepublic String toString() {return this.exceptionType.getName();}@Overridepublic boolean equals(Object other) {if (this == other) {return true;}if (!(other instanceof RollbackRuleAttribute)) {return false;}RollbackRuleAttribute rhs = (RollbackRuleAttribute) other;return this.exceptionType.equals(rhs.exceptionType);}@Overridepublic int hashCode() {return this.exceptionType.hashCode();}
}// 回滚规则:指定异常需要回滚
public class RollbackRuleAttribute extends RollbackRuleAttribute {public RollbackRuleAttribute(Class<?> exceptionType) {super(exceptionType);}
}// 不回滚规则:指定异常不需要回滚
public class NoRollbackRuleAttribute extends RollbackRuleAttribute {public NoRollbackRuleAttribute(Class<?> exceptionType) {super(exceptionType);}
}

6.2 异常处理流程

Spring事务异常处理流程在TransactionAspectSupport.completeTransactionAfterThrowing方法中实现:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {// 1. 如果有事务属性if (txInfo != null && txInfo.getTransactionStatus() != null) {if (logger.isTraceEnabled()) {logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +"] after exception: " + ex);}// 2. 判断是否需要回滚if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {try {// 3. 执行回滚txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());}catch (TransactionSystemException ex2) {logger.error("Application exception overridden by rollback exception", ex);ex2.initApplicationException(ex);throw ex2;}catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by rollback exception", ex);throw ex2;}}else {// 4. 不需要回滚,执行提交try {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}catch (TransactionSystemException ex2) {logger.error("Application exception overridden by commit exception", ex);ex2.initApplicationException(ex);throw ex2;}catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by commit exception", ex);throw ex2;}}}
}

7. 声明式事务注解解析

7.1 @Transactional注解属性

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {// 事务管理器@AliasFor("transactionManager")String value() default "";@AliasFor("value")String transactionManager() default "";// 传播行为Propagation propagation() default Propagation.REQUIRED;// 隔离级别Isolation isolation() default Isolation.DEFAULT;// 超时时间int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;// 只读事务boolean readOnly() default false;// 回滚异常Class<? extends Throwable>[] rollbackFor() default {};// 回滚异常名称String[] rollbackForClassName() default {};// 不回滚异常Class<? extends Throwable>[] noRollbackFor() default {};// 不回滚异常名称String[] noRollbackForClassName() default {};
}

7.2 事务属性解析器

Spring事务属性解析器负责解析@Transactional注解并转换为TransactionAttribute对象:

public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSourceimplements Serializable {private final boolean publicMethodsOnly;private final Set<TransactionAnnotationParser> annotationParsers;private final boolean allowPublicMethodsOnly;public AnnotationTransactionAttributeSource() {this(true);}public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {this.publicMethodsOnly = publicMethodsOnly;this.annotationParsers = new LinkedHashSet<>(2);this.annotationParsers.add(new SpringTransactionAnnotationParser());if (jta12Present) {this.annotationParsers.add(new JtaTransactionAnnotationParser());}if (ejb3Present) {this.annotationParsers.add(new Ejb3TransactionAnnotationParser());}}public AnnotationTransactionAttributeSource(Class<? extends Annotation>... annotationTypes) {this.publicMethodsOnly = true;this.annotationParsers = new LinkedHashSet<>(annotationTypes.length);for (Class<? extends Annotation> annotationType : annotationTypes) {this.annotationParsers.add(new SpringTransactionAnnotationParser(annotationType));}}@Override@Nullableprotected TransactionAttribute findTransactionAttribute(Class<?> clazz) {return determineTransactionAttribute(clazz);}@Override@Nullableprotected TransactionAttribute findTransactionAttribute(Method method) {return determineTransactionAttribute(method);}@Overrideprotected boolean allowPublicMethodsOnly() {return this.publicMethodsOnly;}@Nullableprotected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {for (TransactionAnnotationParser parser : this.annotationParsers) {TransactionAttribute attr = parser.parseTransactionAnnotation(element);if (attr != null) {return attr;}}return null;}
}// Spring事务注解解析器
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {private final Class<? extends Annotation> annotationType;public SpringTransactionAnnotationParser() {this.annotationType = Transactional.class;}public SpringTransactionAnnotationParser(Class<? extends Annotation> annotationType) {this.annotationType = annotationType;}@Overridepublic boolean isCandidateClass(Class<?> targetClass) {return AnnotationUtils.isCandidateClass(targetClass, this.annotationType);}@Override@Nullablepublic TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);if (attributes != null) {return parseTransactionAnnotation(attributes);}else {return null;}}protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();Propagation propagation = attributes.getEnum("propagation");rbta.setPropagationBehavior(propagation.value());Isolation isolation = attributes.getEnum("isolation");rbta.setIsolationLevel(isolation.value());rbta.setTimeout(attributes.getNumber("timeout").intValue());rbta.setReadOnly(attributes.getBoolean("readOnly"));rbta.setQualifier(attributes.getString("value"));List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));}for (String rbRule : attributes.getStringArray("rollbackForClassName")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));}for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}rbta.setRollbackRules(rollbackRules);

8. 编程式事务管理

8.1 TransactionTemplate实现

Spring提供了编程式事务管理的方式,主要通过TransactionTemplatePlatformTransactionManager直接使用:

public class TransactionTemplate extends DefaultTransactionDefinitionimplements TransactionOperations, InitializingBean {private PlatformTransactionManager transactionManager;public TransactionTemplate() {}public TransactionTemplate(PlatformTransactionManager transactionManager) {this.transactionManager = transactionManager;}public TransactionTemplate(PlatformTransactionManager transactionManager, TransactionDefinition definition) {this.transactionManager = transactionManager;setPropagationBehavior(definition.getPropagationBehavior());setIsolationLevel(definition.getIsolationLevel());setTimeout(definition.getTimeout());setReadOnly(definition.isReadOnly());setName(definition.getName());}@Overridepublic void afterPropertiesSet() {if (this.transactionManager == null) {throw new IllegalArgumentException("Property 'transactionManager' is required");}}@Override@Nullablepublic <T> T execute(TransactionCallback<T> action) throws TransactionException {if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);}else {TransactionStatus status = this.transactionManager.getTransaction(this);T result;try {result = action.doInTransaction(status);}catch (RuntimeException | Error ex) {// 事务回滚rollbackOnException(status, ex);throw ex;}catch (Throwable ex) {// 事务回滚rollbackOnException(status, ex);throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");}this.transactionManager.commit(status);return result;}}private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {try {this.transactionManager.rollback(status);}catch (TransactionSystemException ex2) {ex2.initApplicationException(ex);throw ex2;}catch (RuntimeException | Error ex2) {throw ex2;}catch (Throwable ex2) {throw new UndeclaredThrowableException(ex2, "Application exception overridden by rollback exception");}}
}// 事务回调接口
@FunctionalInterface
public interface TransactionCallback<T> {@NullableT doInTransaction(TransactionStatus status);
}// 不带返回值的事务回调接口
@FunctionalInterface
public interface TransactionCallbackWithoutResult extends TransactionCallback<Object> {@Overridedefault Object doInTransaction(TransactionStatus status) {doInTransactionWithoutResult(status);return null;}void doInTransactionWithoutResult(TransactionStatus status);
}

8.2 编程式事务使用示例

@Service
public class ProgrammaticTransactionService {@Autowiredprivate TransactionTemplate transactionTemplate;@Autowiredprivate PlatformTransactionManager transactionManager;public void updateUserWithTransactionTemplate(User user) {// 使用TransactionTemplatetransactionTemplate.execute(status -> {try {userRepository.save(user);// 其他业务操作return null;} catch (Exception e) {status.setRollbackOnly();throw e;}});}public void updateUserWithTransactionManager(User user) {// 直接使用PlatformTransactionManagerDefaultTransactionDefinition def = new DefaultTransactionDefinition();def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);def.setTimeout(30);TransactionStatus status = transactionManager.getTransaction(def);try {userRepository.save(user);// 其他业务操作transactionManager.commit(status);} catch (Exception e) {transactionManager.rollback(status);throw e;}}public User findUserWithReadOnly(Long userId) {// 只读事务transactionTemplate.setReadOnly(true);return transactionTemplate.execute(status -> {return userRepository.findById(userId);});}public void updateUserWithPropagation(User user) {// 设置传播行为transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);transactionTemplate.execute(status -> {userRepository.save(user);return null;});}
}

9. 事务与Spring生态集成

9.1 事务与JPA集成

Spring Data JPA与Spring事务的集成主要通过JpaTransactionManager实现:

public class JpaTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {@Nullableprivate EntityManagerFactory entityManagerFactory;@Nullableprivate DataSource dataSource;private JpaDialect jpaDialect;private boolean lazyDatabaseTransaction = false;public JpaTransactionManager() {setNestedTransactionAllowed(true);}public JpaTransactionManager(EntityManagerFactory emf) {this();this.entityManagerFactory = emf;afterPropertiesSet();}@Overridepublic void afterPropertiesSet() {if (getEntityManagerFactory() == null) {throw new IllegalArgumentException("'entityManagerFactory' or 'persistenceUnitName' is required");}if (this.jpaDialect == null) {EntityManagerFactory emf = getEntityManagerFactory();if (emf instanceof EntityManagerFactoryInfo) {this.jpaDialect = ((EntityManagerFactoryInfo) emf).getJpaDialect();}else {this.jpaDialect = new DefaultJpaDialect();}}}@Overrideprotected Object doGetTransaction() {JpaTransactionObject txObject = new JpaTransactionObject();txObject.setSavepointAllowed(isNestedTransactionAllowed());EntityManagerHolder emHolder = (EntityManagerHolder)TransactionSynchronizationManager.getResource(getEntityManagerFactory());if (emHolder != null) {txObject.setEntityManagerHolder(emHolder, false);}return txObject;}@Overrideprotected boolean isExistingTransaction(Object transaction) {JpaTransactionObject txObject = (JpaTransactionObject) transaction;return (txObject.hasEntityManagerHolder() && txObject.getEntityManagerHolder().isTransactionActive());}@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) {JpaTransactionObject txObject = (JpaTransactionObject) transaction;if (txObject.getEntityManagerHolder() == null ||txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {EntityManager em = null;try {em = createEntityManagerForTransaction();if (logger.isDebugEnabled()) {logger.debug("Opened new EntityManager [" + em + "] for JPA transaction");}txObject.setEntityManagerHolder(new EntityManagerHolder(em), true);}catch (PersistenceException ex) {throw new CannotCreateTransactionException("Could not open JPA EntityManager for transaction", ex);}}EntityManager em = txObject.getEntityManagerHolder().getEntityManager();try {if (!definition.isReadOnly() && !txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {Object transactionData = getJpaDialect().beginTransaction(em, definition);txObject.setTransactionData(transactionData);}EntityManagerHolder emHolder = txObject.getEntityManagerHolder();emHolder.setSynchronizedWithTransaction(true);emHolder.setTransactionActive(true);int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {emHolder.setTimeoutInSeconds(timeout);}if (getDataSource() != null) {ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(getDataSource());emHolder.setConnectionHolder(conHolder);}TransactionSynchronizationManager.bindResource(getEntityManagerFactory(), txObject.getEntityManagerHolder());}catch (TransactionException ex) {closeEntityManagerAfterFailedBegin(txObject);throw ex;}catch (RuntimeException ex) {closeEntityManagerAfterFailedBegin(txObject);throw ex;}}@Overrideprotected void doCommit(DefaultTransactionStatus status) {JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Committing JPA transaction on EntityManager [" +txObject.getEntityManagerHolder().getEntityManager() + "]");}try {EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();tx.commit();}catch (RuntimeException ex) {if (getJpaDialect() != null) {ex = getJpaDialect().translateExceptionIfPossible(ex);}throw new TransactionSystemException("Could not commit JPA transaction", ex);}}@Overrideprotected void doRollback(DefaultTransactionStatus status) {JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Rolling back JPA transaction on EntityManager [" +txObject.getEntityManagerHolder().getEntityManager() + "]");}try {EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();if (tx.isActive()) {tx.rollback();}}catch (RuntimeException ex) {if (getJpaDialect() != null) {ex = getJpaDialect().translateExceptionIfPossible(ex);}throw new TransactionSystemException("Could not roll back JPA transaction", ex);}}@Overrideprotected void doCleanupAfterCompletion(Object transaction) {JpaTransactionObject txObject = (JpaTransactionObject) transaction;// 清理资源if (txObject.isNewEntityManagerHolder()) {TransactionSynchronizationManager.unbindResource(getEntityManagerFactory());}EntityManager em = txObject.getEntityManagerHolder().getEntityManager();if (txObject.isNewEntityManagerHolder()) {EntityManagerFactoryUtils.closeEntityManager(em);}else {try {if (em.getTransaction().isActive()) {em.getTransaction().rollback();}}catch (RuntimeException ex) {logger.debug("Could not roll back JPA transaction after completion", ex);}}txObject.getEntityManagerHolder().clear();}protected EntityManager createEntityManagerForTransaction() {return EntityManagerFactoryUtils.doGetTransactionalEntityManager(getEntityManagerFactory(), getJpaDialect(), isSynchronizedWithTransaction());}
}

9.2 事务与MyBatis集成

Spring与MyBatis的事务集成主要通过DataSourceTransactionManager实现,因为MyBatis底层使用JDBC连接:

@Configuration
@MapperScan("com.example.mapper")
public class MyBatisConfig {@Beanpublic DataSource dataSource() {// 配置数据源return new HikariDataSource();}@Beanpublic PlatformTransactionManager transactionManager(DataSource dataSource) {return new DataSourceTransactionManager(dataSource);}@Beanpublic SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();sessionFactory.setDataSource(dataSource);return sessionFactory.getObject();}
}@Service
public class MyBatisUserService {@Autowiredprivate UserMapper userMapper;@Transactionalpublic void updateUser(User user) {// MyBatis操作会自动参与到Spring事务中userMapper.updateUser(user);userMapper.updateUserDetail(user.getDetail());}
}

9.3 事务与Hibernate集成

Spring与Hibernate的事务集成通过HibernateTransactionManager实现:

public class HibernateTransactionManager extends AbstractPlatformTransactionManagerimplements ResourceTransactionManager, InitializingBean {@Nullableprivate SessionFactory sessionFactory;@Nullableprivate DataSource dataSource;private boolean autodetectDataSource = true;private boolean allowResultAccessAfterCompletion = false;public HibernateTransactionManager() {setNestedTransactionAllowed(true);}public HibernateTransactionManager(SessionFactory sessionFactory) {this();this.sessionFactory = sessionFactory;afterPropertiesSet();}@Overridepublic void afterPropertiesSet() {if (getSessionFactory() == null) {throw new IllegalArgumentException("'sessionFactory' is required");}if (this.dataSource == null) {if (this.autodetectDataSource && getSessionFactory() instanceof SessionFactoryImplementor) {ServiceRegistry serviceRegistry = ((SessionFactoryImplementor) getSessionFactory()).getServiceRegistry();ConnectionProvider connectionProvider = serviceRegistry.getService(ConnectionProvider.class);if (connectionProvider instanceof DatasourceConnectionProviderImpl) {this.dataSource = ((DatasourceConnectionProviderImpl) connectionProvider).getDataSource();}}}}@Overrideprotected Object doGetTransaction() {HibernateTransactionObject txObject = new HibernateTransactionObject();txObject.setSavepointAllowed(isNestedTransactionAllowed());SessionHolder sessionHolder =(SessionHolder) TransactionSynchronizationManager.getResource(getSessionFactory());if (sessionHolder != null) {if (logger.isDebugEnabled()) {logger.debug("Found thread-bound Session [" +sessionHolder.getSession() + "] for Hibernate transaction");}txObject.setSessionHolder(sessionHolder, false);}if (getDataSource() != null) {ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(getDataSource());txObject.setConnectionHolder(conHolder);}return txObject;}@Overrideprotected boolean isExistingTransaction(Object transaction) {HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;return (txObject.hasSessionHolder() && txObject.getSessionHolder().isTransactionActive());}@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) {HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;if (txObject.hasSessionHolder() && !txObject.getSessionHolder().isSynchronizedWithTransaction()) {throw new IllegalTransactionStateException("Pre-bound JDBC Session found and HibernateTransactionManager does not support " +"running within DataSourceTransactionManager: either set up HibernateTransactionManager " +"explicitly or switch to DataSourceTransactionManager for JDBC-based JTA transactions");}Session session = null;try {if (!txObject.hasSessionHolder() || txObject.getSessionHolder().isSynchronizedWithTransaction()) {Session newSession = obtainSessionFactory().openSession();if (logger.isDebugEnabled()) {logger.debug("Opened new Session [" + newSession + "] for Hibernate transaction");}txObject.setSessionHolder(new SessionHolder(newSession), true);}SessionHolder sessionHolder = txObject.getSessionHolder();sessionHolder.setSynchronizedWithTransaction(true);sessionHolder.setTransactionActive(true);session = sessionHolder.getSession();if (this.autodetectDataSource && getDataSource() != null) {Connection con = session.connection();ConnectionHolder conHolder = new ConnectionHolder(con);if (logger.isDebugEnabled()) {logger.debug("Exposing Hibernate transaction as JDBC [" + con + "]");}TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);txObject.setConnectionHolder(conHolder);}// 开始Hibernate事务session.beginTransaction();int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {sessionHolder.setTimeoutInSeconds(timeout);}// 绑定会话持有者到同步管理器if (txObject.isNewSessionHolder()) {TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);}}catch (Throwable ex) {if (txObject.isNewSessionHolder()) {SessionFactoryUtils.releaseSession(session, getSessionFactory());txObject.setSessionHolder(null, false);}throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex);}}@Overrideprotected void doCommit(DefaultTransactionStatus status) {HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Committing Hibernate transaction on Session [" +txObject.getSessionHolder().getSession() + "]");}try {Transaction tx = txObject.getSessionHolder().getSession().getTransaction();tx.commit();}catch (RuntimeException ex) {if (isHibernateFlushFailure(ex)) {throw convertHibernateAccessException(ex);}throw ex;}catch (Error err) {throw err;}}@Overrideprotected void doRollback(DefaultTransactionStatus status) {HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Rolling back Hibernate transaction on Session [" +txObject.getSessionHolder().getSession() + "]");}try {Transaction tx = txObject.getSessionHolder().getSession().getTransaction();if (tx.isActive()) {tx.rollback();}}catch (RuntimeException ex) {throw convertHibernateAccessException(ex);}catch (Error err) {throw err;}}@Overrideprotected void doCleanupAfterCompletion(Object transaction) {HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;// 清理资源if (txObject.isNewSessionHolder()) {TransactionSynchronizationManager.unbindResource(getSessionFactory());}if (getDataSource() != null && txObject.hasConnectionHolder()) {TransactionSynchronizationManager.unbindResource(getDataSource());}Session session = txObject.getSessionHolder().getSession();if (txObject.isNewSessionHolder()) {SessionFactoryUtils.releaseSession(session, getSessionFactory());}else {try {if (session.getTransaction().isActive()) {session.getTransaction().rollback();}}catch (HibernateException ex) {logger.debug("Could not roll back Hibernate Session after completion", ex);}finally {closeSessionAfterCompletion(session);}}txObject.getSessionHolder().clear();}protected boolean isHibernateFlushFailure(RuntimeException ex) {return (ex instanceof QueryException || ex instanceof ConstraintViolationException);}protected RuntimeException convertHibernateAccessException(RuntimeException ex) {return SessionFactoryUtils.convertHibernateAccessException(ex);}protected void closeSessionAfterCompletion(Session session) {try {SessionFactoryUtils.closeSession(session);}catch (RuntimeException ex) {logger.debug("Could not close Hibernate Session after completion", ex);}}
}

10. 分布式事务实现

10.1 JTA事务管理

Java Transaction API (JTA) 是Java EE标准中用于分布式事务管理的API,Spring通过JtaTransactionManager支持JTA事务:

public class JtaTransactionManager extends AbstractPlatformTransactionManager implements TransactionManager, InitializingBean {@Nullableprivate UserTransaction userTransaction;@Nullableprivate TransactionManager transactionManager;private boolean allowCustomIsolationLevels = false;private boolean autodetectUserTransaction = true;private boolean cacheUserTransaction = true;private boolean autodetectTransactionManager = true;private boolean cacheTransactionManager = true;public JtaTransactionManager() {setNestedTransactionAllowed(false);}public JtaTransactionManager(UserTransaction userTransaction) {this();this.userTransaction = userTransaction;}public JtaTransactionManager(TransactionManager transactionManager) {this();this.transactionManager = transactionManager;}public JtaTransactionManager(UserTransaction userTransaction, TransactionManager transactionManager) {this();this.userTransaction = userTransaction;this.transactionManager = transactionManager;}@Overridepublic void afterPropertiesSet() throws TransactionSystemException {if (getUserTransaction() == null && this.autodetectUserTransaction) {setUserTransaction lookupUserTransaction());}if (getTransactionManager() == null && this.autodetectTransactionManager) {setTransactionManager(lookupTransactionManager());}checkUserTransactionAndTransactionManager();}@Overrideprotected Object doGetTransaction() {JtaTransactionObject txObject = new JtaTransactionObject();txObject.setSavepointAllowed(false);return txObject;}@Overrideprotected boolean isExistingTransaction(Object transaction) throws TransactionSystemException {JtaTransactionObject txObject = (JtaTransactionObject) transaction;try {return (txObject.getUserTransaction().getStatus() != Status.STATUS_NO_TRANSACTION);}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on getStatus", ex);}}@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {JtaTransactionObject txObject = (JtaTransactionObject) transaction;try {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {throw new InvalidIsolationLevelException("JTA does not support custom isolation levels");}int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getUserTransaction().setTransactionTimeout(timeout);}txObject.getUserTransaction().begin();if (logger.isDebugEnabled()) {logger.debug("Began JTA transaction");}}catch (NotSupportedException ex) {throw new CannotCreateTransactionException("JTA implementation does not support nested transactions", ex);}catch (UnsupportedOperationException ex) {throw new IllegalTransactionStateException("JTA implementation does not support nested transactions", ex);}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on begin", ex);}}@Overrideprotected void doCommit(DefaultTransactionStatus status) throws TransactionException {JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();try {if (status.isDebug()) {logger.debug("Committing JTA transaction");}txObject.getUserTransaction().commit();}catch (RollbackException ex) {throw new UnexpectedRollbackException("JTA transaction rolled back", ex);}catch (HeuristicMixedException ex) {throw new HeuristicCompletionException(HeuristicCompletionException.STATE_MIXED, ex);}catch (HeuristicRollbackException ex) {throw new HeuristicCompletionException(HeuristicCompletionException.STATE_ROLLED_BACK, ex);}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on commit", ex);}}@Overrideprotected void doRollback(DefaultTransactionStatus status) throws TransactionException {JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();try {if (status.isDebug()) {logger.debug("Rolling back JTA transaction");}txObject.getUserTransaction().rollback();}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on rollback", ex);}}@Overrideprotected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();if (status.isDebug()) {logger.debug("Setting JTA transaction rollback-only");}try {txObject.getUserTransaction().setRollbackOnly();}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on setRollbackOnly", ex);}}@Overrideprotected void doCleanupAfterCompletion(Object transaction) {JtaTransactionObject txObject = (JtaTransactionObject) transaction;if (logger.isDebugEnabled()) {logger.debug("Clearing JTA transaction");}// 清理资源txObject.reset();}protected UserTransaction lookupUserTransaction() throws TransactionSystemException {try {UserTransaction ut = JndiTemplate.getDefaultInstance().lookup("java:comp/UserTransaction", UserTransaction.class);if (logger.isDebugEnabled()) {logger.debug("JTA UserTransaction found at default JNDI location [java:comp/UserTransaction]");}return ut;}catch (NamingException ex) {throw new TransactionSystemException("JTA UserTransaction is not available at JNDI location [java:comp/UserTransaction]", ex);}}protected TransactionManager lookupTransactionManager() throws TransactionSystemException {try {TransactionManager tm = JndiTemplate.getDefaultInstance().lookup("java:comp/TransactionManager", TransactionManager.class);if (logger.isDebugEnabled()) {logger.debug("JTA TransactionManager found at default JNDI location [java:comp/TransactionManager]");}return tm;}catch (NamingException ex) {throw new TransactionSystemException("JTA TransactionManager is not available at JNDI location [java:comp/TransactionManager]", ex);}}private void checkUserTransactionAndTransactionManager() throws IllegalStateException {if (this.userTransaction == null) {throw new IllegalArgumentException("'userTransaction' or 'userTransactionName' is required");}}protected static class JtaTransactionObject implements SmartTransactionObject {@Nullableprivate UserTransaction userTransaction;@Nullableprivate TransactionManager transactionManager;public void setUserTransaction(@Nullable UserTransaction userTransaction) {this.userTransaction = userTransaction;}public UserTransaction getUserTransaction() {if (this.userTransaction == null) {throw new IllegalStateException("No JTA UserTransaction available");}return this.userTransaction;}public void setTransactionManager(@Nullable TransactionManager transactionManager) {this.transactionManager = transactionManager;}public TransactionManager getTransactionManager() {return this.transactionManager;}public boolean isRollbackOnly() {try {return (getUserTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK);}catch (SystemException ex) {throw new TransactionSystemException("JTA failure on getStatus", ex);}}public void reset() {// 清理资源}}
}

10.2 多数据源事务管理

在实际应用中,经常需要跨多个数据源进行事务管理,Spring提供了多种解决方案:

方案1:使用JTA事务管理器
@Configuration
public class JtaTransactionConfig {@Bean(name = "dataSourceA")public DataSource dataSourceA() {return new HikariDataSource();}@Bean(name = "dataSourceB")public DataSource dataSourceB() {return new HikariDataSource();}@Beanpublic PlatformTransactionManager transactionManager() {return new JtaTransactionManager();}
}@Service
public class MultiDataSourceService {@Autowiredprivate JdbcTemplate jdbcTemplateA;@Autowiredprivate JdbcTemplate jdbcTemplateB;@Transactionalpublic void transferMoney(String fromAccount, String toAccount, BigDecimal amount) {// 从数据源A扣款jdbcTemplateA.update("UPDATE account SET balance = balance - ? WHERE id = ?", amount, fromAccount);// 向数据源B存款jdbcTemplateB.update("UPDATE account SET balance = balance + ? WHERE id = ?", amount, toAccount);// 如果发生异常,两个操作都会回滚}
}
方案2:使用编程式事务管理多个数据源
@Service
public class MultiDataSourceProgrammaticService {@Autowiredprivate PlatformTransactionManager transactionManagerA;@Autowiredprivate PlatformTransactionManager transactionManagerB;public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) {DefaultTransactionDefinition def = new DefaultTransactionDefinition();def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);TransactionStatus statusA = transactionManagerA.getTransaction(def);TransactionStatus statusB = transactionManagerB.getTransaction(def);try {// 从数据源A扣款jdbcTemplateA.update("UPDATE account SET balance = balance - ? WHERE id = ?", amount, fromAccount);// 向数据源B存款jdbcTemplateB.update("UPDATE account SET balance = balance + ? WHERE id = ?", amount, toAccount);// 提交两个事务transactionManagerA.commit(statusA);transactionManagerB.commit(statusB);} catch (Exception e) {// 回滚两个事务transactionManagerA.rollback(statusA);transactionManagerB.rollback(statusB);throw e;}}
}

11. 事务性能优化与监控

11.1 事务性能优化策略

11.1.1 优化事务边界
@Service
public class OptimizedTransactionService {// 不好的实践:事务范围过大@Transactionalpublic void badPractice(Long userId) {// 查询操作(不需要事务)User user = userRepository.findById(userId);// 远程调用(不应该在事务中)ExternalData data = externalService.getData(user.getId());// 业务计算(不需要事务)BigDecimal result = calculateResult(user, data);// 只有这里需要事务userRepository.updateUserResult(userId, result);}// 好的实践:事务范围最小化public void goodPractice(Long userId) {// 查询操作(不需要事务)User user = userRepository.findById(userId);// 远程调用(不应该在事务中)ExternalData data = externalService.getData(user.getId());// 业务计算(不需要事务)BigDecimal result = calculateResult(user, data);// 只有这里需要事务updateUserResult(userId, result);}@Transactionalpublic void updateUserResult(Long userId, BigDecimal result) {userRepository.updateUserResult(userId, result);}
}
11.1.2 使用只读事务
@Service
public class ReadOnlyTransactionService {// 只读事务:数据库可以进行优化@Transactional(readOnly = true)public List<User> findActiveUsers() {return userRepository.findByStatus("ACTIVE");}// 复杂查询:使用只读事务@Transactional(readOnly = true, timeout = 60)public UserStatistics generateUserStatistics() {// 复杂的统计查询return userRepository.generateStatistics();}
}
11.1.3 设置合适的隔离级别
@Service
public class IsolationLevelService {// 读已提交:适合大多数场景@Transactional(isolation = Isolation.READ_COMMITTED)public void updateUser(User user) {userRepository.save(user);}// 可重复读:适合需要一致性读取的场景@Transactional(isolation = Isolation.REPEATABLE_READ)public User getUserWithConsistentRead(Long userId) {User user = userRepository.findById(userId);// 在同一事务中多次读取,保证数据一致性List<Order> orders = orderRepository.findByUserId(userId);user.setOrders(orders);return user;}// 串行化:适合高并发写入场景@Transactional(isolation = Isolation.SERIALIZABLE)public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) {accountRepository.debit(fromAccount, amount);accountRepository.credit(toAccount, amount);}
}
11.1.4 设置合适的超时时间
@Service
public class TimeoutTransactionService {// 短事务:设置较短的超时时间@Transactional(timeout = 5)public void quickUpdate(User user) {userRepository.save(user);}// 长事务:设置较长的超时时间@Transactional(timeout = 60)public void batchUpdate(List<User> users) {for (User user : users) {userRepository.save(user);}}
}

11.2 事务监控与调试

11.2.1 事务监控AOP
@Aspect
@Component
public class TransactionMonitorAspect {private static final Logger logger = LoggerFactory.getLogger(TransactionMonitorAspect.class);@Autowiredprivate MeterRegistry meterRegistry;@Around("@annotation(transactional)")public Object monitorTransaction(ProceedingJoinPoint joinPoint, Transactional transactional) throws Throwable {String methodName = joinPoint.getSignature().getName();String className = joinPoint.getTarget().getClass().getSimpleName();String transactionName = className + "." + methodName;long startTime = System.currentTimeMillis();Timer.Sample sample = Timer.start(meterRegistry);try {Object result = joinPoint.proceed();long duration = System.currentTimeMillis() - startTime;sample.stop(Timer.builder("transaction.duration").tag("method", transactionName).tag("status", "success").register(meterRegistry));logger.info("Transaction executed successfully: {}.{} took {} ms", className, methodName, duration);return result;}catch (Exception e) {long duration = System.currentTimeMillis() - startTime;sample.stop(Timer.builder("transaction.duration").tag("method", transactionName).tag("status", "failure").register(meterRegistry));meterRegistry.counter("transaction.failure", "method", transactionName, "exception", e.getClass().getSimpleName()).increment();logger.error("Transaction failed: {}.{} took {} ms", className, methodName, duration, e);throw e;}}
}
11.2.2 事务日志记录
@Component
public class TransactionLogger {private static final Logger logger = LoggerFactory.getLogger(TransactionLogger.class);@EventListenerpublic void handleTransactionStarted(TransactionStartedEvent event) {logger.info("Transaction started: {}", event.getTransactionName());}@EventListenerpublic void handleTransactionCommitted(TransactionCommittedEvent event) {logger.info("Transaction committed: {}", event.getTransactionName());}@EventListenerpublic void handleTransactionRolledBack(TransactionRolledBackEvent event) {logger.warn("Transaction rolled back: {}", event.getTransactionName());}
}// 事务事件发布器
@Component
public class TransactionEventPublisher {@Autowiredprivate ApplicationEventPublisher eventPublisher;public void publishTransactionStarted(String transactionName) {eventPublisher.publishEvent(new TransactionStartedEvent(transactionName));}public void publishTransactionCommitted(String transactionName) {eventPublisher.publishEvent(new TransactionCommittedEvent(transactionName));}public void publishTransactionRolledBack(String transactionName) {eventPublisher.publishEvent(new TransactionRolledBackEvent(transactionName));}
}
11.2.3 事务同步监控
@Component
public class TransactionSyncMonitor implements TransactionSynchronization {private static final Logger logger = LoggerFactory.getLogger(TransactionSyncMonitor.class);private final String transactionName;private final long startTime;public TransactionSyncMonitor(String transactionName) {this.transactionName = transactionName;this.startTime = System.currentTimeMillis();}@Overridepublic void beforeCommit(boolean readOnly) {logger.info("Transaction {} is about to commit, readOnly: {}", transactionName, readOnly);}@Overridepublic void afterCommit() {long duration = System.currentTimeMillis() - startTime;logger.info("Transaction {} committed successfully in {} ms", transactionName, duration);}@Overridepublic void beforeCompletion() {logger.debug("Transaction {} is about to complete", transactionName);}@Overridepublic void afterCompletion(int status) {long duration = System.currentTimeMillis() - startTime;if (status == STATUS_ROLLED_BACK) {logger.warn("Transaction {} rolled back in {} ms", transactionName, duration);} else if (status == STATUS_COMMITTED) {logger.info("Transaction {} completed successfully in {} ms", transactionName, duration);} else {logger.warn("Transaction {} completed with unknown status in {} ms", transactionName, duration);}}
}// 注册事务同步监控
@Aspect
@Component
public class TransactionSyncMonitorAspect {@Autowiredprivate ApplicationContext applicationContext;@Around("@annotation(transactional)")public Object monitorTransactionSync(ProceedingJoinPoint joinPoint, Transactional transactional) throws Throwable {String methodName = joinPoint.getSignature().getName();String className = joinPoint.getTarget().getClass().getSimpleName();String transactionName = className + "." + methodName;// 注册事务同步监控if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronizationManager.registerSynchronization(new TransactionSyncMonitor(transactionName));}return joinPoint.proceed();}
}

12. 常见问题与解决方案

12.1 事务失效场景与解决方案

12.1.1 方法内部调用导致事务失效
@Service
public class InternalCallService {// 问题:内部调用不会触发事务public void methodA() {// 这种调用不会触发事务this.methodB();}@Transactionalpublic void methodB() {// 事务不会生效}// 解决方案1:通过AopContext获取代理对象public void methodAFixed() {((InternalCallService) AopContext.currentProxy()).methodB();}// 解决方案2:注入自身@Autowiredprivate InternalCallService self;public void methodAFixed2() {self.methodB();}// 解决方案3:将事务方法移到另一个Service@Autowiredprivate TransactionalService transactionalService;public void methodAFixed3() {transactionalService.methodB();}
}@Service
public class TransactionalService {@Transactionalpublic void methodB() {// 事务会生效}
}
12.1.2 private方法导致事务失效
@Service
public class PrivateMethodService {// 问题:private方法事务不会生效@Transactionalprivate void privateMethod() {// 事务不会生效}// 解决方案:改为public方法@Transactionalpublic void publicMethod() {// 事务会生效}// 如果必须保持private,可以通过public方法调用@Transactionalpublic void wrapperMethod() {privateMethod();}private void privateMethod() {// 通过wrapperMethod调用,事务会生效}
}
12.1.3 异常被捕获导致事务不回滚
@Service
public class ExceptionHandlingService {// 问题:异常被捕获,事务不会回滚@Transactionalpublic void methodWithCaughtException() {try {// 业务逻辑throw new RuntimeException("测试异常");}catch (Exception e) {// 异常被捕获,事务不会回滚logger.error("发生异常", e);}}// 解决方案1:重新抛出异常@Transactionalpublic void methodWithRethrownException() {try {// 业务逻辑throw new RuntimeException("测试异常");}catch (Exception e) {logger.error("发生异常", e);// 重新抛出异常throw e;}}// 解决方案2:手动设置回滚@Transactionalpublic void methodWithManualRollback() {try {// 业务逻辑throw new RuntimeException("测试异常");}catch (Exception e) {logger.error("发生异常", e);// 手动设置回滚TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();}}// 解决方案3:指定回滚异常@Transactional(rollbackFor = Exception.class)public void methodWithSpecifiedRollback() {try {// 业务逻辑throw new Exception("测试异常");}catch (Exception e) {logger.error("发生异常", e);// 即使是检查异常,也会回滚throw e;}}
}

12.2 事务传播行为问题

12.2.1 嵌套事务回滚问题
@Service
public class NestedTransactionService {// 问题:嵌套事务回滚会影响外部事务@Transactionalpublic void outerMethod() {try {// 外部事务操作userRepository.save(new User("外部用户"));// 调用内部方法innerMethod();} catch (Exception e) {logger.error("内部方法失败", e);}}@Transactional(propagation = Propagation.REQUIRED)public void innerMethod() {// 内部事务操作userRepository.save(new User("内部用户"));throw new RuntimeException("内部方法异常");}// 解决方案:使用嵌套事务@Transactionalpublic void outerMethodFixed() {try {// 外部事务操作userRepository.save(new User("外部用户"));// 调用内部方法innerMethodFixed();} catch (Exception e) {logger.error("内部方法失败", e);}// 外部事务会成功提交}@Transactional(propagation = Propagation.NESTED)public void innerMethodFixed() {// 内部事务操作userRepository.save(new User("内部用户"));throw new RuntimeException("内部方法异常");// 只有内部操作会回滚,外部操作不会受影响}
}
12.2.2 新事务传播问题
@Service
public class RequiresNewTransactionService {// 问题:新事务中的异常不会影响外部事务@Transactionalpublic void outerMethod() {try {// 外部事务操作userRepository.save(new User("外部用户"));// 调用内部方法innerMethod();} catch (Exception e) {logger.error("内部方法失败", e);// 外部事务仍然会提交}}@Transactional(propagation = Propagation.REQUIRES_NEW)public void innerMethod() {// 内部事务操作userRepository.save(new User("内部用户"));throw new RuntimeException("内部方法异常");// 内部事务会回滚,但不会影响外部事务}// 解决方案:检查内部方法执行结果@Transactionalpublic void outerMethodFixed() {// 外部事务操作userRepository.save(new User("外部用户"));try {// 调用内部方法innerMethod();} catch (Exception e) {logger.error("内部方法失败", e);// 根据业务需要决定是否回滚外部事务TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();}}
}

12.3 事务与并发问题

12.3.1 乐观锁实现
@Entity
public class Product {@Idprivate Long id;private String name;private Integer stock;@Versionprivate Long version;// getters and setters
}@Service
public class OptimisticLockService {@Autowiredprivate ProductRepository productRepository;@Transactionalpublic void updateProductStock(Long productId, Integer quantity) {Product product = productRepository.findById(productId);// 检查库存if (product.getStock() < quantity) {throw new InsufficientStockException("库存不足");}// 更新库存product.setStock(product.getStock() - quantity);productRepository.save(product);// 如果版本号不匹配,会抛出OptimisticLockingFailureException}@Transactionalpublic void updateProductStockWithRetry(Long productId, Integer quantity) {int maxRetries = 3;int retryCount = 0;while (retryCount < maxRetries) {try {updateProductStock(productId, quantity);return;} catch (OptimisticLockingFailureException e) {retryCount++;if (retryCount >= maxRetries) {throw new ConcurrencyException("并发更新失败,请重试");}// 短暂等待后重试try {Thread.sleep(50);} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException("线程被中断", ie);}}}}
}
12.3.2 悲观锁实现
@Service
public class PessimisticLockService {@Autowiredprivate ProductRepository productRepository;@Transactionalpublic void updateProductStockWithPessimisticLock(Long productId, Integer quantity) {// 使用悲观锁获取产品Product product = productRepository.findByIdWithLock(productId);// 检查库存if (product.getStock() < quantity) {throw new InsufficientStockException("库存不足");}// 更新库存product.setStock(product.getStock() - quantity);productRepository.save(product);}
}@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {@Lock(LockModeType.PESSIMISTIC_WRITE)@Query("SELECT p FROM Product p WHERE p.id = :id")Product findByIdWithLock(@Param("id") Long id);
}

13. 总结

Spring事务管理是企业级应用开发中的核心功能,它通过AOP代理、事务管理器、同步机制等组件提供了强大而灵活的事务管理能力。本文从源码层面深入分析了Spring事务的实现机制,特别是事务传播机制的实现原理。

13.1 核心要点回顾

  1. 事务管理器架构:Spring提供了多种事务管理器实现,如DataSourceTransactionManagerJpaTransactionManagerHibernateTransactionManager等,它们都继承自AbstractPlatformTransactionManager,实现了通用的事务管理逻辑。

  2. AOP代理机制:Spring通过InfrastructureAdvisorAutoProxyCreator自动创建事务代理,使用TransactionInterceptor拦截方法调用,在方法执行前后添加事务逻辑。

  3. 事务传播行为:Spring定义了7种事务传播行为,通过handleExistingTransaction方法实现,每种传播行为都有其特定的应用场景和实现方式。

  4. 事务同步机制TransactionSynchronizationManager是Spring事务同步的核心管理器,负责管理事务资源和同步回调,通过线程本地变量实现事务上下文的传递。

  5. 事务提交与回滚:Spring事务提交流程在processCommit方法中实现,回滚流程在processRollback方法中实现,两者都会触发相应的同步回调。

  6. 异常处理机制:Spring事务默认只对RuntimeExceptionError进行回滚,可以通过@Transactional注解的rollbackFornoRollbackFor属性自定义回滚规则。

13.2 最佳实践建议

  1. 合理设置事务边界:事务范围应该尽可能小,只包含需要原子性保证的操作,避免在事务中进行远程调用、文件操作等耗时操作。

  2. 选择合适的隔离级别:根据业务需求选择合适的隔离级别,平衡数据一致性和并发性能。

  3. 正确使用事务传播行为:理解各种传播行为的含义和应用场景,避免因误用导致的事务问题。

  4. 处理异常情况:正确处理异常,确保事务能够正确回滚或提交,避免因异常处理不当导致的数据不一致。

  5. 监控事务性能:通过AOP和事务同步机制监控事务执行情况,及时发现和解决性能问题。

http://www.dtcms.com/a/603400.html

相关文章:

  • 咸宁公司做网站音乐主题 wordpress
  • cms大型门户网站aardio 网站开发
  • C语言编程实验编译器 | 实践编程与调试工具的深度剖析
  • 无锡朝阳网站推广济南营销网站建设
  • 网站关键词是什么全国购网站建设
  • 做网站别人点击能得钱吗淮安专业网站建设
  • 【Phoenix】插件(Plug)
  • 做网站用哪里的服务器比较好如何查看网站架构
  • 网站建设方案确认表华为云云速建站
  • 营销型网站如何建设常德经开区网站官网
  • 网站建设皿金手指谷哥壹柒宽屏wordpress主题
  • 高防IP真能100%防御DDoS攻击吗?
  • 网站开发主管要做什么seo网站优化方
  • 电子商务网站搜索引擎设计网站设计前景怎样
  • 山东建设官方网站免费wordpress主题下载地址
  • 六安城市网官网百度小程序排名优化
  • 服务器网站部署建电子商务网站注意事项
  • 有没有做奥数题的网站如何重新运行wordpress
  • 如何建网站挣钱做网站切图软件
  • 什么攻击类型适合使用高防IP进行防护?
  • 网站seo优化方案图片代码如何做网站
  • 南京建网站苏州十大互联网公司
  • 构建AI智能体:九十二、智能协作的艺术:大模型上下文与Token优化指南
  • app软件网站开发网站建设包含那些 内容
  • 个人建网站需要多少钱百度域名值多少钱
  • Python趣味算法:掌握猜牌术算法:用Python解密魔术师的洗牌奥秘
  • wordpress站群 会员网站建设哪家好首选万维科技
  • 怎么做一个盈利网站营销活动管理系统
  • 【Java SE 基础学习打卡】12 Java 入门程序
  • 宁波网站制作相信荣胜网络杭州建模培训