任务并发控制
- 为什么需要任务并发控制
- 什么是任务并发控制
- 核心设计
- 拆解
- 形象总结
- 完整代码
为什么需要任务并发控制
- 我们在面试里常会被问到:“如何控制并发请求数量?”
- 有人答
Promise.all——那只能“一股脑”发出; - 有人答
async/await循环——那只能“串行”排队。 - 但是如果想“同时跑 N 个,其余排队,自然补位”,就得靠并发任务调度器
什么是任务并发控制
- 并发任务调度器是一种运行时调度机制,它依据预设的并发度(pool size)对异步任务进行排队与准入控制:在任意时刻仅允许最多 N 个任务同时进入事件循环执行,超出部分按 FIFO(先进先出) 缓存在等待队列;每当运行中任务结束,调度器立即从队列头部取出新任务补位,从而在保证资源利用率最大化的同时,防止因瞬时高并发导致线程/连接/内存等资源耗尽。
核心设计
- 等待队列:等待队列是一个普通数组,按 FIFO 顺序缓存待执行的
{ fn, resolve } 元组:新任务从尾部推进,调度器每次从头部弹出,既保证提交顺序,又通过简单的 shift/push 实现 O(1) 级入队与出队。 - 计数器:
runningTaskCount 计数器实时记录当前已占用的并发槽位数,每次任务启动前自增、任务结束后自减,成为调度循环唯一的“信号灯”,确保任意时刻运行任务量不超过预设阈值。 - 调度循环:
runTask() 采用“贪婪填充”策略:在 while 循环中持续检测槽位未满 && 队列有任务,立即弹出并启动新任务;任务完成回调里再次递归调用自身,从而把可用并发额度一次性填满,实现零延迟补位。 - 动态扩容:
setPoolSize(newSize) 允许运行期任意修改并发上限,调整后会立即触发一次 runTask(),若新上限大于当前占用数,等待队列中的任务将被即时拉取执行,无需重启调度器即可在线扩/缩容。
拆解
构造器部分
constructor({poolSize}) {this.waiting = [];this.poolSize = poolSize || 2;this.runningTaskCount = 0;}
设置阈值部分
setPoolSize(size){this.poolSize = size;this.runTask();}
增加任务方法
add(fn){return new Promise((resolve)=>{this.waiting.push({fn,resolve});this.runTask();})}
核心调度方法
- 使用
finally来处理:任务无论成功还是失败都要计数器-1 - 递归检查是否还能继续处理任务
runTask() {while (this.runningTaskCount < this.poolSize &&this.waiting.length) {const { fn, resolve } = this.waiting.shift();this.runningTaskCount++;fn().finally(() => { this.runningTaskCount--;this.runTask(); resolve(); });}}
形象总结
- 并发调度器就像一个异步红绿灯,**绿灯数(阈值)**固定,车多就排队,车走自动补位,让整条街吞吐最大又永不堵死。
完整代码
class SuperTask {constructor({poolSize}) {this.waiting = [];this.poolSize = poolSize || 2;this.runningTaskCount = 0;}setPoolSize(size){this.poolSize = size;this.runTask();}add(fn){return new Promise((resolve)=>{this.waiting.push({fn,resolve});this.runTask();})}runTask(){while(this.runningTaskCount < this.poolSize && this.waiting.length > 0){const {fn,resolve} = this.waiting.shift();this.runningTaskCount++;fn().then(()=>{resolve();this.runningTaskCount--;this.runTask();})}}
}