Seata源码—4.全局事务拦截与开启事务处理一
大纲
1.Seata Server的启动入口的源码
2.Seata Server的网络服务器启动的源码
3.全局事务拦截器的核心变量
4.全局事务拦截器的初始化源码
5.全局事务拦截器的AOP切面拦截方法
6.通过全局事务执行模版来执行全局事务
7.获取xid构建全局事务实例与全局事务的传播级别
8.全局事务执行模版根据传播级别来执行业务
9.全局事务执行模版开启事务+提交事务+回滚事务
10.Seata Server集群的负载均衡机制实现源码
11.Seata Client向Seata Server发送请求的源码
12.Client将RpcMessage对象编码成字节数组
13.Server将字节数组解码成RpcMessage对象
14.Server处理已解码的RpcMessage对象的流程
15.Seata Server开启全局事务的流程源码
1.Seata Server的启动入口的源码
代码位于seata-server模块下:
@SpringBootApplication(scanBasePackages = {"io.seata"})
public class ServerApplication {public static void main(String[] args) throws IOException {//run the spring-boot applicationSpringApplication.run(ServerApplication.class, args);}
}@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);private boolean started = Boolean.FALSE;private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();public static void addDisposable(Disposable disposable) {DISPOSABLE_LIST.add(disposable);}@Overridepublic void run(String... args) {try {long start = System.currentTimeMillis();Server.start(args);started = true;long cost = System.currentTimeMillis() - start;LOGGER.info("seata server started in {} millSeconds", cost);} catch (Throwable e) {started = Boolean.FALSE;LOGGER.error("seata server start error: {} ", e.getMessage(), e);System.exit(-1);}}public boolean started() {return started;}@Overridepublic void destroy() throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug("destoryAll starting");}for (Disposable disposable : DISPOSABLE_LIST) {disposable.destroy();}if (LOGGER.isDebugEnabled()) {LOGGER.debug("destoryAll finish");}}
}public class Server {//The entry point of application.public static void start(String[] args) {//create loggerfinal Logger logger = LoggerFactory.getLogger(Server.class);//initialize the parameter parser//Note that the parameter parser should always be the first line to execute.//Because, here we need to parse the parameters needed for startup.ParameterParser parameterParser = new ParameterParser(args);//initialize the metrics//Seata Server是支持metric指标采集功能的MetricsManager.get().init();System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());//Seata Server里的Netty服务器的IO线程池,最小50个,最大500个ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(),NettyServerConfig.getKeepAliveTime(),TimeUnit.SECONDS,new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),new ThreadPoolExecutor.CallerRunsPolicy());//创建一个Netty网络通信服务器NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);UUIDGenerator.init(parameterParser.getServerNode());//log store mode : file, db, redisSessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());//启动定时调度线程DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);coordinator.init();nettyRemotingServer.setHandler(coordinator);//let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);//127.0.0.1 and 0.0.0.0 are not valid here.if (NetUtil.isValidIp(parameterParser.getHost(), false)) {XID.setIpAddress(parameterParser.getHost());} else {String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);if (StringUtils.isNotBlank(preferredNetworks)) {XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));} else {XID.setIpAddress(NetUtil.getLocalIp());}}//初始化Netty服务器nettyRemotingServer.init();}
}
2.Seata Server的网络服务器启动的源码
创建和启动Seata的网络服务器:
public class NettyRemotingServer extends AbstractNettyRemotingServer {...//Instantiates a new Rpc remoting server. 创建Seata Serverpublic NettyRemotingServer(ThreadPoolExecutor messageExecutor) {super(messageExecutor, new NettyServerConfig());}//启动Seata Server@Overridepublic void init() {//registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}private void registerProcessor() {//1.registry on request message processorServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);//2.registry on response message processorServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);//3.registry rm message processorRegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);//4.registry tm message processorRegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);//5.registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}...
}public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {private final NettyServerBootstrap serverBootstrap;...public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);//创建Netty ServerserverBootstrap = new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}@Overridepublic void init() {super.init();//启动Netty ServerserverBootstrap.start();}...
}public abstract class AbstractNettyRemoting implements Disposable {//The Timer executor. 由单个线程进行调度的线程池protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));//The Message executor.protected final ThreadPoolExecutor messageExecutor;...public void init() {//启动一个定时任务,每隔3秒检查发送的请求是否响应超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}...
}public class NettyServerBootstrap implements RemotingBootstrap {private final NettyServerConfig nettyServerConfig;private final EventLoopGroup eventLoopGroupBoss;private final EventLoopGroup eventLoopGroupWorker;private final ServerBootstrap serverBootstrap = new ServerBootstrap();private ChannelHandler[] channelHandlers;private int listenPort;private final AtomicBoolean initialized = new AtomicBoolean(false);public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {this.nettyServerConfig = nettyServerConfig;if (NettyServerConfig.enableEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));} else {this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));}}//Sets channel handlers.protected void setChannelHandlers(final ChannelHandler... handlers) {if (handlers != null) {channelHandlers = handlers;}}@Overridepublic void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker).channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())).localAddress(new InetSocketAddress(getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});try {this.serverBootstrap.bind(getListenPort()).sync();XID.setPort(getListenPort());LOGGER.info("Server started, service listen port: {}", getListenPort());RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));initialized.set(true);} catch (SocketException se) {throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);} catch (Exception exx) {throw new RuntimeException("Server start failed", exx);}}...
}public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {private RemotingServer remotingServer;private final DefaultCore core;private static volatile DefaultCoordinator instance;private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1));private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1));private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1));private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1));private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1));...public static DefaultCoordinator getInstance(RemotingServer remotingServer) {if (null == instance) {synchronized (DefaultCoordinator.class) {if (null == instance) {instance = new DefaultCoordinator(remotingServer);}}}return instance;}private DefaultCoordinator(RemotingServer remotingServer) {if (remotingServer == null) {throw new IllegalArgumentException("RemotingServer not allowed be null.");}this.remotingServer = remotingServer;this.core = new DefaultCore(remotingServer);}public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}...
}
Seata Client的ClientHandler和Seata Server的ServerHandler:
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {//Channel read.@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//此时会把解码完毕的RpcMessage来进行处理processMessage(ctx, (RpcMessage) msg);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {synchronized (lock) {if (ctx.channel().isWritable()) {lock.notifyAll();}}ctx.fireChannelWritabilityChanged();}//Channel inactive.@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {debugLog("inactive:{}", ctx);if (messageExecutor.isShutdown()) {return;}handleDisconnect(ctx);super.channelInactive(ctx);}private void handleDisconnect(ChannelHandlerContext ctx) {final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isInfoEnabled()) {LOGGER.info(ipAndPort + " to server channel inactive.");}if (rpcContext != null && rpcContext.getClientRole() != null) {rpcContext.release();if (LOGGER.isInfoEnabled()) {LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);}} else {if (LOGGER.isInfoEnabled()) {LOGGER.info("remove unused channel:" + ctx.channel());}}}//Exception caught.@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {try {if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {return;}LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel());super.exceptionCaught(ctx, cause);} finally {ChannelManager.releaseRpcContext(ctx.channel());}}//User event triggered.@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {debugLog("idle:{}", evt);IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {if (LOGGER.isInfoEnabled()) {LOGGER.info("channel:" + ctx.channel() + " read idle.");}handleDisconnect(ctx);try {closeChannelHandlerContext(ctx);} catch (Exception e) {LOGGER.error(e.getMessage());}}}}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {if (LOGGER.isInfoEnabled()) {LOGGER.info(ctx + " will closed");}super.close(ctx, future);}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {synchronized (lock) {if (ctx.channel().isWritable()) {lock.notifyAll();}}ctx.fireChannelWritabilityChanged();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (messageExecutor.isShutdown()) {return;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel inactive: {}", ctx.channel());}clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));super.channelInactive(ctx);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {if (LOGGER.isInfoEnabled()) {LOGGER.info("channel {} read idle.", ctx.channel());}try {String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());clientChannelManager.invalidateObject(serverAddress, ctx.channel());} catch (Exception exx) {LOGGER.error(exx.getMessage());} finally {clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));}}if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {try {if (LOGGER.isDebugEnabled()) {LOGGER.debug("will send ping msg,channel {}", ctx.channel());}AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);} catch (Throwable throwable) {LOGGER.error("send request error: {}", throwable.getMessage(), throwable);}}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));if (LOGGER.isInfoEnabled()) {LOGGER.info("remove exception rm channel:{}", ctx.channel());}super.exceptionCaught(ctx, cause);}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {if (LOGGER.isInfoEnabled()) {LOGGER.info(ctx + " will closed");}super.close(ctx, future);}}...
}
3.全局事务拦截器的核心变量
全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,如果发现Spring的Bean含有Seata的注解,就会为该Bean创建动态代理。
比如Spring的Bean添加了@GlobalTransactional注解,那么GlobalTransactionScanner类为这个Bean创建动态代理时,会使用全局事务拦截器GlobalTransactionalInterceptor来进行创建。
这样后续调用到这个Spring Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。
GlobalTransactionalInterceptor这个全局事务注解拦截器的核心变量如下:
一.TransactionalTemplate全局事务执行模版
二.GlobalLockTemplate全局锁管理模版
三.FailureHandler全局事务异常处理器
//全局事务注解拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);//默认的全局事务异常处理组件//如果全局事务出现开启、回滚、提交、重试异常时,就可以回调这个DefaultFailureHandlerImpl进行异常处理private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();//全局事务执行模版,用来管理全局事务的执行private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();//全局锁执行模版,用来实现不同全局事务间的写隔离private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();//真正的全局事务异常处理组件private final FailureHandler failureHandler;//是否禁用全局事务private volatile boolean disable;//全局事务拦截器的顺序private int order;//AOP切面全局事务核心配置,来自于全局事务注解protected AspectTransactional aspectTransactional;//全局事务降级检查的时间周期private static int degradeCheckPeriod;//是否开启全局事务的降级检查private static volatile boolean degradeCheck;//降级检查允许时间private static int degradeCheckAllowTimes;//降级次数private static volatile Integer degradeNum = 0;//reach达标次数private static volatile Integer reachNum = 0;//Guava提供的事件总线private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);//定时调度线程池private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));//默认的全局事务超时时间private static int defaultGlobalTransactionTimeout = 0;...
}
4.全局事务拦截器的初始化源码
全局事务拦截器GlobalTransactionalInterceptor进行初始化时,会设置全局事务的异常处理组件,设置默认的全局事务超时时间为60秒。
//全局事务注解扫描器
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {... //Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...
}//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {//真正的全局事务异常处理组件private final FailureHandler failureHandler;//是否禁用全局事务private volatile boolean disable;//全局事务拦截器的顺序private int order;//是否开启全局事务的降级检查private static volatile boolean degradeCheck;//全局事务降级检查的时间周期private static int degradeCheckPeriod;//降级检查允许时间private static int degradeCheckAllowTimes;//默认的全局事务超时时间private static int defaultGlobalTransactionTimeout = 0;//Guava提供的事件总线private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);...//Instantiates a new Global transactional interceptor.//实例化一个新的全局事务拦截器public GlobalTransactionalInterceptor(FailureHandler failureHandler) {this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}this.initDefaultGlobalTransactionTimeout();}//初始化默认的全局事务超时时间,60s=1minprivate void initDefaultGlobalTransactionTimeout() {if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {int defaultGlobalTransactionTimeout;try {defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);} catch (Exception e) {LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;}if (defaultGlobalTransactionTimeout <= 0) {LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;}GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;}}...
}
5.全局事务拦截器的AOP切面拦截方法
如果调用添加了@GlobalTransactional注解的方法,就会执行GlobalTransactionalInterceptor的invoke()方法。
//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {//是否禁用全局事务private volatile boolean disable; //是否开启全局事务的降级检查private static volatile boolean degradeCheck;//降级次数private static volatile Integer degradeNum = 0;//降级检查允许时间private static int degradeCheckAllowTimes;//AOP切面全局事务核心配置,来自于全局事务注解protected AspectTransactional aspectTransactional;...//如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {//methodInvocation是一次方法调用//通过methodInvocation的getThis()方法可以获取到被调用方法的对象//通过AopUtils.getTargetClass()方法可以获取到对象对应的ClassClass<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//通过反射,获取到目标class中被调用的method方法Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);//如果调用的目标method不为nullif (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {//尝试寻找桥接方法bridgeMethodfinal Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//通过反射,获取被调用的目标方法的@GlobalTransactional注解final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);//通过反射,获取被调用目标方法的@GlobalLock注解final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true//localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);//如果全局事务没有禁用if (!localDisable) {//全局事务注解不为空,或者是AOP切面全局事务核心配置不为空if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {//创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),globalTransactionalAnnotation.name(),globalTransactionalAnnotation.rollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.noRollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.propagation(),globalTransactionalAnnotation.lockRetryInterval(),globalTransactionalAnnotation.lockRetryTimes());} else {transactional = this.aspectTransactional;}//真正处理全局事务的入口return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}//直接运行目标方法return methodInvocation.proceed();}//获取注解public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));}...
}
6.通过全局事务执行模版来执行全局事务
GlobalTransactionInterceptor全局事务拦截器中会有一个全局事务执行模版的实例变量,这个全局事务执行模版TransactionalTemplate实例就是用来执行全局事务的。执行全局事务时,就会调用TransactionalTemplate的execute()方法。
//全局事务拦截器
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {//全局事务执行模版,用来管理全局事务的执行private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();...//真正进行全局事务的处理Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {//基于全局事务执行模版TransactionalTemplate,来执行全局事务return transactionalTemplate.execute(new TransactionalExecutor() {//真正执行目标方法@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}//根据全局事务注解可以获取到一个name,可以对目标方法进行格式化public String name() {String name = aspectTransactional.getName();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}//获取全局事务的信息@Overridepublic TransactionInfo getTransactionInfo() {//reset the value of timeoutint timeout = aspectTransactional.getTimeoutMills();if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}//封装一个全局事务信息实例TransactionInfoTransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(timeout);//全局事务超时时间transactionInfo.setName(name());//全局事务名称transactionInfo.setPropagation(aspectTransactional.getPropagation());//全局事务传播级别transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//全局锁获取重试间隔transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//全局锁重试次数//全局事务回滚规则Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : aspectTransactional.getRollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {...} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}...
}
7.获取xid构建全局事务实例与全局事务的传播级别
(1)从RootContext获取xid来构建全局事务实例
(2)全局事务的传播级别
(1)从RootContext获取xid来构建全局事务实例
RootContext会通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore实例、ThreadLocalContextCore实例。
而xid又会通过RootContext的bind()方法被put()到ContextCore实例中,也就是xid会被put()到ThreadLocal<Map<String, Object>>中,或者被put()到FastThreadLocal<Map<String, Object>>中。因此,通过RootContext的get()方法可以从ContextCore实例中获取当前线程的xid。
//全局事务执行模版
public class TransactionalTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);//Execute object.public Object execute(TransactionalExecutor business) throws Throwable {//1.Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();//1.2 Handle the transaction propagation.Propagation propagation = txInfo.getPropagation();...}...
}//全局事务上下文
public class GlobalTransactionContext {private GlobalTransactionContext() {}//Get GlobalTransaction instance bind on current thread.public static GlobalTransaction getCurrent() {String xid = RootContext.getXID();if (xid == null) {return null;}return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);}...
}public class RootContext {//通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore、ThreadLocalContextCore//所以可以认为,xid是存放在ThreadLocal<Map<String, Object>>中的private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();...private RootContext() {}//Gets xid.@Nullablepublic static String getXID() {return (String) CONTEXT_HOLDER.get(KEY_XID);}//Bind xid.public static void bind(@Nonnull String xid) {if (StringUtils.isBlank(xid)) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("xid is blank, switch to unbind operation!");}unbind();} else {MDC.put(MDC_KEY_XID, xid);if (LOGGER.isDebugEnabled()) {LOGGER.debug("bind {}", xid);}CONTEXT_HOLDER.put(KEY_XID, xid);}}...
}
(2)全局事务的传播级别
全局事务的传播级别分别有:REQUIRED、REQUIRES_NEW、NOT_SUPPORTED、NEVER、SUPPORTS、MANDATORY。
//Propagation level of global transactions.
//全局事务的传播级别
public enum Propagation {//如果全局事务已经存在,此时会直接在当前的全局事务里继续去运行下去,后续运行的都是全局事务里的分支事务//如果全局事务此时还不存在,就会开启一个新的全局事务来运行//这种全局事务传播级别,就是REQUIRED//The logic is similar to the following code:// if (tx == null) {// try {// tx = beginNewTransaction(); // begin new transaction, is not existing// Object rs = business.execute(); // execute with new transaction// commitTransaction(tx);// return rs;// } catch (Exception ex) {// rollbackTransaction(tx);// throw ex;// }// } else {// return business.execute(); // execute with current transaction// }REQUIRED,//如果全局事务已经存在,则先暂停该事务,然后开启一个新的全局事务来执行业务//The logic is similar to the following code:// try {// if (tx != null) {// suspendedResource = suspendTransaction(tx); // suspend current transaction// }// try {// tx = beginNewTransaction(); // begin new transaction// Object rs = business.execute(); // execute with new transaction// commitTransaction(tx);// return rs;// } catch (Exception ex) {// rollbackTransaction(tx);// throw ex;// }// } finally {// if (suspendedResource != null) {// resumeTransaction(suspendedResource); // resume transaction// }// }REQUIRES_NEW,//如果全局事务已经存在,则先暂停该事务,然后不要使用全局事务来执行业务//The logic is similar to the following code:// try {// if (tx != null) {// suspendedResource = suspendTransaction(tx); // suspend current transaction// }// return business.execute(); // execute without transaction// } finally {// if (suspendedResource != null) {// resumeTransaction(suspendedResource); // resume transaction// }// }NOT_SUPPORTED,//如果全局事务不存在,则不要使用全局事务来执行业务//如果全局事务存在,则使用全局事务来执行业务//The logic is similar to the following code:// if (tx != null) {// return business.execute(); // execute with current transaction// } else {// return business.execute(); // execute without transaction// }SUPPORTS,//如果全局事务存在,则抛异常//如果全局事务不存在,则执行业务//The logic is similar to the following code:// if (tx != null) {// throw new TransactionException("existing transaction");// }// return business.execute(); // execute without transactionNEVER,//如果全局事务不存在,则抛异常//如果全局事务存在,则使用全局事务去执行业务//The logic is similar to the following code:// if (tx == null) {// throw new TransactionException("not existing transaction");// }// return business.execute(); // execute with current transactionMANDATORY
}
8.全局事务执行模版根据传播级别来执行业务
//全局事务执行模版
public class TransactionalTemplate {...//Execute object.//通过全局事务生命周期管理组件执行全局事务public Object execute(TransactionalExecutor business) throws Throwable {//1.Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.//根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务//刚开始在开启一个全局事务的时候,是没有全局事务的GlobalTransaction tx = GlobalTransactionContext.getCurrent();//1.2 Handle the transaction propagation.//从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED//也就是如果存在一个全局事务,就直接执行业务;//如果不存在一个全局事务,就开启一个新的全局事务;Propagation propagation = txInfo.getPropagation();//不同的全局事务传播级别,会采取不同的处理方式//比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid//可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED://If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}//Execute without transaction and return.return business.execute();case REQUIRES_NEW://If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}//Continue and execute with new transactionbreak;case SUPPORTS://If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}//Continue and execute with new transactionbreak;case REQUIRED://If current transaction is existing, execute with current transaction, else continue and execute with new transaction.break;case NEVER://If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {//Execute without transaction and return.return business.execute();}case MANDATORY://If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}//Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}//1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {//如果xid为null,则会创建一个新的全局事务tx = GlobalTransactionContext.createNew();}//set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {//2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//else do nothing. Of course, the hooks will still be triggered.//开启一个全局事务beginTransaction(txInfo, tx);Object rs;try {//Do Your Business//执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并执行一个一个分支事务rs = business.execute();} catch (Throwable ex) {//3. The needed business exception to rollback.//发生异常时需要完成的事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//4. everything is fine, commit.//如果一切执行正常就会在这里提交全局事务commitTransaction(tx);return rs;} finally {//5. clear//执行一些全局事务完成后的回调,比如清理等工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {//If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {//如果之前挂起了一个全局事务,此时可以恢复这个全局事务tx.resume(suspendedResourcesHolder);}}}...
}