大文件断点续传
文章目录
- 1 什么是断点续传
- 2 分块与合并测试
- 3 大文件上传流程
- 4 minio合并文件测试
- 5 媒资服务搭建
- 1_数据库模型设计
- 2_接口定义
- 3_DAO开发
- 4_上传分块开发
- 5_合并分块开发
- 6 其他问题
1 什么是断点续传
HTTP 协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
什么是断点续传(引用百度百科):
断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
断点续传流程如下图:
流程如下:
- 前端上传前先把文件分成块
- 一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
- 各分块上传完成最后在服务端合并文件
2 分块与合并测试
为了更好的理解文件分块上传的原理,下边用 java 代码测试文件的分块与合并。
文件分块的流程如下:
- 获取源文件长度
- 根据设定的分块文件的大小计算出块数
- 从源文件读数据依次向每一个块文件写数据。
测试代码如下:
public void testChunk() throws IOException {
File sourceFile = new File("d:/develop/bigfile_test/nacos.mp4");
String chunkPath = "d:/develop/bigfile_test/chunk/";
File chunkFolder = new File(chunkPath);
if (!chunkFolder.exists()) {
boolean isMkdir = chunkFolder.mkdirs();
}
//分块大小
long chunkSize = 1024 * 1024;
//分块数量
long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
System.out.println("分块总数:" + chunkNum);
//缓冲区大小
byte[] b = new byte[1024];
//使用RandomAccessFile访问文件
RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");
//分块
for (int i = 0; i < chunkNum; i++) {
//创建分块文件
File file = new File(chunkPath + i);
if (file.exists()) {
boolean isDelete = file.delete();
}
boolean newFile = file.createNewFile();
if (newFile) {
//向分块文件中写数据
RandomAccessFile raf_write = new RandomAccessFile(file, "rw");
int len = -1;
while ((len = raf_read.read(b)) != -1) {
raf_write.write(b, 0, len);
if (file.length() >= chunkSize) {
break;
}
}
raf_write.close();
System.out.println("完成分块" + i);
}
}
raf_read.close();
}
文件合并流程:
- 找到要合并的文件并按文件合并的先后进行排序。
- 创建合并文件
- 依次从合并的文件中读取数据向合并文件写入数
文件合并的测试代码 :
public void testMerge() throws IOException {
//块文件目录
File chunkFolder = new File("d:/develop/bigfile_test/chunk/");
//原始文件
File originalFile = new File("d:/develop/bigfile_test/nacos.mp4");
//合并文件
File mergeFile = new File("d:/develop/bigfile_test/nacos01.mp4");
if (mergeFile.exists()) {
boolean isDelete = mergeFile.delete();
}
//创建新的合并文件
boolean isCreated = mergeFile.createNewFile();
if (!isCreated) {
return;
}
//用于写文件
RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");
//指针指向文件顶端
raf_write.seek(0);
//缓冲区
byte[] b = new byte[1024];
//分块列表
File[] fileArray = chunkFolder.listFiles();
if (fileArray == null || fileArray.length == 0) {
return;
}
// 转成集合,便于排序
List<File> fileList = Arrays.asList(fileArray);
// 从小到大排序
fileList.sort(new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
}
});
//合并文件
for (File chunkFile : fileList) {
RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");
int len = -1;
while ((len = raf_read.read(b)) != -1) {
raf_write.write(b, 0, len);
}
raf_read.close();
}
raf_write.close();
//校验文件
try (
FileInputStream fileInputStream = new FileInputStream(originalFile);
FileInputStream mergeFileStream = new FileInputStream(mergeFile);
) {
//取出原始文件的md5
String originalMd5 = DigestUtils.md5Hex(fileInputStream);
//取出合并文件的md5进行比较
String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);
if (originalMd5.equals(mergeFileMd5)) {
System.out.println("合并文件成功");
} else {
System.out.println("合并文件失败");
}
}
}
3 大文件上传流程
下图是上传大文件的整体流程:
上传视频的整体流程说明:
- 前端对文件进行分块。
- 前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。
- 如果分块文件不存在则前端开始上传
- 前端请求媒资服务上传分块。
- 媒资服务将分块上传至MinIO。
- 前端将分块上传完毕请求媒资服务合并分块。
- 媒资服务判断分块上传完成则请求MinIO合并文件。
- 合并完成校验合并后的文件是否完整,如果不完整则删除文件。
4 minio合并文件测试
1、将分块文件上传至minio
//将分块文件上传至minio
public void uploadChunk() {
String chunkFolderPath = "D:\\develop\\upload\\chunk\\";
File chunkFolder = new File(chunkFolderPath);
//分块文件
File[] files = chunkFolder.listFiles();
if (files == null || files.length == 0){
return;
}
//将分块文件上传至minio
for (int i = 0; i < files.length; i++) {
try {
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("test").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传分块成功" + i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、通过minio的合并文件
//合并文件,要求分块文件最小5M, 否则 java.lang.IllegalArgumentException: source testbucket/chunk/0: size 1048576 must be greater than 5242880
public void test_merge() throws Exception {
List<ComposeSource> sources = Stream.iterate(0, i -> ++i)
.limit(6)
.map(i -> ComposeSource.builder()
.bucket("test")
.object("chunk/".concat(Integer.toString(i)))
.build())
.collect(Collectors.toList());
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("test").object("merge01.mp4").sources(sources).build();
minioClient.composeObject(composeObjectArgs);
}
3、清除minio中的分块文件
public void test_removeObjects() {
//合并分块完成将分块文件清除
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(6)
.map(i -> new DeleteObject("chunk/".concat(Integer.toString(i))))
.collect(Collectors.toList());
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("test").objects(deleteObjects).build();
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
results.forEach(r -> {
DeleteError deleteError = null;
try {
deleteError = r.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
5 媒资服务搭建
搭建媒资服务,创建通用文件管理工程,应该将大文件和普通文件进行区分,这里仅仅描述大文件的开发过程。
1_数据库模型设计
数据库表结构如下,参考表结构设计实体对象
create table media_files
(
id bigint not null comment '主键,与文件id一致'
primary key,
company_id bigint null comment '机构ID',
company_name varchar(255) null comment '机构名称',
file_name varchar(255) null comment '文件名称',
file_type varchar(12) null comment '文件类型(图片、文档、视频)',
tags varchar(120) null comment '标签(便于文件分类、搜索)',
bucket varchar(255) null comment '存储目录',
file_path varchar(512) null comment '文件存储路径',
file_id varchar(120) not null comment '文件Id,唯一标识,md5值',
url varchar(1024) null comment '资源文件访问地址',
username varchar(60) null comment '上传人',
create_date datetime null comment '上传时间',
change_date datetime null comment '修改时间',
status varchar(12) null comment '状态(1:正常,0:不展示,审核未通过或更新中)',
remark varchar(32) null comment '备注',
audit_status varchar(12) null comment '审核状态',
audit_mind varchar(255) null comment '审核意见',
file_size varchar(12) null comment '文件大小',
constraint media_files_pk
unique (file_id)
) engine = InnoDB default charset = utf8mb4 comment '文件信息表';
2_接口定义
根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}
否则返回{code:-1}
。
定义接口如下:
package org.duration.media.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.duration.media.model.RestResponse;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/check/file")
public RestResponse<Boolean> checkFile(@RequestParam("fileMd5") String fileMd5) throws Exception {
return null;
}
@ApiOperation(value = "分块文件上传前的检测")
@PostMapping("/upload/check/chunk")
public RestResponse<Boolean> checkChunk(@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
return null;
}
@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/upload/chunk")
public RestResponse<Boolean> uploadChunk(@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
return null;
}
@ApiOperation(value = "合并文件")
@PostMapping("/upload/merge/chunks")
public RestResponse<Boolean> mergeChunks(@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal) throws Exception {
return null;
}
}
3_DAO开发
向媒资数据库的文件表插入记录,使用自动生成的Mapper接口或MP即可满足要求。
4_上传分块开发
4.1 检查文件和分块
接口完成进行接口实现,首先实现检查文件方法和检查分块方法。
在IMediaFileService中定义service接口如下:
/**
* 检查文件是否存在
* @param fileMd5 文件的md5
* @return false不存在,true存在
*/
RestResponse<Boolean> checkFile(String fileMd5);
/**
* 检查分块是否存在
* @param fileMd5 文件的md5
* @param chunkIndex 分块序号
* @return false不存在,true存在
*/
RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);
检查文件是否存在接口的实现方法:
public RestResponse<Boolean> checkFile(String fileMd5) {
//查询文件信息
MediaFiles mediaFiles = this.selectByFileId(fileMd5);
if (mediaFiles != null) {
//桶
String bucket = mediaFiles.getBucket();
//存储目录
String filePath = mediaFiles.getFilePath();
//文件流
InputStream stream = null;
try {
stream = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucket)
.object(filePath)
.build());
if (stream != null) {
//文件已存在
return RestResponse.success(true);
}
} catch (Exception e) {
log.error("error: ", e);
}
}
//文件不存在
return RestResponse.success(false);
}
//根据md5值查询文件
private MediaFiles selectByFileId(String fileMd5) {
return lambdaQuery().eq(MediaFiles::getFileId, fileMd5).one();
}
检查分块是否存在接口实现
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
//得到分块文件目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunkIndex;
//文件流
InputStream fileInputStream;
try {
fileInputStream = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucket_big) // 大文件和普通文件使用不同的桶存储
.object(chunkFilePath)
.build());
if (fileInputStream != null) {
//分块已存在
return RestResponse.success(true);
}
} catch (Exception e) {
log.error("error: ", e);
}
//分块未存在
return RestResponse.success(false);
}
//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
// MD5的前两位作为子路径 /a/5/md5
return fileMd5.charAt(0) + "/" + fileMd5.charAt(1) + "/" + fileMd5 + "/" + "chunk" + "/";
}
4.2 上传分块
定义service接口
/**
* 上传分块
* @param fileMd5 文件的md5
* @param chunk 分块序号
* @param localChunkFilePath 分块文件本地路径
* @return 上传失败/成功消息 true->成功
*/
RestResponse<Boolean> uploadChunk(String fileMd5, int chunk, String localChunkFilePath);
接口实现:
public RestResponse<Boolean> uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunk;
String mimeType = getMimeType(null);
try {
//将文件存储至minIO
boolean isSuccess = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_big, chunkFilePath);
return RestResponse.success(isSuccess);
} catch (Exception ex) {
log.error("error: ", ex);
log.debug("上传分块文件:{},失败:{}", chunkFilePath, ex.getMessage());
}
return RestResponse.validFail(false, "上传分块失败");
}
//根据扩展名获取mimeType
private String getMimeType(String extension) {
if (extension == null) {
extension = "";
}
//根据扩展名取出mimeType
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流
if (extensionMatch != null) {
mimeType = extensionMatch.getMimeType();
}
return mimeType;
}
//上传文件到Minio中
public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {
try {
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket(bucket)//桶
.filename(localFilePath)//指定本地文件路径
.object(objectName)//对象名 放在子目录下
.contentType(mimeType)//设置媒体文件类型
.build();
//上传文件
minioClient.uploadObject(uploadObjectArgs);
log.debug("上传文件到minio成功,bucket:{},objectName:{}", bucket, objectName);
return true;
} catch (Exception e) {
log.error("error: ", e);
log.error("上传文件出错,bucket:{},objectName:{},错误信息:{}", bucket, objectName, e.getMessage());
}
return false;
}
完善接口层(其他接口直接调用Service即可,只有上传接口中含有部分逻辑):
@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/upload/chunk")
public RestResponse<Boolean> uploadChunk(@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) throws Exception {
//创建临时文件(使用临时文件而不是使用流,方便调用其他工具类——云存储、Utils)
File tempFile = File.createTempFile("minio", "temp");
//上传的文件拷贝到临时文件
file.transferTo(tempFile);
//文件路径
String absolutePath = tempFile.getAbsolutePath();
return mediaFileService.uploadChunk(fileMd5, chunk, absolutePath);
}
启动前端工程,进入上传文件界面进行前后端联调测试,也可以选择使用swagger文档。
前端对文件分块的大小为5MB,SpringBoot web默认上传文件的大小限制为1MB,这里需要在media-api工程修改配置如下:
spring:
servlet:
multipart:
max-file-size: 50MB
max-request-size: 50MB
max-file-size:单个文件的大小限制
Max-request-size:单次请求的大小限制
5_合并分块开发
定义service接口
/**
* 合并分块
*
* @param companyId 机构id
* @param fileMd5 文件md5
* @param chunkTotal 分块总和
* @param uploadFileParamsDto 文件信息
* @return 是否合并成功
*/
RestResponse<Boolean> mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto);
/**
* 将文件信息添加到文件表
*
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket 桶
* @param objectName 对象名称
* @return 媒资对象
*/
MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName);
service接口实现:
@Override
public RestResponse<Boolean> mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
//=====获取分块文件路径=====
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//组成将分块文件路径组成 List<ComposeSource>
List<ComposeSource> sourceObjectList = Stream.iterate(0, i -> ++i)
.limit(chunkTotal)
.map(i -> ComposeSource.builder()
.bucket(bucket_big)
.object(chunkFileFolderPath.concat(Integer.toString(i)))
.build())
.collect(Collectors.toList());
//=====合并=====
//文件名称
String fileName = uploadFileParamsDto.getFileName();
//文件扩展名
String extName = fileName.substring(fileName.lastIndexOf("."));
//合并文件路径
String mergeFilePath = getFilePathByMd5(fileMd5, extName);
try {
//合并文件
ObjectWriteResponse response = minioClient.composeObject(ComposeObjectArgs.builder()
.bucket(bucket_big)
.object(mergeFilePath)
.sources(sourceObjectList)
.build());
log.debug("合并文件成功:{}", mergeFilePath);
} catch (Exception e) {
log.debug("合并文件失败,fileMd5:{},异常:{}", fileMd5, e.getMessage(), e);
return RestResponse.validFail(false, "合并文件失败。");
}
// ====验证md5====
File minioFile = downloadFileFromMinIO(bucket_big, mergeFilePath);
if (minioFile == null) {
log.debug("下载合并后文件失败,mergeFilePath:{}", mergeFilePath);
return RestResponse.validFail(false, "下载合并后文件失败。");
}
try (InputStream newFileInputStream = new FileInputStream(minioFile)) {
//minio上文件的md5值
String md5Hex = DigestUtils.md5Hex(newFileInputStream);
//比较md5值,不一致则说明文件不完整
if (!fileMd5.equals(md5Hex)) {
return RestResponse.validFail(false, "文件合并校验失败,最终上传失败。");
}
//文件大小
uploadFileParamsDto.setFileSize(minioFile.length());
} catch (Exception e) {
log.debug("校验文件失败,fileMd5:{},异常:{}", fileMd5, e.getMessage(), e);
return RestResponse.validFail(false, "文件合并校验失败,最终上传失败。");
} finally {
boolean isDeleted = minioFile.delete();
}
//文件入库
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_big, mergeFilePath);
//=====清除分块文件=====
clearChunkFiles(chunkFileFolderPath, chunkTotal);
return RestResponse.success(true);
}
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {
//将文件信息保存到数据库
MediaFiles mediaFiles = this.selectByFileId(fileMd5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
//机构id
mediaFiles.setCompanyId(companyId);
//桶
mediaFiles.setBucket(bucket);
//file_path
mediaFiles.setFilePath(objectName);
//file_id
mediaFiles.setFileId(fileMd5);
//url
mediaFiles.setUrl("/" + bucket + "/" + objectName);
//上传时间
mediaFiles.setCreateDate(LocalDateTime.now());
//状态
mediaFiles.setStatus("1");
//审核状态
mediaFiles.setAuditStatus("002003");
//插入数据库
int insert = baseMapper.insert(mediaFiles);
if (insert <= 0) {
log.debug("向数据库保存文件失败,bucket:{},objectName:{}", bucket, objectName);
return null;
}
return mediaFiles;
}
return mediaFiles;
}
/**
* 从minio下载文件
*
* @param bucket 桶
* @param objectName 对象名称
* @return 下载后的文件
*/
public File downloadFileFromMinIO(String bucket, String objectName) {
//临时文件
File minioFile = null;
FileOutputStream outputStream = null;
try {
InputStream stream = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.build());
//创建临时文件
minioFile = File.createTempFile("minio", ".merge");
outputStream = new FileOutputStream(minioFile);
IOUtils.copy(stream, outputStream);
return minioFile;
} catch (Exception e) {
log.error("error: ", e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.error("error: ", e);
}
}
}
return null;
}
/**
* 得到合并后的文件的地址
*
* @param fileMd5 文件id即md5值
* @param fileExt 文件扩展名
* @return 合并后的文件的地址
*/
private String getFilePathByMd5(String fileMd5, String fileExt) {
return fileMd5.charAt(0) + "/" + fileMd5.charAt(1) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}
/**
* 清除分块文件
*
* @param chunkFileFolderPath 分块文件路径
* @param chunkTotal 分块文件总数
*/
private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {
try {
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(chunkTotal)
.map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i))))
.collect(Collectors.toList());
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
results.forEach(r -> {
DeleteError deleteError = null;
try {
deleteError = r.get();
} catch (Exception e) {
log.error("清楚分块文件失败,objectName:{}", deleteError.objectName(), e);
}
});
} catch (Exception e) {
log.error("清楚分块文件失败,chunkFileFolderPath:{}", chunkFileFolderPath, e);
}
}
接口层完善
@ApiOperation(value = "合并文件")
@PostMapping("/upload/merge/chunks")
public RestResponse<Boolean> mergeChunks(@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal) throws Exception {
Long companyId = 1232141425L;
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
uploadFileParamsDto.setFileType("001002");
uploadFileParamsDto.setTags("大型视频");
uploadFileParamsDto.setRemark("");
uploadFileParamsDto.setFileName(fileName);
return mediaFileService.mergeChunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);
}
上传文件进行测试,并且进行前后端联调:
-
上传一个视频测试合并分块的执行逻辑,进入service方法逐行跟踪。
-
断点续传测试:上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块
6 其他问题
分块文件清理问题
上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?
1、在数据库中有一张文件表记录minio中存储的文件信息。
2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。
3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。