...
大前端

一个简单的nodejs多任务线程池的实现类

不确定npm有么有类似的项目,由于本人只做一个小爬虫,所以做了这个线程池类
比较简单,记录一下,以便重复使用!

ThreadPool

new ThreadPool(name ,concurrency, progressCallback);
  • name 线程池名称,可随意设置
  • 同时执行的任务数
  • 任务进度回调
  • progressCallback 接受两个参数completed 已完成任务 total 任务数

enqueue

装载任务
示例:

for (let i = 0; i < 5; i++) {
    pool.enqueue(sampleTask, 3); // 每个任务最多重试 3 次
}

waitForAll

等待任务完成
示例:

await pool.waitForAll();

完整示例:

// 使用示例:
const pool = new ThreadPool(2, (completed, total) => {
    console.log(`进度: ${completed}/${total}`);
});

async function sampleTask() {
    // 模拟一个偶尔失败的任务
    if (Math.random() < 0.5) throw new Error('任务失败');
    console.log('任务成功完成');
}

(async () => {
    for (let i = 0; i < 5; i++) {
        pool.enqueue(sampleTask, 3); // 每个任务最多重试 3 次
    }
    await pool.waitForAll();
    console.log('所有任务完成');
})();

看代码:

class ThreadPool {
    constructor(concurrency, progressCallback) {
        this.concurrency = concurrency;
        this.running = 0;
        this.queue = [];
        this.completedTasks = 0;
        this.totalTasks = 0;
        this.progressCallback = progressCallback || (() => { });
        this.resolveDone = null;
        this.donePromise = new Promise(resolve => this.resolveDone = resolve);
    }

    async runTask(task, retries) {
        this.running++;
        let attempt = 0;
        while (attempt <= retries) {
            try {
                await task();
                break; // 任务成功,退出循环
            } catch (error) {
                attempt++;
                if (attempt > retries) {
                    console.error('任务在最大重试次数后仍然失败:', error);
                } else {
                    console.warn(`任务失败,正在重试... (第 ${attempt}/${retries} 次)`);
                }
            }
        }
        this.running--;
        this.completedTasks++;
        this.progressCallback(this.completedTasks, this.totalTasks);
        this.fillQueue();
    }

    fillQueue() {
        while (this.running < this.concurrency && this.queue.length > 0) {
            const { task, retries } = this.queue.shift();
            this.runTask(task, retries);
        }
        if (this.running === 0 && this.queue.length === 0) {
            this.resolveDone();
        }
    }

    async enqueue(task, retries = 0) {
        if (typeof task !== 'function' || task.constructor.name !== 'AsyncFunction') {
            throw new Error('任务必须是一个异步函数');
        }

        this.totalTasks++;
        const taskObject = { task, retries };
        if (this.running < this.concurrency) {
            this.runTask(task, retries);
        } else {
            this.queue.push(taskObject);
        }
    }

    async waitForAll() {
        if (this.running === 0 && this.queue.length === 0) {
            return;
        }
        await this.donePromise;
        // 重置 donePromise 以便将来使用
        this.donePromise = new Promise(resolve => this.resolveDone = resolve);
    }
}
ffmpeg将m3u8转换成mp4 Android使用Apktool反编译解包Apk文件
biu biu biu
Ubuntu20.04 设置开机自启动 github无法访问,访问速度慢解决 常用的前端JavaScript方法封装 12个有用的JavaScript数组技巧 Windows 10系统连接共享打印机报错0x00000709、0x0000007c、0x0000011b