Skip to content

学习笔记:实现一个异步任务调度器 (控制并发数量)

1. 核心问题:场景分析

面试中遇到了一个典型的并发控制场景题:需要实现一个任务调度器类,该类可以添加异步任务,并确保同时执行的任务数量不能超过一个最大限制(例如 2 个)。

现实场景模拟:

  • 背景: 在一个电商网站的商品列表页,有成百上千个商品。
  • 目标: 为了提升用户体验,当用户点击某个商品时,可以立刻看到商品详情页,而不是等待网络请求。
  • 解决方案: 在列表页预加载(pre-fetch)商品详情页的数据。
  • 痛点/约束:
    1. 如果同时发起上百个请求,会过度占用网络带宽,影响其他关键操作(如搜索)。
    2. 浏览器本身对同域下的并发 HTTP 请求数量有限制(通常是 6 个)。
  • 要求: 设计一个调度器,限制预加载任务的并发数量,比如最多同时只预加载 2 个商品的数据。

2. 任务执行逻辑拆解

假设我们有 4 个任务,并发限制为 2,它们的执行逻辑如下:

时间点事件正在运行的任务 (并发数)等待队列输出日志
0s任务1 (耗时1s) 和 任务2 (耗时2s) 开始执行。[任务1, 任务2] (2/2)[任务3, 任务4]
1s任务1 执行完毕。[任务2] (1/2)[任务3, 任务4]1s 后输出: 商品一
"任务3 (耗时3s) 立即开始执行。[任务2, 任务3] (2/2)[任务4]
2s任务2 执行完毕。[任务3] (1/2)[任务4]2s 后输出: 商品二
"任务4 (耗时4s) 立即开始执行。[任务3, 任务4] (2/2)[]
4s任务3 执行完毕 (在1s时开始,耗时3s)。[任务4] (1/2)[]4s 后输出: 商品三
6s任务4 执行完毕 (在2s时开始,耗时4s)。[] (0/2)[]6s 后输出: 商品四

关键点:后续任务的最终完成时间,取决于它 开始执行的时间 加上它 自身的耗时。而它开始执行的时间,又取决于前面任务何时释放出并发名额。


3. 代码实现步骤

a. 类的基本结构 (Constructor)

我们需要在构造函数中初始化三个核心属性:

  1. tasks: 一个数组,作为等待队列,存放所有未开始的任务。
  2. max: 数值,表示最大并发数。
  3. runningCount: 数值,用于记录当前正在执行的任务数量。
javascript
class TaskScheduler {
    constructor(max = 2) {
        this.tasks = [];           // 任务等待队列
        this.max = max;              // 最大并发数
        this.runningCount = 0;     // 当前正在运行的任务数
    }
}
b. addTask 方法:添加任务

这个方法负责接收任务并将其放入等待队列,但 不立即执行

  1. 接收一个任务 task (它是一个返回 Promise 的函数)。
  2. 为了能在任务完成后 resolve 外层的 Promise,我们将 taskresolve/reject 函数包装在一起,推入 tasks 队列。
  3. 每次添加任务后,调用 run() 方法尝试启动任务。
  4. 返回一个 Promise,以便外部可以 await 任务的最终完成。
javascript
addTask(task) {
    return new Promise((resolve, reject) => {
        // 将任务函数和其 Promise 的控制器一起推入队列
        this.tasks.push({
            task,
            resolve,
            reject
        });
        this._run(); // 尝试执行任务
    });
}
c. _run 方法:核心调度逻辑

这是调度器的“心脏”,负责在合适的时机从队列中取出任务并执行。

  1. 执行前提检查 (Guard Clauses)
    • 如果 runningCount 大于或等于 max(并发已满),则直接返回,等待下次调用。
    • 如果 tasks 队列为空(没有待执行的任务),也直接返回。
  2. 执行任务
    • tasks 队列中取出一个任务 (shift())。
    • runningCount 加一。
    • 执行任务 task()
  3. 任务完成后的处理 (finally)
    • 无论任务成功 (then) 还是失败 (catch),它最终都会结束。我们使用 finally 来确保后续逻辑一定被执行。
    • finally 块中,将 runningCount 减一,释放一个并发名额。
    • 关键一步:再次调用 _run() 方法。这会检查是否有等待中的任务可以立即开始执行,形成一个“链式反应”。
javascript
_run() {
    // 如果并发已满或队列为空,则不执行
    if (this.runningCount >= this.max || this.tasks.length === 0) {
        return;
    }

    // 从队列头部取出一个任务
    const { task, resolve, reject } = this.tasks.shift();
    this.runningCount++;

    // 执行任务
    task()
        .then(resolve, reject) // 将任务结果传递给 addTask 返回的 Promise
        .finally(() => {
            // 任务完成后,无论成功或失败
            this.runningCount--; // 释放一个并发名额
            this._run();         // 尝试执行下一个任务
        });
}

4. 完整代码示例

javascript
class TaskScheduler {
    constructor(max = 2) {
        this.tasks = [];
        this.max = max;
        this.runningCount = 0;
    }

    /**
     * 添加一个异步任务到调度器
     * @param {() => Promise<any>} task 一个返回 Promise 的函数
     * @returns {Promise<any>}
     */
    addTask(task) {
        return new Promise((resolve, reject) => {
            this.tasks.push({
                task,
                resolve,
                reject
            });
            this._run();
        });
    }

    _run() {
        if (this.runningCount >= this.max || this.tasks.length === 0) {
            return;
        }

        const { task, resolve, reject } = this.tasks.shift();
        this.runningCount++;

        // 确保传入的 task 被当作 Promise 处理,增加健壮性
        Promise.resolve(task())
            .then(resolve, reject)
            .finally(() => {
                this.runningCount--;
                this._run();
            });
    }
}


// --- 测试代码 ---
const scheduler = new TaskScheduler(2);

const timeout = (time, name) => {
    return () => new Promise(resolve => {
        setTimeout(() => {
            console.log(`${(Date.now() - startTime)/1000}s 后输出: ${name}`);
            resolve();
        }, time);
    });
};

const addTask = (time, name) => {
    scheduler.addTask(timeout(time, name));
};

const startTime = Date.now();
addTask(1000, "商品一");
addTask(2000, "商品二");
addTask(3000, "商品三");
addTask(4000, "商品四");

// 预期输出:
// 1s 后输出: 商品一
// 2s 后输出: 商品二
// 4s 后输出: 商品三 (等待1s + 执行3s)
// 6s 后输出: 商品四 (等待2s + 执行4s)

5. 总结

实现一个异步任务调度器的核心在于:

  1. 队列(Queue):用一个数组 tasks 来缓存所有待执行的任务。
  2. 计数器(Counter):用一个变量 runningCount 来追踪当前正在执行的任务数。
  3. 调度函数(Scheduler Function)_run 方法是关键,它在每次任务完成时被递归调用,形成一个自驱动的循环,不断地从队列中取出新任务,直到队列为空。
  4. Promise 封装addTask 方法通过返回一个新的 Promise,将内部复杂的调度逻辑对外部调用者透明化,使其可以简单地使用 async/await.then() 来等待任务的最终结果。