学习笔记:实现一个异步任务调度器 (控制并发数量)
1. 核心问题:场景分析
面试中遇到了一个典型的并发控制场景题:需要实现一个任务调度器类,该类可以添加异步任务,并确保同时执行的任务数量不能超过一个最大限制(例如 2 个)。
现实场景模拟:
- 背景: 在一个电商网站的商品列表页,有成百上千个商品。
- 目标: 为了提升用户体验,当用户点击某个商品时,可以立刻看到商品详情页,而不是等待网络请求。
- 解决方案: 在列表页预加载(pre-fetch)商品详情页的数据。
- 痛点/约束:
- 如果同时发起上百个请求,会过度占用网络带宽,影响其他关键操作(如搜索)。
- 浏览器本身对同域下的并发 HTTP 请求数量有限制(通常是 6 个)。
- 要求: 设计一个调度器,限制预加载任务的并发数量,比如最多同时只预加载 2 个商品的数据。
2. 任务执行逻辑拆解
假设我们有 4 个任务,并发限制为 2,它们的执行逻辑如下:
| 时间点 | 事件 | 正在运行的任务 (并发数) | 等待队列 | 输出日志 |
|---|---|---|---|---|
| 0s | 任务1 (耗时1s) 和 任务2 (耗时2s) 开始执行。 | [任务1, 任务2] (2/2) | [任务3, 任务4] | |
| 1s | 任务1 执行完毕。 | [任务2] (1/2) | [任务3, 任务4] | 1s 后输出: 商品一 |
| " | 任务3 (耗时3s) 立即开始执行。 | [任务2, 任务3] (2/2) | [任务4] | |
| 2s | 任务2 执行完毕。 | [任务3] (1/2) | [任务4] | 2s 后输出: 商品二 |
| " | 任务4 (耗时4s) 立即开始执行。 | [任务3, 任务4] (2/2) | [] | |
| 4s | 任务3 执行完毕 (在1s时开始,耗时3s)。 | [任务4] (1/2) | [] | 4s 后输出: 商品三 |
| 6s | 任务4 执行完毕 (在2s时开始,耗时4s)。 | [] (0/2) | [] | 6s 后输出: 商品四 |
关键点:后续任务的最终完成时间,取决于它 开始执行的时间 加上它 自身的耗时。而它开始执行的时间,又取决于前面任务何时释放出并发名额。
3. 代码实现步骤
a. 类的基本结构 (Constructor)
我们需要在构造函数中初始化三个核心属性:
tasks: 一个数组,作为等待队列,存放所有未开始的任务。max: 数值,表示最大并发数。runningCount: 数值,用于记录当前正在执行的任务数量。
javascript
class TaskScheduler {
constructor(max = 2) {
this.tasks = []; // 任务等待队列
this.max = max; // 最大并发数
this.runningCount = 0; // 当前正在运行的任务数
}
}b. addTask 方法:添加任务
这个方法负责接收任务并将其放入等待队列,但 不立即执行。
- 接收一个任务
task(它是一个返回 Promise 的函数)。 - 为了能在任务完成后
resolve外层的Promise,我们将task和resolve/reject函数包装在一起,推入tasks队列。 - 每次添加任务后,调用
run()方法尝试启动任务。 - 返回一个
Promise,以便外部可以await任务的最终完成。
javascript
addTask(task) {
return new Promise((resolve, reject) => {
// 将任务函数和其 Promise 的控制器一起推入队列
this.tasks.push({
task,
resolve,
reject
});
this._run(); // 尝试执行任务
});
}c. _run 方法:核心调度逻辑
这是调度器的“心脏”,负责在合适的时机从队列中取出任务并执行。
- 执行前提检查 (Guard Clauses):
- 如果
runningCount大于或等于max(并发已满),则直接返回,等待下次调用。 - 如果
tasks队列为空(没有待执行的任务),也直接返回。
- 如果
- 执行任务:
- 从
tasks队列中取出一个任务 (shift())。 - 将
runningCount加一。 - 执行任务
task()。
- 从
- 任务完成后的处理 (
finally):- 无论任务成功 (
then) 还是失败 (catch),它最终都会结束。我们使用finally来确保后续逻辑一定被执行。 - 在
finally块中,将runningCount减一,释放一个并发名额。 - 关键一步:再次调用
_run()方法。这会检查是否有等待中的任务可以立即开始执行,形成一个“链式反应”。
- 无论任务成功 (
javascript
_run() {
// 如果并发已满或队列为空,则不执行
if (this.runningCount >= this.max || this.tasks.length === 0) {
return;
}
// 从队列头部取出一个任务
const { task, resolve, reject } = this.tasks.shift();
this.runningCount++;
// 执行任务
task()
.then(resolve, reject) // 将任务结果传递给 addTask 返回的 Promise
.finally(() => {
// 任务完成后,无论成功或失败
this.runningCount--; // 释放一个并发名额
this._run(); // 尝试执行下一个任务
});
}4. 完整代码示例
javascript
class TaskScheduler {
constructor(max = 2) {
this.tasks = [];
this.max = max;
this.runningCount = 0;
}
/**
* 添加一个异步任务到调度器
* @param {() => Promise<any>} task 一个返回 Promise 的函数
* @returns {Promise<any>}
*/
addTask(task) {
return new Promise((resolve, reject) => {
this.tasks.push({
task,
resolve,
reject
});
this._run();
});
}
_run() {
if (this.runningCount >= this.max || this.tasks.length === 0) {
return;
}
const { task, resolve, reject } = this.tasks.shift();
this.runningCount++;
// 确保传入的 task 被当作 Promise 处理,增加健壮性
Promise.resolve(task())
.then(resolve, reject)
.finally(() => {
this.runningCount--;
this._run();
});
}
}
// --- 测试代码 ---
const scheduler = new TaskScheduler(2);
const timeout = (time, name) => {
return () => new Promise(resolve => {
setTimeout(() => {
console.log(`${(Date.now() - startTime)/1000}s 后输出: ${name}`);
resolve();
}, time);
});
};
const addTask = (time, name) => {
scheduler.addTask(timeout(time, name));
};
const startTime = Date.now();
addTask(1000, "商品一");
addTask(2000, "商品二");
addTask(3000, "商品三");
addTask(4000, "商品四");
// 预期输出:
// 1s 后输出: 商品一
// 2s 后输出: 商品二
// 4s 后输出: 商品三 (等待1s + 执行3s)
// 6s 后输出: 商品四 (等待2s + 执行4s)5. 总结
实现一个异步任务调度器的核心在于:
- 队列(Queue):用一个数组
tasks来缓存所有待执行的任务。 - 计数器(Counter):用一个变量
runningCount来追踪当前正在执行的任务数。 - 调度函数(Scheduler Function):
_run方法是关键,它在每次任务完成时被递归调用,形成一个自驱动的循环,不断地从队列中取出新任务,直到队列为空。 - Promise 封装:
addTask方法通过返回一个新的Promise,将内部复杂的调度逻辑对外部调用者透明化,使其可以简单地使用async/await或.then()来等待任务的最终结果。