当前位置: 首页 > news >正文

可靠消息最终一致性分布式事务解决方案

之前文章写过主流的一些 分布式事务的解决方案,但其实工作中很少有一些高并发的业务中去使用这些方案,因为对于高并发的场景来说,引入这些方案的性能损耗太大,且对系统事务侵入性太强影响系统稳定性。

所以在高并发的业务中,如果对实时性可以容忍秒级的延迟,那么使用最终一致性事务方案是最合适的选择。

可靠消息最终一致性事务

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

可靠消息最终一致性方案要解决以下几个问题:

  1. 本地事务与消息发送的原子性问题:即实现本地事务和消息发送的原子性,要么都成功,要么都失败。这是实现可靠消息最终一致性方案的关键问题。
  2. 事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消费消息失败需要重复尝试消费,即实现最终消费成功。
  3. 消息重复消费的问题:由于步骤2的存在,若某一个消费节点出现消费超时但是处理逻辑执行成功了,此时由于消息中间件会重复投递就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
     

1.RocketMQ事务消息实现

RocketMQ独有的事务回调扩展可以比较轻松的实现最终一致性事务。

假设有两个本地事务组成当前的全局事务,实现流程如下:

  1. 先发送half消息到MQ,MQ服务端收到后保存消息,但是half是对消费端不可见状态。
  2. MQ回调发送者的事务事件回调接口,这时候在这个接口中我们执行本地事务。
  3. 如果本地事务执行成功,就提交MQ的事务,此时MQ会把消息设置为可消费状态,否则执行事务回滚,本地事务失败且消息也会被删除。
  4. 如果长时间未响应事务提交,MQ服务端会回查发送者的事务状态,可以做补偿提交。

上述流程保证了第一个本地事务与消息发送的一致性,即本地事务发送成功后消息才可消费。基于MQ的分布式事务实现的是最终一致性并不保证实时性,所以对于消费者而言只要确保收到消息完成第二个本地事务的提交就可以了。 

在这里插入图片描述 

 1.发送事务消息

@RestController
@Slf4j
public class AccountInfoController {@Autowiredprivate AccountInfoService accountInfoService;@GetMapping(value = "/transfer")public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){//创建一个事务id,作为消息内容发到mqString tx_no = UUID.randomUUID().toString();AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);//发送消息accountInfoService.sendUpdateAccountBalance(accountChangeEvent);return "转账成功";}
}

2.RocketMQLocalTransactionListener 接口(本地事务执行和消息事务提交,消息事务回查补偿提交)


@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@AutowiredAccountInfoService accountInfoService;@AutowiredAccountInfoDao accountInfoDao;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额accountInfoService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//MQ回调事务状态回查接口,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();log.info("事务状态回查");int existTx = accountInfoDao.isExistTx(txNo);if(existTx>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

3.本地事务类

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {@AutowiredAccountInfoDao accountInfoDao;@Autowired(required = false)RocketMQTemplate rocketMQTemplate;//向mq发送转账消息@Overridepublic void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("accountChange",accountChangeEvent);String jsonString = jsonObject.toJSONString();log.info(jsonString);//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){return ;}//扣减金额accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志accountInfoDao.addTx(accountChangeEvent.getTxNo());if(accountChangeEvent.getAmount() == 3){throw new RuntimeException("人为制造异常");}}
}

2.本地消息表实现

由于有些项目或者公司架构中不使用RocketMQ,无法通过事务消息机制来实现。那么使用 本地消息表+普通MQ中间件 也可以实现可靠消息最终一致性事务。

本地消息表实现方案的核心是新建一个本地消息数据库表,通过本地数据库事务把业务操作和消息数据绑定在一起,然后通过异步(定时任务重试)将消息发送至消息中间件,最终待确认消息发送给消费方被成功消费。

CREATE TABLE transaction_messages (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE,topic VARCHAR(128) NOT NULL,body TEXT NOT NULL,status TINYINT NOT NULL COMMENT '0-待发送 1-已发送 2-发送失败',retry_count INT DEFAULT 0,created_at DATETIME NOT NULL,updated_at DATETIME NOT NULL,INDEX idx_status_retry (status, retry_count)
);

该方案的流程其实就是模拟RocketMQ事务消息的流程,通过本地消息数据的多个状态来实现本地事务与消息发送的原子性。具体如下:

  1. 在本地事务中完成业务操作后,插入一条状态为 待发送的 的消息记录
  2. 异步或定时任务拿到 待发送的消息(能拿到说明本地事务提交成功),处理消息发送,发送完成后更新状态为已发送(这里无论是发送失败还是更新失败都会重试,最终保证成功)
  3. 事务参与方接收到消息并完成消费,保证幂等和最终消费成功

方案瑕疵

我觉得这个方案是跟RocketMQ方案相比,无法保证发送成功后的消息一定能投放给消费者。

因为高并发系统建设中,出于性能考虑大部分场景在使用消息中间件时都是设置异步复制和刷盘,这就意味着如果出现MQ服务宕机的情况,就可能会出现未复制或未落盘的数据丢失的情况。


如果要解决这个问题则需要基于上面的流程增加核对流程,事务参与方消费完成后记录消费记录,定期核对发送和消费记录,对发送未消费的消息进行补偿发送处理。

 

http://www.dtcms.com/a/286053.html

相关文章:

  • 补贴退坡、平价上网,数字隔离器如何守护更高功率的光伏逆变器?
  • 门控线性单元GLU (Gated Linear Unit)
  • ApplicationContext 事件发布与监听机制详解
  • 反射机制的登录系统
  • PHP 8.0 升级到 PHP 8.1
  • 创建型模式
  • 基于 HT 的 3D 可视化智慧矿山开发实现
  • 从一开始的网络攻防(四):XSS
  • hadoop(服务器伪分布式搭建)
  • FastAdmin后台登录地址变更原理与手动修改方法-后台入口机制原理解析-优雅草卓伊凡
  • Hadoop安全机制深度剖析:Kerberos认证与HDFS ACL细粒度权限控制
  • 《Web安全之深度学习实战》读书笔记总结
  • AI赋能轮胎安全:基于YOLO11的智能裂纹检测系统
  • 基于springboot+vue+mysql的智慧社区设计与实现(源码+论文+开题报告)
  • Docker Swarm 集群使用记录
  • Matlab打开慢、加载慢的解决办法
  • 免费的一些工具收集
  • 【Oracle】centos7离线静默安装oracle11g(p13390677_112040)
  • Hive 向量化执行引擎 Vectorized Execution 常见 NPE 报错分析及解决
  • 全球天气预报5天(经纬度版)免费API接口教程
  • Python绘制数据(二)
  • JAVA面试宝典 -《微服务治理:从链路追踪到熔断》
  • 某邮生活旋转验证码识别
  • 算法竞赛备赛——【图论】求最短路径——小结
  • 前端之CSS
  • MyBatis之关联查询
  • WEB安全架构
  • Tomcat及Nginx部署使用
  • DevExpress WinForms v25.1 亮点:AI驱动的语义搜索、模板库更新
  • RPC 与 Feign 的区别笔记