Vue3 大文件上传实战:切片上传 / 断点续传 / 秒传 / 暂停恢复 / 全局并发控制
文章目录
- 概述
- 技术架构
- 整体上传流程图
- 目录结构建议
- 类型定义(upload/types.ts)
- 并发限流器(upload/limiter.ts)
- API 层(api/index.ts)
- Web Worker:切片 + 文件 Hash(worker/hash-worker.js)
- Vue3 视图与调度(views/Uploader.vue)
- 小结
概述
本文基于 Vue3 实现一个大文件上传方案,完整覆盖以下核心能力:
- 切片上传
- 断点续传
- 秒传(存在即返回)
- 暂停 / 恢复
- 全局并发控制与实时进度
本文采用全局并发限流器
作为上传调度核心,让多个文件、多个切片
在总并发可控
的前提下高效上传。
技术架构
- 前端:Vue3(Composition API)
- 并发控制:全局 ConcurrencyLimiter(信号量/队列)
- 进度统计:切片级别累加 → 文件级别 → 全局
- 传输库:axios(支持上传进度与取消)
- 切片与 Hash:Web Worker + SparkMD5
整体上传流程图
目录结构建议
仅供参考,便于组织代码:
src/api/index.ts // axios 实例与 API 函数worker/hash-worker.js // 切片与文件Hash计算upload/limiter.ts // ConcurrencyLimitertypes.ts // 类型定义helpers.ts // 进度、重试、工具函数views/Uploader.vue // 上传页面(UI + 调度)
类型定义(upload/types.ts)
统一、清晰的命名有助于维护与协作。
/*** 文件上传状态枚举* - idle: 初始状态,未开始上传* - uploading: 上传中* - paused: 已暂停* - completed: 上传完成* - failed: 上传失败*/
export type FileState = 'idle' | 'uploading' | 'paused' | 'completed' | 'failed';/*** 分片上传状态枚举* - idle: 初始状态,未开始上传* - uploading: 上传中* - completed: 上传完成* - failed: 上传失败*/
export type ChunkState = 'idle' | 'uploading' | 'completed' | 'failed';/*** 分片上传任务信息*/
export interface UploadChunkTask {fileId: string; // 所属文件IDchunkId: string; // 分片唯一标识,格式如 `${fileId}-${chunkIndex}`chunkIndex: number; // 分片序号(从0开始)size: number; // 分片大小(字节)blob: Blob; // 分片二进制数据chunkHash: string; // 分片哈希值,格式如 `${fileHash}-${chunkIndex}`,用于断点续传识别uploadedBytes: number; // 已上传字节数attemptCount: number; // 当前重试次数maxAttempts: number; // 最大重试次数abortController: AbortController; // 用于取消分片上传的中止控制器state: ChunkState; // 当前分片状态
}/*** 文件上传任务信息*/
export interface UploadFileTask {fileId: string; // 文件唯一标识fileName: string; // 文件名fileSize: number; // 文件总大小(字节)fileHash: string; // 文件哈希值(用于整体校验)chunkSize: number; // 每个分片的大小(字节)totalChunks: number; // 总分片数chunkTasks: UploadChunkTask[]; // 所有分片任务数组state: FileState; // 文件整体上传状态isPaused: boolean; // 是否手动暂停inflightChunks: Set<UploadChunkTask>; // 正在上传中的分片集合uploadedBytes: number; // 文件级已上传字节数(累计所有分片)totalBytes: number; // 文件级总字节数(等于fileSize)percent: number; // 文件级上传进度百分比(0-100)
}/*** 全局上传进度信息*/
export interface GlobalProgress {uploadedBytes: number; // 全局已上传字节数(累计所有文件)totalBytes: number; // 全局总字节数(累计所有文件)percent: number; // 全局上传进度百分比(0-100)
}
并发限流器(upload/limiter.ts)
核心目标:严格限制“所有文件的切片总并发”不超过上限;支持暂停时移除某文件的待执行项。
/*** 并发限制器,用于控制同时执行的任务数量。* 当达到最大并发数时,新任务会进入队列等待。*/
export class ConcurrencyLimiter {// 最大并发任务数private maxConcurrent: number;// 当前正在执行的任务数private activeCount = 0;// 等待队列,存储尚未执行的任务private pendingQueue: Array<{runTask: () => Promise<unknown>; // 任务执行函数(返回 Promise)resolve: (v: unknown) => void; // 任务成功时的回调reject: (e: unknown) => void; // 任务失败时的回调fileId: string; // 任务关联的文件ID(用于取消)}> = [];constructor(maxConcurrent: number) {this.maxConcurrent = maxConcurrent;}/*** 将任务加入队列,并返回一个 Promise。* 当任务开始执行时,Promise 会根据任务结果 resolve/reject。* @param runTask 待执行的任务函数* @param fileId 关联的文件ID(用于后续取消)*/enqueue(runTask: () => Promise<unknown>, fileId: string) {return new Promise((resolve, reject) => {// 将任务推入等待队列this.pendingQueue.push({ runTask, resolve, reject, fileId });// 尝试启动下一个任务(如果未达到并发上限)this.tryStartNext();});}/*** 根据文件ID移除队列中的待执行任务(例如取消上传)。* @param fileId 要移除的任务关联的文件ID*/removePendingByFileId(fileId: string) {this.pendingQueue = this.pendingQueue.filter(item => item.fileId !== fileId);}/*** 尝试从队列中启动下一个任务(如果未达到并发上限)。*/private tryStartNext() {// 只要当前活跃任务数未达上限且队列非空,就继续启动任务while (this.activeCount < this.maxConcurrent && this.pendingQueue.length > 0) {const next = this.pendingQueue.shift()!; // 从队列头部取出任务this.activeCount++; // 增加活跃任务计数// 立即执行任务(包装在 Promise.resolve 中以捕获同步错误)Promise.resolve().then(() => next.runTask()) // 执行任务函数.then(res => {this.activeCount--; // 任务完成,减少活跃计数next.resolve(res); // 通知外部任务成功this.tryStartNext(); // 递归检查是否能启动新任务}).catch(err => {this.activeCount--; // 任务失败,减少活跃计数next.reject(err); // 通知外部任务失败this.tryStartNext(); // 递归检查是否能启动新任务});}}
}
API 层(api/index.ts)
根据你服务端接口适配。示例包含:checkFile、uploadChunk、mergeChunk。axios 支持 signal 取消。
import axios from 'axios';// 秒传/断点续传 预检查
export async function checkFile(params: {fileHash: string;fileName: string;
}) {
}// 上传切片
export async function uploadChunk(data: FormData, signal: AbortSignal) {
}// 合并切片
export async function mergeChunk(params: {fileHash: string;fileName: string;chunkSize: number;
}) {
}
Web Worker:切片 + 文件 Hash(worker/hash-worker.js)
- 切片:Blob.slice 按 chunkSize 切分
- 文件 Hash:SparkMD5(按切片依次读取并 append)
- 返回:fileHash + fileChunkList
// worker/hash-worker.js
self.importScripts('https://cdn.jsdelivr.net/npm/spark-md5@3.0.2/spark-md5.min.js');/*** 将文件切割为多个分片* @param {File} file - 待分片的文件对象* @param {number} chunkSize - 每个分片的大小(字节)* @returns {Array<{blob: Blob, size: number}>} 分片数组,包含每个分片的Blob对象和大小*/
function createFileChunks(file, chunkSize) {const chunks = [];let offset = 0; // 当前分片的起始字节位置// 循环切割直到覆盖整个文件while (offset < file.size) {// 使用slice方法切割文件(兼容大文件)const blob = file.slice(offset, offset + chunkSize);chunks.push({ blob, // 分片二进制数据size: blob.size // 记录分片实际大小(最后一片可能小于chunkSize)});offset += chunkSize; // 移动切割位置}return chunks;
}/*** 计算文件哈希(基于所有分片的增量计算)* @param {Array<{blob: Blob}>} chunks - 分片数组* @returns {Promise<string>} 文件的MD5哈希值*/
async function calcFileHash(chunks) {// 使用SparkMD5库进行增量哈希计算const spark = new self.SparkMD5.ArrayBuffer();let processedCount = 0; // 已处理分片计数for (let i = 0; i < chunks.length; i++) {const { blob } = chunks[i];// 将Blob转换为ArrayBuffer进行哈希计算const buf = await blob.arrayBuffer();spark.append(buf); // 增量更新哈希processedCount++;// 计算并上报当前进度const percentage = Math.round((processedCount / chunks.length) * 100);self.postMessage({ type: 'progress', percentage // 进度百分比(0-100)});}return spark.end(); // 返回最终哈希值
}// WebWorker消息处理器
self.onmessage = async (e) => {const { file, chunkSize } = e.data; // 从主线程接收的参数try {// 1. 文件分片const fileChunkList = createFileChunks(file, chunkSize);// 2. 计算文件哈希(包含进度上报)const fileHash = await calcFileHash(fileChunkList);// 处理成功,返回结果给主线程self.postMessage({ type: 'done',fileHash, // 文件完整哈希值fileChunkList // 分片结果数组});self.close(); // 关闭Worker} catch (err) {// 错误处理self.postMessage({ type: 'error', error: String(err) // 错误信息转字符串});self.close();}
};
Vue3 视图与调度(views/Uploader.vue)
选择文件 → 准备任务 → 秒传/断点续传 → 全局并发上传 → 合并
<template><!-- 上传器主界面 --><div class="uploader"><!-- 文件选择区域 --><label class="btn">选择文件<input type="file" multiple @change="onSelectFiles" hidden /></label><!-- 上传控制参数 --><div class="controls"><label>切片大小(MB):<input type="number" v-model.number="chunkSizeMB" min="1" max="16" /></label><label>全局并发:<input type="number" v-model.number="maxGlobalConcurrency" min="1" max="12" /></label></div><!-- 全局进度显示 --><div class="global-progress">全局进度:{{ globalProgress.percent }}%({{ formatBytes(globalProgress.uploadedBytes) }} / {{ formatBytes(globalProgress.totalBytes) }})</div><!-- 文件列表展示 --><ul class="file-list"><li v-for="f in fileTasks" :key="f.fileId"><div class="row"><div class="name">{{ f.fileName }}</div><div class="state">{{ f.state }}</div><div class="progress">{{ f.percent }}%({{ formatBytes(f.uploadedBytes) }} / {{ formatBytes(f.totalBytes) }})</div><div class="actions"><button v-if="f.state !== 'paused' && f.state !== 'completed'" @click="pauseFileUpload(f.fileId)">暂停</button><button v-if="f.state === 'paused'" @click="resumeFileUpload(f.fileId)">恢复</button></div></div></li></ul></div>
</template><script setup lang="ts">
import { reactive, ref } from 'vue';
import { checkFile, uploadChunk as apiUploadChunk, mergeChunk } from '@/api';
import { ConcurrencyLimiter } from '@/upload/limiter';
import type { UploadFileTask, UploadChunkTask, GlobalProgress } from '@/upload/types';// 配置参数
const DEFAULT_CHUNK_SIZE_MB = 1; // 默认切片大小1MB
const DEFAULT_MAX_CONCURRENCY = 6; // 默认并发数6const chunkSizeMB = ref<number>(DEFAULT_CHUNK_SIZE_MB);
const maxGlobalConcurrency = ref<number>(DEFAULT_MAX_CONCURRENCY);// 状态管理
const fileTasks = reactive<UploadFileTask[]>([]);
const globalProgress = reactive<GlobalProgress>({ uploadedBytes: 0, totalBytes: 0, percent: 0
});// 并发控制(建议使用工厂模式动态重建)
const limiter = new ConcurrencyLimiter(maxGlobalConcurrency.value);/* 核心方法 *//*** 文件选择处理* 1. 获取文件列表* 2. 为每个文件创建上传任务*/
async function onSelectFiles(e: Event) {const input = e.target as HTMLInputElement;if (!input?.files?.length) return;// 并行处理多个文件(注意浏览器并发限制)await Promise.all(Array.from(input.files).map(file => prepareAndStartFileTask(file)));input.value = ''; // 重置input
}/*** 文件上传预处理(核心流程)* 1. 创建文件任务对象* 2. 切片+计算hash(Worker线程)* 3. 检查秒传/断点续传* 4. 启动上传调度*/
async function prepareAndStartFileTask(file: File) {// 生成唯一文件ID(实际项目建议使用更可靠的生成方式)const fileId = `${file.name}-${file.size}-${Date.now()}`;const chunkSize = Math.max(1, chunkSizeMB.value) * 1024 * 1024;// 初始化文件任务(响应式对象)const task: UploadFileTask = reactive({fileId,fileName: file.name,fileSize: file.size,fileHash: '',chunkSize,totalChunks: 0,chunkTasks: [],state: 'idle',isPaused: false,inflightChunks: new Set(),uploadedBytes: 0,totalBytes: file.size,percent: 0});fileTasks.push(task); // 立即加入列表显示try {// Step 1: 文件切片+计算hash(Worker线程)const { fileHash, chunkTasks } = await createChunksAndHashInWorker(file, chunkSize, fileId);task.fileHash = fileHash;task.chunkTasks = chunkTasks.map(ct => reactive(ct)); // 转为响应式task.totalChunks = chunkTasks.length;// Step 2: 秒传/断点检查const check = await checkFile({ fileHash: `${fileHash}${file.name}`, fileName: file.name });if (check?.code === 0) {const { shouldUpload, uploadedList = [] } = check.data || {};// 秒传处理if (!shouldUpload) {completeFileTask(task);return;}// 断点续传:过滤已上传切片if (uploadedList.length > 0) {task.chunkTasks = task.chunkTasks.filter(ct => !uploadedList.includes(ct.chunkHash));// 所有切片已上传,尝试合并if (task.chunkTasks.length === 0) {await tryMerge(task);return;}}}// Step 3: 启动上传task.state = 'uploading';scheduleAllChunks(task);} catch (err) {console.error('文件预处理失败:', file.name, err);task.state = 'failed';}
}/*** Web Worker 通信封装* 职责:文件切片 + 计算MD5*/
function createChunksAndHashInWorker(file: File, chunkSize: number, fileId: string) {return new Promise<{ fileHash: string; chunkTasks: UploadChunkTask[] }>((resolve, reject) => {const worker = new Worker(new URL('@/worker/hash-worker.js', import.meta.url));worker.postMessage({ file, chunkSize });worker.onmessage = (e: MessageEvent) => {const { type } = e.data || {};if (type === 'progress') {// 可在此处更新hash计算进度} else if (type === 'done') {const { fileHash, fileChunkList } = e.data;// 构建分片任务数组const chunkTasks: UploadChunkTask[] = fileChunkList.map((c: any, index: number) => ({fileId,chunkId: `${fileId}-${index}`,chunkIndex: index,size: c.size,blob: c.blob,chunkHash: `${fileHash}-${index}`,uploadedBytes: 0,attemptCount: 0,maxAttempts: 3,abortController: new AbortController(),state: 'idle'}));resolve({ fileHash, chunkTasks });worker.terminate();} else if (type === 'error') {reject(new Error(e.data?.error || 'hash计算失败'));worker.terminate();}};});
}/*** 调度文件的所有分片上传* 注意:实际项目建议使用全局交错调度(buildInterleavedChunks)*/
function scheduleAllChunks(fileTask: UploadFileTask) {fileTask.chunkTasks.filter(ct => ct.state !== 'completed').forEach(ct => enqueueChunkUpload(fileTask, ct));updateGlobalTotals();
}/*** 分片上传任务封装* 1. 加入并发队列* 2. 处理重试逻辑* 3. 完成检查*/
function enqueueChunkUpload(fileTask: UploadFileTask, chunkTask: UploadChunkTask) {// 标记为进行中fileTask.inflightChunks.add(chunkTask);limiter.enqueue(async () => {try {await runWithRetry(() => uploadOneChunk(fileTask, chunkTask),chunkTask);} finally {fileTask.inflightChunks.delete(chunkTask);}},fileTask.fileId).then(async () => {// 检查文件是否全部完成if (isFileUploadComplete(fileTask)) {await tryMerge(fileTask);}}).catch(err => {console.error('分片上传失败:', chunkTask.chunkId, err);});
}/*** 单个分片上传实现* 关键点:* - 支持取消(AbortController)* - 进度上报(实际项目需实现)*/
async function uploadOneChunk(fileTask: UploadFileTask, chunkTask: UploadChunkTask) {chunkTask.state = 'uploading';const formData = new FormData();formData.append('fileHash', `${fileTask.fileHash}${fileTask.fileName}`);formData.append('fileName', fileTask.fileName);formData.append('index', String(chunkTask.chunkIndex));formData.append('chunkFile', chunkTask.blob);formData.append('chunkHash', chunkTask.chunkHash);formData.append('chunkSize', String(fileTask.chunkSize));formData.append('chunkNumber', String(fileTask.totalChunks));// 执行上传(带取消支持)await apiUploadChunk(formData, chunkTask.abortController.signal);// 更新状态chunkTask.uploadedBytes = chunkTask.size;chunkTask.state = 'completed';// 更新进度updateFileProgress(fileTask);updateGlobalProgress();
}/* 上传控制方法 */// 暂停上传
function pauseFileUpload(fileId: string) {const fileTask = fileTasks.find(f => f.fileId === fileId);if (!fileTask || fileTask.state === 'completed') return;fileTask.isPaused = true;fileTask.state = 'paused';// 取消队列中的任务limiter.removePendingByFileId(fileId);// 中止进行中的上传fileTask.inflightChunks.forEach(ct => {ct.abortController.abort();});
}// 恢复上传
function resumeFileUpload(fileId: string) {const fileTask = fileTasks.find(f => f.fileId === fileId);if (!fileTask) return;fileTask.isPaused = false;fileTask.state = 'uploading';// 重置未完成切片的状态fileTask.chunkTasks.filter(ct => ct.state !== 'completed').forEach(ct => {ct.abortController = new AbortController();enqueueChunkUpload(fileTask, ct);});
}/* 工具方法 */// 带重试的执行(指数退避)
async function runWithRetry(taskFn: () => Promise<unknown>,chunkTask: UploadChunkTask,maxAttempts = 3,baseDelay = 500
) {for (let attempt = 1; attempt <= maxAttempts; attempt++) {chunkTask.attemptCount = attempt;try {return await taskFn();} catch (err: any) {// 主动取消不重试if (chunkTask.abortController.signal.aborted) throw err;// 最后一次尝试失败if (attempt === maxAttempts) {chunkTask.state = 'failed';throw err;}// 延迟重试await sleep(baseDelay * Math.pow(2, attempt - 1));}}
}// 文件上传完成检查
function isFileUploadComplete(fileTask: UploadFileTask) {return fileTask.chunkTasks.every(c => c.state === 'completed') && fileTask.state !== 'paused';
}// 合并文件请求
async function tryMerge(fileTask: UploadFileTask) {const res = await mergeChunk({fileHash: fileTask.fileHash,fileName: fileTask.fileName,chunkSize: fileTask.chunkSize}).catch(() => null);if (res?.code === 0) {completeFileTask(fileTask);} else {fileTask.state = 'failed';}
}// 更新文件进度
function updateFileProgress(fileTask: UploadFileTask) {const uploaded = fileTask.chunkTasks.reduce((s, c) => s + c.uploadedBytes, 0);fileTask.uploadedBytes = uploaded;fileTask.percent = Math.round((uploaded / fileTask.totalBytes) * 100);
}// 更新全局统计
function updateGlobalTotals() {globalProgress.totalBytes = fileTasks.reduce((s, f) => s + f.totalBytes, 0);
}function updateGlobalProgress() {globalProgress.uploadedBytes = fileTasks.reduce((s, f) => s + f.uploadedBytes, 0);globalProgress.percent = Math.round((globalProgress.uploadedBytes / globalProgress.totalBytes) * 100);
}// 标记文件上传完成
function completeFileTask(fileTask: UploadFileTask) {fileTask.state = 'completed';fileTask.percent = 100;fileTask.uploadedBytes = fileTask.totalBytes;updateGlobalProgress();
}/* 辅助工具 */
function sleep(ms: number) { return new Promise(res => setTimeout(res, ms));
}function formatBytes(n: number) {if (!n) return '0 B';const units = ['B','KB','MB','GB','TB']; const i = Math.floor(Math.log(n)/Math.log(1024));return `${(n/Math.pow(1024, i)).toFixed(2)} ${units[i]}`;
}
</script>
关键点详解
- 全局并发控制优先于“按文件分配”
将所有切片视为统一资源池,由 ConcurrencyLimiter 控制启动。
防止多文件时各自“自增并发”导致总并发超标。
通过交错入队(interleaved)实现公平性,避免某个大文件占满资源。 - 进度统计
切片 onUploadProgress 事件能获得 e.loaded,但 axios 对同一请求是累积的,简化起见以“完成即视为 size”统计也可满足绝大多数 UI 需求。
若需更精确的实时数值,可在 api 层把 e.loaded 透传上层,对 chunkTask.uploadedBytes 动态赋值并触发 updateFileProgress / updateGlobalProgress。 - 断点续传与秒传
chunkHash 建议采用 f i l e H a s h − {fileHash}- fileHash−{chunkIndex},服务端据此判断某切片是否已存在。
文件 Hash 可用 SparkMD5;若需更强,考虑 SHA-256(但更耗时)。
秒传:checkFile 返回 shouldUpload=false 直接完成。 - 暂停 / 恢复
暂停:removePendingByFileId + abort inflight,一次到位。
恢复:为未完成切片重建 AbortController 并重新入队。
注意暂停后不要丢失切片状态,resume 时应按 chunkIndex 顺序重排可选。 - 错误重试
建议指数退避:500ms, 1s, 2s…
区分可重试(网络错误、5xx、超时)与不可重试(4xx 参数错误、鉴权失败)。
超过最大重试次数可标记文件 failed 并提示用户。 - 配置与兼容
HTTP/2 多路复用下“同域6并发”限制不再刚性,但客户端/服务端处理能力仍有限,maxGlobalConcurrency 保持可配置。
Safari 对 fetch 上传进度支持欠佳,推荐 axios/XHR。
chunkSize 一般 1–4MB 权衡较好;过小请求数过多,过大重试成本高。
服务端对接要点(简述)
checkFile(fileHash, fileName)
返回 shouldUpload 和已存在的 uploadedList(切片 hash 列表)。
upload 接口
校验必要字段:fileHash、fileName、index、chunkHash、chunkSize、chunkNumber。
存储到临时目录:/upload_tmp/{fileHash}/{index}
merge 接口
校验切片数量完整性与哈希一致性(可选)。
合并为最终文件并清理临时目录。
幂等:重复合并时应安全返回成功。
小结
本文给出了一个基于 Vue3 的大文件上传完整实现方案,重点在于:
- 使用 Web Worker 进行切片和文件 Hash 计算,主线程不卡顿;
- 通过全局并发限流器 ConcurrencyLimiter 严格控制总并发,支持暂停/恢复,且多文件之间相对公平;
- 利用秒传与断点续传节省带宽与时间;
- 细粒度进度统计与指数退避重试,提升稳定性与用户体验。