当前位置: 首页 > wzjs >正文

怎么做二十八页美食网站网站制作出名的公司

怎么做二十八页美食网站,网站制作出名的公司,担保交易网站开发,深圳app开发公司哪家靠谱大纲 1.全局事务注解扫描器继承的父类与实现的接口 2.全局事务注解扫描器的核心变量 3.Spring容器初始化后初始化Seata客户端的源码 4.TM全局事务管理器客户端初始化的源码 5.TM组件的Netty网络通信客户端初始化源码 6.Seata框架的SPI动态扩展机制源码 7.向Seata客户端注…

大纲

1.全局事务注解扫描器继承的父类与实现的接口

2.全局事务注解扫描器的核心变量

3.Spring容器初始化后初始化Seata客户端的源码

4.TM全局事务管理器客户端初始化的源码

5.TM组件的Netty网络通信客户端初始化源码

6.Seata框架的SPI动态扩展机制源码

7.向Seata客户端注册网络请求处理器的源码

8.Seata客户端的定时调度任务源码

9.Seata客户端初始化Netty Bootstrap的源码

10.Seata客户端的寻址机制与连接服务端的源码

11.RM分支事务资源管理器客户端初始化的源码

12.全局事务注解扫描器扫描Bean是否有Seata注解

13.Seata全局事务拦截器的创建和初始化

14.基于Spring AOP创建全局事务动态代理的源码

15.全局事务注解扫描器的初始化总结

7.向Seata客户端注册网络请求处理器的源码

(1)向Seata客户端注册网络请求处理器

(2)初始化Seata客户端的Netty网络服务器

(1)向Seata客户端注册网络请求处理器

这些网络请求处理器主要就是:对事务协调者进行响应的处理器和心跳消息处理器。

public class TMClient {public static void init(String applicationId, String transactionServiceGroup) {init(applicationId, transactionServiceGroup, null, null);}public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
}public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry TC response processor,对事务协调者进行响应的处理器ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//2.registry heartbeat message processor,心跳消息处理器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}public class ClientOnResponseProcessor implements RemotingProcessor {//The Merge msg map from io.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMapprivate Map<Integer, MergeMessage> mergeMsgMap;//The Futures from io.seata.core.rpc.netty.AbstractNettyRemoting#futuresprivate final ConcurrentMap<Integer, MessageFuture> futures;//To handle the received RPC message on upper levelprivate final TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap, ConcurrentHashMap<Integer, MessageFuture> futures, TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//This container holds all processors.protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);...
}

(2)初始化Seata客户端的Netty网络服务器

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;...@Overridepublic void init() {//启动一个定时任务,每隔10s对tx分组发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}

8.Seata客户端的定时调度任务源码

Seata客户端在初始化时会启动两个定时任务:

一.每隔10s对Seata服务端发起一个重连接

二.每隔3秒检查发送的请求是否响应超时

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public abstract class AbstractNettyRemoting implements Disposable {//The Timer executor. 由单个线程进行调度的线程池protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));//Obtain the return result through MessageFuture blocking.protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();protected volatile long nowMills = 0;private static final int TIMEOUT_CHECK_INTERVAL = 3000;...public void init() {//启动一个定时任务,每隔3秒检查发送的请求是否响应超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}
}

9.Seata客户端初始化Netty Bootstrap的源码

基于Netty的API构建一个Bootstrap:

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public class NettyClientBootstrap implements RemotingBootstrap {private final NettyClientConfig nettyClientConfig;private final Bootstrap bootstrap = new Bootstrap();private final EventLoopGroup eventLoopGroupWorker;private EventExecutorGroup defaultEventExecutorGroup;private ChannelHandler[] channelHandlers;...public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) {this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));}//基于Netty的API构建一个Bootstrap//设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//对Netty网络通信数据处理组件pipeline进行初始化bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler,空闲状态检查Handler//如果有数据通过就记录一下时间//如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//基于Seata通信协议的编码器.addLast(new ProtocolV1Decoder())//基于Seata通信协议的解码器.addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}...
}

10.Seata客户端的寻址机制与连接服务端的源码

(1)获取服务端地址的寻址机制

(2)Seata客户端发起与服务端的连接

(1)获取服务端地址的寻址机制

Seata客户端获取Seata服务端地址的方法是Netty长连接管理器NettyClientChannelManager的getAvailServerList()方法。

在getAvailServerList()方法中,首先会通过SPI机制获取注册中心服务实例,也就是注册中心工厂RegistryFactory会根据SPI机制构建出Seata的注册中心服务RegistryService的实例,然后再通过注册中心服务实例RegistryService的lookup()方法获取地址。

比如SPI获取到的注册中心服务实例是FileRegistryServiceImpl。那么其lookup()方法就会根据事务服务分组名称到file.conf里去找,找到映射的名字如default,然后根据default找到Seata服务端的地址列表。

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}
}//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}...}...private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}
}public class RegistryFactory {public static RegistryService getInstance() {return RegistryFactoryHolder.INSTANCE;}private static class RegistryFactoryHolder {private static final RegistryService INSTANCE = buildRegistryService();}private static RegistryService buildRegistryService() {//接下来构建Seata注册中心服务RegistryServiceRegistryType registryType;String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE);try {registryType = RegistryType.getType(registryTypeName);} catch (Exception exx) {throw new NotSupportYetException("not support registry type: " + registryTypeName);}//通过SPI机制进行加载,比如加载到FileRegistryServiceImpl实现类return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();}
}public class FileRegistryServiceImpl implements RegistryService<ConfigChangeListener> {...@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception {String clusterName = getServiceGroup(key);if (clusterName == null) {return null;}String endpointStr = CONFIG.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);if (StringUtils.isNullOrEmpty(endpointStr)) {throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");}String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();for (String endpoint : endpoints) {String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);if (ipAndPort.length != 2) {throw new IllegalArgumentException("endpoint format should like ip:port");}inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));}return inetSocketAddresses;}...
}

(2)Seata客户端发起与服务端的连接

Netty长连接管理器NettyClientChannelManager的acquireChannel()方法会尝试获取连接。如果没有存活的连接,则会在获取到锁之后通过NettyClientChannelManager的doConnect()方法来发起连接。注意:使用到了Apache的Common Pool公共对象池来管理发起的连接。

//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}//availList一般都不会为空if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {//尝试和每个Seata Server去建立一个长连接for (String serverAddress : availList) {try {acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}//Acquire netty client channel connected to remote server.Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to {}", serverAddress);}Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());//获取锁之后发起连接synchronized (lockObj) {return doConnect(serverAddress);}}...private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();private Function<String, NettyPoolKey> poolKeyFunction;private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;...private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);if (currentPoolKey.getMessage() instanceof RegisterTMRequest) {poolKeyMap.put(serverAddress, currentPoolKey);} else {NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}}//发起连接,最终会调用到NettyPoolableFactory的makeObject()方法channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}...
}public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:" + channelToServer);}return channelToServer;}...
}

11.RM分支事务资源管理器客户端初始化的源码

RmNettyRemotingClient初始化时,会注入一个DefaultResourceManager实例以便可以获取根据SPI机制加载的资源管理器,以及注入一个DefaultRMHandler实例以便可以获取根据SPI机制加载的事务消息处理器。

public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);  }}}...
}public class DefaultRMHandler extends AbstractRMHandler {protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {//通过SPI加载所有的RMHandler事务消息处理器//比如:RMHandlerAT、RMHandlerTCC、RMHandlerSaga、RMHandlerXAList<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}...
}public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {private final AtomicBoolean initialized = new AtomicBoolean(false);private ResourceManager resourceManager;...@Overridepublic void init() {//registry processor,注册一些请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//和TmNettyRemotingClient.init()的一样super.init();if (resourceManager != null && !resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) {//和TmNettyRemotingClient.init()的一样getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);//2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);//3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);//4.registry TC response processorClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}

12.全局事务注解扫描器扫描Bean是否有Seata注解

由于GlobalTransactionScanner继承自Spring的AbstractAutoProxyCreator,所以Spring会把Spring Bean传递给GlobalTransactionScanner进行判断,也就是让GlobalTransactionScanner重写的wrapIfNecessary()方法进行判断。

重写的wrapIfNecessary()方法会判断传递过来的Bean的Class或方法上是否添加了Seata的注解,从而决定是否需要针对Bean的Class创建动态代理,从而实现对添加了Seata的注解的方法进行拦截。

对传入的Bean创建动态代理时,是通过调用其继承的父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法进行创建的。

这些Seata的注解包括:@GlobalTransactional、@GlobalLock、@TwoPhaseBusinessAction、@LocalTCC。

//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;//对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器private MethodInterceptor globalTransactionalInterceptor;...//The following will be scanned, and added corresponding interceptor://添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截//TM://@see io.seata.spring.annotation.GlobalTransactional // TM annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler//GlobalLock://@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock)  // GlobalLock handler//TCC mode://@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface//@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method//@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser//Corresponding interceptor://@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {//Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}//目标class是否被打了@GlobalTransactional注解GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//检查目标Spring Bean的各个方法,通过反射拿到添加了注解的一个方法Method[] methods = clazz.getMethods();for (Method method : methods) {//如果方法上被加了如@GlobalTransactional注解,则返回truetrxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}...
}

13.Seata全局事务拦截器的创建和初始化

如果传入GlobalTransactionScanner全局事务注解扫描器的wrapIfNecessary()方法的Bean,添加了比如@GlobalTransactional的全局事务注解,那么wrapIfNecessary()方法就会创建一个全局事务注解拦截器GlobalTransactionalInterceptor。

这个全局事务注解拦截器会被存放在GlobalTransactionScanner实例里的两个变量中:interceptor和globalTransactionalInterceptor。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...public GlobalTransactionalInterceptor(FailureHandler failureHandler) {this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}this.initDefaultGlobalTransactionTimeout();}...
}

14.基于Spring AOP创建全局事务动态代理的源码

全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,发现传入的Bean含有Seata的注解,需要为该Bean创建动态代理时,会调用父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法来创建。

AbstractAutoProxyCreator的wrapIfNecessary()方法,会通过子类GlobalTransactionScanner的getAdvicesAndAdvisorsForBean()方法,获取在GlobalTransactionScanner的wrapIfNecessary()方法中构建的拦截器(也就是全局事务注解的拦截器GlobalTransactionalInterceptor),然后创建传入的Bean的动态代理。

这样后续调用到传入Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。

//关注配置变更事件的监听器、Spring Bean初始化回调、感知到Spring容器、支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...    //Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}//获取指定的拦截器@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException {return new Object[]{interceptor};}...
}public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupportimplements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {...protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {return bean;}if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {return bean;}if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}// Create proxy if we have advice. 获取指定的拦截器Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);//创建动态代理Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));this.proxyTypes.put(cacheKey, proxy.getClass());return proxy;}this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}//获取指定的拦截器protected abstract Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, @Nullable TargetSource customTargetSource) throws BeansException;...
}

15.全局事务注解扫描器的初始化总结

全局事务注解扫描器GlobalTransactionScanner的初始化主要做了如下三项工作:

一.初始化TM全局事务管理器客户端

二.初始化RM分支事务资源管理器客户端

三.对添加了Seata相关注解的Bean创建全局事务动态代理

http://www.dtcms.com/wzjs/250131.html

相关文章:

  • 网站的备案要求吗深圳搜索引擎优化推广便宜
  • 自己做网站最新视频教程网站建设模板
  • 网站优化网络公司男生和女生在一起探讨人生软件
  • 建设通是政府认可网站吗如何投放网络广告
  • 做网站为什么每年都要续费营销策略包括哪些方面
  • 江苏做网站价格整站seo怎么做
  • 丹东建设安全监督网站外贸谷歌推广怎么样
  • 怎样设置默认网站优化怎么做
  • 油价格今日价厦门百度推广排名优化
  • wordpress插件残留青岛百度推广优化
  • 做网站的要素天津优化加盟
  • 京市保障性住房建设投资中心网站百度下载并安装
  • 做3d ppt模板下载网站电脑培训班一般多少钱
  • 新建网站外链怎么做百度ai搜索引擎
  • 一个网站怎么做pc和移动端福州网站制作推广
  • 做项目搭建网站 构建数据库seo收费
  • 随州市网站建设公司宣传推广方式
  • 360建站营销排名seo
  • 怎么做淘宝代购网站国外网站谷歌seo推广
  • 室内装修设计软件免费版下载安卓优化大师手机版
  • 网站建设开发公司有哪些个人网站制作源代码
  • 织梦手机网站怎么安装教程视频教程什么软件可以推广自己的产品
  • 网站建设费用主要包括那几项网站如何宣传推广
  • 深圳网站设计 三把火科技网络营销方案例文
  • 电商网站建设推荐广东网络推广运营
  • 女孩和狗做网站下载百度网盘
  • ashu wordpress昆明网络推广优化
  • 产品通过网站做营销中国搜索
  • 论坛网站开发 go关键词排名怎么做上首页
  • 怎样做婚恋网站如何去除痘痘效果好