mysql-Java手写分布式事物提交流程
准备
innodb存储引擎开启支持分布式事务
set global innodb_support_ax=on
分布式的流程
详细流程:
- XA START ‘a’;
作用:开始一个新的XA事务,并分配一个唯一的事务ID ‘a’。
说明:在这个命令之后,所有后续的SQL操作都会被包含在这个XA事务中,直到遇到XA END命令。 - insert into z(a,b,c) select 100,2,100;
作用:执行一条插入语句,将值(100, 2, 100)插入到表z中。
说明:这条语句是在XA事务上下文中执行的,这意味着如果最终XA事务没有成功提交,这个插入操作也不会对数据库产生实际影响。 - XA END ‘a’;
作用:标记XA事务’a’的操作结束。
说明:这并不意味着事务已经完成或提交,它只是表明当前事务不再接受新的操作。在XA END之后,不能再对该事务进行任何修改操作。 - 进行二阶段
4. 1 XA PREPARE ‘a’;
作用:准备XA事务’a’,使其进入预备状态。
说明:这是两阶段提交(2PC, Two-Phase Commit)的第一阶段。在这个阶段,所有参与的资源管理器会投票决定是否可以安全地提交该事务。如果所有参与者都准备好提交,则可以进入下一阶段;如果有任何一个参与者不能准备好,则整个事务会被回滚。
4.2 XA COMMIT ‘a’;
作用:提交XA事务’a’。
说明:这是两阶段提交的第二阶段。只有当所有参与的资源管理器都已准备好(通过XA PREPARE),并且没有 任何错误发生时,才会执行此命令。提交后,所有更改将永久保存到数据库中。
代码实例
引入依赖:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 请使用最新的版本 --></dependency>
1.事务ID标识类
package org.example.xa;import javax.transaction.xa.Xid;public class MyXid implements Xid {private int formatId;private byte gtrid[];private byte bqual[];public MyXid(int formatId, byte gtrid[], byte bqual[]){this.formatId = formatId;this.gtrid = gtrid;this.bqual = bqual;}@Overridepublic int getFormatId() {return formatId;}@Overridepublic byte[] getGlobalTransactionId() {return gtrid;}@Overridepublic byte[] getBranchQualifier() {return bqual;}
}
2.定义获取分布式事务数据源类
package org.example.xa;import com.mysql.cj.jdbc.MysqlXADataSource;public class GetDataSource {private String connString;private String user;private String password;public GetDataSource(String connString, String user, String password) {this.connString = connString;this.user = user;this.password = password;}public MysqlXADataSource getDataSource(){MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setURL(connString);mysqlXADataSource.setUser(user);mysqlXADataSource.setPassword(password);return mysqlXADataSource;}
}
- XATest测试类
package org.example.xa;import com.mysql.cj.jdbc.MysqlXADataSource;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;public class XATest {public static void main(String[] args) throws SQLException, XAException {GetDataSource getDataSource1 = new GetDataSource("jdbc:mysql://localhost:3306/test", "root", "123456");MysqlXADataSource dataSource1 = getDataSource1.getDataSource();GetDataSource getDataSource2 = new GetDataSource("jdbc:mysql://localhost:3306/test", "root", "123456");MysqlXADataSource dataSource2 = getDataSource2.getDataSource();//创建分布式事务的资源管理器XAConnection xaConnection1 = dataSource1.getXAConnection();XAResource xaResource1 = xaConnection1.getXAResource();Connection connection1 = xaConnection1.getConnection();Statement statement1 = connection1.createStatement();MyXid myXid1 = new MyXid(100, new byte[]{0x61,0x61,0x61}, new byte[]{0x61,0x61,0x61});//创建分布式事务的资源管理器XAConnection xaConnection2 = dataSource2.getXAConnection();XAResource xaResource2 = xaConnection2.getXAResource();Connection connection2 = xaConnection2.getConnection();Statement statement2 = connection2.createStatement();MyXid myXid2 = new MyXid(100, new byte[]{0x62,0x62,0x62}, new byte[]{0x62,0x62,0x62});// xaResource2.rollback(new MyXid(100, new byte[]{0x11}, new byte[]{0x12}));//1.开始一个新的XA事务 在这个命令之后,所有后续的SQL操作都会被包含在这个XA事务中,直到遇到XA END命令xaResource1.start(myXid1, XAResource.TMNOFLAGS);//2.执行DMLstatement1.execute("insert into z(`a`,`b`,`c`) select 2800,2,2800");//3.分布式事务的end,这并不意味着事务已经完成或提交,它只是表明当前事务不再接受新的操作。在XA END之后,不能再对该事务进行任何修改操作。xaResource1.end(myXid1, XAResource.TMSUCCESS);xaResource2.start(myXid2, XAResource.TMNOFLAGS);statement2.execute("insert into z(`a`,`b`,`c`) select 2900,2,2900");xaResource2.end(myXid2, XAResource.TMSUCCESS);//1.分布式的二阶段步骤try{//1.1分布式事务的prepare阶段int re1 = xaResource1.prepare(myXid1);int re2 = xaResource2.prepare(myXid2);//1.2分布式事务的commit或者rollback阶段if(XAResource.XA_OK != re1 || XAResource.XA_OK != re2){xaResource1.rollback(myXid1);xaResource2.rollback(myXid2);}xaResource1.commit(myXid1, false);xaResource2.commit(myXid2, false);}catch (Throwable throwable){xaResource1.rollback(myXid1);xaResource2.rollback(myXid2);}}
}