spring声明式事务原理02-调用第1层@Transactional方法-按需创建事务createTransactionIfNecessary
文章目录
- 【README】
- 【复习-上文逻辑】UserAppService调用userSupport.saveNewUser()
- 【1】概览-按需创建事务-TransactionAspectSupport#createTransactionIfNecessary()
- 【2】方法源码及调用
- 【2.1】TransactionAspectSupport#createTransactionIfNecessary
- 【2.2】tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务
- 【2.2.1】AbstractPlatformTransactionManager#doGetTransaction-创建逻辑事务
- 【2.2.2】AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在
- 【2.2.3】执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务
- 【2.2.4】执行AbstractPlatformTransactionManager#startTransaction():开启事务,创建并返回事务状态(创建物理数据库连接)
- 【2.2.4.1】AbstractPlatformTransactionManager#newTransactionStatus创建事务状态
- 【2.2.4.2】AbstractPlatformTransactionManager#doBegin()-开启事务
- 【2.2.4.3】AbstractPlatformTransactionManager#prepareSynchronization-装配事务的同步信息
- 【补充】TransactionSynchronizationManager-事务同步管理器的线程级变量
- 【2.3】TransactionAspectSupport#prepareTransactionInfo-装配事务信息
- 【TransactionInfo】 TransactionInfo-事务信息类是TransactionAspectSupport的静态内部类
- 【3】小结
- 【3.1】创建spring逻辑事务与物理数据库连接
- 【3.2】资源绑定
- 【3.2.1】数据源与数据库连接持有器绑定
- 【3.2.2】数据库连接持有器绑定到逻辑事务
- 【3.2.3】装配新的数据库连接持有器conHolder
【README】
1)声明式事务代码样例:
public interface UserMapper {
UserPO qryUserById(@Param("id") String id);
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
void insertUser(UserPO userPO);
}
public interface UserAccountMapper {
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
void insertUserAccount(UserAccountPO userAccountPO);
}
【代码解说】
- 代码调用链路:UserAppService -> userSupport.saveNewUser -> userMapper.insertUser + userAccountMapper.insertUserAccount
- userSupport.saveNewUser 带有 @Transactional 注解;
- userMapper.insertUser 带有 @Transactional 注解;
- userAccountMapper.insertUserAccount 带有 @Transactional 注解;
2)@Transactional:事务注解,用于定义事务元数据,包括事务管理器名称,事务传播行为,超时时间(单位秒),是否只读,回滚的异常类型;
- @Transactional可以标注类与方法,不管是标注类还是方法,@Transactional标注所在的类的bean都会被spring通过aop代理进行增强;
【复习-上文逻辑】UserAppService调用userSupport.saveNewUser()
1)因为 userSupport中的saveNewUser方法被@Transaction标注,所以该bean被spring增强为aop代理,所以访问aop代理的入口方法intercept(),
- 实际调用CglibAopProxy#DynamicAdvisedInterceptor静态内部类的 intercept方法;
2)CglibAopProxy#DynamicAdvisedInterceptor静态内部类的 intercept方法()
3)接着执行 (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed() ,调用CglibMethodInvocation#proceed方法;
【CglibAopProxy静态内部类DynamicAdvisedInterceptor】
private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {
private final AdvisedSupport advised;
public DynamicAdvisedInterceptor(AdvisedSupport advised) {
this.advised = advised;
}
@Nullable
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
Object target = null;
TargetSource targetSource = this.advised.getTargetSource();
Object var16;
try {
if (this.advised.exposeProxy) {
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = target != null ? target.getClass() : null;
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
Object retVal;
if (chain.isEmpty()) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
} else {
// 走这个分支
retVal = (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();
}
var16 = CglibAopProxy.processReturnType(proxy, target, method, args, retVal);
} finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
AopContext.setCurrentProxy(oldProxy);
}
}
return var16;
}
new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy) 传入的参数如下:
【CglibAopProxy静态内部类CglibMethodInvocation】
private static class CglibMethodInvocation extends ReflectiveMethodInvocation {
public CglibMethodInvocation(Object proxy, @Nullable Object target, Method method, Object[] arguments, @Nullable Class<?> targetClass, List<Object> interceptorsAndDynamicMethodMatchers, MethodProxy methodProxy) {
super(proxy, target, method, arguments, targetClass, interceptorsAndDynamicMethodMatchers);
}
@Nullable
public Object proceed() throws Throwable {
try {
return super.proceed();// 这里
} catch (RuntimeException var2) {
throw var2;
} catch (Exception var3) {
if (!ReflectionUtils.declaresException(this.getMethod(), var3.getClass()) && !KotlinDetector.isKotlinType(this.getMethod().getDeclaringClass())) {
throw new UndeclaredThrowableException(var3);
} else {
throw var3;
}
}
}
}
4)执行ReflectiveMethodInvocation#proceed()方法
// ReflectiveMethodInvocation#proceed() 反射方法调用器#处理方法
public Object proceed() throws Throwable {
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return this.invokeJoinpoint();
} else {
Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher)interceptorOrInterceptionAdvice;
Class<?> targetClass = this.targetClass != null ? this.targetClass : this.method.getDeclaringClass();
return dm.matcher().matches(this.method, targetClass, this.arguments) ? dm.interceptor().invoke(this) : this.proceed();
} else {
return ((MethodInterceptor)interceptorOrInterceptionAdvice).invoke(this);// 这个分支
}
}
}
5)获取到的interceptorOrInterceptionAdvice实际是 TransactionInterceptor,所以接着执行 TransactionInterceptor#invoke(this)方法;而传入的this=ReflectiveMethodInvocation或者 CglibMethodInvocation,参见 new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy);
【 TransactionInterceptor#invoke(this)方法】
6)调用 TransactionInterceptor#invokeWithinTransaction(this)方法 , TransactionInterceptor父类是TransactionAspectSupport,接着执行 TransactionAspectSupport#invokeWithinTransaction方法;
【TransactionAspectSupport#invokeWithinTransaction】
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
TransactionAttributeSource tas = this.getTransactionAttributeSource();
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
TransactionManager tm = this.determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
// ... 不走这个分支
} else {
PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);
String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager cpptm) {
// ... 不走这个分支
} else {
// 走这个分支, 按需创建事务
TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable var22) {
// 异常处理
this.completeTransactionAfterThrowing(txInfo, var22);
throw var22;
} finally {
// 最后清理事务信息
this.cleanupTransactionInfo(txInfo);
}
if (retVal != null && txAttr != null) {
// 不走这个分支
}
// 返回后提交事务
this.commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}
【代码解说】 TransactionAspectSupport#invokeWithinTransaction()-在事务中执行业务逻辑,具体步骤如下(跳过了分支判断):
- 步骤1:调用createTransactionIfNecessary方法,创建事务;
- 步骤2:调用invocation.proceedWithInvocation(),执行具体业务逻辑(执行目标方法);
- 步骤2.1(或有) : 抛出异常,执行completeTransactionAfterThrowing
- 步骤3:执行完成(无论是否抛出异常),调用cleanupTransactionInfo()
- 步骤4:判断返回值是否不为null 且 事务属性是否不为null
- 步骤5:调用commitTransactionAfterReturning(),提交事务;
下面的内容主要对TransactionAspectSupport#createTransactionIfNecessary(按需创建事务)-进行剖析;
【注意】 本文的业务场景是第1层@Transactional方法的执行 ;
【1】概览-按需创建事务-TransactionAspectSupport#createTransactionIfNecessary()
1)按需创建事务-createTransactionIfNecessary():步骤详情;
- tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务;
- AbstractPlatformTransactionManager#doGetTransaction:重要-创建逻辑事务;
- 创建DataSourceTransactionObject-逻辑事务对象;
- 根据数据源从TransactionSynchronizationManager获取连接持有器;
- 把数据库连接持有器conHolder绑定到逻辑事务对象;
- AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在;(第1层@Transactional方法不走这个分支,即事务不存在)
- 若存在,则调用 handleExistingTransaction ;
- 执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务;
- 执行AbstractPlatformTransactionManager#startTransaction():重要-开启事务;
- 创建事务状态;
- 执行开启事务前的监听器方法-listener#beforeBegin;【本文不涉及】
- 调用AbstractPlatformTransactionManager#doBegin(transaction, definition):开启事务;
- 通过数据源创建物理数据库连接;
- 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject;
- 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
- 设置物理数据库连接为手动提交;
- 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
- 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
- 是否为新的数据库连接持有器:
- 若是:则把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource;
- 调用AbstractPlatformTransactionManager#prepareSynchronization(status, definition):装配事务的同步信息;
- 若事务状态的是否新同步为true,则通过TransactionSynchronizationManager设置5个线程级变量,如下:
- 设置事务是否活跃 ;
- 设置事务隔离级别;
- 设置事务是否只读;
- 设置事务名称;
- 初始化事务同步信息;
- 若事务状态的是否新同步为true,则通过TransactionSynchronizationManager设置5个线程级变量,如下:
- 执行开启事务后的监听器方法-listener#afterBegin;【本文不涉及】
- AbstractPlatformTransactionManager#doGetTransaction:重要-创建逻辑事务;
- TransactionAspectSupport#prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status):装配事务信息;
- 创建事务信息对象txInfo;
- 把事务状态status设置到事务信息对象txInfo;
- txInfo对象绑定到TransactionAspectSupport中的线程级变量transactionInfoHolder;
【2】方法源码及调用
【2.1】TransactionAspectSupport#createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 通过事务管理器获取事务
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
// 准备事务信息TransactionInfo
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
【2.2】tm.getTransaction((TransactionDefinition)txAttr):通过事务管理器创建事务
【AbstractPlatformTransactionManager#getTransaction】
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();
// 创建逻辑事务
Object transaction = this.doGetTransaction();
boolean debugEnabled = this.logger.isDebugEnabled();
// 逻辑事务是否存在 (执行第1层@Transactional方法,明显事务不存在,不走这个分支)
if (this.isExistingTransaction(transaction)) {
// 处理已存在的事务
return this.handleExistingTransaction(def, transaction, debugEnabled);
} else if (def.getTimeout() < -1) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
} else if (def.getPropagationBehavior() == 2) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {
boolean newSynchronization = this.getTransactionSynchronization() == 0;
return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
} else {
// 走这个分支
SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
if (debugEnabled) {
Log var10000 = this.logger;
String var10001 = def.getName();
var10000.debug("Creating new transaction with name [" + var10001 + "]: " + def);
}
try {
// 开启事务
return this.startTransaction(def, transaction, false, debugEnabled, suspendedResources);
} catch (Error | RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
}
}
}
【2.2.1】AbstractPlatformTransactionManager#doGetTransaction-创建逻辑事务
【DataSourceTransactionManager#doGetTransaction】创建逻辑事务
- 创建DataSourceTransactionObject-逻辑事务对象;
- 根据数据源从TransactionSynchronizationManager获取连接持有器;
- 把数据库连接持有器conHolder绑定到逻辑事务对象;
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
【2.2.2】AbstractPlatformTransactionManager#isExistingTransaction:判断事务是否已经存在
第1层@Transactional方法不走这个分支,即事务不存在 ; 若存在,则调用 handleExistingTransaction ;【本文不涉及】
【2.2.3】执行AbstractPlatformTransactionManager#suspend((Object)null);: 挂起事务
传入的挂起的事务对象为null,所以本文不涉及,不挂起任何事物;
【2.2.4】执行AbstractPlatformTransactionManager#startTransaction():开启事务,创建并返回事务状态(创建物理数据库连接)
【AbstractPlatformTransactionManager#startTransaction】开启事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// transactionSynchronization==0, 所以是新同步
boolean newSynchronization = this.getTransactionSynchronization() != 2;
// 1 创建事务状态
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
this.transactionExecutionListeners.forEach((listener) -> {
listener.beforeBegin(status);
});
try {
// 2 开启事务
this.doBegin(transaction, definition);
} catch (Error | RuntimeException var9) {
this.transactionExecutionListeners.forEach((listener) -> {
listener.afterBegin(status, var9);
});
throw var9;
}
// 3 装配事务的同步信息
this.prepareSynchronization(status, definition);
this.transactionExecutionListeners.forEach((listener) -> {
listener.afterBegin(status, (Throwable)null);
});
return status;
}
【2.2.4.1】AbstractPlatformTransactionManager#newTransactionStatus创建事务状态
private DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean nested, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(definition.getName(), transaction, newTransaction, actualNewSynchronization, nested, definition.isReadOnly(), debug, suspendedResources);
}
【2.2.4.2】AbstractPlatformTransactionManager#doBegin()-开启事务
【DataSourceTransactionManager#doBegin()】开启事务
- 通过数据源创建物理数据库连接;
- 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject;
- 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
- 设置物理数据库连接为手动提交;
- 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
- 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
- 是否为新的数据库连接持有器:
- 若是:则把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource;
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 通过数据源创建物理数据库连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 把物理数据库连接封装到连接持有器,把连接持有器绑定管道逻辑事务对象txObject;
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置逻辑事务的连接持有器conHolder对象的synchronizedWithTransaction属性为true;(synchronizedWithTransaction=与事务同步)
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 设置物理数据库连接为手动提交;
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
// 设置逻辑事务的连接持有器conHolder对象的transactionActive为ture; (transactionActive 事务活跃状态)
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 设置逻辑事务的连接持有器conHolder对象的超时时间;(由@Transactional注解定义的)
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 把数据源(key)与逻辑事务的连接持有器conHolder对象(value)绑定到TransactionSynchronizationManager的线程级变量resource
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
【2.2.4.3】AbstractPlatformTransactionManager#prepareSynchronization-装配事务的同步信息
1)调用AbstractPlatformTransactionManager#prepareSynchronization(status, definition):装配事务的同步信息;
- 若事务状态的是否新同步为true,则通过TransactionSynchronizationManager设置5个线程级变量,如下:
- 设置事务是否活跃 ;
- 设置事务隔离级别;
- 设置事务是否只读;
- 设置事务名称;
- 初始化事务同步信息;
【AbstractPlatformTransactionManager#prepareSynchronizatio】
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
// TransactionSynchronizationManager#initSynchronization()
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
} else {
synchronizations.set(new LinkedHashSet());
}
}
【补充】TransactionSynchronizationManager-事务同步管理器的线程级变量
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");
public TransactionSynchronizationManager() {
}
...
}
【2.3】TransactionAspectSupport#prepareTransactionInfo-装配事务信息
TransactionAspectSupport#prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status):装配事务信息;
- 创建事务信息对象txInfo;
- 把事务状态status设置到事务信息对象txInfo;
- txInfo对象绑定到TransactionAspectSupport中的线程级变量transactionInfoHolder;
【TransactionAspectSupport#prepareTransactionInfo】
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.newTransactionStatus(status);
} else if (this.logger.isTraceEnabled()) {
this.logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional.");
}
txInfo.bindToThread();
return txInfo;
}
【TransactionInfo】 TransactionInfo-事务信息类是TransactionAspectSupport的静态内部类
protected static final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
public TransactionInfo(@Nullable PlatformTransactionManager transactionManager, @Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {
this.transactionManager = transactionManager;
this.transactionAttribute = transactionAttribute;
this.joinpointIdentification = joinpointIdentification;
}
public PlatformTransactionManager getTransactionManager() {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
return this.transactionManager;
}
@Nullable
public TransactionAttribute getTransactionAttribute() {
return this.transactionAttribute;
}
public String getJoinpointIdentification() {
return this.joinpointIdentification;
}
public void newTransactionStatus(@Nullable TransactionStatus status) {
this.transactionStatus = status;
}
@Nullable
public TransactionStatus getTransactionStatus() {
return this.transactionStatus;
}
public boolean hasTransaction() {
return this.transactionStatus != null;
}
private void bindToThread() {
this.oldTransactionInfo = (TransactionInfo)TransactionAspectSupport.transactionInfoHolder.get();
TransactionAspectSupport.transactionInfoHolder.set(this);
}
private void restoreThreadLocalStatus() {
TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}
public String toString() {
return this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction";
}
}
【3】小结
【3.1】创建spring逻辑事务与物理数据库连接
1)本文分析了调用第1层@Transactional标注的方法的细节; spirng对@Transactional标注的方法所在类的bean进行了增强,在调用目标方法前执行TransactionAspectSupport#createTransactionIfNecessary-按需创建事务;
2)创建spring逻辑事务:类型为DataSourceTransactionObject, 通过调用DataSourceTransactionManager#doGetTransaction方法创建;
【DataSourceTransactionManager#doGetTransaction】
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
也就是说spring中的事务对象是DataSourceTransactionObject,与数据库事务没有关系;我们可以这样理解:spring中的DataSourceTransactionObject是逻辑事务,而数据库事务是物理事务;逻辑事务是要寄宿(依赖)在物理事务上;还需要注意的是,这里只创建了逻辑事务DataSourceTransactionObject,还没有创建物理事务(数据库事务) ;
3)创建物理数据库连接:
创建物理数据库连接的代码: Connection newCon = obtainDataSource().getConnection() ;
【DataSourceTransactionManager#doBegin】
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 创建物理数据库连接(调用 DataSource#getConnection方法)
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置数据库连接持有器conHolder的与事务同步属性为true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 设置手动提交
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 设置超时时间
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 绑定资源
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
【3.2】资源绑定
【3.2.1】数据源与数据库连接持有器绑定
【DataSourceTransactionManager#doBegin】
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
// 其中 txObjec是逻辑事务
上述代码是把 数据源-key,连接持有器-value绑定到ThreadLocal线程级变量上;
绑定的目的在于:在同一个线程中,同一个数据源,对应的是同一个数据库连接持有器(即同一个物理数据库连接)
【补充】TransactionSynchronizationManager#bindResource, 绑定资源
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
map = new HashMap();
resources.set(map);
}
Object oldValue = ((Map)map).put(actualKey, value);
if (oldValue instanceof ResourceHolder resourceHolder) {
if (resourceHolder.isVoid()) {
oldValue = null;
}
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
}
}
【TransactionSynchronizationManager定义了多个线程级变量】
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");
public TransactionSynchronizationManager() {
}
【3.2.2】数据库连接持有器绑定到逻辑事务
【DataSourceTransactionManager#doBegin】
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
目的: 建立逻辑事务与物理数据库连接持有器(物理数据库连接)的绑定关系;(多个逻辑事务对象对应同一个物理数据库连接,即逻辑事务需要寄宿在物理数据库连接上) ;
【3.2.3】装配新的数据库连接持有器conHolder
// 设置连接持有器conHolder的与事务同步属性为true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 设置连接持有器conHolder的事务是否活跃为true
txObject.getConnectionHolder().setTransactionActive(true);
// 设置连接持有器conHolder的事务超时时间为 @Transactional的元数据
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);