当前位置: 首页 > news >正文

分布式事务的两种解决方案

说明:在微服务架构中,一个方法内存在多个数据库操作,且这些操作有跨服务调用,在这种情况下的事务叫做分布式事务。阿里巴巴开源的分布式组件 seata,通过引入独立于微服务之外的事务协调者,协调事务,统一执行或者回滚事务,是目前很完善的解决方案。

本文介绍在不引入分布式事务组件的情况下,两种解决分布式事务的方案。Seata 的使用参考下面这两篇文章。

  • Seata安装和使用

  • 使用Seata解决分布式事务问题

场景

假设有一个根据用户 ID 删除用户的接口,删除用户记录的同时,也删除用户所拥有的角色记录,如下:

    @Overridepublic void delete(Long id) {// 1.删除用户userMapper.deleteById(id);// 2.删除用户对应的角色roleApi.deleteRoleByUserId(id);}

为了演示,我将删除用户对应的角色放到其他服务中,使用 rpc-api 调用其他服务的方法实现

(InfraRoleApi 接口)

import cn.iocoder.yudao.module.infra.enums.ApiConstants;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.RequestParam;@FeignClient(name = ApiConstants.NAME)
@Tag(name = "RPC 服务 - 角色")
public interface InfraRoleApi {String PREFIX = ApiConstants.PREFIX + "/role";@DeleteMapping(PREFIX)@Operation(summary = "根据参数键查询参数值")void deleteRoleByUserId(@RequestParam Long id);
}

(InfraRoleApi 接口实现类)

import cn.iocoder.yudao.module.infra.dal.dataobject.role.RoleMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;@RestController
public class InfraRoleApiImpl implements InfraRoleApi {@Autowiredprivate RoleMapper roleMapper;@Overridepublic void deleteRoleByUserId(Long id) {roleMapper.deleteRoleByUserId(id);}
}

显然,这个接口需要保证事务,就是说,删除用户和删除用户对应的角色要么都成功,要么都失败。加个事务注解。

    @Transactional(rollbackFor = Exception.class)@Overridepublic void delete(Long id) {// 1.删除用户userMapper.deleteById(id);// 2.删除用户对应的角色roleApi.deleteRoleByUserId(id);}

手动在 InfraRoleApi 这边写一个异常

    @Overridepublic void deleteRoleByUserId(Long id) {// 模拟异常int i = 1 / 0;roleMapper.deleteRoleByUserId(id);}

启动项目,调用接口,删除用户 ID=142 的记录

在这里插入图片描述

用户表逻辑删除成功

在这里插入图片描述

用户角色表,对应用户 ID 的记录没有删除成功

在这里插入图片描述

控制台有报错,显然,前面的声明式注解没有生效。这没办法生效,不同服务运行在不同的 JVM 上,数据库连的都可能不是同一个。

在这里插入图片描述

针对这个场景,我想到下面两种解决方案。

方案一:手写TCC

TCC,是 Try(尝试)-Confirm(确认)-Cancel(取消) 三个单词的首字母,具体实现是,针对业务中的数据库操作,对应写一套回滚操作,把正常的业务操作分为两阶段执行——尝试提交和确认提交,当两阶段发生异常时,执行对应的回滚操作,来保证整体事务的一致性。

针对上面场景的代码,开始改造。

写一个 TCC 类,将两个删除数据库操作封装进来,根据操作的返回结果和 try-catch 判断操作是否成功,看是否需要回滚

import cn.iocoder.yudao.module.infra.api.role.InfraRoleApi;
import cn.iocoder.yudao.module.system.controller.admin.demo.tcc.UserTccService;
import cn.iocoder.yudao.module.system.dal.mysql.user.AdminUserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class UserTccServiceImpl implements UserTccService {@Autowiredprivate InfraRoleApi roleApi;@Autowiredprivate AdminUserMapper userMapper;@Overridepublic void deleteUserByUserId(Long id) {boolean tryResult = tryCommit(id);if (tryResult) {if (!commit(id)) {rollback(id);}} else {boolean rollback = rollback(id);// 如果回滚失败,重试三次if (!rollback) {for (int i = 0; i < 3; i++) {rollback(id);}}}}/*** 尝试提交*/private boolean tryCommit(Long id) {try {// 1.删除用户int i = userMapper.deleteById(id);if (i == 0) {return false;}// 2.删除用户对应的角色boolean result = roleApi.deleteRoleByUserId(id);if (!result) {return false;}} catch (Exception e) {return false;}return true;}/*** 确认提交*/private boolean commit(Long id) {return true;}/*** 回滚*/private boolean rollback(Long id) {try {// 1.删除用户(回滚)userMapper.undeleteById(id);// 2.删除用户对应的角色(回滚)boolean result2 = roleApi.undeleteRoleByUserId(id);if (!result2) {return false;}} catch (Exception e) {return false;}return true;}
}

写两个回滚操作,根据用户 ID 回滚删除用户、回滚删除用户角色

    @Update("update system_users set deleted = 0 where id = #{userId}")void undeleteById(Long userId);
    @Update("update system_user_role set deleted = 0 where user_id = #{id}")void undeleteRoleByUserId(Long id);

好的,重启项目,调用删除接口,报错依旧

在这里插入图片描述

哎,这回用户就没有被删除,事务控制住了。

在这里插入图片描述

这是一种参考了 TCC 设计思想的实现,并非是完整意义上的 TCC,这种方式需要注意以下两点:

  • (1)回滚方法需要幂等,即回滚操作执行一次和多次效果相同;

  • (2)尝试提交和回滚操作的方法顺序要一致,即尝试提交是先删除用户,再删除用户角色,回滚也要按这个顺序回滚;

方案二:消息队列

第二种方案是使用消息队列,当本地服务需要调用其他服务的更新操作时,发布一个消息,让这个消息去完成其他服务的更新操作。

首先,创建一张消息表,里面包括消息对象的全限定名和消息的内容(即消息对象实例化后的字符串)

在这里插入图片描述

记录如下

在这里插入图片描述

改造代码,更新其他服务的操作,换成生成一个消息。这里并不直接发消息,而是把消息先存数据库里。这里可以加声明式注解。

    @Transactional(rollbackFor = Exception.class)@Overridepublic void delete2(Long id) {// 1.删除用户int i = userMapper.deleteById(id);// 2.发一个MQ消息DeleteUserRoleMessage deleteUserRoleMessage = new DeleteUserRoleMessage();deleteUserRoleMessage.setUserId(id);MessageDO messageDO = new MessageDO();messageDO.setClassName("cn.iocoder.yudao.module.system.dal.dataobject.mq.DeleteUserRoleMessage");messageDO.setPayload(JSON.toJSONString(deleteUserRoleMessage));messageMapper.insert(messageDO);}

写个定时器,每隔 10 秒把消息取出来实例化,发送到消息队列里

/*** 消息定时器*/
@Component
public class MessageScheduled {@Autowiredprivate MessageMapper messageMapper;@Autowiredprivate RedisMQTemplate redisMQTemplate;@Scheduled(fixedRate = 10000)public void send() throws ClassNotFoundException {List<MessageDO> messageDOS = messageMapper.selectList();for (MessageDO messageDO : messageDOS) {// 1. 加载类Class<DeleteUserRoleMessage> clazz = (Class<DeleteUserRoleMessage>) Class.forName(messageDO.getClassName());// 2. 使用 Fastjson 反序列化DeleteUserRoleMessage deleteUserRoleMessage = JSON.parseObject(messageDO.getPayload(), clazz);// 3.发送消息redisMQTemplate.send(deleteUserRoleMessage);// 4.删除消息messageMapper.deleteById(messageDO.getId());}}
}

消息对象类,因为只需用到用户 ID,所以只定义一个用户 ID属性

import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 删除用户角色ID*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DeleteUserRoleMessage extends AbstractRedisChannelMessage {private Long userId;
}

这里是用 Redis 作为消息队列,关于 Redis 如何实现消息队列,可参考下面这篇文章:

  • 如何用Redis作为消息队列

回到 infra 服务里,写一个消息消费者,这里接收来自消息队列的消息,获取消息后调用本服务方法,执行删除用户角色操作

import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.module.infra.api.role.InfraRoleApiImpl;
import cn.iocoder.yudao.module.infra.mq.message.DeleteUserRoleMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 删除用户角色消息消费者*/
@Service
public class DeleteUserRoleMessageConsumer extends AbstractRedisChannelMessageListener<DeleteUserRoleMessage> {@Autowiredprivate InfraRoleApiImpl infraRoleApi;@Overridepublic void onMessage(DeleteUserRoleMessage message) {infraRoleApi.deleteRoleByUserId(message.getUserId());}
}

重启项目,试一下,调用接口

在这里插入图片描述

消息表写入一条记录

在这里插入图片描述

消息定时器,读取消息,并发送到消息队列

在这里插入图片描述

另一个服务的消息消费者监听到消息,执行本服务方法

在这里插入图片描述

这套流程解决分布式事务的思路是:不回滚本服务方法,而是向前推进事务,通过将其他服务的操作用 MQ 消息封装,让其他服务监听,执行本服务的数据库操作,完成全部事务。

而其他服务消费消息失败、事务的问题则依靠消息组件保证消息不丢失,和本服务的本地事务实现。

也就是说,如果消费消息失败,则不让消息组件移除消息,而是进行重试,或者在消费者这边判断重试次数,超出重试次数存入到数据库中或者什么地方,等程序员排查消费失败的原因后,再写定时任务来消费消息,保证最终一致性。

当然,这种方法需要考虑重复消费消息的问题,所以也需考虑消费者操作的幂等性。

总结

本文介绍了分布式事务的两种解决方案,方案一实现逻辑简单,但代码侵入高,每个事务方法都要写一套,方案二借助了消息组件,技术门槛较高。

http://www.dtcms.com/a/348602.html

相关文章:

  • MYSQL(DDL)
  • 前端 vs 后端请求:核心差异与实战对比
  • Qt——网络通信(UDP/TCP/HTTP)
  • 【Unity开发】Unity核心学习(二)
  • PAT 1081 Rational Sum
  • 【机器学习】8 Logistic regression
  • Power BI切片器自定义顺序
  • 智能油脂润滑系统:给设备一份 “私人定制” 的保养方案
  • Linux 学习笔记 - 集群管理篇
  • 【大模型LLM学习】Data Agent学习笔记
  • C++算法学习专题:二分查找
  • Kubernetes部署Prometheus+Grafana 监控系统NFS存储方案
  • Socket some functions
  • 让机器人“想象”未来?VLN导航迎来“理解力”新升级
  • 每日算法刷题Day64:8.24:leetcode 堆6道题,用时2h30min
  • 解密 Spring Boot 自动配置:原理、流程与核心组件协同
  • 人形机器人——电子皮肤技术路线:压电式电子皮肤及一种超越现有电子皮肤NeuroDerm的设计
  • 深度学习:CUDA、PyTorch下载安装
  • Leetcode 3659. Partition Array Into K-Distinct Groups
  • sqlite创建数据库,创建表,插入数据,查询数据的C++ demo
  • 商密保护迷思:经营秘密到底需不需要鉴定?
  • 对称二叉树
  • 机械学习综合练习项目
  • jar包项目自启动设置ubuntu
  • [论文阅读] 软件工程 | GPS算法:用“路径摘要”当向导,软件模型检测从此告别“瞎找bug”
  • 服务器硬件电路设计之 SPI 问答(四):3 线 SPI、Dual SPI 与 Qual SPI 的奥秘
  • 春秋云镜 Hospital
  • Vue 3多语言应用开发实战:vue-i18n深度解析与最佳实践
  • 线程包括哪些状态?线程状态之间是如何变化的?
  • yggjs_rlayout框架v0.1.2使用教程 02 TechLayout 布局组件