当前位置: 首页 > news >正文

Seata源码—3.全局事务注解扫描器的初始化一

大纲

1.全局事务注解扫描器继承的父类与实现的接口

2.全局事务注解扫描器的核心变量

3.Spring容器初始化后初始化Seata客户端的源码

4.TM全局事务管理器客户端初始化的源码

5.TM组件的Netty网络通信客户端初始化源码

6.Seata框架的SPI动态扩展机制源码

7.向Seata客户端注册网络请求处理器的源码

8.Seata客户端的定时调度任务源码

9.Seata客户端初始化Netty Bootstrap的源码

10.Seata客户端的寻址机制与连接服务端的源码

11.RM分支事务资源管理器客户端初始化的源码

12.全局事务注解扫描器扫描Bean是否有Seata注解

13.Seata全局事务拦截器的创建和初始化

14.基于Spring AOP创建全局事务动态代理的源码

15.全局事务注解扫描器的初始化总结

如下的代码都是位于seata-spring模块下。

1.全局事务注解扫描器继承的父类与实现的接口

在dubbo-business.xml配置文件中,会引入全局事务注解扫描器GlobalTransactionScanner。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"><dubbo:application name="dubbo-demo-app"><dubbo:parameter key="qos.enable" value="false"/><dubbo:parameter key="qos.accept.foreign.ip" value="false"/><dubbo:parameter key="qos.port" value="33333"/></dubbo:application><dubbo:registry address="zookeeper://localhost:2181" /><dubbo:reference id="orderService" check="false" interface="io.seata.samples.dubbo.service.OrderService"/><dubbo:reference id="stockService" check="false" interface="io.seata.samples.dubbo.service.StockService"/><bean id="business" class="io.seata.samples.dubbo.service.impl.BusinessServiceImpl"><property name="orderService" ref="orderService"/><property name="stockService" ref="stockService"/></bean><!-- 全局事务注解扫描器 --><bean class="io.seata.spring.annotation.GlobalTransactionScanner"><constructor-arg value="dubbo-demo-app"/><constructor-arg value="my_test_tx_group"/></bean>
</beans>

全局事务注解扫描器GlobalTransactionScanner的继承父类和实现接口:

继承父类:AbstractAutoProxyCreator——Spring的动态代理自动创建者;
实现接口:ConfigurationChangeListener——关注配置变更事件的监听器;
实现接口:InitializingBean——Spring Bean的初始化回调;
实现接口:ApplicationContextAware——让Spring Bean获取到Spring容器;
实现接口:DisposableBean——支持可抛弃Bean;
//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:让Bean可以获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {......
}

2.全局事务注解扫描器的核心变量

(1)ConfigurationChangeListener接口

(2)InitializingBean接口

(3)ApplicationContextAware接口

(4)DisposableBean接口

(5)GlobalTransactionScanner核心变量

(1)ConfigurationChangeListener接口

实现了该接口的Bean,可以处理配置变更的事件。

//实现了该ConfigurationChangeListener接口的Bean:
//在发生配置变更事件时,可以进行相应的处理
public interface ConfigurationChangeListener {int CORE_LISTENER_THREAD = 1;int MAX_LISTENER_THREAD = 1;//默认的线程池ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(CORE_LISTENER_THREAD, MAX_LISTENER_THREAD,Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),new NamedThreadFactory("configListenerOperate", MAX_LISTENER_THREAD));//处理配置变更的事件void onChangeEvent(ConfigurationChangeEvent event);//配置变更事件的默认处理:获取默认的线程池来处理配置变更的事件default void onProcessEvent(ConfigurationChangeEvent event) {getExecutorService().submit(() -> {//处理配置变更事件前的回调beforeEvent();//进行具体的配置变更事件处理onChangeEvent(event);//处理配置变更事件后的回调afterEvent();});}//关闭线程池default void onShutDown() {getExecutorService().shutdownNow();}//获取线程池default ExecutorService getExecutorService() {return EXECUTOR_SERVICE;}//处理配置变更事件前的默认回调default void beforeEvent() {}//处理配置变更事件后的默认回调default void afterEvent() {}
}

(2)InitializingBean接口

实现了该接口的Bean,可以在初始化后进行回调。

//实现了该InitializingBean接口的Bean:
//它的所有properties属性被BeanFactory设置之后,
//可以通过afterPropertiesSet()这个回调方法,来处理一些特殊的初始化操作
public interface InitializingBean {void afterPropertiesSet() throws Exception;
}

(3)ApplicationContextAware接口

实现了该接口的Bean,可以获取Spring容器。

//实现了该ApplicationContextAware接口的Bean:
//可以通过setApplicationContext()方法将Spring容器注入到这个Bean里面
//注入 == set属性,代理 == wrap包装
public interface ApplicationContextAware extends Aware {void setApplicationContext(ApplicationContext applicationContext) throws BeansException;
}

(4)DisposableBean接口

实现了该接口的Bean,可以在Spring容器被销毁时进行相应的回调处理。

//实现了该DisposableBean接口的Bean:
//当Spring容器被销毁时,可以通过destroy()方法释放资源
public interface DisposableBean {void destroy() throws Exception;
}

(5)GlobalTransactionScanner核心变量

//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionScanner.class);private static final int AT_MODE = 1;private static final int MT_MODE = 2;private static final int ORDER_NUM = 1024;private static final int DEFAULT_MODE = AT_MODE + MT_MODE;private static final String SPRING_TRANSACTION_INTERCEPTOR_CLASS_NAME = "org.springframework.transaction.interceptor.TransactionInterceptor";private static final Set<String> PROXYED_SET = new HashSet<>();private static final Set<String> EXCLUDE_BEAN_NAME_SET = new HashSet<>();private static final Set<ScannerChecker> SCANNER_CHECKER_SET = new LinkedHashSet<>();//Spring容器private static ConfigurableListableBeanFactory beanFactory;//Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;//对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器private MethodInterceptor globalTransactionalInterceptor;//应用程序ID,在XML里配置时注入进来的private final String applicationId;//分布式事务组private final String txServiceGroup;//分布式事务模式,默认就是AT事务private final int mode;//与阿里云整合使用的,accessKey和secretKey是进行身份认证和安全访问时需要用到private String accessKey;private String secretKey;//是否禁用全局事务,默认是falseprivate volatile boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);//确保初始化方法仅仅调用一次的CAS变量//通过Atomic CAS操作可以确保多线程并发下,方法只被调用一次//只有一个线程可以成功对initialized原子变量进行CAS操作private final AtomicBoolean initialized = new AtomicBoolean(false);//全局事务失败时会有一个handler处理钩子//比如当开启全局事务失败、提交全局事务失败、回滚全局事务失败、回滚重试全局事务失败时,都会在FailureHandler有相应的回调入口private final FailureHandler failureHandlerHook;//Spring容器private ApplicationContext applicationContext;...
}

3.Spring容器初始化完触发Seata客户端初始化

Spring容器启动和初始化完毕后,会调用InitializingBean的afterPropertiesSet()方法进行回调。

GlobalTransactionScanner.afterPropertiesSet()方法会调用initClient()方法,并且会通过CAS操作确保initClient()方法仅执行一次。

initClient()方法是全局事务注解扫描器GlobalTransactionScanner的核心方法,它会负责对Seata客户端进行初始化。

对于Seata客户端来说,有两个重要的组件:一个是TM(即Transaction Manager)全局事务管理器,另一个是RM(即Resource Manager)分支事务资源管理器。

在initClient()方法中,会先调用TMClient的init()方法对TM全局事务管理器客户端进行初始化,然后调用RMClient的init()方法对RM分支事务资源管理器客户端进行初始化。

//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:让Bean可以获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//确保初始化方法仅仅调用一次的CAS变量//通过Atomic CAS操作可以确保多线程并发下,方法只被调用一次//只有一个线程可以成功对initialized原子变量进行CAS操作private final AtomicBoolean initialized = new AtomicBoolean(false);...public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) {setOrder(ORDER_NUM);//启用对目标class创建动态代理setProxyTargetClass(true);//设置应用程序IDthis.applicationId = applicationId;//设置分布式事务服务分组this.txServiceGroup = txServiceGroup;//设置分布式事务模式,默认是ATthis.mode = mode;//设置全局事务失败回调钩子this.failureHandlerHook = failureHandlerHook;}//DisposableBean接口的回调方法//当Spring容器被销毁、系统停止时,所做的一些资源销毁和释放@Overridepublic void destroy() {ShutdownHook.getInstance().destroyAll();}//InitializingBean接口的回调方法//Spring容器启动和初始化完毕后,会调用如下的afterPropertiesSet()方法进行回调@Overridepublic void afterPropertiesSet() {//是否禁用了全局事务,默认是falseif (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);return;}//通过CAS操作确保initClient()初始化动作仅仅执行一次if (initialized.compareAndSet(false, true)) {//initClient()方法会对Seata Client进行初始化,比如和Seata Server建立长连接//seata-samples的业务服务、订单服务、库存服务、账号服务的spring.xml配置文件里都配置了GlobalTransactionScanner这个Bean//而GlobalTransactionScanner这个Bean伴随着Spring容器的初始化完毕,都会回调其初始化逻辑initClient()initClient();}}//initClient()是核心方法,负责对Seata Client客户端进行初始化private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}//对于Seata Client来说,最重要的组件有两个://一个是TM,即Transaction Manager,全局事务管理器//一个是RM,即Resource Manager,分支事务资源管理器//init TM//TMClient.init()会对TM全局事务管理器客户端进行初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//init RM//RMClient.init()会对RM分支事务资源管理器客户端进行初始化RMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}//注册Spring容器被销毁时的回调钩子,释放TM和RM两个组件的一些资源registerSpringShutdownHook();}private void registerSpringShutdownHook() {if (applicationContext instanceof ConfigurableApplicationContext) {((ConfigurableApplicationContext) applicationContext).registerShutdownHook();ShutdownHook.removeRuntimeShutdownHook();}ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));}...
}

4.TM全局事务管理器客户端初始化的源码

TM全局事务管理器在进行初始化之前,会先通过TmNettyRemotingClient的getInstance()方法获取TM组件的Netty网络通信客户端实例,该方法使用了Double Check双重检查机制。

对TM组件的Netty网络通信客户端实例TmNettyRemotingClient进行实例化时,会传入一个创建好的Netty网络通信客户端配置实例NettyClientConfig,以及一个创建好的线程池messageExecutor。

public class TMClient {public static void init(String applicationId, String transactionServiceGroup) {init(applicationId, transactionServiceGroup, null, null);}public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {//获取TM组件的Netty网络通信客户端实例TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
}public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...public static TmNettyRemotingClient getInstance() {//Java并发编程里经典的Double Checkif (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {//创建一个NettyClientConfig,作为Netty网络通信客户端的配置NettyClientConfig nettyClientConfig = new NettyClientConfig();//创建一个线程池final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()),RejectedPolicies.runsOldestTaskPolicy());instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}private TmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);//安全认证signer数字签名组件//EnhancedServiceLoader对一个接口进行加载,类似于Seata SPI机制this.signer = EnhancedServiceLoader.load(AuthSigner.class);}...
}public class NettyClientConfig extends NettyBaseConfig {private int connectTimeoutMillis = 10000;//连接超时时间private int clientSocketSndBufSize = 153600;//客户端Socket发送Buffer大小private int clientSocketRcvBufSize = 153600;//客户端Socket接收Buffer大小private int clientWorkerThreads = WORKER_THREAD_SIZE;//客户端工作线程private final Class<? extends Channel> clientChannelClazz = CLIENT_CHANNEL_CLAZZ;//客户端Channel类private int perHostMaxConn = 2;//每个host的最大连接数private static final int PER_HOST_MIN_CONN = 2;private int pendingConnSize = Integer.MAX_VALUE;private static final long RPC_RM_REQUEST_TIMEOUT = CONFIG.getLong(ConfigurationKeys.RPC_RM_REQUEST_TIMEOUT, DEFAULT_RPC_RM_REQUEST_TIMEOUT);private static final long RPC_TM_REQUEST_TIMEOUT = CONFIG.getLong(ConfigurationKeys.RPC_TM_REQUEST_TIMEOUT, DEFAULT_RPC_TM_REQUEST_TIMEOUT);private static String vgroup;private static String clientAppName;private static int clientType;private static int maxInactiveChannelCheck = 10;private static final int MAX_NOT_WRITEABLE_RETRY = 2000;private static final int MAX_CHECK_ALIVE_RETRY = 300;private static final int CHECK_ALIVE_INTERVAL = 10;private static final String SOCKET_ADDRESS_START_CHAR = "/";private static final long MAX_ACQUIRE_CONN_MILLS = 60 * 1000L;private static final String RPC_DISPATCH_THREAD_PREFIX = "rpcDispatch";private static final int DEFAULT_MAX_POOL_ACTIVE = 1;private static final int DEFAULT_MIN_POOL_IDLE = 0;private static final boolean DEFAULT_POOL_TEST_BORROW = true;private static final boolean DEFAULT_POOL_TEST_RETURN = true;private static final boolean DEFAULT_POOL_LIFO = true;private static final boolean ENABLE_CLIENT_BATCH_SEND_REQUEST = CONFIG.getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST, DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST);...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...private final NettyClientBootstrap clientBootstrap;private NettyClientChannelManager clientChannelManager;...public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {super(messageExecutor);this.transactionRole = transactionRole;clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);clientBootstrap.setChannelHandlers(new ClientHandler());clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}...
}

5.TM组件的Netty网络通信客户端初始化源码

TmNettyRemotingClient进行网络通信初始化时,主要是通过继承的AbstractNettyRemotingClient的构造方法来初始化的。

在AbstractNettyRemotingClient的构造方法中,首先会创建Netty网络通信客户端实例NettyClientBootstrap,然后对该Netty网络通信客户端实例设置ChannelHandler为ClientHandler,接着会创建Netty长连接管理器实例NettyClientChannelManager。

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...private final NettyClientBootstrap clientBootstrap;private NettyClientChannelManager clientChannelManager;...public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {super(messageExecutor);this.transactionRole = transactionRole;//首先创建Netty网络通信客户端实例NettyClientBootstrapclientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//然后对该Netty网络通信客户端实例设置ChannelHandlerclientBootstrap.setChannelHandlers(new ClientHandler());//接着创建Netty长连接管理器实例NettyClientChannelManagerclientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}...
}public class NettyClientBootstrap implements RemotingBootstrap {private final NettyClientConfig nettyClientConfig;private final Bootstrap bootstrap = new Bootstrap();private final EventLoopGroup eventLoopGroupWorker;private EventExecutorGroup defaultEventExecutorGroup;private ChannelHandler[] channelHandlers;...public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) {this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}protected void setChannelHandlers(final ChannelHandler... handlers) {if (handlers != null) {channelHandlers = handlers;}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {synchronized (lock) {if (ctx.channel().isWritable()) {lock.notifyAll();}}ctx.fireChannelWritabilityChanged();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (messageExecutor.isShutdown()) {return;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel inactive: {}", ctx.channel());}clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));super.channelInactive(ctx);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {if (LOGGER.isInfoEnabled()) {LOGGER.info("channel {} read idle.", ctx.channel());}try {String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());clientChannelManager.invalidateObject(serverAddress, ctx.channel());} catch (Exception exx) {LOGGER.error(exx.getMessage());} finally {clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));}}if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {try {if (LOGGER.isDebugEnabled()) {LOGGER.debug("will send ping msg,channel {}", ctx.channel());}AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);} catch (Throwable throwable) {LOGGER.error("send request error: {}", throwable.getMessage(), throwable);}}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));if (LOGGER.isInfoEnabled()) {LOGGER.info("remove exception rm channel:{}", ctx.channel());}super.exceptionCaught(ctx, cause);}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {if (LOGGER.isInfoEnabled()) {LOGGER.info(ctx + " will closed");}super.close(ctx, future);}}...
}//管理Netty客户端的网络连接
class NettyClientChannelManager {private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;private Function<String, NettyPoolKey> poolKeyFunction;NettyClientChannelManager(final NettyPoolableFactory keyPoolableFactory, final Function<String, NettyPoolKey> poolKeyFunction, final NettyClientConfig clientConfig) {nettyClientKeyPool = new GenericKeyedObjectPool<>(keyPoolableFactory);nettyClientKeyPool.setConfig(getNettyPoolConfig(clientConfig));this.poolKeyFunction = poolKeyFunction;}...
}

6.Seata框架的SPI动态扩展机制源码

Seata的SPI扩展机制和Dubbo的SPI扩展机制是一样的。很多开源框架的内核源码里的关键组件,都会定义成接口。然后在框架运行过程中,就可以根据接口去加载可能实现的动态扩展。

这些动态扩展会在如下目录进行配置,这个目录下的文件名就是可以进行动态扩展的接口名称,文件里的内容就是该接口的实现类。

src/resources/META-INF.services

比如在src/resources/META-INF.services/目录下,有一个名为如下文件名的文件,表示可动态扩展的接口是如下接口名,该文件里配置的几个类就是实现了该接口的类。

文件名:io.seata.spring.annotation.ScannerChecker
接口名:io.seata.spring.annotation.ScannerChecker

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private TmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);//安全认证signer数字签名组件//EnhancedServiceLoader对一个接口进行加载,类似于Seata SPI机制this.signer = EnhancedServiceLoader.load(AuthSigner.class);}...
}//The type Enhanced service loader.
public class EnhancedServiceLoader {...//load service providerpublic static <S> S load(Class<S> service) throws EnhancedServiceNotFoundException {return InnerEnhancedServiceLoader.getServiceLoader(service).load(findClassLoader());}private static ClassLoader findClassLoader() {return EnhancedServiceLoader.class.getClassLoader();}...private static class InnerEnhancedServiceLoader<S> {private static final Logger LOGGER = LoggerFactory.getLogger(InnerEnhancedServiceLoader.class);private static final String SERVICES_DIRECTORY = "META-INF/services/";private static final String SEATA_DIRECTORY = "META-INF/seata/";private static final ConcurrentMap<Class<?>, InnerEnhancedServiceLoader<?>> SERVICE_LOADERS = new ConcurrentHashMap<>();private final Class<S> type;private final Holder<List<ExtensionDefinition>> definitionsHolder = new Holder<>();private final ConcurrentMap<ExtensionDefinition, Holder<Object>> definitionToInstanceMap = new ConcurrentHashMap<>();private final ConcurrentMap<String, List<ExtensionDefinition>> nameToDefinitionsMap = new ConcurrentHashMap<>();private final ConcurrentMap<Class<?>, ExtensionDefinition> classToDefinitionMap = new ConcurrentHashMap<>();private InnerEnhancedServiceLoader(Class<S> type) {this.type = type;}...//Get the ServiceLoader for the specified Classprivate static <S> InnerEnhancedServiceLoader<S> getServiceLoader(Class<S> type) {if (type == null) {throw new IllegalArgumentException("Enhanced Service type is null");}return (InnerEnhancedServiceLoader<S>)CollectionUtils.computeIfAbsent(SERVICE_LOADERS, type,key -> new InnerEnhancedServiceLoader<>(type));}//Specify classLoader to load the service providerprivate S load(ClassLoader loader) throws EnhancedServiceNotFoundException {return loadExtension(loader, null, null);}private S loadExtension(ClassLoader loader, Class[] argTypes, Object[] args) {try {loadAllExtensionClass(loader);ExtensionDefinition defaultExtensionDefinition = getDefaultExtensionDefinition();return getExtensionInstance(defaultExtensionDefinition, loader, argTypes, args);} catch (Throwable e) {if (e instanceof EnhancedServiceNotFoundException) {throw (EnhancedServiceNotFoundException)e;} else {throw new EnhancedServiceNotFoundException("not found service provider for : " + type.getName() + " caused by " + ExceptionUtils.getFullStackTrace(e));}}}private ExtensionDefinition getDefaultExtensionDefinition() {List<ExtensionDefinition> currentDefinitions = definitionsHolder.get();return CollectionUtils.getLast(currentDefinitions);}       private S getExtensionInstance(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args) {if (definition == null) {throw new EnhancedServiceNotFoundException("not found service provider for : " + type.getName());}if (Scope.SINGLETON.equals(definition.getScope())) {Holder<Object> holder = CollectionUtils.computeIfAbsent(definitionToInstanceMap, definition, key -> new Holder<>());Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {instance = createNewExtension(definition, loader, argTypes, args);holder.set(instance);}}}return (S)instance;} else {return createNewExtension(definition, loader, argTypes, args);}}private S createNewExtension(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args) {Class<?> clazz = definition.getServiceClass();try {S newInstance = initInstance(clazz, argTypes, args);return newInstance;} catch (Throwable t) {throw new IllegalStateException("Extension instance(definition: " + definition + ", class: " + type + ")  could not be instantiated: " + t.getMessage(), t);}}private S initInstance(Class implClazz, Class[] argTypes, Object[] args) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {S s = null;if (argTypes != null && args != null) {//Constructor with argumentsConstructor<S> constructor = implClazz.getDeclaredConstructor(argTypes);s = type.cast(constructor.newInstance(args));} else {//default Constructors = type.cast(implClazz.newInstance());}if (s instanceof Initialize) {((Initialize)s).init();}return s;}private List<Class> loadAllExtensionClass(ClassLoader loader) {List<ExtensionDefinition> definitions = definitionsHolder.get();if (definitions == null) {synchronized (definitionsHolder) {definitions = definitionsHolder.get();if (definitions == null) {definitions = findAllExtensionDefinition(loader);definitionsHolder.set(definitions);}}}return definitions.stream().map(def -> def.getServiceClass()).collect(Collectors.toList());}private List<ExtensionDefinition> findAllExtensionDefinition(ClassLoader loader) {List<ExtensionDefinition> extensionDefinitions = new ArrayList<>();try {//Seata的SPI扩展机制和Dubbo的SPI扩展机制是一样的//对于开源框架的内核源码里的很多关键组件,都会定义接口//然后在开源框架的运行过程中,就可以针对这个接口去加载可能实现的动态扩展//这些动态扩展接口文件的配置位于:src/resources/META-INF.services//在该文件里对指定的接口定义自己的实现类,比如:src/resources/META-INF.services/io.seata.spring.annotation.ScannerCheckerloadFile(SERVICES_DIRECTORY, loader, extensionDefinitions);loadFile(SEATA_DIRECTORY, loader, extensionDefinitions);} catch (IOException e) {throw new EnhancedServiceNotFoundException(e);}//After loaded all the extensions,sort the caches by orderif (!nameToDefinitionsMap.isEmpty()) {for (List<ExtensionDefinition> definitions : nameToDefinitionsMap.values()) {Collections.sort(definitions, (def1, def2) -> {int o1 = def1.getOrder();int o2 = def2.getOrder();return Integer.compare(o1, o2);});}}if (!extensionDefinitions.isEmpty()) {Collections.sort(extensionDefinitions, (definition1, definition2) -> {int o1 = definition1.getOrder();int o2 = definition2.getOrder();return Integer.compare(o1, o2);});}return extensionDefinitions;}private void loadFile(String dir, ClassLoader loader, List<ExtensionDefinition> extensions) throws IOException {String fileName = dir + type.getName();Enumeration<java.net.URL> urls;if (loader != null) {urls = loader.getResources(fileName);} else {urls = ClassLoader.getSystemResources(fileName);}if (urls != null) {while (urls.hasMoreElements()) {java.net.URL url = urls.nextElement();try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), Constants.DEFAULT_CHARSET))) {String line;while ((line = reader.readLine()) != null) {final int ci = line.indexOf('#');if (ci >= 0) {line = line.substring(0, ci);}line = line.trim();if (line.length() > 0) {try {ExtensionDefinition extensionDefinition = getUnloadedExtensionDefinition(line, loader);if (extensionDefinition == null) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("The same extension {} has already been loaded, skipped", line);}continue;}extensions.add(extensionDefinition);} catch (LinkageError | ClassNotFoundException e) {LOGGER.warn("Load [{}] class fail. {}", line, e.getMessage());}}}} catch (Throwable e) {LOGGER.warn("load clazz instance error: {}", e.getMessage());}}}}private ExtensionDefinition getUnloadedExtensionDefinition(String className, ClassLoader loader) throws ClassNotFoundException {//Check whether the definition has been loadedif (!isDefinitionContainsClazz(className, loader)) {Class<?> clazz = Class.forName(className, true, loader);String serviceName = null;Integer priority = 0;Scope scope = Scope.SINGLETON;LoadLevel loadLevel = clazz.getAnnotation(LoadLevel.class);if (loadLevel != null) {serviceName = loadLevel.name();priority = loadLevel.order();scope = loadLevel.scope();}ExtensionDefinition result = new ExtensionDefinition(serviceName, priority, scope, clazz);classToDefinitionMap.put(clazz, result);if (serviceName != null) {CollectionUtils.computeIfAbsent(nameToDefinitionsMap, serviceName, e -> new ArrayList<>()).add(result);}return result;}return null;}private boolean isDefinitionContainsClazz(String className, ClassLoader loader) {for (Map.Entry<Class<?>, ExtensionDefinition> entry : classToDefinitionMap.entrySet()) {if (!entry.getKey().getName().equals(className)) {continue;}if (Objects.equals(entry.getValue().getServiceClass().getClassLoader(), loader)) {return true;}}return false;}...}
}

相关文章:

  • Ansys Zemax | 在 MATLAB 或 Python 中使用 ZOS-API 进行光线追迹的批次处理
  • Web》》url 参数 # 、 ? 、@
  • element ui 级联列表Cascader懒加载数据回显的优雅解决方案
  • LocalDateTime类型的时间在前端页面不显示或者修改数据时因为LocalDateTime导致无法修改,解决方案
  • 会计要素+借贷分录+会计科目+账户,几个银行会计的重要概念
  • 【J2】乘法逆元
  • 将b[索引]中元素按照a中元素的值进行排序
  • C++核心编程--1 内存分区模型
  • python打卡day26
  • 如何在线免费压缩PDF文档?
  • 【MySQL】多表连接查询
  • 各个历史版本mysql/tomcat/Redis/Jdk/Apache/gitlab下载地址
  • 2024年9月电子学会等级考试五级第三题——整数分解
  • 【蓝桥杯省赛真题49】python偶数 第十五届蓝桥杯青少组Python编程省赛真题解析
  • zynq嵌入式linux启动默认设置
  • 钉钉数据与金蝶云星空的无缝集成解决方案
  • 嵌入式开发学习日志(数据结构--双链表)Day21
  • C++ QT图片查看器
  • 掘金中亚货代蓝海,易境通货代系统解锁数字化制胜密码!
  • Python实战案例:打造趣味猜拳小游戏
  • 美叙领导人25年来首次会面探索关系正常化,特朗普下令解除对叙经济制裁
  • 制造四十余年血腥冲突后,库尔德工人党为何自行解散?
  • 将人工智能送上太空,我国太空计算卫星星座成功发射
  • 习近平出席中国-拉美和加勒比国家共同体论坛第四届部长级会议开幕式
  • 盖茨说对中国技术封锁起到反作用
  • 观众走入剧院空间,人艺之友一起“再造时光”