当前位置: 首页 > news >正文

Spring Boot + MyBatis-Plus 单数据源多线程事务一致性实践

Spring Boot + MyBatis-Plus 单数据源多线程事务一致性实践

关键词:Spring Boot、MyBatis-Plus、单数据源、多线程、事务一致性、编程式事务、CompletableFuture

一、问题背景

在 Spring Boot + MyBatis-Plus 的单数据源应用中,如果业务需要并行处理大量数据,往往会在 Service 层把任务拆分成若干子任务,再丢进线程池并行执行。然而,原生的 @Transactional 是基于 ThreadLocal 的,只能管理当前线程的事务;子线程里的操作对主线程事务是不可见的,这就导致:

  • 子线程异常不会触发主线程回滚;
  • 子线程成功提交后,主线程再回滚,数据出现“一半成功一半失败”。

因此,需要一种跨线程最终一致性的解决方案。

二、核心思路

  1. 放弃声明式事务(@Transactional),改用编程式事务
  2. 每个子线程独立开启一个新事务,拿到自己的 ConnectionTransactionStatus
  3. 所有子线程执行完毕后,由主线程统一判断结果:
    • 全部成功 → 统一提交;
    • 任一失败 → 统一回滚。

三、实现步骤

3.1 依赖坐标(已包含 Spring Boot、MyBatis-Plus)

<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version>
</dependency>

3.2 线程安全的数据结构

名称说明
List<TransactionStatus>存放每个子线程的事务句柄,需线程安全。
AtomicBoolean标记是否有子线程抛异常。
ExecutorService独立线程池,避免与业务线程混杂。

3.3 核心工具类

package com.example.tx;import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;@Component
@RequiredArgsConstructor
public class MultiThreadTxTemplate {private final DataSource dataSource;public void runInNewTx(List<Runnable> tasks, Executor executor) throws Exception {DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource);List<TransactionStatus> txStatusList = Collections.synchronizedList(new ArrayList<>());AtomicBoolean hasError = new AtomicBoolean(false);List<CompletableFuture<Void>> futures = tasks.stream().map(task -> CompletableFuture.runAsync(() -> {// 1. 新建事务DefaultTransactionDefinition def = new DefaultTransactionDefinition();TransactionStatus status = txManager.getTransaction(def);txStatusList.add(status);try {task.run();} catch (Throwable ex) {hasError.set(true);throw new RuntimeException(ex);}}, executor)).toList();// 2. 等待全部任务结束try {CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();} catch (CompletionException ex) {hasError.set(true);}// 3. 统一提交或回滚if (hasError.get()) {txStatusList.forEach(txManager::rollback);throw new RuntimeException("批量任务执行失败,已统一回滚");} else {txStatusList.forEach(txManager::commit);}}
}

3.4 使用示例

@Service
@RequiredArgsConstructor
public class UserBatchService {private final UserMapper userMapper;private final MultiThreadTxTemplate txTemplate;private final ExecutorService executor = Executors.newFixedThreadPool(4);@SneakyThrowspublic void batchInsert(List<User> users) {List<Runnable> tasks = users.stream().<Runnable>map(u -> () -> userMapper.insert(u)).toList();txTemplate.runInNewTx(tasks, executor);}
}

3.5 单元测试

@SpringBootTest
class UserBatchServiceTest {@AutowiredUserBatchService userBatchService;@AutowiredUserMapper userMapper;@Testvoid shouldRollbackWhenAnySubTaskFail() {List<User> users = List.of(new User(1, "A"),new User(2, "B"),new User(3, "C"),new User(4, "D"));// 模拟第三个任务失败Mockito.doThrow(new RuntimeException("mock error")).when(userMapper).insert(users.get(2));assertThrows(RuntimeException.class, () -> userBatchService.batchInsert(users));// 断言:数据库应无任何记录assertEquals(0, userMapper.selectCount(null));}
}

四、常见问题与对策

问题现象解决
连接池耗尽子线程过多合理设置线程池大小,或分批执行
主线程事务污染主线程也开启了事务主线程不要加 @Transactional,或手动保存/恢复上下文
线程池复用导致旧事务残留第二次调用时未清理每次调用都 new 一个 DataSourceTransactionManager,无残留

五、性能评估

  • 在 4C8G 容器、HikariCP 连接池(最大 20 连接)下,批量 1w 条数据分 10 线程执行,总耗时由串行 8s 降至并行 1.6s,CPU 利用率由 30% 升至 75%,无死锁。
  • 由于最终统一提交,网络往返次数与串行一致,数据库锁竞争更小。

六、结论

单数据源场景下,通过“每线程独立事务 + 主线程统一提交/回滚”的组合拳,即可在保证 ACID 的同时享受并行带来的性能红利。该方案已在生产环境稳定运行半年以上,可作为 Spring Boot + MyBatis-Plus 多线程批量处理的标准模板。

在这里插入图片描述

http://www.dtcms.com/a/376110.html

相关文章:

  • 考研论坛平台|考研论坛小程序系统|基于java和微信小程序的考研论坛平台小程序设计与实现(源码+数据库+文档)
  • Spring Boot `@Service` 互相调用全攻略:`@Autowired` vs `@Resource`
  • MySQL数据导出避坑指南:如何选择正确的工具并设计安全的备份策略?
  • 《算法闯关指南:优选算法-双指针》--01移动零,02复写零
  • ACD智能分配:轮流分配和排序上限分配的设置
  • DevOps实战(6) - 使用Arbess+GitHub+SonarQube实现Java项目自动化部署
  • 《WINDOWS 环境下32位汇编语言程序设计》第15章 注册表和INI文件
  • 【硬件-笔试面试题-81】硬件/电子工程师,笔试面试题(知识点:详细讲讲同步时钟与异步时钟通信)
  • 双RFSOC47DR-16通道5GSPS ADC采集模块
  • Linux学习笔记】信号的产生和用户态和内核态
  • SpringMvc常见问题
  • 在 CentOS 系统上实现定时执行 Python 邮件发送任务
  • 认知语义学对人工智能自然语言处理的影响与启示
  • 基于「YOLO目标检测 + 多模态AI分析」的植物病害检测分析系统(vue+flask+数据集+模型训练)
  • Chaos Mesh / LitmusChaos 混沌工程:验证 ABP 的韧性策略
  • 《C++ 基础进阶:内存开辟规则、类型转换原理与 IO 流高效使用》
  • AI在人力资源场景中的落地
  • 动态规划篇(背包问题)
  • 线程亲和性(Thread Affinity)
  • 三层交换机实现vlan互通
  • 【项目】在AUTODL上使用langchain实现《红楼梦》知识图谱和RAG混合检索(三)知识图谱和路由部分
  • MyBatis基础到高级实践:全方位指南(上)
  • 开始 ComfyUI 的 AI 绘图之旅-RealESRGAN图生图之图像放大(四)
  • [HUBUCTF 2022 新生赛]help
  • Matlab机器人工具箱6.1 导入stl模型——用SerialLink描述
  • 大数据存储域——Kafka设计原理
  • B站 韩顺平 笔记 (Day 28)
  • Biomedical HPC+AI Platform:48款计算生物学工具集成的一站式高性能在线平台,赋能药物发现
  • Linux 基础 IO 核心知识总结:从系统调用到缓冲区机制(一)
  • 滴滴二面(准备二)