一个简单的nodejs多任务线程池的实现类
不确定npm有么有类似的项目,由于本人只做一个小爬虫,所以做了这个线程池类
比较简单,记录一下,以便重复使用!
ThreadPool
new ThreadPool(name ,concurrency, progressCallback);
- name 线程池名称,可随意设置
- 同时执行的任务数
- 任务进度回调
- progressCallback 接受两个参数completed 已完成任务 total 任务数
enqueue
装载任务
示例:
for (let i = 0; i < 5; i++) {
pool.enqueue(sampleTask, 3); // 每个任务最多重试 3 次
}
waitForAll
等待任务完成
示例:
await pool.waitForAll();
完整示例:
// 使用示例:
const pool = new ThreadPool(2, (completed, total) => {
console.log(`进度: ${completed}/${total}`);
});
async function sampleTask() {
// 模拟一个偶尔失败的任务
if (Math.random() < 0.5) throw new Error('任务失败');
console.log('任务成功完成');
}
(async () => {
for (let i = 0; i < 5; i++) {
pool.enqueue(sampleTask, 3); // 每个任务最多重试 3 次
}
await pool.waitForAll();
console.log('所有任务完成');
})();
看代码:
class ThreadPool {
constructor(concurrency, progressCallback) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
this.completedTasks = 0;
this.totalTasks = 0;
this.progressCallback = progressCallback || (() => { });
this.resolveDone = null;
this.donePromise = new Promise(resolve => this.resolveDone = resolve);
}
async runTask(task, retries) {
this.running++;
let attempt = 0;
while (attempt <= retries) {
try {
await task();
break; // 任务成功,退出循环
} catch (error) {
attempt++;
if (attempt > retries) {
console.error('任务在最大重试次数后仍然失败:', error);
} else {
console.warn(`任务失败,正在重试... (第 ${attempt}/${retries} 次)`);
}
}
}
this.running--;
this.completedTasks++;
this.progressCallback(this.completedTasks, this.totalTasks);
this.fillQueue();
}
fillQueue() {
while (this.running < this.concurrency && this.queue.length > 0) {
const { task, retries } = this.queue.shift();
this.runTask(task, retries);
}
if (this.running === 0 && this.queue.length === 0) {
this.resolveDone();
}
}
async enqueue(task, retries = 0) {
if (typeof task !== 'function' || task.constructor.name !== 'AsyncFunction') {
throw new Error('任务必须是一个异步函数');
}
this.totalTasks++;
const taskObject = { task, retries };
if (this.running < this.concurrency) {
this.runTask(task, retries);
} else {
this.queue.push(taskObject);
}
}
async waitForAll() {
if (this.running === 0 && this.queue.length === 0) {
return;
}
await this.donePromise;
// 重置 donePromise 以便将来使用
this.donePromise = new Promise(resolve => this.resolveDone = resolve);
}
}