当前位置: 首页 > news >正文

异步并发控制代码详细分析

异步并发控制代码详细分析

完整代码

/*** 控制异步请求并发数的简单函数式实现* @param {Array<Function>} tasks - 异步任务数组,每个任务都是返回Promise的函数* @param {number} limit - 最大并发数* @returns {Promise<Array>} - 所有任务的结果数组(按照添加顺序)*/
function limitConcurrency(tasks, limit) {// 任务结果数组(保持与tasks相同顺序)const results = new Array(tasks.length);// 记录已完成的任务数量let completedCount = 0;// 当前正在执行的任务数量let runningCount = 0;// 下一个要执行的任务索引let nextIndex = 0;return new Promise((resolve) => {// 定义执行下一批任务的函数function runNextTasks() {// 当所有任务都已完成时,返回结果if (completedCount === tasks.length) {resolve(results);return;}// 尝试启动新任务,直到达到并发上限或任务都已分配while (runningCount < limit && nextIndex < tasks.length) {const taskIndex = nextIndex++;const task = tasks[taskIndex];// 增加运行计数runningCount++;// 执行任务并处理结果Promise.resolve(task()).then(result => {// 保存结果到对应位置results[taskIndex] = result;console.log(taskIndex,'taskIndextaskIndex');completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();}).catch(error => {// 错误处理:记录错误并继续results[taskIndex] = { error };completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();});}}// 开始执行任务runNextTasks();});
}// 使用示例:控制5个setTimeout的并发执行
const tasks = [() => new Promise(resolve => setTimeout(() => {console.log('任务1完成');resolve('结果1');}, 5000)),() => new Promise(resolve => setTimeout(() => {console.log('任务2完成');resolve('结果2');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务3完成');resolve('结果3');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务4完成');resolve('结果4');}, 1800)),() => new Promise(resolve => setTimeout(() => {console.log('任务5完成');resolve('结果5');}, 1200))
];// 限制并发数为2
limitConcurrency(tasks, 2).then(results => {console.log('所有任务完成,结果:', results);
});

详细执行流程分析

初始状态

  • tasks.length = 5(索引: 0, 1, 2, 3, 4)
  • limit = 2
  • results = [empty × 5]
  • completedCount = 0
  • runningCount = 0
  • nextIndex = 0

第一次调用 runNextTasks()

检查完成条件
if (completedCount === tasks.length) // 0 === 5 → false,继续执行
while循环 - 第一轮
while (runningCount < limit && nextIndex < tasks.length)
// 0 < 2 && 0 < 5 → true,进入循环// 第一次循环:
const taskIndex = nextIndex++; // taskIndex = 0, nextIndex = 1
const task = tasks[taskIndex]; // task = tasks[0](5秒任务)
runningCount++; // runningCount = 1// 启动 taskIndex=0 的任务(5秒延时)
Promise.resolve(task()).then(...)
while循环 - 第二轮
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 1 < 5 → true,继续循环// 第二次循环:
const taskIndex = nextIndex++; // taskIndex = 1, nextIndex = 2
const task = tasks[taskIndex]; // task = tasks[1](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=1 的任务(1秒延时)
Promise.resolve(task()).then(...)
while循环 - 第三轮
while (runningCount < limit && nextIndex < tasks.length)
// 2 < 2 && 2 < 5 → false,退出while循环

此时状态:

  • 正在执行:taskIndex=0(5秒)、taskIndex=1(1秒)
  • 等待队列:taskIndex=2、taskIndex=3、taskIndex=4
  • runningCount = 2(已达上限)
  • nextIndex = 2

1秒后 - taskIndex=1 完成

Promise.then 回调执行
.then(result => {results[taskIndex] = result; // results[1] = '结果2'console.log(taskIndex,'taskIndextaskIndex'); // 输出: 1 taskIndextaskIndexcompletedCount++; // completedCount = 1runningCount--;   // runningCount = 1(释放一个槽位)runNextTasks();   // 递归调用
})
递归调用 runNextTasks()
// 检查完成条件
if (completedCount === tasks.length) // 1 === 5 → false// while循环
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 2 < 5 → trueconst taskIndex = nextIndex++; // taskIndex = 2, nextIndex = 3
const task = tasks[taskIndex]; // task = tasks[2](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=2 的任务

此时状态:

  • 正在执行:taskIndex=0(还剩4秒)、taskIndex=2(1秒)
  • 等待队列:taskIndex=3、taskIndex=4
  • completedCount = 1
  • runningCount = 2
  • nextIndex = 3

2秒后 - taskIndex=2 完成

类似地,taskIndex=2 完成后会:

  1. results[2] = '结果3'
  2. completedCount = 2
  3. runningCount = 1
  4. 递归调用启动 taskIndex=3

3.8秒后 - taskIndex=3 完成

taskIndex=3(1800ms)完成后:

  1. results[3] = '结果4'
  2. completedCount = 3
  3. runningCount = 1
  4. 递归调用启动 taskIndex=4

5秒后 - taskIndex=0 完成

taskIndex=0(5000ms)完成后:

  1. results[0] = '结果1'
  2. completedCount = 4
  3. runningCount = 1
  4. 递归调用,但 nextIndex=5,无新任务启动

5.2秒后 - taskIndex=4 完成

taskIndex=4(1200ms,从3.8秒开始)完成后:

  1. results[4] = '结果5'
  2. completedCount = 5
  3. runningCount = 0
  4. 递归调用检查完成条件:completedCount === tasks.length5 === 5 → true
  5. 调用 resolve(results)

时间轴图示

时间 | 正在执行的任务              | 完成的任务 | 等待队列
-----|---------------------------|-----------|------------------
T0   | taskIndex=0, taskIndex=1  |           | 2,3,4
T1   | taskIndex=0, taskIndex=2  | 1         | 3,4
T2   | taskIndex=0, taskIndex=3  | 1,2       | 4
T3.8 | taskIndex=0, taskIndex=4  | 1,2,3     |
T5   | taskIndex=4               | 1,2,3,0   |
T5.2 | 无                        | 1,2,3,0,4 | → resolve

关键变量状态变化

时间点nextIndexrunningCountcompletedCount说明
初始000开始状态
启动后220启动了taskIndex=0,1
T1321taskIndex=1完成,启动taskIndex=2
T2422taskIndex=2完成,启动taskIndex=3
T3.8523taskIndex=3完成,启动taskIndex=4
T5514taskIndex=0完成
T5.2505taskIndex=4完成,resolve

控制台输出顺序

1 taskIndextaskIndex     // T1: taskIndex=1完成
任务2完成
2 taskIndextaskIndex     // T2: taskIndex=2完成
任务3完成
3 taskIndextaskIndex     // T3.8: taskIndex=3完成
任务4完成
0 taskIndextaskIndex     // T5: taskIndex=0完成
任务1完成
4 taskIndextaskIndex     // T5.2: taskIndex=4完成
任务5完成
所有任务完成,结果: ['结果1', '结果2', '结果3', '结果4', '结果5']

核心机制总结

  1. 并发槽位管理:通过 runningCountlimit 控制同时执行的任务数
  2. 动态调度:每个任务完成时立即尝试启动新任务(递归调用 runNextTasks()
  3. 顺序保证:使用闭包中的 taskIndex 确保结果按原数组顺序存储
  4. 流水线执行:不等待批次完成,而是任务完成即补充,最大化并发效率

这种设计实现了受限并发的流水线处理,既控制了资源使用,又保证了高效执行和结果有序性。

相关文章:

  • WEB3——什么是ABI
  • 《TCP/IP 详解 卷1:协议》第2章:Internet 地址结构
  • <PLC><socket><西门子>基于西门子S7-1200PLC,实现手机与PLC通讯(通过websocket转接)
  • 云原生微服务架构演进之路:理念、挑战与实践
  • 小型图书管理系统案例(用于spring mvc 实践)
  • MicroPython+L298N+ESP32控制电机转速
  • Wi-Fi 切换 5G 的时机
  • 公链地址生成曲线和算法
  • 【NLP入门系列一】NLP概述和独热编码
  • c/c++的opencv霍夫变换
  • LLM 使用 MCP 协议及其原理详解
  • Glide NoResultEncoderAvailableException异常解决
  • 安装启动Mosquitto以及问题error: cjson/cJSON.h: No such file or directory解决
  • leetcode:7. 整数反转(python3解法,数学相关算法题)
  • Python学习(5) ----- Python的JSON处理
  • IDEA 在公司内网配置gitlab
  • 室内VR全景助力房产营销及装修
  • 敏捷开发在AI团队的适配研究
  • Android 开发 Kotlin 全局大喇叭与广播机制
  • STM32G4 电机外设篇(二) VOFA + ADC + OPAMP
  • wordpress 加载速度优化/什么是seo营销
  • 怎样弄网站的导航栏/关键词筛选工具
  • 公司做网站推广/重庆seo顾问
  • 申请注册公司需要哪些条件/杭州seo网站排名
  • 电子政务与网站建设工作总结/百度关键词排名优化
  • 武汉市网站开发公司/seo关键词排名优化