当前位置: 首页 > wzjs >正文

服装网站建设策划书的基本结构在线网站流量查询

服装网站建设策划书的基本结构,在线网站流量查询,傻瓜式wordpress,用.net做视频网站的案例Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 Seata AT 基于两阶段提交协议的演变: 一阶段:业…

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata AT

基于两阶段提交协议的演变:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。

AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。

通过日志观察客户端的全局事务处理流程

当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法。

微服务A
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [172.17.0.7:8091:72680625000142901]
[  XNIO-1 task-4] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction 172.17.0.8:8091:72680625000142901 will be commit
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : [172.17.0.8:8091:72680625000142901] commit status: Committed
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction end, xid = 172.17.0.8:8091:72680625000142901[_RMROLE_1_26_32] io.seata.rm.AbstractRMHandler            : Branch committing: 172.17.0.8:8091:72680625000142901 72680625000142905 jdbc:postgresql://ip:port/schema {"autoCommit":false}
[_RMROLE_1_26_32] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72680625000142901', branchId=72680625000142905, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema?currentSchema=schema', applicationData='{"autoCommit":false}'}微服务B
[  XNIO-1 task-7] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142907, lockKeys:tableName:pkid

注意观察分支事务提交时线程的不同,证明是tc异步通知分支事务异步提交的。

1.分支事务内遇到异常,但全局事务不回滚?

当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。当某段逻辑(如业务c)执行中出现异常时,本地事务会触发回滚。(视rollbackFor、隔离级别、事务代理、事务上下文等诸多原因决定是否真的会回滚。)

此时,假设微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法,同样的当业务c执行中出现异常时,全局事务却未必回滚。

异常时全局事务处理流程

通过日志观察,分支事务遇到异常后,并没有提交到tc。即tc不会感知到分支事务的异常。
而微服务A没有输出微服务B的异常,正常提交了全局事务。

为什么微服务A没有捕获到微服务B的异常呢?是由于微服务B中配置的RestControllerAdvice捕获了svc的异常。

如下

@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
public Response handleGlobalException(Exception e) {String serviceName = this.environment.getProperty("spring.application.name");String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());log.error(errorMsg, e);return Response.failed(e.getMessage());
}

ResponseStatus为200,feign感知不到异常不会报错,微服务A也就无从感知到异常。

调整如下:

@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Response handleGlobalException(Exception e) {String serviceName = this.environment.getProperty("spring.application.name");String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());log.error(errorMsg, e);return Response.failed(e.getMessage());
}

微服务A的报错如下

feign.FeignException$InternalServerError: [500 Internal Server Error] during [POST] to [http://serviceName/url] [Client#method(List)]: [{"code":xxxx,"msg":"插入重复数据","data":null,"success":false}]at feign.FeignException.serverErrorStatus(FeignException.java:259)at feign.FeignException.errorStatus(FeignException.java:206)at feign.FeignException.errorStatus(FeignException.java:194)at feign.codec.ErrorDecoder$Default.decode(ErrorDecoder.java:103)at feign.InvocationContext.decodeError(InvocationContext.java:126)at feign.InvocationContext.proceed(InvocationContext.java:72)at feign.ResponseHandler.handleResponse(ResponseHandler.java:63)at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:114)at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:70)at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:99)at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory$1.proceed(FeignCachingInvocationHandlerFactory.java:66)at org.springframework.cache.interceptor.CacheInterceptor.lambda$invoke$0(CacheInterceptor.java:64)at org.springframework.cache.interceptor.CacheAspectSupport.invokeOperation(CacheAspectSupport.java:416)at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74)at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory.lambda$create$1(FeignCachingInvocationHandlerFactory.java:53)

feign.InvocationContext#proceed
在这里插入图片描述
除了调整httpStatus200,也可以通过在微服务A中判断返回对象的状态来决定是否报错,并由全局事务捕获。

异常时的处理逻辑:

全局事务回滚:
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction 172.17.0.7:8091:72652747696657983 will be rollback
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction end, xid = 172.17.0.7:8091:72652747696657983
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : [172.17.0.7:8091:72652747696657983] rollback status: Rollbacked分支事务回滚:
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

注意:回滚也是异步执行的。

源码分析
全局事务的处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction
io.seata.tm.api.TransactionalTemplate#execute// 1. Get transactionInfo 获取注解上的配置TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.  隔离级别Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// 当前事务存在,则先挂起;然后创建新事务;// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}tx = GlobalTransactionContext.createNew();// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,else create// 同spring @transactional 隔离级别,默认特性;当前事务存在直接返回,没有则创建;tx = GlobalTransactionContext.getCurrentOrCreate();break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 全局锁配置// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());}try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 开启新事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 处理业务代码,上文的场景即在这里处理;rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 捕获异常并完成事务;// 注意是完成,而不是回滚,例如不是所有的异常都要回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx, txInfo);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)throws TransactionalExecutor.ExecutionException, TransactionException {//roll back 判断是回滚还是提交if (txInfo != null && txInfo.rollbackOn(originalException)) {rollbackTransaction(tx, originalException);} else {// not roll back on this exception, so commitcommitTransaction(tx, txInfo);}
}
分支事务的处理逻辑

提交阶段
io.seata.rm.datasource.ConnectionProxy#commit
io.seata.rm.datasource.ConnectionProxy#doCommit
io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
io.seata.rm.AbstractResourceManager#branchRegister

BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(),String.format("branch register failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
}
if (LOGGER.isInfoEnabled()) {LOGGER.info("branch register success, xid:{}, branchId:{}, lockKeys:{}", xid, response.getBranchId(), lockKeys);
}
return response.getBranchId();

回滚阶段,tm触发全局回滚后,tc会通过netty异步告知各rm进行回滚
io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler
io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#process // rm回滚
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
io.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {// 基于undo_log回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);if (LOGGER.isInfoEnabled()) {LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);}} catch (TransactionException te) {StackTraceLogger.error(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}return BranchStatus.PhaseTwo_Rollbacked;
}

2.传播特性Propagation

spring @transactional 传播特性失效?

Spring在TransactionDefinition接口中规定了7种类型的事务传播行为。REQUIRES_NEW 用于在当前事务存在时挂起当前事务,并创建一个新的事务来执行标注了REQUIRES_NEW的方法。这种机制确保了内部事务的独立性,不受外部事务的影响。

然而在全局事务回滚时,REQUIRES_NEW注解的事务也将回滚,因为新开启的新分支事务也被同一全局事务管理。
日志如下:

[  XNIO-1 task-2] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138, lockKeys:tableName:pkName
-- REQUIRES_NEW 新开启的分支事务
[  XNIO-1 task-2] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141, lockKeys:tableName:pkName[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615141, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615141 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615141, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615138, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615138 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_2_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615138, undo_log deleted with GlobalFinished
[h_RMROLE_1_2_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

解决方式,在spring REQUIRES_NEW 之上添加全局事务的传播特性,开启新的全局事务

@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW)
@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW)
// 全局事务A回滚,另一个全局事务并不会回滚
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : suspending current transaction, xid = 172.17.0.8:8091:72652747696657983[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [172.17.0.8:8091:72652747696657984][h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72652747696657984', branchId=72652747696657985, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.17.0.8:8091:72652747696657984 72652747696657985 jdbc:postgresql://ip:port/schema null
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

再次看io.seata.tm.api.TransactionalTemplate#execute方法里全局事务传播特性的行为

case REQUIRES_NEW:// 区分传播特性// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}tx = GlobalTransactionContext.createNew();
ShouldNeverHappenException

在最初使用全局事务的隔离级别时,方法上只有@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW),而没有@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW),抛出过如下异常,绑定的是新分布式事务id,但当前全局事务id是原id

Cause: java.sql.SQLException: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141Caused by: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141at io.seata.rm.datasource.ConnectionContext.bind(ConnectionContext.java:198)at io.seata.rm.datasource.ConnectionProxy.bind(ConnectionProxy.java:85)at io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute(BaseTransactionalExecutor.java:120)at io.seata.rm.datasource.exec.ExecuteTemplate.execute(ExecuteTemplate.java:145)at io.seata.rm.datasource.PreparedStatementProxy.execute(PreparedStatementProxy.java:55)at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:48)at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:75)at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:106)at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)

seata 通过代理执行sql语句时,检查是否已有xid
org.apache.seata.rm.datasource.ConnectionContext#bind

void bind(String xid) {if (xid == null) {throw new IllegalArgumentException("xid should not be null");}if (!inGlobalTransaction()) {setXid(xid);} else {if (!this.xid.equals(xid)) {throw new ShouldNeverHappenException(String.format("bind xid: %s, while current xid: %s", xid, this.xid));}}
}

跟进org.apache.seata.rm.datasource.ConnectionContext#reset()方法,只有在本地事务提交或回滚后才会清空xid。即同一分支事务不能跨不同的分布式事务。

在执行不同的sql语句时,会根据不同类型的数据库及dml语句来实现抽象类AbstractDMLBaseExecutor,比如insert、update、delete的beforeImage,afterImage有不同的处理逻辑。

3. 全局事务读未提交?

回滚时的undolog 检测

在某次分支事务执行回滚时,曾遇到过如下异常:

 [_RMROLE_1_13_32] i.seata.rm.datasource.DataSourceManager  : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508]
// 执行回滚
org.apache.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
// 基于undo log进行回滚
org.apache.seata.rm.datasource.undo.AbstractUndoLogManager#undo
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
// 回滚前校验, 如校验当前信息与修改前、修改后信息
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#dataValidationAndGoOn// 如果当前数据与undo log 中的修改后、修改前日志都不一样,则抛出异常// 这些数据可能被其它并发事务修改了,需要人工维护io.seata.rm.datasource.undo.SQLUndoDirtyExceptionprotected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {TableRecords beforeRecords = sqlUndoLog.getBeforeImage();TableRecords afterRecords = sqlUndoLog.getAfterImage();// Compare current data with before data// No need undo if the before data snapshot is equivalent to the after data snapshot.Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);if (beforeEqualsAfterResult.getResult()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Stop rollback because there is no data change " +"between the before data snapshot and the after data snapshot.");}// no need continue undo.return false;}// Validate if data is dirty.TableRecords currentRecords = queryCurrentRecords(conn);// compare with current data and after image.Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);if (!afterEqualsCurrentResult.getResult()) {// If current data is not equivalent to the after data, then compare the current data with the before// data, too. No need continue to undo if current data is equivalent to the before data snapshotResult<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);if (beforeEqualsCurrentResult.getResult()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Stop rollback because there is no data change " +"between the before data snapshot and the current data snapshot.");}// no need continue undo.return false;} else {if (LOGGER.isInfoEnabled()) {if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());}}if (LOGGER.isDebugEnabled()) {LOGGER.debug("check dirty data failed, old and new data are not equal, " +"tableName:[" + sqlUndoLog.getTableName() + "]," +"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");}throw new SQLUndoDirtyException("Has dirty records when undo.");}}return true;
}	

回顾下seata at 模式的执行流程,在一阶段分支事务注册时已经完成本地事务的提交。

// 本地事务提交
org.apache.seata.rm.datasource.ConnectionProxy#commit
org.apache.seata.rm.datasource.ConnectionProxy#doCommit 	
org.apache.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit  private void processGlobalTransactionCommit() throws SQLException {try {// 向tc注册分支事务 register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 提交本地事务targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {// 向tc上报本地提交结果report(true);}// 重试context;避免检查xid抛出shouldNotHappenExpcontext.reset();
}			

而回滚时是收到tc通知异步进行的,在这个时间差内如果有其他事务读到了全局事务未提交的数据,即出现了全局事务的读未提交,出现了脏读的情况。

如何实现全局事务的读已提交?
脏写

回顾上文的日志中,曾出现过

[  XNIO-1 task-4] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid

即分支事务提交时,会加锁。
但是如果针对同一张表的改动不在此全局事务的方法中,如何处理呢?
可以在其他方法上也增加全局事务注解@GlobalTransactional 。
如果不需要全局事务,则需要使用@GlobalLock。

无论是@GlobalTransactional还是@GlobalLock,都是在分支事务提交时才能知道锁是否存在,然后决定下一步操作。

脏读

曾有同事遇到过一个场景,在MQ因异常重试时,读到了回滚前的数据认为业务已执行完成而停止了重试逻辑。

因此需要在执行过程中有必要检查全局锁是否存在。
如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。

io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

public T doExecute(Object... args) throws Throwable {Connection conn = statementProxy.getConnection();// ... ...try {// ... ...while (true) {try {// ... ...if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {// Do the same thing under either @GlobalTransactional or @GlobalLock, // that only check the global lock  here.statementProxy.getConnectionProxy().checkLock(lockKeys);} else {throw new RuntimeException("Unknown situation!");}break;} catch (LockConflictException lce) {if (sp != null) {conn.rollback(sp);} else {conn.rollback();}// trigger retrylockRetryController.sleep(lce);}}} finally {// ...}

其他的一些问题

时间类型的字段时区不要带时区
[_RMROLE_1_10_32] i.seata.rm.datasource.DataSourceManager  : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed and try again later xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508 Cannot cast an instance of java.sql.Timestamp to type Types.TIMESTAMP_WITH_TIMEZONE]

参考

https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/
https://seata.apache.org/zh-cn/blog/seata-datasource-proxy/
https://seata.apache.org/zh-cn/docs/overview/faq/#42
https://seata.apache.org/zh-cn/blog/seata-at-lock/

org.apache.seata.tm.api.TransactionalTemplate
io.seata.rm.AbstractResourceManager

org.apache.seata.rm.datasource.exec.BaseTransactionalExecutor
子类 AbstractDMLBaseExecutor、SelectForUpdateExecutor

org.apache.seata.server.coordinator.DefaultCore

http://www.dtcms.com/wzjs/452370.html

相关文章:

  • 郑州做网站哪里好免费大数据查询平台
  • 沈阳市城乡建设局网站首页保定seo博客
  • bbin接口网站开发域名注册多少钱
  • 欧力虎网站建设企业查询官网
  • 表白网页生成器手机版网站建设优化推广系统
  • 做外贸c2c网站有哪些博客seo怎么做
  • 搜索网站的软件有哪些网店营销策略有哪些
  • 网页设计个人简历代码seo网站优化方案案例
  • 网站+做内容分发资格网站建设 全网营销
  • 邹城网站建设zczwxx搜狗引擎
  • 网页设计教学网站东莞优化排名公司
  • 网站详情怎么做的搜索引擎优化的目的是对用户友好
  • 哪个网站专业做商铺网站推广方案策划
  • 提供定制型网站建设百度网站推广排名
  • 搜索引擎优化方法有哪几种杭州seo服务公司
  • 阿里云建站数据库用什么seo职位具体做什么
  • win2008 网站服务器网站制作的费用
  • 东营建设信息网(东营市住房和城乡如何做一个网站的seo
  • 潍坊中小型网站建设公司好用的搜索引擎有哪些
  • 网站建设讲师招聘营销方式有哪些
  • 做商品网站的教学视频全渠道营销成功案例
  • 玉溪做网站建设的公司推广拉新app哪几个靠谱
  • 软件工程就业岗位电脑系统优化软件十大排名
  • 广州微网站建设机构矿坛器材友情交换
  • 网站怎么申请支付宝可以免费领取会员的软件
  • 网站建设公司怎么盈利社区推广
  • 吉林省建设监理协会网站搜索引擎优化技术
  • 用wp系统做网站手游推广渠道
  • 怎么做网站建设作业百度推广关键词匹配模式
  • 网站后台html5模板谷歌chrome官网