当前位置: 首页 > news >正文

SpringBoot系列之实现高效批量写入数据

Spring Boot 实现高效批量插入数据的实践指南

在实际开发中,我们经常会遇到需要批量插入大量数据到数据库的场景。如果使用传统的单条插入方式,不仅效率低下,还会给数据库带来巨大压力。本文将介绍如何使用 Spring Boot 实现高效

批量数据插入,并通过具体代码示例展示优化过程。

项目背景与需求

我们需要实现一个接口,能够高效地向数据库插入大量用户数据。具体要求如下:

  • 支持高并发场景下的批量插入需求

  • 保证插入效率,减少数据库压力

  • 实现异步处理,避免接口超时

技术选型

  • 框架:Spring Boot 3.3.2
  • 数据库:MySQL
  • ORM:Spring Data JPA
  • 工具类:Hutool
  • 构建工具:Maven

项目结构

springboot-batch-insert/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── example/
│   │   │           └── springboot/
│   │   │               ├── SpringbootBatchInsertApplication.java
│   │   │               ├── controller/
│   │   │               │   └── BatchInsertController.java
│   │   │               ├── service/
│   │   │               │   └── BatchInsertService.java
│   │   │               ├── repository/
│   │   │               │   └── UserRepository.java
│   │   │               ├── model/
│   │   │               │   └── User.java
│   │   │               └── configuration/
│   │   │                   └── AsyncConfig.java
│   │   └── resources/
│   │       ├── application.properties
│   │       └── application.yml
│   └── test/
│       └── java/
│           └── com/
│               └── example/
│                   └── springboot/
│                       └── SpringBatchInsertApplicationTests.java
└── pom.xml

核心代码实现

1. 配置文件

首先,我们需要配置数据库连接和 JPA 相关属性,在 application.yml 中:

server:port: 8080spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=trueusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverjpa:hibernate:ddl-auto: updateshow-sql: falseproperties:hibernate:format_sql: truedialect: org.hibernate.dialect.MySQL8Dialectjdbc:batch_size: 1000order_inserts: truebatch_versioned_data: truelogging:level:org.hibernate.SQL: WARNorg.hibernate.type.descriptor.sql.BasicBinder: WARNcom.example: INFO

关键配置说明:

  • rewriteBatchedStatements=true:开启 MySQL 的批量插入优化

  • hibernate.jdbc.batch_size=1000:设置 Hibernate 批量操作大小

  • hibernate.order_inserts=true:优化批量插入性能

2. 实体类定义

创建 User 实体类:

package com.example.springboot.model;import jakarta.persistence.*;
import lombok.Data;import java.time.LocalDateTime;/*** 用户实体类*/@Data
@Entity
@Table(name = "sys_user")
public class User {@Idprivate Long id;/*** 用户名*/@Column(nullable = false, length = 50)private String username;/*** 密码*/@Column(nullable = false, length = 100)private String password;/*** 邮箱*/@Column(nullable = false, length = 100)private String email;/*** 创建时间*/@Column(name = "create_time", nullable = false, updatable = false)@Builder.Defaultprivate LocalDateTime createTime = LocalDateTime.now();/*** 修改时间*/@Column(name = "modify_time", nullable = false)@Builder.Defaultprivate LocalDateTime modifyTime = LocalDateTime.now();}

3. 数据库访问层

创建 UserRepository 接口,定义批量插入方法:

package com.example.springboot.repository;import com.example.springboot.model.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;@Repository
public interface UserRepository extends JpaRepository<User, Long> {}

4. 异步配置

创建 AsyncConfig 配置类,定义线程池:

package com.example.springboot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 异步线程池配置*/
// 修改 AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "batchInsertExecutor")public Executor batchInsertExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();int coreThreads = Runtime.getRuntime().availableProcessors();executor.setCorePoolSize(coreThreads);executor.setMaxPoolSize(coreThreads * 2);executor.setQueueCapacity(1000);executor.setThreadNamePrefix("BatchInsert-");executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}

5. 服务层实现

创建 BatchInsertService 服务类,实现批量插入逻辑:

package com.example.springboot.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import com.example.springboot.model.User;
import com.example.springboot.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;@Service
public class BatchInsertService {private Logger log = LoggerFactory.getLogger(BatchInsertService.class);@Autowired@Qualifier("batchInsertExecutor")private Executor batchInsertExecutor;@Autowiredprivate UserRepository userRepository;private static final Integer BATCH_SIZE = 1000;public CompletableFuture<Void> processBatchInsert(Integer totalCount) {if (totalCount <= 0) {throw new IllegalArgumentException("插入数量必须大于0");}// 记录总任务开始时间Instant totalStart = Instant.now();log.info("批量插入开始,总数量:{} ", totalCount);// 1. 生成测试数据并记录耗时Instant dataGenStart = Instant.now();List<User> testData = generateTestData(totalCount);long dataGenCost = java.time.Duration.between(dataGenStart, Instant.now()).toMillis();log.info("测试数据生成完成,耗时:{}ms", dataGenCost);// 2. 分割数据并记录耗时Instant splitStart = Instant.now();List<List<User>> partitionedUserList = CollUtil.split(testData, BATCH_SIZE);long splitCost = java.time.Duration.between(splitStart, Instant.now()).toMillis();log.info("数据分割完成,分成 {} 批,每批 {} 条,耗时:{}ms",partitionedUserList.size(), BATCH_SIZE, splitCost);// 3. 批量插入并记录每批耗时List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> batch : partitionedUserList) {final int batchSize = batch.size();// 记录当前批次开始时间Instant batchStart = Instant.now();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {userRepository.saveAll(batch);// 计算当前批次耗时long batchCost = java.time.Duration.between(batchStart, Instant.now()).toMillis();log.info("批次插入完成,批次大小:{},耗时:{}ms", batchSize, batchCost);} catch (Exception e) {log.error("批次插入失败,批次大小:{},错误:{}", batchSize, e.getMessage(), e);throw e;}}, batchInsertExecutor);futures.add(future);}// 4. 等待所有批次完成并记录总耗时return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {long totalCost = java.time.Duration.between(totalStart, Instant.now()).toMillis();if (e != null) {log.error("批量插入失败,总耗时:{}ms ", totalCost, e);} else {log.info("批量插入完成,总数量:{},总耗时:{}ms,平均每条耗时:{}ms",totalCount, totalCost, totalCount > 0 ? (totalCost / totalCount) : 0);}});}private List<User> generateTestData(Integer count) {List<User> users = new ArrayList<>(count);for (int i = 0; i < count; i++) {User user = new User();user.setId(IdUtil.getSnowflake().nextId());user.setUsername("user" + i);user.setPassword(IdUtil.fastSimpleUUID());user.setEmail("user" + i + "@example.com");user.setCreateTime(LocalDateTimeUtil.now());user.setModifyTime(LocalDateTimeUtil.now());users.add(user);}return users;}
}

6. 控制器实现

创建 BatchInsertController 控制器,提供 API 接口:

package com.example.springboot.controller;import com.example.springboot.service.BatchInsertService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/v1")
public class BatchInsertController {private Logger log = LoggerFactory.getLogger(BatchInsertController.class);@Autowiredprivate BatchInsertService batchInsertService;/*** 提供API接口,用于触发批量插入*/@PostMapping("/batch-insert")public ResponseEntity<String> triggerBatchInsert(@RequestParam(defaultValue = "10000") int count) {if (count <= 0) {return ResponseEntity.badRequest().body("插入数量必须大于0");}try {batchInsertService.processBatchInsert(count);return ResponseEntity.accepted().body("批量插入任务已启动,将异步执行!");} catch (Exception e) {log.error("Failed to start batch insert", e);return ResponseEntity.internalServerError().body("启动批量插入失败");}}
}

7. 启动类

package com.example.springboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class SpringbootBatchInsertApplication {public static void main(String[] args) {SpringApplication.run(SpringbootBatchInsertApplication.class, args);}}

关键技术点解析

  1. 批量插入优化
  • 数据库连接参数 rewriteBatchedStatements=true 开启 MySQL 批量插入优化

  • Hibernate 配置 batch_size=1000 实现批量操作

  • 分批次处理数据,避免一次性加载过多数据到内存

  1. 异步处理
  • 使用 @EnableAsync 开启异步功能

  • 自定义线程池,合理设置核心线程数、最大线程数等参数

  • 使用 CompletableFuture 实现异步编程,提高接口响应速度

  1. 事务管理
  • 在批量插入方法上添加 @Transactional 注解,保证数据一致性

  • 合理控制事务粒度,避免长事务

性能优化建议

  1. 调整批次大小:根据数据库性能和网络情况,调整 BATCH_SIZE 参数,通常在 500-2000 之间效果较好

  2. 线程池优化

  • 核心线程数不宜过多,一般设置为 CPU 核心数
  • 最大线程数根据系统资源和数据库连接数合理设置
  • 合理设置队列容量,避免内存溢出
  1. JVM 优化:根据数据量大小,适当调整 JVM 堆内存大小

  2. 数据库优化

  • 确保表结构合理,索引设计恰当
  • 批量插入期间可暂时关闭索引,插入完成后再重建
  • 考虑使用数据库分区表

总结

本文介绍了如何使用 Spring Boot 实现高效的批量数据插入功能,通过分批次处理、异步执行和数据库优化等手段,显著提高了大量数据插入的效率。在实际应用中,还需要根据具体业务场景和数据量大小,进一步调整和优化参数,以达到最佳性能。

http://www.dtcms.com/a/354500.html

相关文章:

  • 专项智能练习(图形图像基础)
  • 文本处理与模型对比:BERT, Prompt, Regex, TF-IDF
  • 高精度惯性导航IMU价格与供应商
  • [sys-BlueChi] docs | BluechiCtl命令行工具
  • 【C#/Cpp】CLR项目搭建的内联和托管两选项
  • IPv4和IPv6的主要区别,以及常见的过渡策略有哪些
  • OpenCV计算机视觉实战(22)——图像拼接详解
  • 机器视觉学习-day07-图像镜像旋转
  • 【开题答辩全过程】以 基于Spring Boot农产品运输服务平台为例,包含答辩的问题和答案
  • MapStruct用法和实践
  • 【笔记ing】大模型算法架构
  • android studio 同步慢问题解决
  • Logstash数据迁移之mysql-to-kafka.conf两种路由决策对比
  • WebRTC音频QoS方法五(音频变速算法之Accelerate、FastAccelerate、PreemptiveExpand算法实现)
  • Kafka、RabbitMQ 与 RocketMQ 在高并发场景下的高可用与性能对比分析
  • 游戏使用云手机在线运行怎么样?
  • 小白成长之路-k8s原理(二)
  • 【在 macOS 系统上使用 Docker 启动 Kafka 的完整指南】
  • 点评项目(Redis中间件)第二部分Redis基础
  • ArtCAM 2008安装教程
  • React 业务场景使用相关封装(hooks 使用)
  • 【AI自动化】VSCode+Playwright+codegen+nodejs自动化脚本生成
  • Git 删除文件
  • WINTRUST!_ExplodeMessag函数中的pCatAdd
  • 【大前端】React useEffect 详解:从入门到进阶
  • 响应用户:React中的事件处理机制
  • [linux仓库]透视文件IO:从C库函数的‘表象’到系统调用的‘本质’
  • RSA+AES 混合加密不复杂,但落地挺烦,我用 Vue+PHP 封装成了两个库
  • XTUOJ C++小练习(素数的判断,数字塔,字母塔)
  • 亚马逊合规风控升级:详情页排查与多账号运营安全构建