攻克 大 Excel 上传难题:从异步处理到并发去重的全链路解决方案
在企业级应用开发中,Excel 批量导入是一个高频需求。当面对几百兆甚至更大的 Excel 文件时,传统的同步处理方式往往会导致接口超时、内存溢出,甚至整个应用崩溃。更复杂的是,还需要处理数据校验、重复数据检测、多用户并发上传等问题,最终还要生成一份详细的导入结果供用户核对。
本文将从实际业务场景出发,构建一套完整的大 Excel 文件上传处理方案,涵盖从前端分片上传到后端异步处理、数据校验、并发去重、结果生成的全流程,并提供可直接运行的代码实现。
一、需求分析与架构设计
1.1 核心业务需求
- 支持上传几百兆的大型 Excel 文件
- 异步处理导入过程,不阻塞用户操作
- 对每条数据进行规则校验
- 检查数据是否已存在于数据库,避免重复导入
- 处理多用户并发上传的情况
- 生成包含每条数据导入结果(成功 / 失败及原因)的 Excel 文件
- 提供导入进度查询功能
1.2 技术挑战
- 大文件上传导致的内存溢出问题
- 长时间处理导致的接口超时问题
- 大量数据校验的性能瓶颈
- 并发场景下的数据一致性问题
- 重复数据的高效检测
- 导入结果的精准记录与反馈
1.3 整体架构设计
针对上述需求和挑战,我们设计如下架构:
架构说明:
- 前端分片上传:将大文件拆分为小分片上传,避免单次请求过大
- 后端 API 服务:接收文件分片、合并文件、创建导入任务
- 文件存储服务:存储原始 Excel 文件和生成的结果文件
- 消息队列:解耦 API 服务和异步处理服务,实现削峰填谷
- 异步处理服务:负责 Excel 解析、数据校验、去重、导入和结果生成
- 校验规则引擎:集中处理数据校验逻辑
- 数据库:存储业务数据、导入任务状态和结果信息
二、技术选型与环境配置
2.1 核心技术栈
- 后端框架:Spring Boot 3.2.0
- 文件处理:Alibaba EasyExcel 3.3.0
- 消息队列:RabbitMQ 3.13.0
- 数据库:MySQL 8.0.35
- ORM 框架:MyBatis-Plus 3.5.5
- 缓存:Redis 7.2.3(用于分布式锁和临时存储)
- API 文档:SpringDoc OpenAPI 2.2.0(Swagger 3)
- 工具类:Lombok 1.18.30、Fastjson2 2.0.32、Guava 32.1.3-jre
- 前端:Vue 3 + Element Plus(本文不展开前端实现细节)
2.2 Maven 依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.example</groupId><artifactId>large-excel-import</artifactId><version>0.0.1-SNAPSHOT</version><name>large-excel-import</name><description>Large Excel Import Solution</description><properties><java.version>17</java.version><easyexcel.version>3.3.0</easyexcel.version><mybatis-plus.version>3.5.5</mybatis-plus.version><springdoc.version>2.2.0</springdoc.version><fastjson2.version>2.0.32</fastjson2.version><guava.version>32.1.3-jre</guava.version><lombok.version>1.18.30</lombok.version></properties><dependencies><!-- Spring Boot Core --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!-- Database --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-extension</artifactId><version>${mybatis-plus.version}</version></dependency><!-- Excel Processing --><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>${easyexcel.version}</version></dependency><!-- API Documentation --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>${springdoc.version}</version></dependency><!-- Utilities --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><optional>true</optional></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version></dependency><!-- Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
2.3 核心配置文件
# application.yml
spring:profiles:active: devservlet:multipart:max-file-size: 10MBmax-request-size: 10MB# 日志配置
logging:level:root: INFOcom.example: DEBUGcom.alibaba.easyexcel: WARNpattern:console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"
# application-dev.yml
spring:datasource:url: jdbc:mysql://localhost:3306/excel_import?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverredis:host: localhostport: 6379password:database: 0timeout: 3000mslettuce:pool:max-active: 16max-idle: 8min-idle: 4rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /listener:simple:concurrency: 2max-concurrency: 8prefetch: 1template:retry:enabled: trueinitial-interval: 1000msmax-attempts: 3max-interval: 3000ms# 文件存储配置
file:storage:path: ./uploadFilestemp-path: ./tempFilesresult-path: ./resultFilesmax-size: 524288000 # 500MB# MyBatis-Plus配置
mybatis-plus:mapper-locations: classpath*:mapper/**/*.xmltype-aliases-package: com.example.excelimport.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.slf4j.Slf4jImplglobal-config:db-config:id-type: ASSIGN_IDlogic-delete-field: deletedlogic-delete-value: 1logic-not-delete-value: 0# 异步任务配置
async:executor:core-pool-size: 4max-pool-size: 16queue-capacity: 100keep-alive-seconds: 60thread-name-prefix: ExcelImport-# Swagger配置
springdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodpackages-to-scan: com.example.excelimport.controller
三、数据库设计
3.1 核心表结构
3.1.1 导入任务表(import_task)
CREATE TABLE `import_task` (`id` bigint NOT NULL COMMENT '主键ID',`task_no` varchar(64) NOT NULL COMMENT '任务编号',`file_name` varchar(255) NOT NULL COMMENT '文件名',`file_path` varchar(512) NOT NULL COMMENT '文件路径',`result_file_path` varchar(512) DEFAULT NULL COMMENT '结果文件路径',`file_size` bigint NOT NULL COMMENT '文件大小(字节)',`total_rows` int DEFAULT 0 COMMENT '总记录数',`success_rows` int DEFAULT 0 COMMENT '成功记录数',`fail_rows` int DEFAULT 0 COMMENT '失败记录数',`status` tinyint NOT NULL COMMENT '状态:0-初始化,1-处理中,2-完成,3-失败',`progress` int NOT NULL DEFAULT 0 COMMENT '进度(%)',`user_id` bigint NOT NULL COMMENT '操作人ID',`user_name` varchar(64) NOT NULL COMMENT '操作人姓名',`error_msg` varchar(1024) DEFAULT NULL COMMENT '错误信息',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_task_no` (`task_no`),KEY `idx_user_id` (`user_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入任务表';
3.1.2 业务数据表(business_data)
以一个商品信息表为例,实际业务中根据需求调整:
CREATE TABLE `business_data` (`id` bigint NOT NULL COMMENT '主键ID',`product_code` varchar(64) NOT NULL COMMENT '商品编码',`product_name` varchar(255) NOT NULL COMMENT '商品名称',`category_id` bigint NOT NULL COMMENT '分类ID',`price` decimal(10,2) NOT NULL COMMENT '价格',`stock` int NOT NULL COMMENT '库存',`status` tinyint NOT NULL DEFAULT 1 COMMENT '状态:0-禁用,1-启用',`create_user` bigint NOT NULL COMMENT '创建人',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` bigint DEFAULT NULL COMMENT '更新人',`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`deleted` tinyint NOT NULL DEFAULT 0 COMMENT '删除标识:0-未删,1-已删',PRIMARY KEY (`id`),UNIQUE KEY `uk_product_code` (`product_code`) COMMENT '商品编码唯一',KEY `idx_category_id` (`category_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
3.1.3 导入日志表(可选,用于详细追踪)
CREATE TABLE `import_log` (`id` bigint NOT NULL COMMENT '主键ID',`task_id` bigint NOT NULL COMMENT '任务ID',`row_num` int NOT NULL COMMENT '行号',`data_content` text COMMENT '数据内容',`success` tinyint NOT NULL COMMENT '是否成功:0-失败,1-成功',`message` varchar(1024) DEFAULT NULL COMMENT '提示信息',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (`id`),KEY `idx_task_id` (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入日志表';
3.2 MyBatis-Plus 实体类
3.2.1 导入任务实体
package com.example.excelimport.entity;import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;import java.time.LocalDateTime;/*** 导入任务实体类** @author ken*/
@Data
@TableName("import_task")
public class ImportTask {/*** 主键ID*/@TableId(type = IdType.ASSIGN_ID)private Long id;/*** 任务编号*/@TableField("task_no")private String taskNo;/*** 文件名*/@TableField("file_name")private String fileName;/*** 文件路径*/@TableField("file_path")private String filePath;/*** 结果文件路径*/@TableField("result_file_path")private String resultFilePath;/*** 文件大小(字节)*/@TableField("file_size")private Long fileSize;/*** 总记录数*/@TableField("total_rows")private Integer totalRows;/*** 成功记录数*/@TableField("success_rows")private Integer successRows;/*** 失败记录数*/@TableField("fail_rows")private Integer failRows;/*** 状态:0-初始化,1-处理中,2-完成,3-失败*/@TableField("status")private Integer status;/*** 进度(%)*/@TableField("progress")private Integer progress;/*** 操作人ID*/@TableField("user_id")private Long userId;/*** 操作人姓名*/@TableField("user_name")private String userName;/*** 错误信息*/@TableField("error_msg")private String errorMsg;/*** 创建时间*/@TableField(value = "create_time", fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 更新时间*/@TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE)private LocalDateTime updateTime;
}
3.2.2 商品信息实体
package com.example.excelimport.entity;import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;import java.math.BigDecimal;
import java.time.LocalDateTime;/*** 商品信息实体类** @author ken*/
@Data
@TableName("business_data")
public class BusinessData {/*** 主键ID*/@TableId(type = IdType.ASSIGN_ID)private Long id;/*** 商品编码*/@TableField("product_code")private String productCode;/*** 商品名称*/@TableField("product_name")private String productName;/*** 分类ID*/@TableField("category_id")private Long categoryId;/*** 价格*/@TableField("price")private BigDecimal price;/*** 库存*/@TableField("stock")private Integer stock;/*** 状态:0-禁用,1-启用*/@TableField("status")private Integer status;/*** 创建人*/@TableField(value = "create_user", fill = FieldFill.INSERT)private Long createUser;/*** 创建时间*/@TableField(value = "create_time", fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 更新人*/@TableField(value = "update_user", fill = FieldFill.UPDATE)private Long updateUser;/*** 更新时间*/@TableField(value = "update_time", fill = FieldFill.UPDATE)private LocalDateTime updateTime;/*** 删除标识:0-未删,1-已删*/@TableField("deleted")@TableLogicprivate Integer deleted;
}
3.2.3 Mapper 接口
package com.example.excelimport.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.excelimport.entity.ImportTask;
import org.apache.ibatis.annotations.Mapper;/*** 导入任务Mapper** @author ken*/
@Mapper
public interface ImportTaskMapper extends BaseMapper<ImportTask> {
}
package com.example.excelimport.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.entity.BusinessData;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;import java.util.List;/*** 商品数据Mapper** @author ken*/
@Mapper
public interface BusinessDataMapper extends BaseMapper<BusinessData> {/*** 批量插入商品数据** @param list 商品数据列表* @return 插入数量*/int batchInsert(@Param("list") List<BusinessData> list);/*** 根据商品编码列表查询** @param codeList 商品编码列表* @return 商品数据列表*/List<BusinessData> selectByCodes(@Param("codeList") List<String> codeList);
}
四、文件上传与任务管理
4.1 分片上传核心组件
4.1.1 文件分片 DTO
package com.example.excelimport.dto;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.web.multipart.MultipartFile;/*** 文件分片上传DTO** @author ken*/
@Data
@Schema(description = "文件分片上传参数")
public class FileChunkDTO {/*** 文件名*/@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)private String fileName;/*** 文件唯一标识*/@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)private String fileId;/*** 分片索引*/@Schema(description = "分片索引,从0开始", requiredMode = Schema.RequiredMode.REQUIRED)private Integer chunkIndex;/*** 总分片数*/@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)private Integer totalChunks;/*** 分片大小(字节)*/@Schema(description = "分片大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)private Long chunkSize;/*** 文件总大小(字节)*/@Schema(description = "文件总大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)private Long totalSize;/*** 文件分片数据*/@Schema(description = "文件分片数据", requiredMode = Schema.RequiredMode.REQUIRED)private MultipartFile chunkFile;
}
4.1.2 文件上传服务接口
package com.example.excelimport.service;import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.vo.FileUploadResultVO;/*** 文件上传服务** @author ken*/
public interface FileUploadService {/*** 上传文件分片** @param chunkDTO 分片上传参数* @param userId 用户ID* @return 上传结果*/FileUploadResultVO uploadChunk(FileChunkDTO chunkDTO, Long userId);/*** 合并文件分片** @param fileId 文件唯一标识* @param fileName 文件名* @param totalChunks 总分片数* @param userId 用户ID* @return 合并结果,包含完整文件路径*/String mergeChunks(String fileId, String fileName, Integer totalChunks, Long userId);/*** 检查分片是否已上传** @param fileId 文件唯一标识* @param chunkIndex 分片索引* @return true-已上传,false-未上传*/boolean checkChunkExists(String fileId, Integer chunkIndex);
}
4.1.3 文件上传服务实现
package com.example.excelimport.service.impl;import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.IdUtil;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** 文件上传服务实现** @author ken*/
@Slf4j
@Service
public class FileUploadServiceImpl implements FileUploadService {/*** 临时文件存储路径*/@Value("${file.storage.temp-path}")private String tempPath;/*** 正式文件存储路径*/@Value("${file.storage.path}")private String storagePath;/*** 最大文件大小*/@Value("${file.storage.max-size}")private Long maxFileSize;/*** 日期格式化器*/private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");@Overridepublic FileUploadResultVO uploadChunk(FileChunkDTO chunkDTO, Long userId) {// 参数校验StringUtils.hasText(chunkDTO.getFileId(), "文件标识不能为空");StringUtils.hasText(chunkDTO.getFileName(), "文件名不能为空");if (chunkDTO.getChunkIndex() == null || chunkDTO.getChunkIndex() < 0) {throw new IllegalArgumentException("分片索引必须大于等于0");}if (chunkDTO.getTotalChunks() == null || chunkDTO.getTotalChunks() <= 0) {throw new IllegalArgumentException("总分片数必须大于0");}if (chunkDTO.getChunkFile() == null || chunkDTO.getChunkFile().isEmpty()) {throw new IllegalArgumentException("分片文件不能为空");}if (chunkDTO.getTotalSize() == null || chunkDTO.getTotalSize() <= 0) {throw new IllegalArgumentException("文件总大小必须大于0");}if (chunkDTO.getTotalSize() > maxFileSize) {throw new IllegalArgumentException("文件大小超过限制,最大支持" + maxFileSize / (1024 * 1024) + "MB");}// 创建临时目录String chunkDir = getChunkDir(chunkDTO.getFileId());File dirFile = new File(chunkDir);if (!dirFile.exists() && !dirFile.mkdirs()) {log.error("创建分片目录失败: {}", chunkDir);throw new RuntimeException("上传失败,无法创建临时目录");}// 保存分片文件String chunkFileName = chunkDTO.getChunkIndex().toString();File chunkFile = new File(chunkDir, chunkFileName);try {chunkDTO.getChunkFile().transferTo(chunkFile);log.info("分片上传成功,fileId: {}, chunkIndex: {}, userId: {}",chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);} catch (IOException e) {log.error("分片上传失败,fileId: {}, chunkIndex: {}", chunkDTO.getFileId(), chunkDTO.getChunkIndex(), e);// 失败时删除可能的残留文件if (chunkFile.exists() && !chunkFile.delete()) {log.warn("删除失败的分片文件失败: {}", chunkFile.getAbsolutePath());}throw new RuntimeException("上传失败: " + e.getMessage());}// 检查是否所有分片都已上传boolean allUploaded = checkAllChunksUploaded(chunkDTO.getFileId(), chunkDTO.getTotalChunks());FileUploadResultVO result = new FileUploadResultVO();result.setFileId(chunkDTO.getFileId());result.setFileName(chunkDTO.getFileName());result.setChunkIndex(chunkDTO.getChunkIndex());result.setTotalChunks(chunkDTO.getTotalChunks());result.setAllUploaded(allUploaded);result.setMessage("分片上传成功");return result;}@Overridepublic String mergeChunks(String fileId, String fileName, Integer totalChunks, Long userId) {StringUtils.hasText(fileId, "文件标识不能为空");StringUtils.hasText(fileName, "文件名不能为空");if (totalChunks == null || totalChunks <= 0) {throw new IllegalArgumentException("总分片数必须大于0");}// 验证所有分片是否已上传if (!checkAllChunksUploaded(fileId, totalChunks)) {throw new RuntimeException("存在未上传的分片,无法合并");}try {// 获取文件存储目录String dateDir = DATE_FORMATTER.format(LocalDate.now());String saveDir = storagePath + File.separator + dateDir;File saveDirFile = new File(saveDir);if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {log.error("创建文件存储目录失败: {}", saveDir);throw new RuntimeException("合并失败,无法创建存储目录");}// 生成唯一文件名,保留原扩展名String ext = FileUtil.extName(fileName);String uniqueFileName = IdUtil.fastSimpleUUID() + (StringUtils.hasText(ext) ? "." + ext : "");String filePath = saveDir + File.separator + uniqueFileName;// 合并分片Path targetPath = Paths.get(filePath);String chunkDir = getChunkDir(fileId);// 按分片索引升序排列List<File> chunkFiles = new ArrayList<>();for (int i = 0; i < totalChunks; i++) {File chunkFile = new File(chunkDir, String.valueOf(i));if (!chunkFile.exists()) {throw new RuntimeException("分片文件缺失: " + i);}chunkFiles.add(chunkFile);}// 合并所有分片到目标文件for (File chunkFile : chunkFiles) {Files.write(targetPath, Files.readAllBytes(chunkFile.toPath()), StandardOpenOption.CREATE, StandardOpenOption.APPEND);}log.info("文件合并成功,fileId: {}, fileName: {}, savePath: {}, userId: {}",fileId, fileName, filePath, userId);// 清理临时分片文件deleteChunkDir(fileId);return filePath;} catch (IOException e) {log.error("文件合并失败,fileId: {}", fileId, e);throw new RuntimeException("合并失败: " + e.getMessage());}}@Overridepublic boolean checkChunkExists(String fileId, Integer chunkIndex) {StringUtils.hasText(fileId, "文件标识不能为空");if (chunkIndex == null || chunkIndex < 0) {throw new IllegalArgumentException("分片索引必须大于等于0");}String chunkDir = getChunkDir(fileId);File chunkFile = new File(chunkDir, chunkIndex.toString());return chunkFile.exists() && chunkFile.length() > 0;}/*** 获取分片存储目录*/private String getChunkDir(String fileId) {return tempPath + File.separator + "chunks" + File.separator + fileId;}/*** 检查所有分片是否已上传*/private boolean checkAllChunksUploaded(String fileId, int totalChunks) {String chunkDir = getChunkDir(fileId);File dirFile = new File(chunkDir);if (!dirFile.exists()) {return false;}// 列出所有分片文件并检查数量是否匹配File[] chunkFiles = dirFile.listFiles();if (chunkFiles == null || chunkFiles.length != totalChunks) {return false;}// 检查每个分片是否存在for (int i = 0; i < totalChunks; i++) {File chunkFile = new File(dirFile, String.valueOf(i));if (!chunkFile.exists() || chunkFile.length() == 0) {return false;}}return true;}/*** 删除分片目录及文件*/private void deleteChunkDir(String fileId) {String chunkDir = getChunkDir(fileId);File dirFile = new File(chunkDir);if (dirFile.exists()) {try {// 递归删除目录FileUtil.del(dirFile);log.info("分片目录已删除: {}", chunkDir);} catch (Exception e) {log.warn("删除分片目录失败: {}", chunkDir, e);}}}
}
4.2 导入任务管理
4.2.1 任务相关 DTO 和 VO
package com.example.excelimport.dto;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 创建导入任务DTO** @author ken*/
@Data
@Schema(description = "创建导入任务参数")
public class CreateImportTaskDTO {/*** 文件唯一标识*/@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)private String fileId;/*** 文件名*/@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)private String fileName;/*** 总分片数*/@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)private Integer totalChunks;
}
package com.example.excelimport.vo;import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.time.LocalDateTime;/*** 导入任务VO** @author ken*/
@Data
@Schema(description = "导入任务信息")
public class ImportTaskVO {/*** 任务ID*/@Schema(description = "任务ID")private Long id;/*** 任务编号*/@Schema(description = "任务编号")private String taskNo;/*** 文件名*/@Schema(description = "文件名")private String fileName;/*** 文件大小(MB)*/@Schema(description = "文件大小(MB)")private Double fileSizeMB;/*** 总记录数*/@Schema(description = "总记录数")private Integer totalRows;/*** 成功记录数*/@Schema(description = "成功记录数")private Integer successRows;/*** 失败记录数*/@Schema(description = "失败记录数")private Integer failRows;/*** 状态:0-初始化,1-处理中,2-完成,3-失败*/@Schema(description = "状态:0-初始化,1-处理中,2-完成,3-失败")private Integer status;/*** 状态描述*/@Schema(description = "状态描述")private String statusDesc;/*** 进度(%)*/@Schema(description = "进度(%)")private Integer progress;/*** 错误信息*/@Schema(description = "错误信息")private String errorMsg;/*** 结果文件下载地址*/@Schema(description = "结果文件下载地址")private String resultFileUrl;/*** 创建时间*/@Schema(description = "创建时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime;/*** 更新时间*/@Schema(description = "更新时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime updateTime;
}
4.2.2 任务服务接口
package com.example.excelimport.service;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.vo.ImportTaskVO;/*** 导入任务服务** @author ken*/
public interface ImportTaskService {/*** 创建导入任务** @param dto 创建任务参数* @param userId 用户ID* @param userName 用户名* @return 任务信息*/ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName);/*** 查询任务详情** @param taskId 任务ID* @param userId 用户ID* @return 任务详情*/ImportTaskVO getTaskDetail(Long taskId, Long userId);/*** 分页查询用户的导入任务** @param page 分页参数* @param userId 用户ID* @param status 任务状态,null表示查询所有* @return 任务分页列表*/IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status);/*** 更新任务状态** @param taskId 任务ID* @param status 新状态* @param progress 进度(%)* @param successRows 成功记录数* @param failRows 失败记录数* @param errorMsg 错误信息* @param resultFilePath 结果文件路径* @return 是否更新成功*/boolean updateTaskStatus(Long taskId, Integer status, Integer progress,Integer successRows, Integer failRows,String errorMsg, String resultFilePath);/*** 获取任务实体** @param taskId 任务ID* @return 任务实体*/ImportTask getTaskById(Long taskId);
}
4.2.3 任务服务实现
package com.example.excelimport.service.impl;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.mapper.ImportTaskMapper;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.ImportTaskVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;import java.io.File;
import java.text.MessageFormat;
import java.util.Map;
import java.util.UUID;/*** 导入任务服务实现** @author ken*/
@Slf4j
@Service
public class ImportTaskServiceImpl extends ServiceImpl<ImportTaskMapper, ImportTask> implements ImportTaskService {/*** 任务状态描述映射*/private static final Map<Integer, String> STATUS_DESC_MAP = Maps.newHashMap();static {STATUS_DESC_MAP.put(0, "初始化");STATUS_DESC_MAP.put(1, "处理中");STATUS_DESC_MAP.put(2, "完成");STATUS_DESC_MAP.put(3, "失败");}@Autowiredprivate FileUploadService fileUploadService;@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 结果文件访问路径前缀*/@Value("${server.servlet.context-path:}")private String contextPath;/*** 导入任务交换机*/@Value("${rabbitmq.exchange.import-task:import.task.exchange}")private String importTaskExchange;/*** 导入任务路由键*/@Value("${rabbitmq.routing-key.import-task:import.task.process}")private String importTaskRoutingKey;@Override@Transactional(rollbackFor = Exception.class)public ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName) {// 参数校验StringUtils.hasText(dto.getFileId(), "文件标识不能为空");StringUtils.hasText(dto.getFileName(), "文件名不能为空");if (dto.getTotalChunks() == null || dto.getTotalChunks() <= 0) {throw new IllegalArgumentException("总分片数必须大于0");}ObjectUtils.isEmpty(userId, "用户ID不能为空");StringUtils.hasText(userName, "用户名不能为空");// 合并文件分片String filePath = fileUploadService.mergeChunks(dto.getFileId(), dto.getFileName(), dto.getTotalChunks(), userId);if (!StringUtils.hasText(filePath)) {throw new RuntimeException("文件合并失败");}// 获取文件大小File file = new File(filePath);if (!file.exists() || !file.isFile()) {throw new RuntimeException("合并后的文件不存在");}long fileSize = file.length();// 创建任务记录ImportTask task = new ImportTask();task.setTaskNo(generateTaskNo());task.setFileName(dto.getFileName());task.setFilePath(filePath);task.setFileSize(fileSize);task.setStatus(0); // 初始化状态task.setProgress(0);task.setUserId(userId);task.setUserName(userName);int insert = baseMapper.insert(task);if (insert <= 0) {log.error("创建导入任务失败,userId: {}, fileName: {}", userId, dto.getFileName());throw new RuntimeException("创建导入任务失败");}log.info("创建导入任务成功,taskId: {}, taskNo: {}, fileName: {}, userId: {}",task.getId(), task.getTaskNo(), dto.getFileName(), userId);// 发送消息到队列,异步处理导入try {rabbitTemplate.convertAndSend(importTaskExchange, importTaskRoutingKey, task.getId());log.info("导入任务已发送到队列,taskId: {}", task.getId());} catch (Exception e) {log.error("发送导入任务到队列失败,taskId: {}", task.getId(), e);// 发送失败时,更新任务状态为失败task.setStatus(3);task.setErrorMsg("任务提交失败: " + e.getMessage());baseMapper.updateById(task);throw new RuntimeException("创建任务成功,但提交处理失败,请稍后重试");}return convertToVO(task);}@Overridepublic ImportTaskVO getTaskDetail(Long taskId, Long userId) {ObjectUtils.isEmpty(taskId, "任务ID不能为空");ObjectUtils.isEmpty(userId, "用户ID不能为空");ImportTask task = baseMapper.selectOne(Wrappers.<ImportTask>lambdaQuery().eq(ImportTask::getId, taskId).eq(ImportTask::getUserId, userId));if (task == null) {return null;}return convertToVO(task);}@Overridepublic IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status) {ObjectUtils.isEmpty(userId, "用户ID不能为空");IPage<ImportTask> taskPage = baseMapper.selectPage(page, Wrappers.<ImportTask>lambdaQuery().eq(ImportTask::getUserId, userId).eq(status != null, ImportTask::getStatus, status).orderByDesc(ImportTask::getCreateTime));IPage<ImportTaskVO> resultPage = new Page<>();BeanUtils.copyProperties(taskPage, resultPage);resultPage.setRecords(taskPage.getRecords().stream().map(this::convertToVO).collect(java.util.stream.Collectors.toList()));return resultPage;}@Override@Transactional(rollbackFor = Exception.class)public boolean updateTaskStatus(Long taskId, Integer status, Integer progress,Integer successRows, Integer failRows,String errorMsg, String resultFilePath) {ObjectUtils.isEmpty(taskId, "任务ID不能为空");ObjectUtils.isEmpty(status, "状态不能为空");ImportTask task = new ImportTask();task.setId(taskId);task.setStatus(status);if (progress != null) {task.setProgress(progress);}if (successRows != null) {task.setSuccessRows(successRows);}if (failRows != null) {task.setFailRows(failRows);}if (totalRows != null) {task.setTotalRows(totalRows);}task.setErrorMsg(errorMsg);task.setResultFilePath(resultFilePath);int update = baseMapper.updateById(task);return update > 0;}@Overridepublic ImportTask getTaskById(Long taskId) {if (taskId == null) {return null;}return baseMapper.selectById(taskId);}/*** 生成任务编号*/private String generateTaskNo() {return "IMPT" + System.currentTimeMillis() + UUID.randomUUID().toString().substring(0, 8).toUpperCase();}/*** 转换为VO对象*/private ImportTaskVO convertToVO(ImportTask task) {ImportTaskVO vo = new ImportTaskVO();BeanUtils.copyProperties(task, vo);// 计算文件大小(MB)if (task.getFileSize() != null) {vo.setFileSizeMB(task.getFileSize() / (1024.0 * 1024.0));}// 设置状态描述vo.setStatusDesc(STATUS_DESC_MAP.getOrDefault(task.getStatus(), "未知状态"));// 设置结果文件下载地址if (StringUtils.hasText(task.getResultFilePath())) {String relativePath = task.getResultFilePath().replace(File.separator, "/");vo.setResultFileUrl(contextPath + "/api/v1/files/download?path=" + relativePath);}return vo;}
}
4.3 控制器实现
package com.example.excelimport.controller;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import com.example.excelimport.vo.ImportTaskVO;
import com.example.excelimport.vo.ResponseVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;/*** 文件导入控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/v1/import")
@Tag(name = "文件导入接口", description = "大文件分片上传及异步导入处理")
public class ImportController {@Autowiredprivate FileUploadService fileUploadService;@Autowiredprivate ImportTaskService importTaskService;@Operation(summary = "上传文件分片", description = "将大文件拆分为分片上传")@PostMapping("/file/chunk")public ResponseVO<FileUploadResultVO> uploadChunk(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "文件分片信息", required = true) @ModelAttribute FileChunkDTO chunkDTO) {log.info("接收文件分片上传请求,fileId: {}, chunkIndex: {}, userId: {}",chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);FileUploadResultVO result = fileUploadService.uploadChunk(chunkDTO, userId);return ResponseVO.success(result);}@Operation(summary = "检查分片是否已上传", description = "用于断点续传时检查分片状态")@GetMapping("/file/chunk/exists")public ResponseVO<Boolean> checkChunkExists(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "文件唯一标识", required = true) @RequestParam String fileId,@Parameter(description = "分片索引", required = true) @RequestParam Integer chunkIndex) {log.info("检查分片是否已上传,fileId: {}, chunkIndex: {}, userId: {}", fileId, chunkIndex, userId);boolean exists = fileUploadService.checkChunkExists(fileId, chunkIndex);return ResponseVO.success(exists);}@Operation(summary = "创建导入任务", description = "合并文件分片并创建导入任务")@PostMapping("/task")public ResponseVO<ImportTaskVO> createImportTask(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "用户名", required = true) @RequestHeader("X-User-Name") String userName,@Parameter(description = "创建任务参数", required = true) @RequestBody CreateImportTaskDTO dto) {log.info("创建导入任务,fileId: {}, fileName: {}, userId: {}", dto.getFileId(), dto.getFileName(), userId);ImportTaskVO taskVO = importTaskService.createTask(dto, userId, userName);return ResponseVO.success(taskVO);}@Operation(summary = "查询任务详情", description = "获取导入任务的详细信息和进度")@GetMapping("/task/{taskId}")public ResponseVO<ImportTaskVO> getTaskDetail(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "任务ID", required = true) @PathVariable Long taskId) {log.info("查询任务详情,taskId: {}, userId: {}", taskId, userId);ImportTaskVO taskVO = importTaskService.getTaskDetail(taskId, userId);return ResponseVO.success(taskVO);}@Operation(summary = "查询用户导入任务列表", description = "分页查询当前用户的导入任务")@GetMapping("/tasks")public ResponseVO<IPage<ImportTaskVO>> queryUserTasks(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "页码,从1开始") @RequestParam(defaultValue = "1") Integer pageNum,@Parameter(description = "每页条数") @RequestParam(defaultValue = "10") Integer pageSize,@Parameter(description = "任务状态:0-初始化,1-处理中,2-完成,3-失败") @RequestParam(required = false) Integer status) {log.info("查询用户导入任务列表,userId: {}, pageNum: {}, pageSize: {}, status: {}",userId, pageNum, pageSize, status);Page<ImportTask> page = new Page<>(pageNum, pageSize);IPage<ImportTaskVO> taskPage = importTaskService.queryUserTasks(page, userId, status);return ResponseVO.success(taskPage);}@Operation(summary = "下载导入结果文件", description = "下载包含导入结果的Excel文件")@GetMapping("/file/result")public void downloadResultFile(@Parameter(description = "用户ID", required = true) @RequestHeader("X-User-Id") Long userId,@Parameter(description = "任务ID", required = true) @RequestParam Long taskId,HttpServletResponse response) throws IOException {log.info("下载导入结果文件,taskId: {}, userId: {}", taskId, userId);// 获取任务信息ImportTask task = importTaskService.getTaskById(taskId);if (task == null) {response.sendError(HttpServletResponse.SC_NOT_FOUND, "任务不存在");return;}// 验证权限if (!task.getUserId().equals(userId)) {response.sendError(HttpServletResponse.SC_FORBIDDEN, "没有权限下载该文件");return;}// 检查结果文件是否存在String resultFilePath = task.getResultFilePath();if (!org.springframework.util.StringUtils.hasText(resultFilePath)) {response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");return;}File file = new File(resultFilePath);if (!file.exists() || !file.isFile()) {response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");return;}// 设置响应头response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");String fileName = "导入结果_" + task.getFileName();String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8.name());response.setHeader("Content-Disposition", "attachment; filename*=UTF-8''" + encodedFileName);response.setContentLengthLong(file.length());// 写入文件内容try (FileInputStream fis = new FileInputStream(file);OutputStream os = response.getOutputStream()) {byte[] buffer = new byte[1024 * 1024]; // 1MB缓冲区int len;while ((len = fis.read(buffer)) != -1) {os.write(buffer, 0, len);}os.flush();} catch (IOException e) {log.error("下载结果文件失败,taskId: {}", taskId, e);throw e;}}
}
五、异步处理与数据导入
5.1 消息队列配置
package com.example.excelimport.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ配置** @author ken*/
@Configuration
public class RabbitMQConfig {/*** 导入任务交换机*/@Value("${rabbitmq.exchange.import-task:import.task.exchange}")private String importTaskExchange;/*** 导入任务队列*/@Value("${rabbitmq.queue.import-task:import.task.queue}")private String importTaskQueue;/*** 导入任务路由键*/@Value("${rabbitmq.routing-key.import-task:import.task.process}")private String importTaskRoutingKey;/*** 死信交换机*/@Value("${rabbitmq.exchange.dlq:import.dlq.exchange}")private String dlqExchange;/*** 死信队列*/@Value("${rabbitmq.queue.dlq:import.dlq.queue}")private String dlqQueue;/*** 死信路由键*/@Value("${rabbitmq.routing-key.dlq:import.dlq.routing}")private String dlqRoutingKey;/*** 死信交换机*/@Beanpublic DirectExchange dlqExchange() {return new DirectExchange(dlqExchange, true, false);}/*** 死信队列*/@Beanpublic Queue dlqQueue() {return QueueBuilder.durable(dlqQueue).build();}/*** 死信队列绑定*/@Beanpublic Binding dlqBinding() {return BindingBuilder.bind(dlqQueue()).to(dlqExchange()).with(dlqRoutingKey);}/*** 导入任务交换机*/@Beanpublic DirectExchange importTaskExchange() {return new DirectExchange(importTaskExchange, true, false);}/*** 导入任务队列* 设置死信队列,用于处理失败的任务*/@Beanpublic Queue importTaskQueue() {Map<String, Object> arguments = new HashMap<>(3);// 绑定死信交换机arguments.put("x-dead-letter-exchange", dlqExchange);// 绑定死信路由键arguments.put("x-dead-letter-routing-key", dlqRoutingKey);// 消息过期时间 30分钟arguments.put("x-message-ttl", 30 * 60 * 1000);return QueueBuilder.durable(importTaskQueue).withArguments(arguments).build();}/*** 导入任务队列绑定*/@Beanpublic Binding importTaskBinding() {return BindingBuilder.bind(importTaskQueue()).to(importTaskExchange()).with(importTaskRoutingKey);}
}
5.2 Excel 数据模型与解析
5.2.1 导入数据模型
package com.example.excelimport.excel;import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import com.alibaba.excel.annotation.write.style.ContentRowHeight;
import com.alibaba.excel.annotation.write.style.HeadRowHeight;
import lombok.Data;import java.math.BigDecimal;/*** 商品导入Excel模型** @author ken*/
@Data
@HeadRowHeight(20)
@ContentRowHeight(18)
@ColumnWidth(20)
public class ProductImportModel {@ExcelProperty(value = "商品编码", index = 0)private String productCode;@ExcelProperty(value = "商品名称", index = 1)private String productName;@ExcelProperty(value = "分类ID", index = 2)private Long categoryId;@ExcelProperty(value = "价格", index = 3)private BigDecimal price;@ExcelProperty(value = "库存", index = 4)private Integer stock;@ExcelProperty(value = "状态(0-禁用 1-启用)", index = 5)private Integer status;/*** 导入结果状态:成功/失败*/@ExcelProperty(value = "导入结果", index = 6)private String importStatus;/*** 导入失败原因*/@ExcelProperty(value = "失败原因", index = 7)private String errorMsg;
}
5.2.2 Excel 读取监听器
package com.example.excelimport.excel;import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.alibaba.excel.util.ListUtils;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;/*** 商品导入Excel监听器** @author ken*/
@Slf4j
public class ProductImportListener implements ReadListener<ProductImportModel> {/*** 批处理阈值*/private static final int BATCH_COUNT = 1000;/*** 缓存的数据列表*/private List<ProductImportModel> cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);/*** 所有数据列表(用于生成结果文件)*/private List<ProductImportModel> allDataList = Lists.newArrayList();/*** 当前行号(从1开始,包含表头)*/private AtomicInteger rowNum = new AtomicInteger(0);/*** 任务ID*/private final Long taskId;/*** 处理用户ID*/private final Long userId;/*** 业务数据服务*/private final BusinessDataService businessDataService;/*** 导入任务服务*/private final ImportTaskService importTaskService;/*** 构造函数*/public ProductImportListener(Long taskId, Long userId,BusinessDataService businessDataService,ImportTaskService importTaskService) {this.taskId = taskId;this.userId = userId;this.businessDataService = businessDataService;this.importTaskService = importTaskService;}/*** 每解析一行数据都会调用此方法*/@Overridepublic void invoke(ProductImportModel data, AnalysisContext context) {// 行号递增(表头行也会被计数,所以需要过滤)int currentRowNum = rowNum.incrementAndGet();// 跳过表头行if (currentRowNum == 1) {return;}// 记录原始行号(展示给用户时从1开始)data.setRowNum(currentRowNum - 1);// 添加到缓存列表cachedDataList.add(data);allDataList.add(data);// 达到批处理阈值时进行处理if (cachedDataList.size() >= BATCH_COUNT) {processBatchData();// 清空缓存cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);// 更新进度updateTaskProgress();}}/*** 解析完成后调用此方法*/@Overridepublic void doAfterAllAnalysed(AnalysisContext context) {// 处理剩余数据if (!CollectionUtils.isEmpty(cachedDataList)) {processBatchData();}log.info("Excel解析完成,taskId: {}, 总记录数: {}", taskId, allDataList.size());// 更新总记录数importTaskService.updateTaskStatus(taskId, null, null, null, null, null, null, allDataList.size());}/*** 处理批量数据*/private void processBatchData() {log.info("开始处理批量数据,taskId: {}, 数量: {}", taskId, cachedDataList.size());try {// 1. 数据校验businessDataService.validateData(cachedDataList);// 2. 检查重复数据businessDataService.checkDuplicateData(cachedDataList);// 3. 过滤有效数据List<BusinessData> validDataList = businessDataService.filterValidData(cachedDataList, userId);// 4. 批量导入数据if (!CollectionUtils.isEmpty(validDataList)) {businessDataService.batchImportData(validDataList);}} catch (Exception e) {log.error("处理批量数据失败,taskId: {}", taskId, e);// 标记该批次数据为失败cachedDataList.forEach(data -> {if (data.getImportStatus() == null) {data.setImportStatus("失败");data.setErrorMsg("系统处理错误: " + e.getMessage());}});}}/*** 更新任务进度*/private void updateTaskProgress() {try {// 计算总记录数(预估)int totalCount = allDataList.size() + (cachedDataList.size() > 0 ? BATCH_COUNT : 0);if (totalCount == 0) {return;}// 计算进度百分比int progress = Math.min(90, (allDataList.size() * 100) / totalCount);importTaskService.updateTaskStatus(taskId, 1, progress, null, null, null, null, null);} catch (Exception e) {log.error("更新任务进度失败,taskId: {}", taskId, e);}}/*** 获取所有数据列表*/public List<ProductImportModel> getAllDataList() {return allDataList;}
}
5.3 数据校验与去重服务
5.3.1 业务服务接口
package com.example.excelimport.service;import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;import java.util.List;/*** 业务数据服务** @author ken*/
public interface BusinessDataService {/*** 数据校验** @param dataList 导入数据列表*/void validateData(List<ProductImportModel> dataList);/*** 检查重复数据(包括列表内部重复和与数据库重复)** @param dataList 导入数据列表*/void checkDuplicateData(List<ProductImportModel> dataList);/*** 过滤有效数据** @param dataList 导入数据列表* @param userId 用户ID* @return 有效业务数据列表*/List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId);/*** 批量导入数据** @param dataList 业务数据列表* @return 导入成功数量*/int batchImportData(List<BusinessData> dataList);/*** 根据编码列表查询数据** @param codeList 编码列表* @return 业务数据列表*/List<BusinessData> getByCodes(List<String> codeList);
}
5.3.2 业务服务实现
package com.example.excelimport.service.impl;import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.mapper.BusinessDataMapper;
import com.example.excelimport.service.BusinessDataService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** 业务数据服务实现** @author ken*/
@Slf4j
@Service
public class BusinessDataServiceImpl implements BusinessDataService {/*** Redis分布式锁前缀*/private static final String LOCK_PREFIX = "import:lock:product:";/*** 分布式锁过期时间(秒)*/private static final int LOCK_EXPIRE_SECONDS = 60;@Autowiredprivate BusinessDataMapper businessDataMapper;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void validateData(List<ProductImportModel> dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}for (ProductImportModel data : dataList) {// 重置之前的校验结果data.setImportStatus(null);data.setErrorMsg(null);List<String> errors = Lists.newArrayList();// 商品编码校验if (!StringUtils.hasText(data.getProductCode())) {errors.add("商品编码不能为空");} else if (data.getProductCode().length() > 64) {errors.add("商品编码长度不能超过64个字符");}// 商品名称校验if (!StringUtils.hasText(data.getProductName())) {errors.add("商品名称不能为空");} else if (data.getProductName().length() > 255) {errors.add("商品名称长度不能超过255个字符");}// 分类ID校验if (data.getCategoryId() == null || data.getCategoryId() <= 0) {errors.add("分类ID必须大于0");}// 价格校验if (data.getPrice() == null) {errors.add("价格不能为空");} else if (data.getPrice().compareTo(BigDecimal.ZERO) <= 0) {errors.add("价格必须大于0");} else if (data.getPrice().scale() > 2) {errors.add("价格最多保留两位小数");}// 库存校验if (data.getStock() == null) {errors.add("库存不能为空");} else if (data.getStock() < 0) {errors.add("库存不能为负数");}// 状态校验if (data.getStatus() == null) {errors.add("状态不能为空");} else if (data.getStatus() != 0 && data.getStatus() != 1) {errors.add("状态必须为0(禁用)或1(启用)");}// 设置校验结果if (!errors.isEmpty()) {data.setImportStatus("失败");data.setErrorMsg(String.join(";", errors));}}}@Overridepublic void checkDuplicateData(List<ProductImportModel> dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}// 1. 过滤已校验失败的数据List<ProductImportModel> validDataList = dataList.stream().filter(data -> data.getImportStatus() == null).collect(Collectors.toList());if (CollectionUtils.isEmpty(validDataList)) {return;}// 2. 检查列表内部重复Multimap<String, ProductImportModel> codeMap = HashMultimap.create();for (ProductImportModel data : validDataList) {codeMap.put(data.getProductCode(), data);}// 标记重复数据for (Map.Entry<String, ProductImportModel> entry : codeMap.entries()) {String code = entry.getKey();ProductImportModel data = entry.getValue();// 如果编码对应的记录数大于1,则表示有重复if (codeMap.get(code).size() > 1) {data.setImportStatus("失败");data.setErrorMsg("商品编码在导入文件中重复");}}// 3. 过滤掉内部重复的数据,检查数据库中是否已存在List<ProductImportModel> uniqueDataList = validDataList.stream().filter(data -> data.getImportStatus() == null).collect(Collectors.toList());if (CollectionUtils.isEmpty(uniqueDataList)) {return;}// 4. 批量查询数据库中已存在的编码List<String> codeList = uniqueDataList.stream().map(ProductImportModel::getProductCode).collect(Collectors.toList());List<BusinessData> existDataList = businessDataMapper.selectByCodes(codeList);if (!CollectionUtils.isEmpty(existDataList)) {Set<String> existCodeSet = existDataList.stream().map(BusinessData::getProductCode).collect(Collectors.toSet());// 标记数据库中已存在的数据for (ProductImportModel data : uniqueDataList) {if (existCodeSet.contains(data.getProductCode())) {data.setImportStatus("失败");data.setErrorMsg("商品编码在系统中已存在");}}}}@Overridepublic List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId) {if (CollectionUtils.isEmpty(dataList)) {return Lists.newArrayList();}// 过滤出校验通过的数据List<ProductImportModel> validDataList = dataList.stream().filter(data -> data.getImportStatus() == null).collect(Collectors.toList());if (CollectionUtils.isEmpty(validDataList)) {return Lists.newArrayList();}// 转换为业务实体List<BusinessData> businessDataList = Lists.newArrayListWithExpectedSize(validDataList.size());for (ProductImportModel data : validDataList) {BusinessData businessData = new BusinessData();BeanUtils.copyProperties(data, businessData);businessData.setCreateUser(userId);businessData.setCreateTime(LocalDateTime.now());businessDataList.add(businessData);}return businessDataList;}@Override@Transactional(rollbackFor = Exception.class)public int batchImportData(List<BusinessData> dataList) {if (CollectionUtils.isEmpty(dataList)) {return 0;}log.info("开始批量导入数据,数量: {}", dataList.size());// 1. 并发控制:使用分布式锁确保数据唯一性List<BusinessData> finalDataList = Lists.newArrayList();for (BusinessData data : dataList) {String lockKey = LOCK_PREFIX + data.getProductCode();Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);if (Boolean.TRUE.equals(locked)) {try {// 再次检查数据库,确保没有并发插入BusinessData existData = businessDataMapper.selectOne(Wrappers.<BusinessData>lambdaQuery().eq(BusinessData::getProductCode, data.getProductCode()).eq(BusinessData::getDeleted, 0));if (existData == null) {finalDataList.add(data);}} finally {// 释放锁redisTemplate.delete(lockKey);}} else {log.warn("获取分布式锁失败,可能有并发操作,productCode: {}", data.getProductCode());}}if (CollectionUtils.isEmpty(finalDataList)) {log.info("没有可导入的数据(可能被并发操作拦截)");return 0;}// 2. 批量插入数据int insertCount = businessDataMapper.batchInsert(finalDataList);log.info("批量导入完成,计划导入: {}, 实际导入: {}", finalDataList.size(), insertCount);return insertCount;}@Overridepublic List<BusinessData> getByCodes(List<String> codeList) {if (CollectionUtils.isEmpty(codeList)) {return Lists.newArrayList();}return businessDataMapper.selectByCodes(codeList);}
}
5.4 消息消费者(异步处理服务)
package com.example.excelimport.consumer;import com.alibaba.excel.EasyExcel;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.excel.ProductImportListener;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.io.File;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;/*** 导入任务消费者** @author ken*/
@Slf4j
@Component
public class ImportTaskConsumer {@Autowiredprivate ImportTaskService importTaskService;@Autowiredprivate BusinessDataService businessDataService;/*** 结果文件存储路径*/@Value("${file.storage.result-path}")private String resultPath;/*** 日期格式化器*/private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");/*** 处理导入任务*/@RabbitListener(queues = "${rabbitmq.queue.import-task:import.task.queue}")public void processImportTask(Long taskId) {log.info("开始处理导入任务,taskId: {}", taskId);if (taskId == null) {log.error("导入任务ID为空,跳过处理");return;}ImportTask task = null;try {// 1. 获取任务信息task = importTaskService.getTaskById(taskId);if (task == null) {log.error("导入任务不存在,taskId: {}", taskId);return;}log.info("开始处理文件导入,taskId: {}, fileName: {}, filePath: {}",taskId, task.getFileName(), task.getFilePath());// 2. 更新任务状态为处理中importTaskService.updateTaskStatus(taskId, 1, 5, 0, 0, null, null, 0);// 3. 验证文件是否存在String filePath = task.getFilePath();if (!StringUtils.hasText(filePath)) {throw new RuntimeException("文件路径为空");}File file = new File(filePath);if (!file.exists() || !file.isFile()) {throw new RuntimeException("文件不存在或不是有效文件: " + filePath);}// 4. 创建监听器并读取ExcelProductImportListener listener = new ProductImportListener(taskId, task.getUserId(), businessDataService, importTaskService);EasyExcel.read(file, ProductImportModel.class, listener).sheet().doRead();// 5. 获取所有数据并统计结果List<ProductImportModel> allDataList = listener.getAllDataList();int totalCount = allDataList.size();int successCount = (int) allDataList.stream().filter(data -> "成功".equals(data.getImportStatus())).count();int failCount = totalCount - successCount;// 6. 生成结果文件String resultFilePath = generateResultFile(task, allDataList);// 7. 更新任务状态为完成importTaskService.updateTaskStatus(taskId, 2, 100, successCount, failCount, null, resultFilePath, totalCount);log.info("导入任务处理完成,taskId: {}, 总记录数: {}, 成功: {}, 失败: {}",taskId, totalCount, successCount, failCount);} catch (Exception e) {log.error("导入任务处理失败,taskId: {}", taskId, e);// 更新任务状态为失败String errorMsg = e.getMessage();if (errorMsg != null && errorMsg.length() > 1000) {errorMsg = errorMsg.substring(0, 1000) + "...";}if (task != null) {importTaskService.updateTaskStatus(taskId, 3, null, null, null, errorMsg, null, null);}}}/*** 处理死信队列中的失败任务*/@RabbitListener(queues = "${rabbitmq.queue.dlq:import.dlq.queue}")public void processFailedTask(Long taskId) {log.warn("接收到死信队列中的失败任务,taskId: {}", taskId);// 这里可以做一些告警或者重试处理// 例如:记录到失败任务表,通知管理员处理ImportTask task = importTaskService.getTaskById(taskId);if (task != null) {log.warn("失败任务详情:taskNo: {}, fileName: {}, status: {}",task.getTaskNo(), task.getFileName(), task.getStatus());// TODO: 发送告警通知}}/*** 生成结果文件*/private String generateResultFile(ImportTask task, List<ProductImportModel> dataList) {try {// 创建结果文件存储目录String dateDir = DATE_FORMATTER.format(LocalDate.now());String saveDir = resultPath + File.separator + dateDir;File saveDirFile = new File(saveDir);if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {log.error("创建结果文件目录失败: {}", saveDir);throw new RuntimeException("生成结果文件失败,无法创建存储目录");}// 生成结果文件名String originalFileName = task.getFileName();String ext = originalFileName.contains(".") ? originalFileName.substring(originalFileName.lastIndexOf(".")) : ".xlsx";String resultFileName = "result_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8) + ext;String resultFilePath = saveDir + File.separator + resultFileName;// 写入结果文件EasyExcel.write(resultFilePath, ProductImportModel.class).sheet("导入结果").doWrite(dataList);log.info("结果文件生成成功,taskId: {}, filePath: {}", task.getId(), resultFilePath);return resultFilePath;} catch (Exception e) {log.error("生成结果文件失败,taskId: {}", task.getId(), e);throw new RuntimeException("生成结果文件失败: " + e.getMessage());}}
}
六、并发控制与性能优化
6.1 分布式锁实现
在多用户并发上传的场景下,需要确保数据的唯一性,避免重复导入。我们使用 Redis 实现分布式锁来控制并发访问:
package com.example.excelimport.util;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** Redis分布式锁实现** @author ken*/
@Slf4j
@Component
public class RedisDistributedLock implements Lock {private final RedisTemplate<String, Object> redisTemplate;/*** 锁的默认过期时间(毫秒)*/private static final long DEFAULT_EXPIRE_MILLIS = 30000;/*** 锁的键*/private final String lockKey;/*** 锁的持有者标识*/private final String lockValue;/*** 锁的过期时间(毫秒)*/private final long expireMillis;/*** 构造函数*/public RedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {this(redisTemplate, lockKey, UUID.randomUUID().toString(), DEFAULT_EXPIRE_MILLIS);}/*** 构造函数*/public RedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey, String lockValue, long expireMillis) {this.redisTemplate = redisTemplate;this.lockKey = lockKey;this.lockValue = lockValue;this.expireMillis = expireMillis;}@Overridepublic void lock() {// 循环获取锁,直到成功while (!tryLock()) {// 短暂休眠,避免过度消耗CPUtry {Thread.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}}}@Overridepublic void lockInterruptibly() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}// 循环获取锁,直到成功或被中断while (!tryLock()) {Thread.sleep(50);if (Thread.interrupted()) {throw new InterruptedException();}}}@Overridepublic boolean tryLock() {return tryLock(expireMillis, TimeUnit.MILLISECONDS);}@Overridepublic boolean tryLock(long time, TimeUnit unit) {long millisToWait = unit.toMillis(time);long start = System.currentTimeMillis();// 循环获取锁,直到成功或超时while (true) {// 尝试获取锁Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireMillis, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {log.debug("获取分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);return true;}// 检查是否超时if (System.currentTimeMillis() - start > millisToWait) {log.debug("获取分布式锁超时,lockKey: {}, lockValue: {}", lockKey, lockValue);return false;}// 短暂休眠,避免过度消耗CPUtry {Thread.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}@Overridepublic void unlock() {// 使用Lua脚本保证删除操作的原子性String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";script = script.replace("ARGV[1]", "'" + lockValue + "'");Long result = (Long) redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockKey));if (result != null && result > 0) {log.debug("释放分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);} else {log.warn("释放分布式锁失败,可能锁已过期或被其他线程持有,lockKey: {}, lockValue: {}", lockKey, lockValue);}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("RedisDistributedLock不支持Condition");}/*** 获取锁的键*/public String getLockKey() {return lockKey;}/*** 获取锁的持有者标识*/public String getLockValue() {return lockValue;}
}
6.2 性能优化策略
- 分批次处理:将大量数据分成小批次处理,避免内存溢出
- 异步处理:使用消息队列和异步任务,避免阻塞主线程
- 缓存热点数据:将频繁访问的数据(如分类信息)缓存到 Redis
- 批量操作:使用 MyBatis-Plus 的批量插入功能,减少数据库交互
- 索引优化:为商品编码等查询条件建立唯一索引
- 文件读写优化:使用缓冲流读写文件,设置合理的缓冲区大小
- 并发控制:使用分布式锁控制并发写入,避免数据冲突
- JVM 优化:根据实际情况调整 JVM 参数,尤其是堆内存大小
package com.example.excelimport.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 异步任务配置** @author ken*/
@Configuration
@EnableAsync
public class AsyncConfig {/*** 核心线程数*/@Value("${async.executor.core-pool-size:4}")private int corePoolSize;/*** 最大线程数*/@Value("${async.executor.max-pool-size:16}")private int maxPoolSize;/*** 队列容量*/@Value("${async.executor.queue-capacity:100}")private int queueCapacity;/*** 线程存活时间(秒)*/@Value("${async.executor.keep-alive-seconds:60}")private int keepAliveSeconds;/*** 线程名称前缀*/@Value("${async.executor.thread-name-prefix:ExcelImport-}")private String threadNamePrefix;/*** 异步任务执行器*/@Bean(name = "asyncExecutor")public Executor asyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(corePoolSize);// 最大线程数executor.setMaxPoolSize(maxPoolSize);// 队列容量executor.setQueueCapacity(queueCapacity);// 线程存活时间executor.setKeepAliveSeconds(keepAliveSeconds);// 线程名称前缀executor.setThreadNamePrefix(threadNamePrefix);// 拒绝策略:当线程池和队列都满了,由提交任务的线程执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}
七、总结与扩展
本文详细介绍了一个大 Excel 文件异步上传和导入的完整解决方案,从前端分片上传到后端异步处理,再到数据校验、并发控制和结果生成,涵盖了整个流程的关键技术点。
该方案的核心优势在于:
- 高性能:通过分片上传、异步处理、批量操作等技术,支持处理几百兆的大型 Excel 文件
- 高可靠性:使用消息队列解耦,结合死信队列处理失败任务,确保数据不丢失
- 数据一致性:通过分布式锁和数据库唯一索引,有效防止并发导入导致的数据重复
- 良好的用户体验:异步处理不阻塞用户操作,提供进度查询和详细的结果反馈
可扩展方向
- 断点续传优化:结合文件哈希校验,实现更高效的断点续传
- 分布式部署:将文件存储迁移到分布式文件系统(如 MinIO),支持多节点部署
- 任务优先级:为不同用户或业务场景设置任务优先级
- 监控告警:增加详细的监控指标和告警机制,及时发现和处理问题
- 导入模板管理:支持多种导入模板和动态配置校验规则
- 大数据量优化:对于超大规模数据(千万级以上),可考虑使用 Spark 等大数据处理框架
通过本文提供的方案和代码实现,开发者可以快速构建一个稳定、高效的大 Excel 导入功能,满足企业级应用的需求。同时,也可以根据实际业务场景进行定制和扩展,进一步提升系统的性能和可靠性。