当前位置: 首页 > wzjs >正文

做视频网站犯法么微信引流推广怎么找平台

做视频网站犯法么,微信引流推广怎么找平台,xampp做网站可以吗,上饶做网站要多少钱大纲 1.Seata的Resource资源接口源码 2.Seata数据源连接池代理的实现源码 3.Client向Server发起注册RM的源码 4.Client向Server注册RM时的交互源码 5.数据源连接代理与SQL句柄代理的初始化源码 6.Seata基于SQL句柄代理执行SQL的源码 7.执行SQL语句前取消自动提交事务的源…

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码

1.Seata的Resource资源接口源码

数据源代理DataSourceProxy不仅实现了Seata的Resource资源接口,同时还继承了实现了SeataDataSourceProxy接口的抽象类AbstractDataSourceProxy。

由于SeataDataSourceProxy接口又继承自JDK提供的DataSource接口,所以通过数据源连接池DataSource接口的方法,可以获取数据源的连接。

注意:这里的数据源==数据库。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {...
}public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {...
}public interface SeataDataSourceProxy extends DataSource {...
}public interface DataSource extends CommonDataSource, Wrapper {//获取数据源连接Connection getConnection() throws SQLException;Connection getConnection(String username, String password) throws SQLException;
}

Seata的Resource资源接口有三个方法:

一.getResourceGroupId()方法用来获取资源分组

比如主从节点同属一个分组。

二.getResourceId()方法用来获取数据源ID

比如数据源连接URL可作为数据源ID。

三.getBranchType()方法用来获取分支事务类型

比如类型有:AT、TCC、SAGA、XA。

//Resource that can be managed by Resource Manager and involved into global transaction.
//资源是由RM资源管理组件来负责管理的
//RM资源管理器组件会负责把一个个的资源纳入到全局事务里去
//比如RM可以管理数据库资源,把一个数据库本地事务纳入到全局事务里去
public interface Resource {//Get the resource group id.//e.g. master and slave data-source should be with the same resource group id.//获取到资源分组ID//主从架构的数据源关联到同一个资源分组ID//比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组IDString getResourceGroupId();//Get the resource id.//e.g. url of a data-source could be the id of the db data-source resource.//比如数据源连接URL可以作为数据源的IDString getResourceId();//get resource type, AT, TCC, SAGA and XA//branchType表示分支事务类型:AT、TCC、SAGA、XABranchType getBranchType();
}

2.Seata数据源连接池代理的实现源码

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

SeataDataSourceProxy数据源代理在继承DataSource数据源连接池的基础上,增加了两个方法:一个是获取代理的目标数据源连接池的方法,一个是获取代理的目标数据源连接池对应的分支事务类型的方法。

public interface SeataDataSourceProxy extends DataSource {//Gets target data source. //获取代理的目标数据源连接池DataSource getTargetDataSource();//Gets branch type. //获取代理的目标数据源连接池对应的分支事务类型BranchType getBranchType();
}

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

AbstractDataSourceProxy抽象类的主要工作是封装代理的目标数据源连接池targetDataSource。

//The type Abstract data source proxy.
//AbstractDataSourceProxy主要的工作就是:
//封装了代理的目标数据源连接池targetDataSource
public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {//The Target data source.//代理目标的连接池,可以通过targetDataSource来获取连接protected DataSource targetDataSource;//Instantiates a new Abstract data source proxy.public AbstractDataSourceProxy(){ }//Instantiates a new Abstract data source proxy.public AbstractDataSourceProxy(DataSource targetDataSource) {this.targetDataSource = targetDataSource;}//Gets target data source.@Overridepublic DataSource getTargetDataSource() {return targetDataSource;}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {return targetDataSource.unwrap(iface);}//判断目标连接池targetDataSource是否包装了指定的接口iface@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return targetDataSource.isWrapperFor(iface);}@Overridepublic PrintWriter getLogWriter() throws SQLException {return targetDataSource.getLogWriter();}@Overridepublic void setLogWriter(PrintWriter out) throws SQLException {targetDataSource.setLogWriter(out);}@Overridepublic void setLoginTimeout(int seconds) throws SQLException {targetDataSource.setLoginTimeout(seconds);}@Overridepublic int getLoginTimeout() throws SQLException {return targetDataSource.getLoginTimeout();}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return targetDataSource.getParentLogger();}
}

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

初始化数据源连接池代理DataSourceProxy的具体逻辑是:首先从目标数据库连接池dataSource中获取一个数据库连接,然后根据这个数据库连接Connection去初始化jdbcUrl和dbType,接着根据数据库连接地址jdbcUrl初始化resourceId,然后把当前数据库连接池代理DataSourceProxy作为一个资源注册到默认的RM即DefaultResourceManager里去,最后设置RootContext上下文即线程本地变量副本中的分支事务类型。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);//默认资源分组IDprivate static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";//Enable the table meta checker,默认是不启用的private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);//Table meta checker interval,默认是60sprivate static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);//资源组ID,比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组IDprivate String resourceGroupId;//代理的目标数据源连接url,这个数据源连接url也可以作为resourceIdprivate String jdbcUrl;//数据源ID,比如数据库连接url就可以作为一个数据源IDprivate String resourceId;//数据源类型private String dbType;//数据源连接用户名private String userName;//定时调度的线程池,定时检查表里的元数据private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tableMetaChecker", 1, true));//Instantiates a new Data source proxy.public DataSourceProxy(DataSource targetDataSource) {this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);}//Instantiates a new Data source proxy.//@param targetDataSource the target data source//@param resourceGroupId  the resource group idpublic DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {if (targetDataSource instanceof SeataDataSourceProxy) {LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();}this.targetDataSource = targetDataSource;init(targetDataSource, resourceGroupId);}//初始化数据源连接池代理DataSourceProxyprivate void init(DataSource dataSource, String resourceGroupId) {//资源分组IDthis.resourceGroupId = resourceGroupId;//从目标数据库连接池dataSource中,获取一个数据库连接try (Connection connection = dataSource.getConnection()) {//获取数据库连接connection里的元数据的连接urljdbcUrl = connection.getMetaData().getURL();//根据连接url获取到数据库类型dbType = JdbcUtils.getDbType(jdbcUrl);if (JdbcConstants.ORACLE.equals(dbType)) {//如果数据库类型等于oracle,则需要获取数据库连接connection的元数据的用户名userName = connection.getMetaData().getUserName();} else if (JdbcConstants.MARIADB.equals(dbType)) {//如果数据库类型等于mariadb,则需要对数据库类型进行赋值为MySQLdbType = JdbcConstants.MYSQL;}} catch (SQLException e) {throw new IllegalStateException("can not init dataSource", e);}//初始化资源ID,也就是获取数据库连接url来初始化resourceIDinitResourceId();//把当前数据库连接池代理,作为一个资源,注册到默认的RM里,也就是DefaultResourceManagerDefaultResourceManager.get().registerResource(this);if (ENABLE_TABLE_META_CHECKER_ENABLE) {tableMetaExecutor.scheduleAtFixedRate(() -> {try (Connection connection = dataSource.getConnection()) {TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());} catch (Exception ignore) {}}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);}//Set the default branch type to 'AT' in the RootContext.//设置RootContext上下文,即线程本地变量副本中的分支事务类型RootContext.setDefaultBranchType(this.getBranchType());}private void initResourceId() {if (JdbcConstants.POSTGRESQL.equals(dbType)) {initPGResourceId();} else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {initDefaultResourceId();resourceId = resourceId + "/" + userName;} else if (JdbcConstants.MYSQL.equals(dbType)) {initMysqlResourceId();} else {initDefaultResourceId();}}private void initMysqlResourceId() {String startsWith = "jdbc:mysql:loadbalance://";if (jdbcUrl.startsWith(startsWith)) {String url;if (jdbcUrl.contains("?")) {url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));} else {url = jdbcUrl;}resourceId = url.replace(",", "|");} else {initDefaultResourceId();}}...
}

3.Client向Server发起注册RM的源码

初始化数据源连接池代理DataSourceProxy时,会将数据库连接池代理作为资源,注册到DefaultResourceManager资源管理器中。

而初始化DefaultResourceManager时,会通过SPI机制加载所有的ResourceManager。

因此在执行DataSourceProxy的init()方法进行初始化时,由于会调用DefaultResourceManager的registerResource()方法,所以最后会执行到DataSourceManager的registerResource()方法。

在DataSourceManager的registerResource()方法中,首先会把数据源连接池代理DataSourceProxy放入一个Map中进行缓存,然后通过RmNettyRemotingClient构造一个注册RM的请求把数据源连接池代理DataSourceProxy作为资源注册到Seata Server中。

public class DefaultResourceManager implements ResourceManager {//all resource managersprotected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}//Get resource manager.public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//init all resource managers//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}@Overridepublic void registerResource(Resource resource) {getResourceManager(resource.getBranchType()).registerResource(resource);}public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}...
}//The type Data source manager.
//DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {//异步化workerprivate final AsyncWorker asyncWorker = new AsyncWorker(this);//RM负责管理的一些resource资源private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();...@Overridepublic void registerResource(Resource resource) {DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;//根据资源ID和数据源代理,把数据源连接池代理DataSourceProxy放入到map里去dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);super.registerResource(dataSourceProxy);}...
}public abstract class AbstractResourceManager implements ResourceManager {...@Overridepublic void registerResource(Resource resource) {//通过RmNettyRemotingClient把RM注册到Seata Server中RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());}...
}

4.Client向Server注册RM时的交互源码

(1)Client异步发送注册RM的请求给Server

(2)Server收到注册RM的请求后的处理及异步响应

(1)Client异步发送注册RM的请求给Server

public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {...//Register new db key.public void registerResource(String resourceGroupId, String resourceId) {//Resource registration cannot be performed until the RM client is initializedif (StringUtils.isBlank(transactionServiceGroup)) {return;}if (getClientChannelManager().getChannels().isEmpty()) {getClientChannelManager().reconnect(transactionServiceGroup);return;}synchronized (getClientChannelManager().getChannels()) {//向每一个Server发起注册for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {String serverAddress = entry.getKey();Channel rmChannel = entry.getValue();if (LOGGER.isInfoEnabled()) {LOGGER.info("will register resourceId:{}", resourceId);}sendRegisterMessage(serverAddress, rmChannel, resourceId);}}}public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);message.setResourceIds(resourceId);try {//异步发送注册RM的请求super.sendAsyncRequest(channel, message);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {getClientChannelManager().releaseChannel(channel, serverAddress);if (LOGGER.isInfoEnabled()) {LOGGER.info("remove not writable channel:{}", channel);}} else {LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void sendAsyncRequest(Channel channel, Object msg) {if (channel == null) {LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");return;}RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);if (rpcMessage.getBody() instanceof MergeMessage) {mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());}super.sendAsync(channel, rpcMessage);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}

(2)Server收到注册RM的请求后的处理及异步响应

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...//Rpc message processing.protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class RegRmProcessor implements RemotingProcessor {...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {onRegRmMessage(ctx, rpcMessage);}private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();//获取请求的发送地址String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());boolean isSuccess = false;String errorInfo = StringUtils.EMPTY;try {if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {//通过Channel管理组件ChannelManager,注册RM网络连接ChannelManager.registerRMChannel(message, ctx.channel());Version.putChannelVersion(ctx.channel(), message.getVersion());isSuccess = true;if (LOGGER.isDebugEnabled()) {LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());}} else {if (LOGGER.isWarnEnabled()) {LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());}}} catch (Exception exx) {isSuccess = false;errorInfo = exx.getMessage();LOGGER.error("RM register fail, error message:{}", errorInfo);}RegisterRMResponse response = new RegisterRMResponse(isSuccess);if (StringUtils.isNotEmpty(errorInfo)) {response.setMsg(errorInfo);}//返回响应给客户端remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);if (isSuccess && LOGGER.isInfoEnabled()) {LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), message.getVersion());}}...
}public class ChannelManager {...public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {Version.checkVersion(resourceManagerRequest.getVersion());Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());RpcContext rpcContext;if (!IDENTIFIED_CHANNELS.containsKey(channel)) {rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),resourceManagerRequest.getResourceIds(), channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);} else {rpcContext = IDENTIFIED_CHANNELS.get(channel);rpcContext.addResources(dbkeySet);}if (dbkeySet == null || dbkeySet.isEmpty()) { return; }for (String resourceId : dbkeySet) {String clientIp;ConcurrentMap<Integer, RpcContext> portMap =CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>()).computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>()).computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());rpcContext.holdInResourceManagerChannels(resourceId, portMap);updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());}}...
}public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@Overridepublic void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {Channel clientChannel = channel;if (!(msg instanceof HeartbeatMessage)) {clientChannel = ChannelManager.getSameClientChannel(channel);}if (clientChannel != null) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE);super.sendAsync(clientChannel, rpcMsg);} else {throw new RuntimeException("channel is error.");}}...
}public abstract class AbstractNettyRemoting implements Disposable {...//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}

5.数据源连接代理与SQL句柄代理的初始化源码

(1)数据库操作的三剑客之连接、句柄和结果

(2)数据源连接代理的初始化

(3)数据源连接代理对SQL进行预编译

(4)SQL句柄代理的初始化

(5)SQL句柄代理执行SQL

(1)数据库操作的三剑客之连接、句柄和结果

Seata Client或者Seata Server进行数据库操作的大致流程如下所示:

public class LogStoreDataBaseDAO implements LogStore {//The Log store data source. 数据源连接池protected DataSource logStoreDataSource = null;...@Overridepublic GlobalTransactionDO queryGlobalTransactionDO(long transactionId) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByTransactionId(globalTable);Connection conn = null;//连接PreparedStatement ps = null;//句柄ResultSet rs = null;//结果try {//1.从数据源连接池中获取数据源连接conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);//2.对sql语句进行预编译ps = conn.prepareStatement(sql);ps.setLong(1, transactionId);//3.执行sql语句rs = ps.executeQuery();if (rs.next()) {return convertGlobalTransactionDO(rs);} else {return null;}} catch (SQLException e) {throw new DataAccessException(e);} finally {IOUtil.close(rs, ps, conn);}}...
}

(2)数据源连接代理的初始化

Seata Client或者Seata Server进行数据库操作时,首先会通过数据库连接池代理DataSourceProxy获取数据库连接,也就是会通过DataSourceProxy的getConnection()方法获取数据源连接代理ConnectionProxy,其中就会根据获取到的一个数据源连接Connection初始化一个数据源连接代理ConnectionProxy。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {...@Overridepublic ConnectionProxy getConnection() throws SQLException {//从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回Connection targetConnection = targetDataSource.getConnection();return new ConnectionProxy(this, targetConnection);}@Overridepublic ConnectionProxy getConnection(String username, String password) throws SQLException {//从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回Connection targetConnection = targetDataSource.getConnection(username, password);return new ConnectionProxy(this, targetConnection);}...
}public class ConnectionProxy extends AbstractConnectionProxy {//Instantiates a new Connection proxy.public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {super(dataSourceProxy, targetConnection);}...
}public abstract class AbstractConnectionProxy implements Connection {//The Data source proxy. 数据源连接池代理protected DataSourceProxy dataSourceProxy;//The Target connection. 目标数据源连接protected Connection targetConnection;//Instantiates a new Abstract connection proxy.public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {this.dataSourceProxy = dataSourceProxy;this.targetConnection = targetConnection;}...
}

(3)数据源连接代理对SQL进行预编译

数据源连接代理ConnectionProxy在进行数据库操作时,获取到数据库连接Connection之后,就需要对要执行的SQL进行预编译,也就是会调用AbstractConnectionProxy的prepareStatement()方法。

public abstract class AbstractConnectionProxy implements Connection {...//对SQL进行预编译@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {String dbType = getDbType();//support oracle 10.2+PreparedStatement targetPreparedStatement = null;//如果是AT模式if (BranchType.AT == RootContext.getBranchType()) {List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);if (sqlRecognizers != null && sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);targetPreparedStatement = getTargetConnection().prepareStatement(sql, pkNameArray);}}}if (targetPreparedStatement == null) {targetPreparedStatement = getTargetConnection().prepareStatement(sql);}//返回一个SQL句柄代理return new PreparedStatementProxy(this, targetPreparedStatement, sql);}...
}

(4)SQL句柄代理的初始化

SQL句柄代理PreparedStatementProxy的初始化主要是设置目标SQL、目标句柄和数据源连接代理。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {//Instantiates a new Prepared statement proxy.public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);}...
}public abstract class AbstractPreparedStatementProxy extends StatementProxy<PreparedStatement> implements PreparedStatement {protected Map<Integer, ArrayList<Object>> parameters;private void initParameterHolder() {this.parameters = new HashMap<>();}//Instantiates a new Abstract prepared statement proxy.public AbstractPreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);initParameterHolder();}...
}public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {//Instantiates a new Statement proxy.public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) throws SQLException {super(connectionWrapper, targetStatement, targetSQL);}...
}public abstract class AbstractStatementProxy<T extends Statement> implements Statement {//The Connection proxy.protected AbstractConnectionProxy connectionProxy;//The Target statement.protected T targetStatement;//The Target sql.protected String targetSQL;...//Instantiates a new Abstract statement proxy.public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) throws SQLException {this.connectionProxy = connectionProxy;this.targetStatement = targetStatement;this.targetSQL = targetSQL;}...
}

(5)SQL句柄代理执行SQL

从数据源连接池中获取到数据源连接,以及对SQL语句进行预编译后,就可以调用SQL句柄代理PreparedStatementProxy的executeQuery()等方法执行SQL语句。

6.Seata基于SQL句柄代理执行SQL的源码

(1)Spring的JdbcTemplate操作数据库的三剑客

(2)基于SQL句柄代理执行SQL的流程

(1)Spring的JdbcTemplate操作数据库的三剑客

连接、句柄和结果。

@Disabled
public class LocalTransactionWithGlobalLockDataSourceBasicTest {private static ClassPathXmlApplicationContext context;private static JdbcTemplate jdbcTemplate;@BeforeAllpublic static void before() {context = new ClassPathXmlApplicationContext("basic-test-context.xml");jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");}@Testpublic void testInsert() {RootContext.bindGlobalLockFlag();jdbcTemplate.update("insert into user0 (id, name, gmt) values (?, ?, ?)", new Object[]{2, "xxx", new Date()});}...
}public class JdbcTemplate extends JdbcAccessor implements JdbcOperations {...@Overridepublic int update(String sql, @Nullable Object... args) throws DataAccessException {return update(sql, newArgPreparedStatementSetter(args));}@Overridepublic int update(String sql, @Nullable PreparedStatementSetter pss) throws DataAccessException {return update(new SimplePreparedStatementCreator(sql), pss);}protected int update(final PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss) throws DataAccessException {logger.debug("Executing prepared SQL update");return updateCount(execute(psc, ps -> {try {if (pss != null) {pss.setValues(ps);}//PreparedStatement执行SQLint rows = ps.executeUpdate();if (logger.isTraceEnabled()) {logger.trace("SQL update affected " + rows + " rows");}return rows;} finally {if (pss instanceof ParameterDisposer) {((ParameterDisposer) pss).cleanupParameters();}}}, true));}@Nullableprivate <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources) throws DataAccessException {Assert.notNull(psc, "PreparedStatementCreator must not be null");Assert.notNull(action, "Callback object must not be null");if (logger.isDebugEnabled()) {String sql = getSql(psc);logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));}//1.获取连接Connection con = DataSourceUtils.getConnection(obtainDataSource());PreparedStatement ps = null;try {//2.创建句柄ps = psc.createPreparedStatement(con);applyStatementSettings(ps);//3.执行SQL的结果T result = action.doInPreparedStatement(ps);handleWarnings(ps);return result;} catch (SQLException ex) {if (psc instanceof ParameterDisposer) {((ParameterDisposer) psc).cleanupParameters();}String sql = getSql(psc);psc = null;JdbcUtils.closeStatement(ps);ps = null;DataSourceUtils.releaseConnection(con, getDataSource());con = null;throw translateException("PreparedStatementCallback", sql, ex);} finally {if (closeResources) {if (psc instanceof ParameterDisposer) {((ParameterDisposer) psc).cleanupParameters();}JdbcUtils.closeStatement(ps);DataSourceUtils.releaseConnection(con, getDataSource());}}}...
}

(2)基于SQL句柄代理执行SQL的流程

SQL句柄代理PreparedStatementProxy在调用execute()方法执行SQL时,就会调用到ExecuteTemplate执行模版的execute()方法。

而ExecuteTemplate执行模版的execute()方法,如果发现不需要全局锁 + 没有开启全局事务,那么就普通执行本地事务。否则,最终就会调用到BaseTransactionalExecutor的excute()方法。

在BaseTransactionalExecutor的excute()方法中,首先会从线程本地变量副本中获取xid,然后再执行SQL。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {...@Overridepublic boolean execute() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());}@Overridepublic ResultSet executeQuery() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());}@Overridepublic int executeUpdate() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());}...
}public class ExecuteTemplate {...public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {return execute(null, statementProxy, statementCallback, args);}public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {//如果发现不需要全局锁,而且没有开启AT模式下的全局事务,那么就普通执行本地事务if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {//Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}//获取到DB的类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);}Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);switch (sqlRecognizer.getSQLType()) {case INSERT://通过SPI机制加载InsertExecutorexecutor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch (dbType) {case JdbcConstants.MYSQL:case JdbcConstants.MARIADB:executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {//比如下面最终会调用BaseTransactionalExecutor.excute()方法rs = executor.execute(args);} catch (Throwable ex) {if (!(ex instanceof SQLException)) {// Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}...
}@LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE)
public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable {...//Instantiates a new Abstract dml base executor.public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}...
}public abstract class BaseInsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> implements InsertExecutor<T> {...public BaseInsertExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overridepublic T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}...
}public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {//The Statement proxy.protected StatementProxy<S> statementProxy;//The Statement callback.protected StatementCallback<T, S> statementCallback;//The Sql recognizer.protected SQLRecognizer sqlRecognizer;...public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,SQLRecognizer sqlRecognizer) {this.statementProxy = statementProxy;this.statementCallback = statementCallback;this.sqlRecognizer = sqlRecognizer;}...@Overridepublic T execute(Object... args) throws Throwable {//获取xidString xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}//Do execute object.protected abstract T doExecute(Object... args) throws Throwable;...
}

7.执行SQL语句前取消自动提交事务的源码

执行ExecuteTemplate执行模版的execute()方法时,最终会调用到BaseTransactionalExecutor基础事务执行器的excute()方法。

执行BaseTransactionalExecutor的execute()方法时,又会执行到AbstractDMLBaseExecutor的doExecute()方法。该方法会判断目标数据库连接是否会自动提交本地事务,默认情况下本地事务都是自动提交的。如果是,则取消自动提交本地事务。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {//The Statement proxy.protected StatementProxy<S> statementProxy;//The Statement callback.protected StatementCallback<T, S> statementCallback;//The Sql recognizer.protected SQLRecognizer sqlRecognizer;...public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,SQLRecognizer sqlRecognizer) {this.statementProxy = statementProxy;this.statementCallback = statementCallback;this.sqlRecognizer = sqlRecognizer;}...@Overridepublic T execute(Object... args) throws Throwable {//获取xidString xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}//Do execute object.protected abstract T doExecute(Object... args) throws Throwable;...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overridepublic T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}...
}public abstract class AbstractConnectionProxy implements Connection {...@Overridepublic boolean getAutoCommit() throws SQLException {//判断目标数据库连接是否是自动提交,默认情况是都是自动提交的return targetConnection.getAutoCommit();}...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...//Execute auto commit true t.protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//修改自动提交事务的设置,此时需要阻止自动提交事务connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {T result = executeAutoCommitFalse(args);//执行SQL语句connectionProxy.commit();//手动提交本地事务return result;});} catch (Exception e) {//when exception occur in finally,this exception will lost, so just print it hereLOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}...
}public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...//change connection autoCommit to false by seatapublic void changeAutoCommit() throws SQLException {getContext().setAutoCommitChanged(true);setAutoCommit(false);}//Gets context.public ConnectionContext getContext() {return context;}@Overridepublic void setAutoCommit(boolean autoCommit) throws SQLException {if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {//change autocommit from false to true, we should commit() first according to JDBC spec.doCommit();}//把目标数据源连接的自动提交事务设置为falsetargetConnection.setAutoCommit(autoCommit);}...
}

8.执行SQL语句前后构建数据镜像的源码

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

(2)以UpdateExecuto为例构建前后镜像

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

一.首先设置数据源连接阻止其自动提交事务

二.根据目标SQL语句构建beforeImage前镜像

三.执行目标SQL语句(但还没提交其对应的事务)

四.根据beforeImage前镜像构建afterImage后镜像

五.根据前镜像和后镜像构建UndoLog数据

六.手动提交数据源连接代理的事务

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...//Execute auto commit true t.protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//修改数据源连接的自动提交事务的设置,此时需要阻止自动提交事务connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {T result = executeAutoCommitFalse(args);//执行SQL语句connectionProxy.commit();//手动提交本地事务return result;});} catch (Exception e) {// when exception occur in finally,this exception will lost, so just print it hereLOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}//Execute auto commit false t.protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");}//根据目标SQL语句构建beforeImage,表示目标SQL执行前的数据镜像TableRecords beforeImage = beforeImage();//接下来真正去执行这条SQL语句,但是此时本地事务还不会提交T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();if (updateCount > 0) {//根据beforeImage构建afterImage,表示目标SQL执行后的数据镜像TableRecords afterImage = afterImage(beforeImage);//根据beforeImage和afterImage准备undoLog数据到数据源连接代理中prepareUndoLog(beforeImage, afterImage);}return result;}...
}

(2)以UpdateExecutor为例构建前后镜像

public class TableRecords implements java.io.Serializable {//表的元数据private transient TableMeta tableMeta;//表的名称private String tableName;//表的多行数据private List<Row> rows = new ArrayList<Row>();...
}public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {private static final Configuration CONFIG = ConfigurationFactory.getInstance();private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);//Instantiates a new Update executor.public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overrideprotected TableRecords beforeImage() throws SQLException {ArrayList<List<Object>> paramAppenderList = new ArrayList<>();TableMeta tmeta = getTableMeta();//根据主键ID值拼接一个SQL语句,查询这条数据更新前的镜像String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);return buildTableRecords(tmeta, selectSQL, paramAppenderList);}private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;List<String> updateColumns = recognizer.getUpdateColumns();StringBuilder prefix = new StringBuilder("SELECT ");StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());String whereCondition = buildWhereCondition(recognizer, paramAppenderList);String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);String limitCondition = buildLimitCondition(recognizer, paramAppenderList);if (StringUtils.isNotBlank(whereCondition)) {suffix.append(WHERE).append(whereCondition);}if (StringUtils.isNotBlank(orderByCondition)) {suffix.append(" ").append(orderByCondition);}if (StringUtils.isNotBlank(limitCondition)) {suffix.append(" ").append(limitCondition);}suffix.append(" FOR UPDATE");StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());if (ONLY_CARE_UPDATE_COLUMNS) {if (!containsPK(updateColumns)) {selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));}for (String columnName : updateColumns) {selectSQLJoin.add(columnName);}//The on update xxx columns will be auto update by db, so it's also the actually updated columnsList<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();onUpdateColumns.removeAll(updateColumns);for (String onUpdateColumn : onUpdateColumns) {selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));}} else {for (String columnName : tableMeta.getAllColumns().keySet()) {selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));}}return selectSQLJoin.toString();}@Overrideprotected TableRecords afterImage(TableRecords beforeImage) throws SQLException {TableMeta tmeta = getTableMeta();if (beforeImage == null || beforeImage.size() == 0) {return TableRecords.empty(getTableMeta());}String selectSQL = buildAfterImageSQL(tmeta, beforeImage);ResultSet rs = null;try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);rs = pst.executeQuery();return TableRecords.buildRecords(tmeta, rs);} finally {IOUtil.close(rs);}}private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {StringBuilder prefix = new StringBuilder("SELECT ");String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);if (ONLY_CARE_UPDATE_COLUMNS) {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;List<String> updateColumns = recognizer.getUpdateColumns();if (!containsPK(updateColumns)) {selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));}for (String columnName : updateColumns) {selectSQLJoiner.add(columnName);}//The on update xxx columns will be auto update by db, so it's also the actually updated columnsList<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();onUpdateColumns.removeAll(updateColumns);for (String onUpdateColumn : onUpdateColumns) {selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));}} else {for (String columnName : tableMeta.getAllColumns().keySet()) {selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));}}return selectSQLJoiner.toString();}
}

9.构建全局锁的key和UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

(2)构建全局锁的key的源码

(3)构建UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

在基础事务执行器BaseTransactionalExecutor的prepareUndoLog()方法中,会构建全局锁的key和构建UndoLog数据,并把它们设置到数据源连接代理ConnectionProxy中。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//prepare undo log.//@param beforeImage the before image//@param afterImage  the after imageprotected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {return;}if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {if (beforeImage.getRows().size() != afterImage.getRows().size()) {throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");}}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;//构建全局锁的key//比如更新了一批数据,那么需要针对这批数据的主键ID,来构建这批数据的全局锁的keyString lockKeys = buildLockKey(lockKeyRecords);if (null != lockKeys) {//将全局锁key设置到数据源连接代理ConnectionProxy中connectionProxy.appendLockKey(lockKeys);//构建UndoLogSQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);//将UndoLog设置到数据源连接代理ConnectionProxy中connectionProxy.appendUndoLog(sqlUndoLog);}}...
}

(2)构建全局锁的key的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//build lockKey//@param rowsIncludingPK the records//@return the string as local key. the local key example(multi pk): "t_user:1_a,2_b"protected String buildLockKey(TableRecords rowsIncludingPK) {if (rowsIncludingPK.size() == 0) {return null;}//构建出来的全局锁的key形式为:table_name:id_11001StringBuilder sb = new StringBuilder();sb.append(rowsIncludingPK.getTableMeta().getTableName());sb.append(":");int filedSequence = 0;//pksRows指的是,更新的每一行数据主键字段和主键的值List<Map<String, Field>> pksRows = rowsIncludingPK.pkRows();//获取到主键字段名称,主键可能是联合主键,主键字段的名称可能有多个List<String> primaryKeysOnlyName = getTableMeta().getPrimaryKeyOnlyName();//rowMap就是一行数据,rowMap中的key是字段名称,value是字段值for (Map<String, Field> rowMap : pksRows) {int pkSplitIndex = 0;//遍历和提取这行数据里多个主键字段的名称for (String pkName : primaryKeysOnlyName) {if (pkSplitIndex > 0) {sb.append("_");}//获取到多个主键字段的value,然后拼接在一起sb.append(rowMap.get(pkName).getValue());pkSplitIndex++;}filedSequence++;if (filedSequence < pksRows.size()) {sb.append(",");}}//最终拼成的key形如:table_name:1101_aadd,table_name:xxxx_xxxreturn sb.toString();}...
}

(3)构建UndoLog数据的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//build a SQLUndoLog//@param beforeImage the before image//@param afterImage  the after imageprotected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {SQLType sqlType = sqlRecognizer.getSQLType();String tableName = sqlRecognizer.getTableName();SQLUndoLog sqlUndoLog = new SQLUndoLog();sqlUndoLog.setSqlType(sqlType);//SQL的类型可能为insert、update、deletesqlUndoLog.setTableName(tableName);//表的名称sqlUndoLog.setBeforeImage(beforeImage);//SQL执行前的数据镜像sqlUndoLog.setAfterImage(afterImage);//SQL执行后的数据镜像return sqlUndoLog;}...
}public class SQLUndoLog implements java.io.Serializable {private SQLType sqlType;private String tableName;private TableRecords beforeImage;private TableRecords afterImage;...
}

10.Seata Client发起分支事务注册的源码

(1)ConnectionProxy.commit()提交事务

(2)ConnectionProxy.register()注册分支事务

(1)ConnectionProxy.commit()提交事务

执行数据源连接代理ConnectionProxy的commit()方法提交事务的时候,首先会先调用数据源连接代理ConnectionProxy的register()方法注册分支事务。

public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...@Overridepublic void commit() throws SQLException {try {//通过全局锁重试策略组件来执行本地事务的提交lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {rollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}private void doCommit() throws SQLException {if (context.inGlobalTransaction()) {processGlobalTransactionCommit();} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();} else {targetConnection.commit();}}private void processLocalCommitWithGlobalLocks() throws SQLException {//检查全局锁keyscheckLock(context.buildLockKeys());try {//目标数据源连接提交事务targetConnection.commit();} catch (Throwable ex) {throw new SQLException(ex);}context.reset();}private void processGlobalTransactionCommit() throws SQLException {try {//注册分支事务register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//目标数据源连接提交事务targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {report(true);}context.reset();}...
}

(2)ConnectionProxy.register()注册分支事务

执行数据源连接代理ConnectionProxy的register()方法注册分支事务的时候,会调用资源管理器DefaultResourceManager的branchRegister()方法,然后会继续调用AbstractResourceManager的branchRegister()方法来注册分支事务。

在AbstractResourceManager的branchRegister()方法中,首先会构造分支事务注册请求,然后通过RmNettyRemotingClient将分支事务注册请求发送给Seata Server。

//The type Connection proxy.
//数据源连接代理
public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...private void register() throws TransactionException {if (!context.hasUndoLog() || !context.hasLockKey()) {return;}//分支事务注册Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT,//事务类型getDataSourceProxy().getResourceId(),//资源id,资源是已经注册过了的null,context.getXid(),context.getApplicationData(),context.buildLockKeys()//注册分支事物时带上全局锁keys);context.setBranchId(branchId);}...
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}//注册分支事务@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);}public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}...
}public abstract class AbstractResourceManager implements ResourceManager {...@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);//xid是全局事务idrequest.setLockKey(lockKeys);//这次分支事务要更新数据全局锁keyrequest.setResourceId(resourceId);//分支事务对应的资源idrequest.setBranchType(branchType);//分支事务类型request.setApplicationData(applicationData);//应用数据BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}...
}

http://www.dtcms.com/wzjs/183030.html

相关文章:

  • 乐清网站建设lonwap巨量千川广告投放平台
  • html5网站提示网络平台怎么创建需要多少钱
  • 兰州网站建设网站建设中国突然宣布大消息
  • 中山网站建设文化报价无锡百度竞价公司
  • 鹤岗哈尔滨网站建设长沙网络营销学校
  • 杭州建设网站设计的公司全国疫情突然又严重了
  • 三星网上商城打不开北京seo费用是多少
  • 构建一个网站十大网络推广公司
  • 南京哪家公司做企业网站 做得比较好关键词搜索排名软件
  • 网站建设外包公司万网是什么网站
  • 网站建设 专项资金变更有创意的网络广告案例
  • 赣州网站开发梅州网络推广
  • sae安装WordPress4.4成都网站seo技术
  • 企业网站建设英文网站制作的费用
  • 成都网站设计报价什么推广方式能快速引流
  • 塘沽网站制作百度怎么发帖子
  • 江都区城乡建设局网站四川seo推广
  • 广饶网站设计找平台推广
  • 网站顶级域名证书网络推广员工作内容
  • 网站审查元素 可做代码营销网络是什么意思
  • 语音识别程序代做网站想做网络推广如何去做
  • 外贸网站推广 雅虎问答有用吗网络推广图片
  • java 做直播网站有哪些软件100种找客户的方法
  • 专门做win7系统的网站自己做一个网站
  • 鹤壁网站建设seo是哪个英文的缩写
  • 西安做网站的公司客服制作一个小型网站
  • ac86u做网站服务器seo网站推广杭州
  • 手机网站建设模板福州seo快速排名软件
  • 网易疫情数据搜索引擎优化包括哪些
  • 广告联盟上怎么做网站色盲能治好吗