当前位置: 首页 > wzjs >正文

怎么做二十八页美食网站网站seo推广公司靠谱吗

怎么做二十八页美食网站,网站seo推广公司靠谱吗,电影网站建设教程下载,河南企业网站建设公司大纲 1.全局事务注解扫描器继承的父类与实现的接口 2.全局事务注解扫描器的核心变量 3.Spring容器初始化后初始化Seata客户端的源码 4.TM全局事务管理器客户端初始化的源码 5.TM组件的Netty网络通信客户端初始化源码 6.Seata框架的SPI动态扩展机制源码 7.向Seata客户端注…

大纲

1.全局事务注解扫描器继承的父类与实现的接口

2.全局事务注解扫描器的核心变量

3.Spring容器初始化后初始化Seata客户端的源码

4.TM全局事务管理器客户端初始化的源码

5.TM组件的Netty网络通信客户端初始化源码

6.Seata框架的SPI动态扩展机制源码

7.向Seata客户端注册网络请求处理器的源码

8.Seata客户端的定时调度任务源码

9.Seata客户端初始化Netty Bootstrap的源码

10.Seata客户端的寻址机制与连接服务端的源码

11.RM分支事务资源管理器客户端初始化的源码

12.全局事务注解扫描器扫描Bean是否有Seata注解

13.Seata全局事务拦截器的创建和初始化

14.基于Spring AOP创建全局事务动态代理的源码

15.全局事务注解扫描器的初始化总结

7.向Seata客户端注册网络请求处理器的源码

(1)向Seata客户端注册网络请求处理器

(2)初始化Seata客户端的Netty网络服务器

(1)向Seata客户端注册网络请求处理器

这些网络请求处理器主要就是:对事务协调者进行响应的处理器和心跳消息处理器。

public class TMClient {public static void init(String applicationId, String transactionServiceGroup) {init(applicationId, transactionServiceGroup, null, null);}public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
}public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry TC response processor,对事务协调者进行响应的处理器ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//2.registry heartbeat message processor,心跳消息处理器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}public class ClientOnResponseProcessor implements RemotingProcessor {//The Merge msg map from io.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMapprivate Map<Integer, MergeMessage> mergeMsgMap;//The Futures from io.seata.core.rpc.netty.AbstractNettyRemoting#futuresprivate final ConcurrentMap<Integer, MessageFuture> futures;//To handle the received RPC message on upper levelprivate final TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap, ConcurrentHashMap<Integer, MessageFuture> futures, TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//This container holds all processors.protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);...
}

(2)初始化Seata客户端的Netty网络服务器

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;...@Overridepublic void init() {//启动一个定时任务,每隔10s对tx分组发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}

8.Seata客户端的定时调度任务源码

Seata客户端在初始化时会启动两个定时任务:

一.每隔10s对Seata服务端发起一个重连接

二.每隔3秒检查发送的请求是否响应超时

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public abstract class AbstractNettyRemoting implements Disposable {//The Timer executor. 由单个线程进行调度的线程池protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));//Obtain the return result through MessageFuture blocking.protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();protected volatile long nowMills = 0;private static final int TIMEOUT_CHECK_INTERVAL = 3000;...public void init() {//启动一个定时任务,每隔3秒检查发送的请求是否响应超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}
}

9.Seata客户端初始化Netty Bootstrap的源码

基于Netty的API构建一个Bootstrap:

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public class NettyClientBootstrap implements RemotingBootstrap {private final NettyClientConfig nettyClientConfig;private final Bootstrap bootstrap = new Bootstrap();private final EventLoopGroup eventLoopGroupWorker;private EventExecutorGroup defaultEventExecutorGroup;private ChannelHandler[] channelHandlers;...public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) {this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));}//基于Netty的API构建一个Bootstrap//设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//对Netty网络通信数据处理组件pipeline进行初始化bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler,空闲状态检查Handler//如果有数据通过就记录一下时间//如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//基于Seata通信协议的编码器.addLast(new ProtocolV1Decoder())//基于Seata通信协议的解码器.addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}...
}

10.Seata客户端的寻址机制与连接服务端的源码

(1)获取服务端地址的寻址机制

(2)Seata客户端发起与服务端的连接

(1)获取服务端地址的寻址机制

Seata客户端获取Seata服务端地址的方法是Netty长连接管理器NettyClientChannelManager的getAvailServerList()方法。

在getAvailServerList()方法中,首先会通过SPI机制获取注册中心服务实例,也就是注册中心工厂RegistryFactory会根据SPI机制构建出Seata的注册中心服务RegistryService的实例,然后再通过注册中心服务实例RegistryService的lookup()方法获取地址。

比如SPI获取到的注册中心服务实例是FileRegistryServiceImpl。那么其lookup()方法就会根据事务服务分组名称到file.conf里去找,找到映射的名字如default,然后根据default找到Seata服务端的地址列表。

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}
}//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}...}...private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}
}public class RegistryFactory {public static RegistryService getInstance() {return RegistryFactoryHolder.INSTANCE;}private static class RegistryFactoryHolder {private static final RegistryService INSTANCE = buildRegistryService();}private static RegistryService buildRegistryService() {//接下来构建Seata注册中心服务RegistryServiceRegistryType registryType;String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE);try {registryType = RegistryType.getType(registryTypeName);} catch (Exception exx) {throw new NotSupportYetException("not support registry type: " + registryTypeName);}//通过SPI机制进行加载,比如加载到FileRegistryServiceImpl实现类return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();}
}public class FileRegistryServiceImpl implements RegistryService<ConfigChangeListener> {...@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception {String clusterName = getServiceGroup(key);if (clusterName == null) {return null;}String endpointStr = CONFIG.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);if (StringUtils.isNullOrEmpty(endpointStr)) {throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");}String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();for (String endpoint : endpoints) {String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);if (ipAndPort.length != 2) {throw new IllegalArgumentException("endpoint format should like ip:port");}inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));}return inetSocketAddresses;}...
}

(2)Seata客户端发起与服务端的连接

Netty长连接管理器NettyClientChannelManager的acquireChannel()方法会尝试获取连接。如果没有存活的连接,则会在获取到锁之后通过NettyClientChannelManager的doConnect()方法来发起连接。注意:使用到了Apache的Common Pool公共对象池来管理发起的连接。

//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}//availList一般都不会为空if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {//尝试和每个Seata Server去建立一个长连接for (String serverAddress : availList) {try {acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}//Acquire netty client channel connected to remote server.Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to {}", serverAddress);}Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());//获取锁之后发起连接synchronized (lockObj) {return doConnect(serverAddress);}}...private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();private Function<String, NettyPoolKey> poolKeyFunction;private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;...private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);if (currentPoolKey.getMessage() instanceof RegisterTMRequest) {poolKeyMap.put(serverAddress, currentPoolKey);} else {NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}}//发起连接,最终会调用到NettyPoolableFactory的makeObject()方法channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}...
}public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:" + channelToServer);}return channelToServer;}...
}

11.RM分支事务资源管理器客户端初始化的源码

RmNettyRemotingClient初始化时,会注入一个DefaultResourceManager实例以便可以获取根据SPI机制加载的资源管理器,以及注入一个DefaultRMHandler实例以便可以获取根据SPI机制加载的事务消息处理器。

public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);  }}}...
}public class DefaultRMHandler extends AbstractRMHandler {protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {//通过SPI加载所有的RMHandler事务消息处理器//比如:RMHandlerAT、RMHandlerTCC、RMHandlerSaga、RMHandlerXAList<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}...
}public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {private final AtomicBoolean initialized = new AtomicBoolean(false);private ResourceManager resourceManager;...@Overridepublic void init() {//registry processor,注册一些请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//和TmNettyRemotingClient.init()的一样super.init();if (resourceManager != null && !resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) {//和TmNettyRemotingClient.init()的一样getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);//2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);//3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);//4.registry TC response processorClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}

12.全局事务注解扫描器扫描Bean是否有Seata注解

由于GlobalTransactionScanner继承自Spring的AbstractAutoProxyCreator,所以Spring会把Spring Bean传递给GlobalTransactionScanner进行判断,也就是让GlobalTransactionScanner重写的wrapIfNecessary()方法进行判断。

重写的wrapIfNecessary()方法会判断传递过来的Bean的Class或方法上是否添加了Seata的注解,从而决定是否需要针对Bean的Class创建动态代理,从而实现对添加了Seata的注解的方法进行拦截。

对传入的Bean创建动态代理时,是通过调用其继承的父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法进行创建的。

这些Seata的注解包括:@GlobalTransactional、@GlobalLock、@TwoPhaseBusinessAction、@LocalTCC。

//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;//对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器private MethodInterceptor globalTransactionalInterceptor;...//The following will be scanned, and added corresponding interceptor://添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截//TM://@see io.seata.spring.annotation.GlobalTransactional // TM annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler//GlobalLock://@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock)  // GlobalLock handler//TCC mode://@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface//@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method//@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser//Corresponding interceptor://@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {//Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}//目标class是否被打了@GlobalTransactional注解GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//检查目标Spring Bean的各个方法,通过反射拿到添加了注解的一个方法Method[] methods = clazz.getMethods();for (Method method : methods) {//如果方法上被加了如@GlobalTransactional注解,则返回truetrxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}...
}

13.Seata全局事务拦截器的创建和初始化

如果传入GlobalTransactionScanner全局事务注解扫描器的wrapIfNecessary()方法的Bean,添加了比如@GlobalTransactional的全局事务注解,那么wrapIfNecessary()方法就会创建一个全局事务注解拦截器GlobalTransactionalInterceptor。

这个全局事务注解拦截器会被存放在GlobalTransactionScanner实例里的两个变量中:interceptor和globalTransactionalInterceptor。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...public GlobalTransactionalInterceptor(FailureHandler failureHandler) {this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}this.initDefaultGlobalTransactionTimeout();}...
}

14.基于Spring AOP创建全局事务动态代理的源码

全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,发现传入的Bean含有Seata的注解,需要为该Bean创建动态代理时,会调用父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法来创建。

AbstractAutoProxyCreator的wrapIfNecessary()方法,会通过子类GlobalTransactionScanner的getAdvicesAndAdvisorsForBean()方法,获取在GlobalTransactionScanner的wrapIfNecessary()方法中构建的拦截器(也就是全局事务注解的拦截器GlobalTransactionalInterceptor),然后创建传入的Bean的动态代理。

这样后续调用到传入Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。

//关注配置变更事件的监听器、Spring Bean初始化回调、感知到Spring容器、支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...    //Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}//获取指定的拦截器@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException {return new Object[]{interceptor};}...
}public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupportimplements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {...protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {return bean;}if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {return bean;}if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}// Create proxy if we have advice. 获取指定的拦截器Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);//创建动态代理Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));this.proxyTypes.put(cacheKey, proxy.getClass());return proxy;}this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}//获取指定的拦截器protected abstract Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, @Nullable TargetSource customTargetSource) throws BeansException;...
}

15.全局事务注解扫描器的初始化总结

全局事务注解扫描器GlobalTransactionScanner的初始化主要做了如下三项工作:

一.初始化TM全局事务管理器客户端

二.初始化RM分支事务资源管理器客户端

三.对添加了Seata相关注解的Bean创建全局事务动态代理

http://www.dtcms.com/wzjs/511555.html

相关文章:

  • txt怎么做pdf电子书下载网站b2b b2c c2c o2o区别
  • 建设网站需要学习什么淘宝指数
  • php购物网站开发设计与实现青岛seo推广公司
  • 网络公司网站样本杭州排名优化公司电话
  • 服装定制网站的设计与实现seo 视频
  • 网帆网站建设网站排名顾问
  • 做视频up主视频网站百度排名怎么做
  • 织梦网站访问量统计代码郑州百度快照优化排名
  • html网站建设基本流程图在哪里打广告效果最好
  • 标志设计作业上海优化价格
  • 网页设计界面图seo推广是什么意怿
  • 做商业网站赚钱吗优化公司哪家好
  • 江苏省住房和城乡建设厅 官方网站企业新闻营销
  • 襄阳做网站公司西安seo学院
  • 微信如何做网站成人技术培训学校
  • 企业营销网站建设费用预算360seo排名优化服务
  • 丰县建设网站产品推广方式有哪些
  • 成都旅游攻略五日游怎么做好网站搜索引擎优化
  • 做网站和程序员哪个好点百度账号登陆
  • 做外贸翻译用哪个网站seo店铺描述
  • 台州网站建设哪家公司好seo网页优化培训
  • 武汉做网站公司hlbzx小米的推广软文
  • wordpress页面设计外贸东莞seo优化推广
  • 做业务在那几个网站上找客户端郑州seo
  • 同步wordpress站点百度网盘人工申诉电话
  • 百姓网免费招聘信息广西seo
  • 织梦个人网站企业网站seo案例分析
  • 杭州建设局网站品牌推广方案思维导图
  • 查看网站是否做百度推广自助建站
  • 天津电商网站建设杭州专业seo公司