当前位置: 首页 > news >正文

spring声明式事务原理02-调用第1层@Transactional方法-按需创建事务createTransactionIfNecessary

文章目录

  • 【README】
    • 【复习-上文逻辑】UserAppService调用userSupport.saveNewUser()
  • 【1】概览-按需创建事务-TransactionAspectSupport#createTransactionIfNecessary()
  • 【2】方法源码及调用
    • 【2.1】TransactionAspectSupport#createTransactionIfNecessary
    • 【2.2】tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务
      • 【2.2.1】AbstractPlatformTransactionManager#doGetTransaction-创建逻辑事务
      • 【2.2.2】AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在
      • 【2.2.3】执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务
      • 【2.2.4】执行AbstractPlatformTransactionManager#startTransaction():开启事务,创建并返回事务状态(创建物理数据库连接)
        • 【2.2.4.1】AbstractPlatformTransactionManager#newTransactionStatus创建事务状态
        • 【2.2.4.2】AbstractPlatformTransactionManager#doBegin()-开启事务
        • 【2.2.4.3】AbstractPlatformTransactionManager#prepareSynchronization-装配事务的同步信息
          • 【补充】TransactionSynchronizationManager-事务同步管理器的线程级变量
    • 【2.3】TransactionAspectSupport#prepareTransactionInfo-装配事务信息
      • 【TransactionInfo】 TransactionInfo-事务信息类是TransactionAspectSupport的静态内部类
  • 【3】小结
    • 【3.1】创建spring逻辑事务与物理数据库连接
    • 【3.2】资源绑定
      • 【3.2.1】数据源与数据库连接持有器绑定
      • 【3.2.2】数据库连接持有器绑定到逻辑事务
      • 【3.2.3】装配新的数据库连接持有器conHolder

【README】

1)声明式事务代码样例:

在这里插入图片描述

public interface UserMapper {
    UserPO qryUserById(@Param("id") String id);
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    void insertUser(UserPO userPO);
}

public interface UserAccountMapper {

    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    void insertUserAccount(UserAccountPO userAccountPO);
}

【代码解说】

  • 代码调用链路:UserAppService -> userSupport.saveNewUser -> userMapper.insertUser + userAccountMapper.insertUserAccount
  • userSupport.saveNewUser 带有 @Transactional 注解;
    • userMapper.insertUser 带有 @Transactional 注解;
    • userAccountMapper.insertUserAccount 带有 @Transactional 注解;

2)@Transactional:事务注解,用于定义事务元数据,包括事务管理器名称,事务传播行为,超时时间(单位秒),是否只读,回滚的异常类型;

  • @Transactional可以标注类与方法,不管是标注类还是方法,@Transactional标注所在的类的bean都会被spring通过aop代理进行增强;

【复习-上文逻辑】UserAppService调用userSupport.saveNewUser()

1)因为 userSupport中的saveNewUser方法被@Transaction标注,所以该bean被spring增强为aop代理,所以访问aop代理的入口方法intercept(),

  • 实际调用CglibAopProxy#DynamicAdvisedInterceptor静态内部类的 intercept方法;

2)CglibAopProxy#DynamicAdvisedInterceptor静态内部类的 intercept方法()

3)接着执行 (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed() ,调用CglibMethodInvocation#proceed方法;

【CglibAopProxy静态内部类DynamicAdvisedInterceptor】

private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {
        private final AdvisedSupport advised;

        public DynamicAdvisedInterceptor(AdvisedSupport advised) {
            this.advised = advised;
        }

        @Nullable
        public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
            Object oldProxy = null;
            boolean setProxyContext = false;
            Object target = null;
            TargetSource targetSource = this.advised.getTargetSource();

            Object var16;
            try {
                if (this.advised.exposeProxy) {
                    oldProxy = AopContext.setCurrentProxy(proxy);
                    setProxyContext = true;
                }

                target = targetSource.getTarget();
                Class<?> targetClass = target != null ? target.getClass() : null;
                List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
                Object retVal;
                if (chain.isEmpty()) {
                    Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
                    retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
                } else {
                    // 走这个分支 
                    retVal = (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();
                }
                var16 = CglibAopProxy.processReturnType(proxy, target, method, args, retVal);
            } finally {
                if (target != null && !targetSource.isStatic()) {
                    targetSource.releaseTarget(target);
                }
                if (setProxyContext) {
                    AopContext.setCurrentProxy(oldProxy);
                }
            }
            return var16;
        }

new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy) 传入的参数如下:

在这里插入图片描述


【CglibAopProxy静态内部类CglibMethodInvocation】

private static class CglibMethodInvocation extends ReflectiveMethodInvocation {
    public CglibMethodInvocation(Object proxy, @Nullable Object target, Method method, Object[] arguments, @Nullable Class<?> targetClass, List<Object> interceptorsAndDynamicMethodMatchers, MethodProxy methodProxy) {
        super(proxy, target, method, arguments, targetClass, interceptorsAndDynamicMethodMatchers);
    }

    @Nullable
    public Object proceed() throws Throwable {
        try {
            return super.proceed();// 这里 
        } catch (RuntimeException var2) {
            throw var2;
        } catch (Exception var3) {
            if (!ReflectionUtils.declaresException(this.getMethod(), var3.getClass()) && !KotlinDetector.isKotlinType(this.getMethod().getDeclaringClass())) {
                throw new UndeclaredThrowableException(var3);
            } else {
                throw var3;
            }
        }
    }
}

4)执行ReflectiveMethodInvocation#proceed()方法

// ReflectiveMethodInvocation#proceed() 反射方法调用器#处理方法 
public Object proceed() throws Throwable {
    if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
        return this.invokeJoinpoint();
    } else {
        Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
            InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher)interceptorOrInterceptionAdvice;
            Class<?> targetClass = this.targetClass != null ? this.targetClass : this.method.getDeclaringClass();
            return dm.matcher().matches(this.method, targetClass, this.arguments) ? dm.interceptor().invoke(this) : this.proceed();
        } else {
            return ((MethodInterceptor)interceptorOrInterceptionAdvice).invoke(this);// 这个分支 
        }
    }
}

5)获取到的interceptorOrInterceptionAdvice实际是 TransactionInterceptor,所以接着执行 TransactionInterceptor#invoke(this)方法;而传入的this=ReflectiveMethodInvocation或者 CglibMethodInvocation,参见 new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy);

【 TransactionInterceptor#invoke(this)方法】

在这里插入图片描述

6)调用 TransactionInterceptor#invokeWithinTransaction(this)方法 , TransactionInterceptor父类是TransactionAspectSupport,接着执行 TransactionAspectSupport#invokeWithinTransaction方法;

【TransactionAspectSupport#invokeWithinTransaction】

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
    TransactionAttributeSource tas = this.getTransactionAttributeSource();
    TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
    TransactionManager tm = this.determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
        // ... 不走这个分支 
    } else {
        PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);
        String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
        if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager cpptm) {
	        // ... 不走这个分支 
        } else {
            // 走这个分支, 按需创建事务
            TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
            Object retVal;
            try {
                // 执行目标方法 
                retVal = invocation.proceedWithInvocation();
            } catch (Throwable var22) {
                // 异常处理
                this.completeTransactionAfterThrowing(txInfo, var22);
                throw var22;
            } finally {
                // 最后清理事务信息 
                this.cleanupTransactionInfo(txInfo);
            }

            if (retVal != null && txAttr != null) {
               // 不走这个分支 
            }
			// 返回后提交事务
            this.commitTransactionAfterReturning(txInfo);
            return retVal;
        }
    }
}

【代码解说】 TransactionAspectSupport#invokeWithinTransaction()-在事务中执行业务逻辑,具体步骤如下(跳过了分支判断):

  • 步骤1:调用createTransactionIfNecessary方法,创建事务;
  • 步骤2:调用invocation.proceedWithInvocation(),执行具体业务逻辑(执行目标方法);
    • 步骤2.1(或有) : 抛出异常,执行completeTransactionAfterThrowing
  • 步骤3:执行完成(无论是否抛出异常),调用cleanupTransactionInfo()
  • 步骤4:判断返回值是否不为null 且 事务属性是否不为null
  • 步骤5:调用commitTransactionAfterReturning(),提交事务;

下面的内容主要对TransactionAspectSupport#createTransactionIfNecessary(按需创建事务)-进行剖析;

【注意】 本文的业务场景是第1层@Transactional方法的执行



【1】概览-按需创建事务-TransactionAspectSupport#createTransactionIfNecessary()

1)按需创建事务-createTransactionIfNecessary():步骤详情;

  1. tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务;
    1. AbstractPlatformTransactionManager#doGetTransaction:重要-创建逻辑事务
      1. 创建DataSourceTransactionObject-逻辑事务对象;
      2. 根据数据源从TransactionSynchronizationManager获取连接持有器;
      3. 把数据库连接持有器conHolder绑定到逻辑事务对象;
    2. AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在;(第1层@Transactional方法不走这个分支,即事务不存在
      1. 若存在,则调用 handleExistingTransaction ;
    3. 执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务;
    4. 执行AbstractPlatformTransactionManager#startTransaction():重要-开启事务
      1. 创建事务状态;
      2. 执行开启事务前的监听器方法-listener#beforeBegin;【本文不涉及】
      3. 调用AbstractPlatformTransactionManager#doBegin(transaction, definition):开启事务
        1. 通过数据源创建物理数据库连接;
        2. 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject;
        3. 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
        4. 设置物理数据库连接为手动提交;
        5. 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
        6. 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
        7. 是否为新的数据库连接持有器:
          1. 若是:则把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource;
      4. 调用AbstractPlatformTransactionManager#prepareSynchronization(status, definition):装配事务的同步信息;
        1. 若事务状态的是否新同步为true,则通过TransactionSynchronizationManager设置5个线程级变量,如下:
          1. 设置事务是否活跃 ;
          2. 设置事务隔离级别;
          3. 设置事务是否只读;
          4. 设置事务名称;
          5. 初始化事务同步信息;
      5. 执行开启事务后的监听器方法-listener#afterBegin;【本文不涉及】
  2. TransactionAspectSupport#prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status):装配事务信息;
    1. 创建事务信息对象txInfo;
    2. 把事务状态status设置到事务信息对象txInfo;
    3. txInfo对象绑定到TransactionAspectSupport中的线程级变量transactionInfoHolder;

【2】方法源码及调用

【2.1】TransactionAspectSupport#createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
        txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 通过事务管理器获取事务
            status = tm.getTransaction((TransactionDefinition)txAttr); 
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
        }
    }
    // 准备事务信息TransactionInfo 
    return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}


【2.2】tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务

【AbstractPlatformTransactionManager#getTransaction】

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();
    // 创建逻辑事务 
    Object transaction = this.doGetTransaction();
    boolean debugEnabled = this.logger.isDebugEnabled();

    // 逻辑事务是否存在 (执行第1层@Transactional方法,明显事务不存在,不走这个分支)
    if (this.isExistingTransaction(transaction)) {
        // 处理已存在的事务 
        return this.handleExistingTransaction(def, transaction, debugEnabled);
    } else if (def.getTimeout() < -1) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    } else if (def.getPropagationBehavior() == 2) {
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {    
        boolean newSynchronization = this.getTransactionSynchronization() == 0;
        return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
    } else {
        // 走这个分支 
        SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
        if (debugEnabled) {
            Log var10000 = this.logger;
            String var10001 = def.getName();
            var10000.debug("Creating new transaction with name [" + var10001 + "]: " + def);
        }
        try {
            // 开启事务 
            return this.startTransaction(def, transaction, false, debugEnabled, suspendedResources);
        } catch (Error | RuntimeException var7) {
            this.resume((Object)null, suspendedResources);
            throw var7;
        }
    }
}

【2.2.1】AbstractPlatformTransactionManager#doGetTransaction-创建逻辑事务

【DataSourceTransactionManager#doGetTransaction】创建逻辑事务

  1. 创建DataSourceTransactionObject-逻辑事务对象;
  2. 根据数据源从TransactionSynchronizationManager获取连接持有器;
  3. 把数据库连接持有器conHolder绑定到逻辑事务对象;
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
          (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}


【2.2.2】AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在

第1层@Transactional方法不走这个分支,即事务不存在 ; 若存在,则调用 handleExistingTransaction ;【本文不涉及】



【2.2.3】执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务

传入的挂起的事务对象为null,所以本文不涉及,不挂起任何事物;



【2.2.4】执行AbstractPlatformTransactionManager#startTransaction():开启事务,创建并返回事务状态(创建物理数据库连接)

【AbstractPlatformTransactionManager#startTransaction】开启事务

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    // transactionSynchronization==0, 所以是新同步 
    boolean newSynchronization = this.getTransactionSynchronization() != 2;
    // 1 创建事务状态
    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
    this.transactionExecutionListeners.forEach((listener) -> {
        listener.beforeBegin(status);
    });

    try {
        // 2 开启事务 
        this.doBegin(transaction, definition);
    } catch (Error | RuntimeException var9) {
        this.transactionExecutionListeners.forEach((listener) -> {
            listener.afterBegin(status, var9);
        });
        throw var9;
    }
	// 3 装配事务的同步信息 
    this.prepareSynchronization(status, definition);
    this.transactionExecutionListeners.forEach((listener) -> {
        listener.afterBegin(status, (Throwable)null);
    });
    return status;
}


【2.2.4.1】AbstractPlatformTransactionManager#newTransactionStatus创建事务状态
private DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean nested, boolean debug, @Nullable Object suspendedResources) {
    boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(definition.getName(), transaction, newTransaction, actualNewSynchronization, nested, definition.isReadOnly(), debug, suspendedResources);
}


【2.2.4.2】AbstractPlatformTransactionManager#doBegin()-开启事务

【DataSourceTransactionManager#doBegin()】开启事务

  1. 通过数据源创建物理数据库连接;
  2. 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject;
  3. 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
  4. 设置物理数据库连接为手动提交;
  5. 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
  6. 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
  7. 是否为新的数据库连接持有器:
    1. 若是:则把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource;
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
       if (!txObject.hasConnectionHolder() ||
             txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
           // 通过数据源创建物理数据库连接
          Connection newCon = obtainDataSource().getConnection();
          if (logger.isDebugEnabled()) {
             logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
          }
           // 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject; 
          txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
       }
// 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
       txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
       con = txObject.getConnectionHolder().getConnection();

       Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
       txObject.setPreviousIsolationLevel(previousIsolationLevel);
       txObject.setReadOnly(definition.isReadOnly());

       if (con.getAutoCommit()) {
          txObject.setMustRestoreAutoCommit(true);
          if (logger.isDebugEnabled()) {
             logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
          }
           // 设置物理数据库连接为手动提交; 
          con.setAutoCommit(false);
       }

       prepareTransactionalConnection(con, definition);
        // 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
       txObject.getConnectionHolder().setTransactionActive(true);

       int timeout = determineTimeout(definition);
       if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
           // 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
          txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
       }

       // Bind the connection holder to the thread.
       if (txObject.isNewConnectionHolder()) {
           // 把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource
          TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
       }
    }

    catch (Throwable ex) {
       if (txObject.isNewConnectionHolder()) {
          DataSourceUtils.releaseConnection(con, obtainDataSource());
          txObject.setConnectionHolder(null, false);
       }
       throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}


【2.2.4.3】AbstractPlatformTransactionManager#prepareSynchronization-装配事务的同步信息

1)调用AbstractPlatformTransactionManager#prepareSynchronization(status, definition):装配事务的同步信息;

  1. 若事务状态的是否新同步为true,则通过TransactionSynchronizationManager设置5个线程级变量,如下:
    1. 设置事务是否活跃 ;
    2. 设置事务隔离级别;
    3. 设置事务是否只读;
    4. 设置事务名称;
    5. 初始化事务同步信息;

【AbstractPlatformTransactionManager#prepareSynchronizatio】

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
    }

}
// TransactionSynchronizationManager#initSynchronization() 
public static void initSynchronization() throws IllegalStateException {
        if (isSynchronizationActive()) {
            throw new IllegalStateException("Cannot activate transaction synchronization - already active");
        } else {
            synchronizations.set(new LinkedHashSet());
        }
    }
【补充】TransactionSynchronizationManager-事务同步管理器的线程级变量
public abstract class TransactionSynchronizationManager {
    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
    private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
    private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
    private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");

    public TransactionSynchronizationManager() {
    }
    ...
}

【2.3】TransactionAspectSupport#prepareTransactionInfo-装配事务信息

TransactionAspectSupport#prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status):装配事务信息;

  1. 创建事务信息对象txInfo;
  2. 把事务状态status设置到事务信息对象txInfo;
  3. txInfo对象绑定到TransactionAspectSupport中的线程级变量transactionInfoHolder;

【TransactionAspectSupport#prepareTransactionInfo】

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {
    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }

        txInfo.newTransactionStatus(status);
    } else if (this.logger.isTraceEnabled()) {
        this.logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional.");
    }

    txInfo.bindToThread();
    return txInfo;
}

【TransactionInfo】 TransactionInfo-事务信息类是TransactionAspectSupport的静态内部类

protected static final class TransactionInfo {
    @Nullable
    private final PlatformTransactionManager transactionManager;
    @Nullable
    private final TransactionAttribute transactionAttribute;
    private final String joinpointIdentification;
    @Nullable
    private TransactionStatus transactionStatus;
    @Nullable
    private TransactionInfo oldTransactionInfo;

    public TransactionInfo(@Nullable PlatformTransactionManager transactionManager, @Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {
        this.transactionManager = transactionManager;
        this.transactionAttribute = transactionAttribute;
        this.joinpointIdentification = joinpointIdentification;
    }

    public PlatformTransactionManager getTransactionManager() {
        Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
        return this.transactionManager;
    }

    @Nullable
    public TransactionAttribute getTransactionAttribute() {
        return this.transactionAttribute;
    }

    public String getJoinpointIdentification() {
        return this.joinpointIdentification;
    }

    public void newTransactionStatus(@Nullable TransactionStatus status) {
        this.transactionStatus = status;
    }

    @Nullable
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    public boolean hasTransaction() {
        return this.transactionStatus != null;
    }

    private void bindToThread() {
        this.oldTransactionInfo = (TransactionInfo)TransactionAspectSupport.transactionInfoHolder.get();
        TransactionAspectSupport.transactionInfoHolder.set(this);
    }

    private void restoreThreadLocalStatus() {
        TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
    }

    public String toString() {
        return this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction";
    }
}

【3】小结

【3.1】创建spring逻辑事务与物理数据库连接

1)本文分析了调用第1层@Transactional标注的方法的细节; spirng对@Transactional标注的方法所在类的bean进行了增强,在调用目标方法前执行TransactionAspectSupport#createTransactionIfNecessary-按需创建事务;

2)创建spring逻辑事务:类型为DataSourceTransactionObject, 通过调用DataSourceTransactionManager#doGetTransaction方法创建;

【DataSourceTransactionManager#doGetTransaction】

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
          (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

也就是说spring中的事务对象是DataSourceTransactionObject,与数据库事务没有关系;我们可以这样理解:spring中的DataSourceTransactionObject是逻辑事务,而数据库事务是物理事务;逻辑事务是要寄宿(依赖)在物理事务上;还需要注意的是,这里只创建了逻辑事务DataSourceTransactionObject,还没有创建物理事务(数据库事务) ;


3)创建物理数据库连接:

创建物理数据库连接的代码: Connection newCon = obtainDataSource().getConnection() ;

【DataSourceTransactionManager#doBegin】

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
       if (!txObject.hasConnectionHolder() ||
             txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
           // 创建物理数据库连接(调用 DataSource#getConnection方法) 
          Connection newCon = obtainDataSource().getConnection();
          if (logger.isDebugEnabled()) {
             logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
          }
          txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
       }
		
        // 设置数据库连接持有器conHolder的与事务同步属性为true 
       txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
       con = txObject.getConnectionHolder().getConnection();

       Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
       txObject.setPreviousIsolationLevel(previousIsolationLevel);
       txObject.setReadOnly(definition.isReadOnly());
      
       if (con.getAutoCommit()) {
          txObject.setMustRestoreAutoCommit(true);
          if (logger.isDebugEnabled()) {
             logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
          }
           // 设置手动提交 
          con.setAutoCommit(false);
       }

       prepareTransactionalConnection(con, definition);
       txObject.getConnectionHolder().setTransactionActive(true);

       int timeout = determineTimeout(definition);
       if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
           // 设置超时时间 
          txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
       }

       // Bind the connection holder to the thread.
       if (txObject.isNewConnectionHolder()) {
           // 绑定资源
          TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
       }
    }

    catch (Throwable ex) {
       if (txObject.isNewConnectionHolder()) {
          DataSourceUtils.releaseConnection(con, obtainDataSource());
          txObject.setConnectionHolder(null, false);
       }
       throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}


【3.2】资源绑定

【3.2.1】数据源与数据库连接持有器绑定

【DataSourceTransactionManager#doBegin】

if (txObject.isNewConnectionHolder()) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
// 其中 txObjec是逻辑事务 

上述代码是把 数据源-key,连接持有器-value绑定到ThreadLocal线程级变量上;
绑定的目的在于:在同一个线程中,同一个数据源,对应的是同一个数据库连接持有器(即同一个物理数据库连接)

【补充】TransactionSynchronizationManager#bindResource, 绑定资源

public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = (Map)resources.get();
    if (map == null) {
        map = new HashMap();
        resources.set(map);
    }

    Object oldValue = ((Map)map).put(actualKey, value);
    if (oldValue instanceof ResourceHolder resourceHolder) {
        if (resourceHolder.isVoid()) {
            oldValue = null;
        }
    }

    if (oldValue != null) {
        throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
    }
}

【TransactionSynchronizationManager定义了多个线程级变量】

public abstract class TransactionSynchronizationManager {
    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
    private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
    private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
    private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");

    public TransactionSynchronizationManager() {
    }


【3.2.2】数据库连接持有器绑定到逻辑事务

【DataSourceTransactionManager#doBegin】

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

目的: 建立逻辑事务与物理数据库连接持有器(物理数据库连接)的绑定关系;(多个逻辑事务对象对应同一个物理数据库连接,即逻辑事务需要寄宿在物理数据库连接上)


【3.2.3】装配新的数据库连接持有器conHolder

// 设置连接持有器conHolder的与事务同步属性为true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 设置连接持有器conHolder的事务是否活跃为true
txObject.getConnectionHolder().setTransactionActive(true);
// 设置连接持有器conHolder的事务超时时间为 @Transactional的元数据
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

相关文章:

  • 深入解析“Off-the-Shelf”——从产品到AI模型的通用概念
  • 视觉定位项目中可以任意修改拍照点位吗?
  • ElementUI 表格中插入图片缩略图,鼠标悬停显示大图
  • 图像处理篇---图像预处理
  • 【宠粉赠书】极速探索 HarmonyOS NEXT:国产操作系统的未来之光
  • tongweb信创项目线上业务添堵问题排查
  • 《Python实战进阶》No21:数据存储:Redis 与 MongoDB 的使用场景
  • Spring面试:Spring,SpringMVC,SpringBoot
  • 软考系统架构师 — 1 考点分析
  • 基于javaweb的SpringBoot精美物流管理系统设计与实现(源码+文档+部署讲解)
  • 五大基础算法——分治算法
  • QT中的宏
  • 大数据学习(64)- 大数据集群的高可用(HA)
  • 基于ydoVr算法的车辆智能防盗系统的设计与实现
  • linux:进程调度(下)
  • 单机 elasticsearch下载,安装,配置,启动,百度云链接提供
  • 【C语言】动态内存管理用realloc管理更灵活
  • CSS Table (表格)
  • windows上清理docker
  • Dify使用部署与应用实践
  • 专门做搞笑视频的网站/东莞seo整站优化
  • 网站建设移动端官网/怎样联系百度客服
  • 江门骏域网站建设/成都网站seo
  • 免费制作微网站/宁波关键词优化平台
  • 网站建设推广公司哪家权威/如何通过网络营销自己
  • 天津做网站的哪家好/seo赚钱暴利