当前位置: 首页 > news >正文

各大网站新闻热门软件排行榜

各大网站新闻,热门软件排行榜,织梦自定义表单做网站在线留言,wordpress 商店插件上传视频 需求分析 教学机构人员进入媒资管理列表查询自己上传的媒资文件。 点击“媒资管理” 进入媒资管理列表页面查询本机构上传的媒资文件。 教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。 点击“上传视频”打开上传页面 选择要上传的文件…

上传视频

需求分析

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件。

点击“媒资管理”

进入媒资管理列表页面查询本机构上传的媒资文件。

  1. 教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。

点击“上传视频”打开上传页面

  1. 选择要上传的文件,自动执行文件上传。

  1. 视频上传成功会自动处理,处理完成可以预览视频。

断点续传

概念介绍

需求背景

通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。

什么是断点续传

引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。

断点续传流程如下图:

流程如下:

1、前端上传前先把文件分成块

2、一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传

3、各分块上传完成最后在服务端合并文件

分块与合并原理

为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。

文件分块的流程如下:

1、获取源文件长度

2、根据设定的分块文件的大小计算出块数

3、从源文件读数据依次向每一个块文件写数据。

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件处理测试* @date 2022/9/13 9:21*/public class BigFileTest {//测试文件分块方法@Testpublic void testChunk() throws IOException {File sourceFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4"); // 原始文件String chunkPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\"; // 分块文件存储目录File chunkFolder = new File(chunkPath);if (!chunkFolder.exists()) {chunkFolder.mkdirs();}//分块大小long chunkSize = 1024 * 1024 * 1;//分块数量long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);System.out.println("分块总数:"+chunkNum);//缓冲区大小byte[] b = new byte[1024];//使用RandomAccessFile访问文件(变化流), "r"表示读取流, "rw"表示写入流RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");//分块for (int i = 0; i < chunkNum; i++) {//创建分块文件File file = new File(chunkPath + i);if(file.exists()){file.delete();}boolean newFile = file.createNewFile();if (newFile) {//向分块文件中写数据RandomAccessFile raf_write = new RandomAccessFile(file, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);if (file.length() >= chunkSize) {break;}}raf_write.close();System.out.println("完成分块"+i);}}raf_read.close();}
}

执行结果: 目标文件被分片的储存在文件夹中

]文件合并流程:

1、找到要合并的文件并按文件合并的先后进行排序。

2、创建合并文件

3、依次从合并的文件中读取数据向合并文件写入数

package com.xuecheng.media;
/*** @author Mr.M* @version 1.0* @description 大文件处理测试* @date 2022/9/13 9:21*/public class BigFileTest {//测试文件合并方法@Testpublic void testMerge() throws IOException {//块文件目录File chunkFolder = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\");//原始文件File originalFile = new File("C:\\Users\\Lenovo\\Desktop\\学成在线项目—视频\\day06\\Day6-01.上传视频-什么是断点续传.mp4");//合并文件File mergeFile = new File("D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\Day6-01.上传视频-什么是断点续传.mp4");if (mergeFile.exists()) {mergeFile.delete();}//创建新的合并文件mergeFile.createNewFile();//用于写文件RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");//指针指向文件顶端raf_write.seek(0);//缓冲区byte[] b = new byte[1024];//分块列表File[] fileArray = chunkFolder.listFiles();// 转成集合,便于排序List<File> fileList = Arrays.asList(fileArray);// 从小到大排序Collections.sort(fileList, new Comparator<File>() {@Overridepublic int compare(File o1, File o2) {return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());}});//开始合并文件for (File chunkFile : fileList) {RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "rw");int len = -1;while ((len = raf_read.read(b)) != -1) {raf_write.write(b, 0, len);}raf_read.close();}raf_write.close();//校验文件try (FileInputStream fileInputStream = new FileInputStream(originalFile);FileInputStream mergeFileStream = new FileInputStream(mergeFile);) {//取出原始文件的md5String originalMd5 = DigestUtils.md5Hex(fileInputStream);//取出合并文件的md5进行比较String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);if (originalMd5.equals(mergeFileMd5)) {System.out.println("合并文件成功");} else {System.out.println("合并文件失败");}}}
}

执行结果: 分片文件被合并为正常文件

视频上传流程

下图是上传视频的整体流程:

1、前端对文件进行分块。

2、前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。

3、如果分块文件不存在则前端开始上传

4、前端请求媒资服务上传分块。

5、媒资服务将分块上传至MinIO。

6、前端将分块上传完毕请求媒资服务合并分块。

7、媒资服务判断分块上传完成则请求MinIO合并文件。

8、合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。

minio合并文件测试

1、将分块文件上传至minio, minio限制每个分片文件不小于5M

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();// 将分块文件上传至minio@Testpublic void uploadChunk() {String chunkFolderPath = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\minio\\chunk\\";File chunkFolder = new File(chunkFolderPath);//分块文件File[] files = chunkFolder.listFiles();//将分块文件上传至miniofor (int i = 0; i < files.length; i++) {try {UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder().bucket("testbucket").object("chunk/" + i).filename(files[i].getAbsolutePath()).build();minioClient.uploadObject(uploadObjectArgs);System.out.println("上传分块成功" + i);} catch (Exception e) {e.printStackTrace();}}}}

2、通过minio的合并文件

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//合并文件,要求分块文件最小5M@Testpublic void test_merge() throws Exception {// 分块文件集合(传统方式)
//        List<ComposeSource> sources = new ArrayList<>();
//        for (int i = 0; i < 10; i++) {
//            // 构建文件信息
//            ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build();
//            sources.add(composeSource);
//        }// 分块文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(10).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/".concat(Integer.toString(i))).build()).collect(Collectors.toList());// 指定合并后的文件名// 通过sources指定源文件ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket("testbucket").object("01.上传视频-什么是断点续传.mp4").sources(sources).build();// 合并文件minioClient.composeObject(composeObjectArgs);}
}

3、分块文件使用后就没用了, 清除分块文件

/*** @description 测试MinIO*/
public class MinioTest {static MinioClient minioClient =MinioClient.builder().endpoint("http://192.168.101.65:9000").credentials("minioadmin", "minioadmin").build();//清除分块文件@Testpublic void test_removeObjects() {//合并分块完成将分块文件清除List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(10).map(i -> new DeleteObject("chunk/".concat(Integer.toString(i)))).collect(Collectors.toList());//构建参数RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("testbucket").objects(deleteObjects).build();//执行删除Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();}});}}

接口定义

根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}否则返回{code:-1}

从课程资料中拷贝RestResponse.java类到base工程下的model包下。

/*** @author Mr.M* @version 1.0* @description 通用结果类型* @date 2022/9/13 14:44*/@Data
@ToString
public class RestResponse<T> {/*** 响应编码,0为正常,-1错误*/private int code;/*** 响应提示信息*/private String msg;/*** 响应内容*/private T result;public RestResponse() {this(0, "success");}public RestResponse(int code, String msg) {this.code = code;this.msg = msg;}/*** 错误信息的封装** @param msg* @param <T>* @return*/public static <T> RestResponse<T> validfail(String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setMsg(msg);return response;}public static <T> RestResponse<T> validfail(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setCode(-1);response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常响应数据(包含响应内容)** @return RestResponse Rest服务封装相应数据*/public static <T> RestResponse<T> success(T result) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);return response;}public static <T> RestResponse<T> success(T result, String msg) {RestResponse<T> response = new RestResponse<T>();response.setResult(result);response.setMsg(msg);return response;}/*** 添加正常响应数据(不包含响应内容)** @return RestResponse Rest服务封装相应数据*/public static <T> RestResponse<T> success() {return new RestResponse<T>();}public Boolean isSuccessful() {return this.code == 0;}}

定义接口如下:

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return null;}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

service开发

校验方法

首先实现检查文件方法和检查分块方法, 在MediaFileService中定义service接口如下

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 检查文件是否存在* @param fileMd5 文件的md5* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:38*/public RestResponse<Boolean> checkFile(String fileMd5);/*** @description 检查分块是否存在* @param fileMd5  文件的md5* @param chunkIndex  分块序号* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在* @author Mr.M* @date 2022/9/13 15:39*/public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);}

service接口实现方法

package com.xuecheng.media.api;import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.service.MediaFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {return null;}@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {return null;}}

在接口中调用service提供的检查文件方法和检查分块方法

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "文件上传前检查文件")@PostMapping("/upload/checkfile")public RestResponse<Boolean> checkfile(@RequestParam("fileMd5") String fileMd5) throws Exception {return mediaFileService.checkFile(fileMd5);}@ApiOperation(value = "分块文件上传前的检测")@PostMapping("/upload/checkchunk")public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);return booleanRestResponse;}
}
上传方法

定义service接口

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 上传分块* @param fileMd5  文件md5* @param chunk  分块序号* @param localChunkFilePath  分块文件本地路径* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:50*/public RestResponse uploadChunk(String fileMd5,int chunk,String localChunkFilePath);}

接口实现:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//视频文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;/*** @param localFilePath 文件地址* @param bucket        桶* @param objectName    对象名称* @return void* @description 将文件写入minIO* @author Mr.M* @date 2022/10/12 21:22*/public boolean addMediaFilesToMinIO(String localFilePath, String mimeType, String bucket, String objectName) {try {// 构建文件参数UploadObjectArgs testbucket = UploadObjectArgs.builder().bucket(bucket).object(objectName).filename(localFilePath).contentType(mimeType).build();// 执行上传操作minioClient.uploadObject(testbucket);log.debug("上传文件到minio成功,bucket:{},objectName:{}", bucket, objectName);System.out.println("上传成功");return true;} catch (Exception e) {e.printStackTrace();log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}", bucket, objectName, e.getMessage(), e);XueChengPlusException.cast("上传文件到文件系统失败");}return false;}// 根据文件扩展名取出mimeTypeprivate String getMimeType(String extension) {if (extension == null)extension = "";//根据扩展名取出mimeTypeContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);//通用mimeType,字节流String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;if (extensionMatch != null) {mimeType = extensionMatch.getMimeType();}return mimeType;}//得到分块文件的目录private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}@Overridepublic RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {//得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);//得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;//mimeTypeString mimeType = getMimeType(null);//将文件存储至minIOboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath);if (!b) {log.debug("上传分块文件失败:{}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功:{}",chunkFilePath);return RestResponse.success(true);}}

完善接口

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "上传分块文件")@PostMapping("/upload/uploadchunk")public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,@RequestParam("fileMd5") String fileMd5,@RequestParam("chunk") int chunk) throws Exception {// 创建一个临时文件File tempFile = File.createTempFile("minio", "temp");file.transferTo(tempFile);// 获取文件路径String localFilePath = tempFile.getAbsolutePath();RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);return restResponse;}}

接口测试

  1. 更新前端文件
  • 将uploadtools.ts文件覆盖前端工程src/utils 目录下的同名文件,
  • 把前端切换文件增大到10M

  • 将 media-add-dialog.vue文件覆盖前端工程src\module-organization\pages\media-manage\components目录下的同名文件
  1. 修改后端配置
  • 前端对文件分块的大小为5MB,SpringBoot web默认上传文件的大小限制为1MB,这里需要在media-api工程修改配置如下:
spring:servlet:multipart:max-file-size: 50MBmax-request-size: 50MB

  • max-file-size: 单个文件的大小限制
  • Max-request-size: 单次请求的大小限制
  1. 启动前后端服务, 联调

合并方法

定义service接口

/*** @author Mr.M* @version 1.0* @description 媒资文件管理业务类* @date 2022/9/10 8:55*/
public interface MediaFileService {/*** @description 合并分块* @param companyId  机构id* @param fileMd5  文件md5* @param chunkTotal 分块总和* @param uploadFileParamsDto 文件信息* @return com.xuecheng.base.model.RestResponse* @author Mr.M* @date 2022/9/13 15:56*/public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
}

接口实现:

/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;//普通文件桶@Value("${minio.bucket.files}")private String bucket_mediafiles;//视频文件桶@Value("${minio.bucket.videofiles}")private String bucket_video;@AutowiredMediaFileService currentProxy;/*** 合并分块** @param companyId           机构id* @param fileMd5             文件md5* @param chunkTotal          分块总和* @param uploadFileParamsDto 文件信息* @return*/@Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {// 1.找到分块文件, 调用minio的sdk进行文件合并// 分块文件目录String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 1.1分块文件集合(steam流)List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath.concat(Integer.toString(i))).build()).collect(Collectors.toList());//源文件名称String fileName = uploadFileParamsDto.getFilename();//文件扩展名String extension = fileName.substring(fileName.lastIndexOf("."));//合并后文件的objectNameString objectName = getFilePathByMd5(fileMd5, extension);// 1.2指定合并后的文件信息ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder().bucket(bucket_video).object(objectName) // 合并后的文件objectName.sources(sources)   // 通过sources指定源文件.build();// 1.3合并文件try {minioClient.composeObject(composeObjectArgs);} catch (Exception e) {e.printStackTrace();log.debug("合并文件失败,fileMd5:{},异常:{}", fileMd5, e.getMessage(), e);return RestResponse.validfail(false, "合并文件失败。");}// 2.校验合并后的文件和源文件是否一致// 先下载合并后的文件File file = downloadFileFromMinIO(bucket_video, objectName);try (FileInputStream fileInputStream = new FileInputStream(file)) {//计算合并后文件的md5值String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);//比较原始文件和合并后文件的MD5值if (!fileMd5.equals(mergeFile_md5)) {log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}", fileMd5, mergeFile_md5);return RestResponse.validfail(false, "文件合并校验失败");}//保存文件大小uploadFileParamsDto.setFileSize(file.length());} catch (Exception e) {e.printStackTrace();return RestResponse.validfail(false, "文件合并校验失败");}// 3.将文件信息入库MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);if (mediaFiles == null) {return RestResponse.validfail(false, "文件入库失败");}// 4.清理分块文件clearChunkFiles(chunkFileFolderPath, chunkTotal);return RestResponse.success(true);}//得到分块文件的目录private String getChunkFileFolderPath(String fileMd5) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";}/*** 得到合并后的文件的地址** @param fileMd5 文件id即md5值* @param fileExt 文件扩展名* @return*/private String getFilePathByMd5(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}/*** 从minio下载文件** @param bucket     桶* @param objectName 对象名称* @return 下载后的文件*/public File downloadFileFromMinIO(String bucket, String objectName) {//临时文件File minioFile = null;FileOutputStream outputStream = null;try {InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());//创建临时文件minioFile = File.createTempFile("minio", ".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(stream, outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}/*** 清除分块文件** @param chunkFileFolderPath 分块文件路径* @param chunkTotal          分块文件总数*/private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {try {//待删除分块文件列表List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i)))).collect(Collectors.toList());// 分块文件信息RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();// 清除分块文件Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);// 真正删除分块文件results.forEach(r -> {DeleteError deleteError = null;try {deleteError = r.get();} catch (Exception e) {e.printStackTrace();log.error("清除分块文件失败,objectname:{}", deleteError.objectName(), e);}});} catch (Exception e) {e.printStackTrace();log.error("清除分块文件失败,chunkFileFolderPath:{}", chunkFileFolderPath, e);}}}

controller完善

/*** @author Mr.M* @version 1.0* @description 大文件上传接口* @date 2022/9/6 11:29*/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {@AutowiredMediaFileService mediaFileService;@ApiOperation(value = "合并文件")@PostMapping("/upload/mergechunks")public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,@RequestParam("fileName") String fileName,@RequestParam("chunkTotal") int chunkTotal) throws Exception {// todo: 机构idLong companyId = 1232141425L;UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();uploadFileParamsDto.setFileType("001002");uploadFileParamsDto.setTags("课程视频");uploadFileParamsDto.setRemark("");uploadFileParamsDto.setFilename(fileName);return mediaFileService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);}}

功能测试: 下边进行前后端联调

  1. 上传一个视频测试合并分块的执行逻辑

进入service方法逐行跟踪。

  1. 断点续传测试

上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传

  1. 文件分片上传后合并分片, 合并完成后删除分片文件

update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?

多个线程同时执行上边的sql只会有一个线程执行成功。

  1. 什么是乐观锁、悲观锁?
  • synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
  • 乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。

定义mapper

package com.xuecheng.media.mapper;/*** <p>*  Mapper 接口* </p>** @author itcast*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 开启一个任务* @param id 任务id* @return 更新记录数*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);}

service方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒资文件处理业务方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/***  开启一个任务* @param id 任务id* @return true开启任务成功,false开启任务失败*/public boolean startTask(long id);}
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;//实现如下public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}}

更新任务状态

任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。

在MediaFileProcessService接口添加方法

package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒资文件处理业务方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @description 保存任务结果* @param taskId  任务id* @param status 任务状态* @param fileId  文件id* @param url url* @param errorMsg 错误信息* @return void* @author Mr.M* @date 2022/10/15 11:29*/void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);}

service接口方法实现如下:

package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任务,如果不存在则直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//处理失败,更新任务处理结果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//处理失败if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);return ;}//任务处理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒资文件中的访问urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//处理成功,更新url和状态mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到历史记录MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//删除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}}

视频处理

视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。

所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。

定义任务类VideoTask 如下:

package com.xuecheng.media.jobhandler;/*** 视频处理任务类** @author Mr.M* @version 1.0* @description TODO* @date 2022/10/15 11:58*/
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileProcessService mediaFileProcessService;@AutowiredMediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpeg_path;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex(); // 执行器序号,从0开始int shardTotal = XxlJobHelper.getShardTotal();  // 执行器总数//取出cpu核心数作为一次处理数据的条数int processors = Runtime.getRuntime().availableProcessors();//查询待处理的任务List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);//实际查到的任务数int size = mediaProcessList.size();log.debug("取到视频处理任务数:" + size);if (size <= 0) {return;}//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(size);// 使用的计数器CountDownLatch countDownLatch = new CountDownLatch(size);mediaProcessList.forEach(mediaProcess -> {// 将任务加入线程池executorService.execute(() -> {try {Long taskId = mediaProcess.getId(); // 任务id// 开启任务boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("抢占任务失败, 任务id: {}", taskId);return;}// 准备参数String bucket = mediaProcess.getBucket();  //桶String filePath = mediaProcess.getFilePath();  //存储路径String fileId = mediaProcess.getFileId(); //原始视频的md5值//将要处理的文件下载到服务器上File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);if (originalFile == null) {log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");return;}//处理结束的视频文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("创建mp4临时文件失败");mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");return;}//视频处理结果String result = "";try {//开始处理视频Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//开始视频转换,成功将返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {//记录错误信息log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//将mp4上传至minio//mp4在minio的存储路径String objectName = getFilePath(fileId, ".mp4");//访问urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());//最终还是失败了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");}} finally {// 计数器减1countDownLatch.countDown();}});});//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}}

测试

基本测试

进入xxl-job调度中心添加执行器和视频处理任务

  1. 添加执行器

  1. 视频处理任务

  • 在xxl-job配置任务调度策略:
    • 1)配置阻塞处理策略为:丢弃后续调度。
    • 2)配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。

  1. 配置完成开始测试视频处理:
  • 首先上传至少4个视频,非mp4格式。

  • 在xxl-job启动视频处理任务

  • 观察媒资管理服务后台日志

失败测试

1、先停止调度中心的视频处理任务。

2、上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址

3、启动任务

观察任务处理失败后是否会重试,并记录失败次数。

抢占任务测试

1、修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”

2、在抢占任务代码处打断点并选择支持多线程方式

3、在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。

4、启动任务

此时多个线程执行都停留在断点处

依次放行,观察同一个任务只会被一个线程抢占成功。

http://www.dtcms.com/a/488380.html

相关文章:

  • 追觅的想象空间:以技术为翼,向生态无垠
  • wap网站搭建品牌注册查询系统
  • 在线购物网站怎么做承德公司做网站
  • ptmalloc原理(简)
  • 优选算法之双指针:从原理到实战,解决数组与链表
  • 泰安网站建设企业门户网站静态模板
  • 做招聘网站的背景图片手机建网站 优帮云
  • 湖州网站建设湖州网站建设网站建设运营计划书
  • Spring的核心思想与注解
  • 洛阳响应式建站wordpress无法搜索插件
  • 幻灯片在什么网站做dw做的静态网站怎么分享链接
  • 做网站如何推销小公司做网站的好处
  • 力扣Hot100--106.对称二叉树
  • 胡恩全10.15作业
  • 网站推广需要多少钱教学网站开发源码
  • ACSM-CPT 8周冲刺每日学习计划(10/16–12/15)
  • 公司备案网站名称代理ip注册网站都通不过
  • 杭州建设银行网站智慧团建登录不上
  • 大型商城网站开发wordpress侧边栏加图片
  • Linux内核架构浅谈37-深入理解Linux页帧标志:从PG_locked到PG_dirty的核心原理与实践
  • 建设网站的功能及目的是什么wordpress好用的地图
  • 佛山找企业的网站WORDPRESS添加前台会员注册
  • 【完整源码+数据集+部署教程】 【零售和消费品&存货】条形码检测系统源码&数据集全套:改进yolo11-TADDH
  • 上海建网站公司排名常用的设计软件有哪些
  • 10、一个简易 vector:C++ 模板与 STL
  • 营销网站设计网站获取访客qq 原理
  • Aosp14桌面壁纸和锁屏壁纸的设置和加载分析
  • 作业1.1
  • 减肥养生网站建设360极速浏览器网站开发缓存
  • 网站开发人员资质北京汽车网站建设