Seata源码—5.全局事务的创建与返回处理二
大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
5.Seata Server创建全局事务与返回xid的源码
-> ServerHandler.channelRead()接收Seata Client发送过来的请求;
-> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
-> ServerOnRequestProcessor.process()处理RpcMessage消息;
-> TransactionMessageHandler.onRequest()处理RpcMessage消息;
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理processMessage(ctx, (RpcMessage) msg);}}
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {private final RemotingServer remotingServer;...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();//RpcContext线程本地变量副本RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}// the batch send request messageif (message instanceof MergedWarpMessage) {...} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;//最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessageAbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);//返回响应给客户端remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}...
}
-> TransactionMessageHandler.onRequest()处理RpcMessage消息;
-> DefaultCoordinator.onRequest()处理RpcMessage消息;
-> GlobalBeginRequest.handle()处理开启全局事务请求;
-> AbstractTCInboundHandler.handle()开启全局事务返回全局事务;
-> DefaultCoordinator.doGlobalBegin()开启全局事务;
-> DefaultCore.begin()创建全局事务会话并开启;
-> GlobalSession.createGlobalSession()创建全局事务会话;
-> GlobalSession.begin()开启全局事务会话;
-> AbstractSessionManager.onBegin()
-> AbstractSessionManager.addGlobalSession()
-> AbstractSessionManager.writeSession()
-> TransactionStoreManager.writeSession()持久化全局事务会话;
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {...@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}...
}public class GlobalBeginRequest extends AbstractTransactionRequestToTC {...@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}...
}public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);@Overridepublic GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {GlobalBeginResponse response = new GlobalBeginResponse();exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {@Overridepublic void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {try {//开启全局事务doGlobalBegin(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);}}}, request, response);return response;}...
}public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {private final DefaultCore core;...@Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {//接下来才真正处理开启全局事务的业务逻辑//其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去response.setXid(core.begin(rpcContext.getApplicationId(),//应用程序idrpcContext.getTransactionServiceGroup(),//事务服务分组request.getTransactionName(),//事务名称request.getTimeout())//超时时间);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}...
}public class DefaultCore implements Core {...@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {//创建一个全局事务会话GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);//通过slf4j的MDC把xid放入线程本地变量副本里去MDC.put(RootContext.MDC_KEY_XID, session.getXid());//添加一个全局事务会话的生命周期监听器session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//打开Session,其中会对全局事务会话进行持久化session.begin();//transaction start event,发布会话开启事件MetricsPublisher.postSessionDoingEvent(session, false);//返回全局事务会话的xidreturn session.getXid();}...
}public class GlobalSession implements SessionLifecycle, SessionStorable {...public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);return session;}public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {//全局事务id是通过UUIDGenerator来生成的this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.lazyLoadBranch = lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions = new ArrayList<>();}this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;//根据UUIDGenerator生成的transactionId + XID工具生成最终的xidthis.xid = XID.generateXID(transactionId);}@Overridepublic void begin() throws TransactionException {this.status = GlobalStatus.Begin;this.beginTime = System.currentTimeMillis();this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}}...
}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {...@Overridepublic void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);}@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (LOGGER.isDebugEnabled()) {LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);}writeSession(LogOperation.GLOBAL_ADD, session);}private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {//transactionStoreManager.writeSession()会对全局事务会话进行持久化if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {...}}...
}
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
-> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应;
-> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应;
-> AbstractNettyRemoting.sendAsync()异步发送响应;
-> Netty的Channel.writeAndFlush()发送响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@Overridepublic void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {Channel clientChannel = channel;if (!(msg instanceof HeartbeatMessage)) {clientChannel = ChannelManager.getSameClientChannel(channel);}if (clientChannel != null) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE: ProtocolConstants.MSGTYPE_RESPONSE);super.sendAsync(clientChannel, rpcMsg);} else {throw new RuntimeException("channel is error.");}}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {RpcMessage rpcMsg = new RpcMessage();rpcMsg.setMessageType(messageType);rpcMsg.setCodec(rpcMessage.getCodec()); // same with requestrpcMsg.setCompressor(rpcMessage.getCompressor());rpcMsg.setBody(msg);rpcMsg.setId(rpcMessage.getId());return rpcMsg;}//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}
6.Client获取Server的响应与处理的源码
-> ClientHandler.channelRead()接收Seata Server返回的响应;
-> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
-> ClientOnResponseProcessor.process()会设置MessageFuture结果;
-> MessageFuture.setResultMessage()设置MessageFuture结果;
-> CompletableFuture.complete()唤醒阻塞的线程;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class ClientOnResponseProcessor implements RemotingProcessor {...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (rpcMessage.getBody() instanceof MergeResultMessage) {...} else if (rpcMessage.getBody() instanceof BatchResultMessage) {...} else {//这里是对普通消息的处理MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}...
}public class MessageFuture {private transient CompletableFuture<Object> origin = new CompletableFuture<>();...//Sets result message.public void setResultMessage(Object obj) {origin.complete(obj);}...
}
由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。
即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。
-> GlobalTransactionalInterceptor.invoke()
-> GlobalTransactionalInterceptor.handleGlobalTransaction()
-> TransactionalTemplate.execute()
-> TransactionalTemplate.beginTransaction()
-> DefaultGlobalTransaction.begin()
-> DefaultTransactionManager.begin()
-> DefaultTransactionManager.syncCall()
-> TmNettyRemotingClient.sendSyncRequest()
-> AbstractNettyRemotingClient.sendSyncRequest()发送请求;
-> AbstractNettyRemoting.sendSync()发送同步请求;
-> MessageFuture.get()会同步等待Seata Server的响应结果;
-> CompletableFuture.get()阻塞当前线程进行等待唤醒;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {//因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡String serverAddress = loadBalance(getTransactionServiceGroup(), msg);//获取RPC调用的超时时间long timeoutMillis = this.getRpcRequestTimeout();//构建一个RPC消息RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);//send batch message//put message into basketMap, @see MergedSendRunnable//默认是不开启批量消息发送if (this.isEnableClientBatchSendRequest()) {...} else {//通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel//然后通过网络连接Channel把RpcMessage发送出去Channel channel = clientChannelManager.acquireChannel(serverAddress);return super.sendSync(channel, rpcMessage, timeoutMillis);}}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}//把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());//获取远程地址String remoteAddr = ChannelUtil.getAddressFromChannel(channel);doBeforeRpcHooks(remoteAddr, rpcMessage);//异步化发送数据,同时对发送结果添加监听器//如果发送失败,则会对网络连接Channel进行销毁处理channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());}destroyChannel(future.channel());}});try {//然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);doAfterRpcHooks(remoteAddr, rpcMessage, result);return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}...
}public class MessageFuture {private transient CompletableFuture<Object> origin = new CompletableFuture<>();...public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {Object result = null;try {result = origin.get(timeout, unit);if (result instanceof TimeoutException) {throw (TimeoutException)result;}} catch (ExecutionException e) {throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);} catch (TimeoutException e) {throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));}if (result instanceof RuntimeException) {throw (RuntimeException)result;} else if (result instanceof Throwable) {throw new RuntimeException((Throwable)result);}return result;}...
}
7.Seata与Dubbo整合的过滤器源码
(1)调用Dubbo过滤器的入口
(2)Seata与Dubbo整合的过滤器
(1)调用Dubbo过滤器的入口
-> GlobalTransactionalInterceptor.invoke()拦截添加了@GlobalTransactional注解的方法;
-> GlobalTransactionalInterceptor.handleGlobalTransaction()进行全局事务的处理;
-> TransactionalTemplate.execute()执行全局事务
-> TransactionalTemplate.beginTransaction()开启一个全局事务
-> handleGlobalTransaction().methodInvocation.proceed()真正执行目标方法
-> ApacheDubboTransactionPropagationFilter.invoke()经过Dubbo过滤器处理
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...//如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {//methodInvocation是一次方法调用//通过methodInvocation的getThis()方法可以获取到被调用方法的对象//通过AopUtils.getTargetClass()方法可以获取到对象对应的ClassClass<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//通过反射,获取到目标class中被调用的method方法Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);//如果调用的目标method不为nullif (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {//尝试寻找桥接方法bridgeMethodfinal Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//通过反射,获取被调用的目标方法的@GlobalTransactional注解final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);//通过反射,获取被调用目标方法的@GlobalLock注解final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true//localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);//如果全局事务没有禁用if (!localDisable) {//全局事务注解不为空,或者是AOP切面全局事务核心配置不为空if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {//创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),globalTransactionalAnnotation.name(),globalTransactionalAnnotation.rollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.noRollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.propagation(),globalTransactionalAnnotation.lockRetryInterval(),globalTransactionalAnnotation.lockRetryTimes());} else {transactional = this.aspectTransactional;}//真正处理全局事务的入口return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}//直接运行目标方法return methodInvocation.proceed();}//真正进行全局事务的处理Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {//基于全局事务执行模版TransactionalTemplate,来执行全局事务return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {//真正执行目标方法return methodInvocation.proceed();}...});} catch (TransactionalExecutor.ExecutionException e) {...} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}...
}public class TransactionalTemplate {...public Object execute(TransactionalExecutor business) throws Throwable {//1.Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.//根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务//刚开始在开启一个全局事务的时候,是没有全局事务的GlobalTransaction tx = GlobalTransactionContext.getCurrent();//1.2 Handle the transaction propagation.//从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED//也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;Propagation propagation = txInfo.getPropagation();//不同的全局事务传播级别,会采取不同的处理方式//比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid//可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {...}//1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {tx = GlobalTransactionContext.createNew();}//set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {//2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, else do nothing. Of course, the hooks will still be triggered.//开启一个全局事务beginTransaction(txInfo, tx);Object rs;try {//Do Your Business//执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务rs = business.execute();} catch (Throwable ex) {//3. The needed business exception to rollback.//发生异常时需要完成的事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//4. everything is fine, commit.//如果一切执行正常就会在这里提交全局事务commitTransaction(tx);return rs;} finally {//5. clear//执行一些全局事务完成后的回调,比如清理等工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {//If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {//如果之前挂起了一个全局事务,此时可以恢复这个全局事务tx.resume(suspendedResourcesHolder);}}}//开启事务private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()triggerBeforeBegin();//真正去开启一个全局事务tx.begin(txInfo.getTimeOut(), txInfo.getName());//开启全局事务之后还有一个回调钩子名为triggerAfterBegin()triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);}}...
}
(2)Seata与Dubbo整合的过滤器
如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。
当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class ApacheDubboTransactionPropagationFilter implements Filter {private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {//发起Dubbo的RPC调用时,会先从线程本地变量副本里获取xidString xid = RootContext.getXID();//然后从线程本地变量副本里获取当前的分支事务类型,默认分支类型就是ATBranchType branchType = RootContext.getBranchType();//从RpcContext里获取attachments里的xid和分支类型String rpcXid = getRpcXid();String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);if (LOGGER.isDebugEnabled()) {LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);}boolean bind = false;if (xid != null) {//如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形//则把线程本地变量副本里的xid和分支类型,设置到RpcContext上下文里//RpcContext上下文里的attachment内容会随着RPC请求发送到其他系统中RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());} else {//如果线程本地变量副本里的xid为null且RpcContext里的xid不为null,对应于接收RPC调用的情形if (rpcXid != null) {//把RpcContext里的xid绑定到当前服务的线程本地变量副本里RootContext.bind(rpcXid); if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {RootContext.bindBranchType(BranchType.TCC);}bind = true;if (LOGGER.isDebugEnabled()) {LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);}}}try {return invoker.invoke(invocation);} finally {if (bind) {BranchType previousBranchType = RootContext.getBranchType();//对线程本地变量副本里的xid做解绑String unbindXid = RootContext.unbind(); if (BranchType.TCC == previousBranchType) {RootContext.unbindBranchType();}if (LOGGER.isDebugEnabled()) {LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);}if (!rpcXid.equalsIgnoreCase(unbindXid)) {LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid, rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);if (unbindXid != null) {RootContext.bind(unbindXid);LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);if (BranchType.TCC == previousBranchType) {RootContext.bindBranchType(BranchType.TCC);LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);}}}}//对RpcContext上下文里的东西进行解绑RpcContext.getContext().removeAttachment(RootContext.KEY_XID);RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID);RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);}}private String getRpcXid() {String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);if (rpcXid == null) {rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());}return rpcXid;}
}