minio大文件断点续传
1. 功能要点概述
将大文件按分片直传至 MinIO,支持断点续传、进度可查、校验合并与随时取消。
功能展开:
- 分片与断点续传
- 客户端按固定
chunkSize
切片;每片独立上传,失败可重传 - 通过
uploadId + partNumber
唯一定位分片;已传分片可直接跳过
- 客户端按固定
- 直传 MinIO
- 服务端生成预签名 URL;浏览器使用 HTTP PUT 将分片直接上传到 MinIO
- 后端仅负责发号、校验、合并与进度管理
- 状态与进度管理
- 以 Redis 记录任务元数据(
uploadId/totalChunks/status
)与已传分片集合 - 查询接口返回已完成分片数与百分比,用于前端 UI 渲染
- 以 Redis 记录任务元数据(
- 完整性校验与合并
- 合并前通过 MinIO
listMultipart
校验分片数量、连续性与 ETag 合法性 - 校验通过后触发
mergeMultipartUpload
生成最终对象
- 合并前通过 MinIO
- 失败重试与幂等
- 前端对单片设置重试(指数退避);服务端以“是否已存在分片”保障幂等
- 支持随时取消任务并清理残留分片/进度
2. 功能时间序列流程图
说明:
- 泳道分为 客户端浏览器 前端JS 控制器 服务层 MinIO 五个层面 展示全链路交互
- 初始化阶段 获取
uploadId
并在 Redis 存储任务与分片元数据 - 上传阶段 每片生成预签名 URL 通过 HTTP PUT 直传到 MinIO 成功后更新 Redis
- 进度阶段 通过 Redis 统计已传分片并返回给前端渲染
- 完成阶段 校验分片完整性与连续性 后向 MinIO 发送合并分片请求
- 取消阶段 请求 MinIO 取消上传并清理 Redis 记录
3. 关键代码与详细解释
3.1 初始化断点续传上传
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("初始化断点续传上传")
@PostMapping("/initResumableUpload")
public ResponseUtils initResumableUpload(@RequestParam String fileName,@RequestParam Long fileSize,@RequestParam(required = false, defaultValue = "5242880") Long chunkSize,@RequestParam String bucketName) {try {ResumableUploadDto uploadDto = minioUtil.initResumableUpload(fileName, fileSize, chunkSize, bucketName);uploadProgressService.saveUploadProgress(uploadDto);List<Integer> existingChunks = minioUtil.getExistingChunks(uploadDto.getUploadId(), fileName, bucketName);uploadDto.setUploadedChunks(existingChunks);for (Integer chunkNumber : existingChunks) {uploadProgressService.updateUploadedChunk(uploadDto.getUploadId(), chunkNumber);}return ResponseUtils.success(uploadDto);} catch (Exception e) {return ResponseUtils.error("初始化断点续传上传失败: " + e.getMessage());}
}
要点:
- 计算
totalChunks
并生成uploadId
保存到 Redis - 返回当前任务的
uploadId
以及已存在的分片编号 用于断点续传续点
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public ResumableUploadDto initResumableUpload(String fileName, Long fileSize, Long chunkSize, String bucketName) throws Exception {createBucket(bucketName);String uploadId = getUploadId(fileName, bucketName);int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);ResumableUploadDto uploadDto = new ResumableUploadDto();uploadDto.setUploadId(uploadId);uploadDto.setFileName(fileName);uploadDto.setBucketName(bucketName);uploadDto.setFileSize(fileSize);uploadDto.setChunkSize(chunkSize);uploadDto.setTotalChunks(totalChunks);uploadDto.setStatus("INIT");uploadDto.setCreateTime(System.currentTimeMillis());uploadDto.setUpdateTime(System.currentTimeMillis());return uploadDto;
}
解释:
getUploadId
内部通过 MinIOinitMultiPartUpload
获取上传任务 IDtotalChunks
用于前端 UI 与后续完整性校验
3.2 分片上传与断点续传
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("上传分片")
@PostMapping("/uploadChunk")
public ResponseUtils uploadChunk(@RequestParam String uploadId,@RequestParam Integer chunkNumber,@RequestParam MultipartFile chunk,@RequestParam String fileName,@RequestParam String bucketName) {try {if (uploadProgressService.isChunkUploaded(uploadId, chunkNumber)) {return ResponseUtils.success("分片已存在,跳过上传");}ChunkUploadDto dto = new ChunkUploadDto();dto.setUploadId(uploadId);dto.setChunkNumber(chunkNumber);dto.setChunk(chunk);dto.setFileName(fileName);dto.setBucketName(bucketName);boolean success = minioUtil.uploadChunk(dto);if (success) {uploadProgressService.updateUploadedChunk(uploadId, chunkNumber);return ResponseUtils.success("分片上传成功");} else {return ResponseUtils.error("分片上传失败");}} catch (Exception e) {return ResponseUtils.error("分片上传失败: " + e.getMessage());}
}
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public boolean uploadChunk(ChunkUploadDto chunkUploadDto) throws Exception {CloseableHttpClient httpClient = null;try {byte[] chunkData = toByteArray(chunkUploadDto.getChunk().getInputStream());Map<String, String> reqParams = new HashMap<>();reqParams.put("uploadId", chunkUploadDto.getUploadId());reqParams.put("partNumber", String.valueOf(chunkUploadDto.getChunkNumber()));String uploadUrl = getPresignedObjectUrl(chunkUploadDto.getFileName(), reqParams, chunkUploadDto.getBucketName());httpClient = HttpClients.createDefault();HttpPut httpPut = new HttpPut(uploadUrl);httpPut.setHeader("Content-Type", "application/octet-stream");httpPut.setEntity(new ByteArrayEntity(chunkData));CloseableHttpResponse response = httpClient.execute(httpPut);int statusCode = response.getStatusLine().getStatusCode();if (statusCode >= 200 && statusCode < 300) {String etag = response.getFirstHeader("ETag") != null ? response.getFirstHeader("ETag").getValue() : null;if (etag != null) { etag = etag.replace("\"", ""); }return true;} else {return false;}} finally {if (httpClient != null) { try { httpClient.close(); } catch (IOException ignore) {} }}
}
解释:
- 分片上传直接 PUT 到 MinIO 对应对象的分段地址 上行链路不经由服务端转存
- 使用
uploadId
与partNumber
绑定具体分片 - 成功后在 Redis 标记该分片完成 便于断点续传跳过
3.3 进度查询
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("获取上传进度")
@GetMapping("/getUploadProgress")
public ResponseUtils getUploadProgress(@RequestParam String uploadId) {try {UploadProgressDto progress = uploadProgressService.calculateProgress(uploadId);if (progress == null) { return ResponseUtils.error("未找到上传进度信息"); }return ResponseUtils.success(progress);} catch (Exception e) {return ResponseUtils.error("获取上传进度失败: " + e.getMessage());}
}
说明:
calculateProgress
统计已上传分片数量 与totalChunks
计算百分比 并返回前端
3.4 完成上传与合并
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("完成断点续传上传")
@PostMapping("/completeResumableUpload")
public ResponseUtils completeResumableUpload(@RequestParam String uploadId,@RequestParam String fileName,@RequestParam String bucketName) {try {uploadProgressService.updateUploadStatus(uploadId, "UPLOADING");ResumableUploadDto uploadDto = uploadProgressService.getUploadProgress(uploadId);if (uploadDto == null) { return ResponseUtils.error("未找到上传进度信息"); }boolean isValid = minioUtil.validateChunks(uploadId, fileName, bucketName, uploadDto.getTotalChunks());if (!isValid) { uploadProgressService.updateUploadStatus(uploadId, "FAILED");return ResponseUtils.error("分片验证失败,请重新上传"); }ObjectWriteResponse result = minioUtil.completeResumableUpload(uploadId, fileName, bucketName);uploadProgressService.updateUploadStatus(uploadId, "COMPLETED");JSONObject response = new JSONObject();response.put("objectName", result.object());response.put("bucketName", result.bucket());response.put("uploadId", uploadId);return ResponseUtils.success(response);} catch (Exception e) {uploadProgressService.updateUploadStatus(uploadId, "FAILED");return ResponseUtils.error("完成断点续传上传失败: " + e.getMessage());}
}
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public boolean validateChunks(String uploadId, String fileName, String bucketName, int expectedChunks) throws Exception {ListPartsResponse partResult = minioClient.listMultipart(bucketName, null, fileName, 1000, 0, uploadId, null, null);List<Part> partList = partResult.result().partList();if (partList.size() != expectedChunks) { return false; }List<Part> sortedParts = new ArrayList<>(partList);sortedParts.sort((p1, p2) -> Integer.compare(p1.partNumber(), p2.partNumber()));for (int i = 0; i < sortedParts.size(); i++) {Part part = sortedParts.get(i);if (part.partNumber() != i + 1) { return false; }if (part.etag() == null || part.etag().isEmpty()) { return false; }}return true;
}public ObjectWriteResponse completeResumableUpload(String uploadId, String fileName, String bucketName) throws Exception {return mergeMultipartUpload(fileName, uploadId, bucketName);
}
解释:
- 完成前强制校验数量 连续性 以及 ETag 的存在性 保证合并正确
- 调用 MinIO merge 完成分段拼接 返回最终对象信息
3.5 取消上传
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public void cancelResumableUpload(String uploadId, String fileName, String bucketName) throws Exception {HashMultimap<String, String> headers = HashMultimap.create();minioClient.cancelMultipartUpload(bucketName, null, fileName, uploadId, headers, null);
}
解释:
- 调用 MinIO 的取消接口并清理服务端 Redis 侧的任务记录 即可释放未完成的上传资源
3.6 前端断点续传关键代码
// File: src/main/resources/static/resumable-upload.html (片段)
let currentFile = null;
let uploadId = null;
let chunkSize = 5 * 1024 * 1024; // 5MB
let totalChunks = 0;
let uploadedChunks = new Set(); // 已上传分片集合
let isUploading = false;
let isPaused = false;// 1. 文件选择与分片计算
function selectFile() {const fileInput = document.getElementById('fileInput');if (fileInput.files.length > 0) {currentFile = fileInput.files[0];totalChunks = Math.ceil(currentFile.size / chunkSize);showMessage(`已选择文件: ${currentFile.name} (${formatFileSize(currentFile.size)})`, 'success');showMessage(`总分片数: ${totalChunks}`, 'success');document.getElementById('startBtn').disabled = false;renderChunks(); // 渲染分片UI}
}// 2. 初始化上传
async function startUpload() {if (!currentFile) {showMessage('请先选择文件', 'error');return;}try {const response = await fetch('/file/initResumableUpload', {method: 'POST',headers: { 'Content-Type': 'application/x-www-form-urlencoded' },body: new URLSearchParams({fileName: currentFile.name,fileSize: currentFile.size,chunkSize: chunkSize,bucketName: 'test-bucket'})});const result = await response.json();if (result.code === "0000") {uploadId = result.data.uploadId;// 断点续传:设置已上传分片if (result.data.uploadedChunks) {result.data.uploadedChunks.forEach(chunkNum => {uploadedChunks.add(chunkNum);});}showMessage('上传初始化成功', 'success');await uploadChunks(); // 开始上传分片}} catch (error) {showMessage('上传初始化失败: ' + error.message, 'error');}
}// 3. 分片上传核心逻辑
async function uploadChunks() {isUploading = true;isPaused = false;document.getElementById('statusText').textContent = '正在上传...';for (let i = 1; i <= totalChunks; i++) {// 暂停检查if (isPaused) {document.getElementById('statusText').textContent = '已暂停';return;}// 断点续传:跳过已上传分片if (uploadedChunks.has(i)) {updateChunkStatus(i, 'uploaded');continue;}// 重试机制let retryCount = 0;const maxRetries = 3;let success = false;while (retryCount < maxRetries && !success) {try {updateChunkStatus(i, 'uploading');// 文件切片const start = (i - 1) * chunkSize;const end = Math.min(start + chunkSize, currentFile.size);const chunk = currentFile.slice(start, end);// 构建表单数据const formData = new FormData();formData.append('uploadId', uploadId);formData.append('chunkNumber', i);formData.append('chunk', chunk);formData.append('fileName', currentFile.name);formData.append('bucketName', 'test-bucket');// 上传分片const response = await fetch('/file/uploadChunk', {method: 'POST',body: formData});const result = await response.json();if (result.code === "0000") {uploadedChunks.add(i);updateChunkStatus(i, 'uploaded');updateProgress();success = true;showMessage(`分片 ${i} 上传成功`, 'success');} else {throw new Error(result.msg || '上传失败');}} catch (error) {retryCount++;if (retryCount < maxRetries) {showMessage(`分片 ${i} 上传失败,正在重试 (${retryCount}/${maxRetries}): ${error.message}`, 'error');// 指数退避延迟await new Promise(resolve => setTimeout(resolve, 1000 * retryCount));} else {updateChunkStatus(i, 'failed');showMessage(`分片 ${i} 上传失败,已达到最大重试次数: ${error.message}`, 'error');}}}}// 检查是否全部完成if (uploadedChunks.size === totalChunks) {await completeUpload();} else {showMessage(`上传完成,但存在失败的分片。成功: ${uploadedChunks.size}/${totalChunks}`, 'error');}
}// 4. 完成上传
async function completeUpload() {try {const response = await fetch('/file/completeResumableUpload', {method: 'POST',headers: { 'Content-Type': 'application/x-www-form-urlencoded' },body: new URLSearchParams({uploadId: uploadId,fileName: currentFile.name,bucketName: 'test-bucket'})});const result = await response.json();if (result.code === "0000") {showMessage('文件上传完成!', 'success');document.getElementById('statusText').textContent = '上传完成';} else {showMessage('完成上传失败: ' + result.msg, 'error');}} catch (error) {showMessage('完成上传失败: ' + error.message, 'error');}
}// 5. 暂停与恢复
function pauseUpload() {isPaused = true;isUploading = false;document.getElementById('statusText').textContent = '已暂停';document.getElementById('pauseBtn').disabled = true;document.getElementById('resumeBtn').disabled = false;
}async function resumeUpload() {document.getElementById('resumeBtn').disabled = true;await uploadChunks(); // 从断点继续
}// 6. 取消上传
function cancelUpload() {if (uploadId) {fetch('/file/cancelResumableUpload', {method: 'POST',headers: { 'Content-Type': 'application/x-www-form-urlencoded' },body: new URLSearchParams({uploadId: uploadId,fileName: currentFile.name,bucketName: 'test-bucket'})});}resetUpload();showMessage('上传已取消', 'error');
}
前端关键点说明:
- 文件切片:使用
File.slice(start, end)
按固定大小切分文件,最后一片可能小于chunkSize
- 断点续传:通过
uploadedChunks
Set 记录已成功分片,初始化时从服务端获取已存在分片列表 - 重试机制:每片失败后最多重试 3 次,采用指数退避延迟(1s、2s、3s)
- 状态管理:通过
isUploading
、isPaused
控制上传流程,支持暂停/恢复/取消 - 进度更新:实时更新分片状态 UI 与整体进度百分比
- 错误处理:区分网络错误、服务端错误,提供用户友好的错误提示