异步并发控制代码详细分析
异步并发控制代码详细分析
完整代码
/*** 控制异步请求并发数的简单函数式实现* @param {Array<Function>} tasks - 异步任务数组,每个任务都是返回Promise的函数* @param {number} limit - 最大并发数* @returns {Promise<Array>} - 所有任务的结果数组(按照添加顺序)*/
function limitConcurrency(tasks, limit) {// 任务结果数组(保持与tasks相同顺序)const results = new Array(tasks.length);// 记录已完成的任务数量let completedCount = 0;// 当前正在执行的任务数量let runningCount = 0;// 下一个要执行的任务索引let nextIndex = 0;return new Promise((resolve) => {// 定义执行下一批任务的函数function runNextTasks() {// 当所有任务都已完成时,返回结果if (completedCount === tasks.length) {resolve(results);return;}// 尝试启动新任务,直到达到并发上限或任务都已分配while (runningCount < limit && nextIndex < tasks.length) {const taskIndex = nextIndex++;const task = tasks[taskIndex];// 增加运行计数runningCount++;// 执行任务并处理结果Promise.resolve(task()).then(result => {// 保存结果到对应位置results[taskIndex] = result;console.log(taskIndex,'taskIndextaskIndex');completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();}).catch(error => {// 错误处理:记录错误并继续results[taskIndex] = { error };completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();});}}// 开始执行任务runNextTasks();});
}// 使用示例:控制5个setTimeout的并发执行
const tasks = [() => new Promise(resolve => setTimeout(() => {console.log('任务1完成');resolve('结果1');}, 5000)),() => new Promise(resolve => setTimeout(() => {console.log('任务2完成');resolve('结果2');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务3完成');resolve('结果3');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务4完成');resolve('结果4');}, 1800)),() => new Promise(resolve => setTimeout(() => {console.log('任务5完成');resolve('结果5');}, 1200))
];// 限制并发数为2
limitConcurrency(tasks, 2).then(results => {console.log('所有任务完成,结果:', results);
});
详细执行流程分析
初始状态
tasks.length = 5
(索引: 0, 1, 2, 3, 4)limit = 2
results = [empty × 5]
completedCount = 0
runningCount = 0
nextIndex = 0
第一次调用 runNextTasks()
检查完成条件
if (completedCount === tasks.length) // 0 === 5 → false,继续执行
while循环 - 第一轮
while (runningCount < limit && nextIndex < tasks.length)
// 0 < 2 && 0 < 5 → true,进入循环// 第一次循环:
const taskIndex = nextIndex++; // taskIndex = 0, nextIndex = 1
const task = tasks[taskIndex]; // task = tasks[0](5秒任务)
runningCount++; // runningCount = 1// 启动 taskIndex=0 的任务(5秒延时)
Promise.resolve(task()).then(...)
while循环 - 第二轮
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 1 < 5 → true,继续循环// 第二次循环:
const taskIndex = nextIndex++; // taskIndex = 1, nextIndex = 2
const task = tasks[taskIndex]; // task = tasks[1](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=1 的任务(1秒延时)
Promise.resolve(task()).then(...)
while循环 - 第三轮
while (runningCount < limit && nextIndex < tasks.length)
// 2 < 2 && 2 < 5 → false,退出while循环
此时状态:
- 正在执行:taskIndex=0(5秒)、taskIndex=1(1秒)
- 等待队列:taskIndex=2、taskIndex=3、taskIndex=4
runningCount = 2
(已达上限)nextIndex = 2
1秒后 - taskIndex=1 完成
Promise.then 回调执行
.then(result => {results[taskIndex] = result; // results[1] = '结果2'console.log(taskIndex,'taskIndextaskIndex'); // 输出: 1 taskIndextaskIndexcompletedCount++; // completedCount = 1runningCount--; // runningCount = 1(释放一个槽位)runNextTasks(); // 递归调用
})
递归调用 runNextTasks()
// 检查完成条件
if (completedCount === tasks.length) // 1 === 5 → false// while循环
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 2 < 5 → trueconst taskIndex = nextIndex++; // taskIndex = 2, nextIndex = 3
const task = tasks[taskIndex]; // task = tasks[2](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=2 的任务
此时状态:
- 正在执行:taskIndex=0(还剩4秒)、taskIndex=2(1秒)
- 等待队列:taskIndex=3、taskIndex=4
completedCount = 1
runningCount = 2
nextIndex = 3
2秒后 - taskIndex=2 完成
类似地,taskIndex=2 完成后会:
results[2] = '结果3'
completedCount = 2
runningCount = 1
- 递归调用启动 taskIndex=3
3.8秒后 - taskIndex=3 完成
taskIndex=3(1800ms)完成后:
results[3] = '结果4'
completedCount = 3
runningCount = 1
- 递归调用启动 taskIndex=4
5秒后 - taskIndex=0 完成
taskIndex=0(5000ms)完成后:
results[0] = '结果1'
completedCount = 4
runningCount = 1
- 递归调用,但 nextIndex=5,无新任务启动
5.2秒后 - taskIndex=4 完成
taskIndex=4(1200ms,从3.8秒开始)完成后:
results[4] = '结果5'
completedCount = 5
runningCount = 0
- 递归调用检查完成条件:
completedCount === tasks.length
→5 === 5
→ true - 调用
resolve(results)
时间轴图示
时间 | 正在执行的任务 | 完成的任务 | 等待队列
-----|---------------------------|-----------|------------------
T0 | taskIndex=0, taskIndex=1 | | 2,3,4
T1 | taskIndex=0, taskIndex=2 | 1 | 3,4
T2 | taskIndex=0, taskIndex=3 | 1,2 | 4
T3.8 | taskIndex=0, taskIndex=4 | 1,2,3 |
T5 | taskIndex=4 | 1,2,3,0 |
T5.2 | 无 | 1,2,3,0,4 | → resolve
关键变量状态变化
时间点 | nextIndex | runningCount | completedCount | 说明 |
---|---|---|---|---|
初始 | 0 | 0 | 0 | 开始状态 |
启动后 | 2 | 2 | 0 | 启动了taskIndex=0,1 |
T1 | 3 | 2 | 1 | taskIndex=1完成,启动taskIndex=2 |
T2 | 4 | 2 | 2 | taskIndex=2完成,启动taskIndex=3 |
T3.8 | 5 | 2 | 3 | taskIndex=3完成,启动taskIndex=4 |
T5 | 5 | 1 | 4 | taskIndex=0完成 |
T5.2 | 5 | 0 | 5 | taskIndex=4完成,resolve |
控制台输出顺序
1 taskIndextaskIndex // T1: taskIndex=1完成
任务2完成
2 taskIndextaskIndex // T2: taskIndex=2完成
任务3完成
3 taskIndextaskIndex // T3.8: taskIndex=3完成
任务4完成
0 taskIndextaskIndex // T5: taskIndex=0完成
任务1完成
4 taskIndextaskIndex // T5.2: taskIndex=4完成
任务5完成
所有任务完成,结果: ['结果1', '结果2', '结果3', '结果4', '结果5']
核心机制总结
- 并发槽位管理:通过
runningCount
和limit
控制同时执行的任务数 - 动态调度:每个任务完成时立即尝试启动新任务(递归调用
runNextTasks()
) - 顺序保证:使用闭包中的
taskIndex
确保结果按原数组顺序存储 - 流水线执行:不等待批次完成,而是任务完成即补充,最大化并发效率
这种设计实现了受限并发的流水线处理,既控制了资源使用,又保证了高效执行和结果有序性。