详解分布式事务框架DTM:子事务屏障
背景
DTM是一款分布式事务解决框架,官方文档在:https://dtm.pub
本文分析其解决空补偿,悬挂,幂等的通用方案,非常巧妙
空补偿,悬挂,幂等
一般情况下,一个TCC事务回滚时的执行顺序是,先执行完Try,再执行Cancel,但可能由于Try的网络延迟大,导致先执行Cancel,再执行Try。这种情况就引入了分布式事务中的两个难题:
- 空补偿: Cancel执行时,Try未执行
- 悬挂: Try执行时,Cancel已执行完成
分布式事务还有一类需要处理的常见问题,就是重复请求
- 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性
针对空补偿和悬挂问题,最容易想到的思路是:
- 空补偿: 先查try有没有执行过,如果执行过,就不执行cancel操作。但要记录已触发了cancel操作
- 悬挂: 先查cancel有没有执行过,如果执行过,就不执行try操作
上述的这种实现,能够在大部分情况下正常运行,但是上述做法中的先查后改在并发情况下是容易掉坑里的,我们分析以下如下场景:
- 正常执行顺序下,Try执行时,在查完没有空补偿记录之后,事务提交之前,如果发生了进程暂停P,或者事务内部进行网络请求出现了拥塞,导致本地事务等待较久
- 全局事务超时后,Cancel执行,因为没有查到要补偿的业务主键,因此判断是空补偿,返回
- Try的进程暂停结束,最后提交本地事务
- 全局事务回滚完成后,Try分支的业务操作没有被回滚,产生了悬挂
虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的
子事务屏障
解决上述问题的关键机制为:利用唯一索引,避免”先查后改“,而是以改代查
子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)
每次执行try,cancel操作时,流程如下:
- 开启本地事务
- 对于当前操作op(try|confirm|cancel),insert 一条数据
gid-branchid-op
,如果插入不成功,表示触发了幂等限制,返回失败 - 如果当前操作是cancel,那么在insert 一条数据
gid-branchid-try
,如果插入成功(注意是成功),则提交事务 - 在同一个事务内,调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败
在此机制下,解决了乱序相关的问题
- 幂等控制:步骤2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
- 空补偿控制:如果Try没有执行,直接执行了Cancel,那么步骤3中Cancel插入
gid-branchid-try
会成功,不走屏障内的逻辑,保证了空补偿控制、 - 防悬挂控制:Try在Cancel之后执行,那么Cancel会在步骤3中插入
gid-branchid-try
,导致Try在步骤2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制
接下来分析,当try和cancel产生并发时(竞态条件),上面的控制依然生效吗?
这里明确一个mysql的背景知识:在mysql中,Cancel和Try都会插入同一条记录gid-branchid-try
,由于唯一索引冲突,那么两个操作中只有一个能够成功,而另一个则会等持有锁的事务完成后,再执行
- try和cancel没有并发:
- cancel先执行,由于第3步执行成功,不会执行空回滚
- try后执行,由于cancel执行了步骤3,try第2步失败,不会产生悬挂
- try和cancel有并发(重点)
- try在cancel之前:
- try先插入一条gid-branchid-try数据成功
- cancel操作执行insert 一条数据gid-branchid-try失败,正常执行cancel操作
- try在cancel之后:
- cancel先执行insert一条gid-branchid-try数据成功,不会cancel
- try操作后执行,也insert一条gid-branchid-try数据,阻塞等待cancel执行完毕,发现已经存在唯一索引,执行失败,不会空悬挂,如下图所示:
- try在cancel之前:
看看代码
上面理论分析得比较详细了。其实怎么实现并不是非常重要,按照理论可以快速写出对应的实现
简单看看DTM的子事务屏障落实到源码层面如下:
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {bid := bb.newBarrierID()defer dtmimp.DeferDo(&rerr, func() error {// 没有err,commitreturn tx.Commit()}, func() error {return tx.Rollback()})originOp := map[string]string{dtmimp.OpCancel: dtmimp.OpTry, // tccdtmimp.OpCompensate: dtmimp.OpAction, // sagadtmimp.OpRollback: dtmimp.OpAction, // workflow}[bb.Op]// 插入一次try的originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName)// 插入自己的currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName)logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)// 二阶段消息,只用检查是不是幂等if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.return ErrDuplicated}if rerr == nil {rerr = oerr}/**保证幂等性保证避免空悬挂,空回滚*/// 如果当前操作是取消、补偿或回滚操作,并且原始插入成功(originAffected > 0),或者当前插入失败(currentAffected == 0),则直接返回,避免重复处理。if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensatecurrentAffected == 0 { // repeated request or dangled requestreturn}if rerr == nil {// 在同一个事务里执行业务逻辑rerr = busiCall(tx)}return
}