在項(xiàng)目開(kāi)發(fā)中,會(huì)經(jīng)常遇到需要異步請(qǐng)求多個(gè)接口,獲取到數(shù)據(jù)結(jié)果,但是一個(gè)個(gè)串行調(diào)太慢,思索著可以并發(fā)同時(shí)幾個(gè)一起調(diào),做個(gè)并發(fā)池,提高請(qǐng)求效率。
這個(gè)場(chǎng)景很經(jīng)典,在日常開(kāi)發(fā)中也很實(shí)用。3年前,面試字節(jié)的時(shí)候,也被問(wèn)到類似的現(xiàn)場(chǎng)編程題。
2020年寫(xiě)了此文章,時(shí)隔3年,已經(jīng)是個(gè)老鳥(niǎo)了,把之前的豆腐渣工程翻新一下。

功能說(shuō)明
支持多個(gè)異步任務(wù)同時(shí)執(zhí)行,等待全部執(zhí)行完成后,直接返回執(zhí)行結(jié)果
- 支持限制并發(fā)池的大小
- 支持全部任務(wù)執(zhí)行成功后,進(jìn)行回調(diào)處理
- 支持執(zhí)行過(guò)程中繼續(xù)追加任務(wù)
- 返回結(jié)果支持【按順序取用】或者【按特定對(duì)象取用】
實(shí)現(xiàn)圖解

方式一【推薦】
先取用任務(wù)進(jìn)行執(zhí)行,直到工作區(qū)占滿了,當(dāng)某個(gè)任務(wù)執(zhí)行結(jié)束后,繼續(xù)取用任務(wù)執(zhí)行,直到任務(wù)區(qū)滿了,則暫停取用。一直等待某個(gè)任務(wù)結(jié)束,重新取用任務(wù)執(zhí)行。
按此循環(huán)操作,直到工作區(qū)正在執(zhí)行的任務(wù)數(shù)為0,表示全部執(zhí)行完畢。然后返回全部執(zhí)行結(jié)果,執(zhí)行回調(diào)函數(shù)。
期間,支持不斷地往任務(wù)池追加任務(wù)。
interface ITask {
fn: () => Promise<any>;
key?: string | number;
}
interface IResults {
[key: string | number]: any;
}
class TaskQueue {
maxNum: number;
running: number;
queue: Array<ITask>;
queueIndex: number;
results: IResults;
callback: null | Function;
isPushOver: boolean;
constructor(maxNum: number) {
this.maxNum = maxNum; // 并發(fā)池?cái)?shù)量大小
this.running = 0; // 當(dāng)前正在執(zhí)行的池?cái)?shù)
this.queue = []; // 任務(wù)隊(duì)列
this.queueIndex = 0; // 當(dāng)前進(jìn)入執(zhí)行的任務(wù)索引順序
this.results = {}; // 存儲(chǔ)任務(wù)執(zhí)行結(jié)果
this.callback = null; // 回調(diào)函數(shù)
this.isPushOver = false; // 任務(wù)是否追加完畢
}
// 追加任務(wù),并執(zhí)行
pushTasks(tasks: Array<ITask>) {
this.queue.push(...tasks);
this.next();
}
// 通知任務(wù)追加完畢
pushOver() {
this.isPushOver = true;
this.runOver();
}
// 任務(wù)全部執(zhí)行完畢
runOver() {
if (
typeof this.callback === "function" &&
this.running == 0 &&
this.isPushOver
) {
this.callback.call(null, this.results);
}
}
next() {
while (this.running < this.maxNum && this.queue.length) {
const task = this.queue.shift();
// 標(biāo)識(shí)當(dāng)前任務(wù)索引,方便從 results 取用值
const key = task?.key || this.queueIndex;
this.queueIndex++;
// 任務(wù)池被占用數(shù)量+1
this.running++;
// 任務(wù)臨界判斷合法性
if (!task) {
this.results[key] = null;
this.running--;
continue;
}
// 執(zhí)行任務(wù)
task
.fn()
.then((res: any) => {
this.results[key] = res;
})
.catch((err: any) => {
this.results[key] = err;
})
.finally(() => {
this.running--;
this.next();
});
}
this.runOver();
}
}
/**
* 測(cè)試用例
*/
function run() {
// 創(chuàng)建實(shí)例
const queue = new TaskQueue(5);
queue.callback = (result: any) => {
console.log("asyncPool_1 ~ result:", result);
console.log(result[1]); // 支持按順序取用結(jié)果
console.log(result.four); // 支持針對(duì)特殊任務(wù)取用結(jié)果
};
function buildTask(result: any, delay = 0) {
return () =>
new Promise((resolve) =>
setTimeout(() => {
console.log("正在執(zhí)行任務(wù)", result);
resolve(result);
}, delay)
);
}
const tasks = [
{ fn: buildTask(1, 100) },
{ fn: buildTask(2, 200) },
{ fn: buildTask(3, 300) },
{ fn: buildTask(4, 100), key: "four" }, // key
];
queue.pushTasks(tasks);
setTimeout(() => {
console.log("再追加一個(gè)任務(wù)");
queue.pushTasks([{ fn: buildTask(5, 100), key: 5 }]);
}, 500);
setTimeout(() => {
console.log("通知追加結(jié)束");
queue.pushOver();
}, 700);
}
run();

- 支持中途追加任務(wù)
- 支持全部執(zhí)行完成后,返回執(zhí)行結(jié)果,并執(zhí)行回調(diào)函數(shù)
- 支持對(duì)執(zhí)行結(jié)果按順序取用 或 按需取用
方式二
假設(shè)支持最大并發(fā)執(zhí)行的任務(wù)數(shù)是5個(gè),先創(chuàng)建5個(gè)執(zhí)行工作線,每個(gè)工作線開(kāi)始取任務(wù)并執(zhí)行。當(dāng)某個(gè)工作線的任務(wù)執(zhí)行完畢,則再次從任務(wù)池中取任務(wù)。直到某個(gè)工作線取不到任務(wù)時(shí),表示全部執(zhí)行完畢,進(jìn)行執(zhí)行回調(diào)方法。
interface ITask {
fn: () => Promise<any>;
key?: string | number;
}
interface IResults {
[key: string | number]: any;
}
function asyncPool_2(tasks: Array<ITask>, max: number, callback: Function) {
let result: IResults = {};
let taskIndex = 0;
Promise.all(
Array.from({ length: max }).map(() => {
return new Promise((resolve) => {
function runTask() {
if (tasks.length <= 0) {
resolve(null);
return;
}
const task = tasks.shift();
const key = task?.key || taskIndex;
if (!task?.fn) {
result[key] = null;
taskIndex++;
runTask();
return;
}
task.fn().then((res) => {
result[key] = res;
runTask();
});
taskIndex++;
}
runTask();
});
})
).then(() => callback(result));
}
/**
* 測(cè)試用例
*/
function run() {
function buildTask(result: any, delay = 0) {
return () =>
new Promise((resolve) =>
setTimeout(() => {
console.log("正在執(zhí)行任務(wù)", result);
resolve(result);
}, delay)
);
}
const tasks = [
{ fn: buildTask(1, 100) },
{ fn: buildTask(2, 200) },
{ fn: buildTask(3, 300) },
{ fn: buildTask(4, 100), key: "four" }, // key
];
asyncPool_2(tasks, 5, (result: any) => {
console.log("asyncPool_2 ~ result:", result);
console.log(result[1]); // 支持按順序取用結(jié)果
console.log(result.four); // 支持針對(duì)特殊任務(wù)取用結(jié)果
});
}
run();

代碼邏輯簡(jiǎn)潔一些,適用于一些相對(duì)簡(jiǎn)單的應(yīng)用場(chǎng)景,需要一開(kāi)始確定好執(zhí)行哪些任務(wù)。
- 支持傳入任務(wù)數(shù)組,雖能中途追加任務(wù),但是可能存在時(shí)機(jī)問(wèn)題
- 支持全部執(zhí)行完成后,返回結(jié)果,并執(zhí)行回調(diào)函數(shù)
- 支持對(duì)執(zhí)行結(jié)果按順序取用 或 按需取用
方式三
npm中有挺多第三方包,比如 async-pool、es-promise-pool、p-limit等,但是實(shí)際使用起來(lái)還挺麻煩,挑了使用比較多的async-pool進(jìn)行重寫(xiě)。
其中,具體實(shí)現(xiàn)原理可以查看Promise.all并發(fā)限制文章(這邊文章提供的代碼是存在問(wèn)題的,但是原理講得挺清楚的)。
基于這篇文章提供的思路,對(duì)代碼進(jìn)行改寫(xiě),具體如下
/**
* promise并發(fā)限制調(diào)用
* @param {object[]} data - 調(diào)用的數(shù)據(jù)列表
* @param {number} maxLimit - 并發(fā)調(diào)用限制個(gè)數(shù)
* @param {function} iteratee - 處理單個(gè)節(jié)點(diǎn)的方法
* @returns {promise}
*/
export function promiseLimitPool({ data = [], maxLimit = 3, iteratee = () => {} } = {}, callback=()=>{}) {
const executing = [];
const enqueue = (index = 0) => {
// 邊界處理
if (index === data.length) {
return Promise.all(executing);
}
// 每次調(diào)用enqueue, 初始化一個(gè)promise
const item = data[index];
function itemPromise(index) {
const promise = new Promise(async (resolve) => {
// 處理單個(gè)節(jié)點(diǎn)
await iteratee({ index, item: cloneDeep(item), data: cloneDeep(data) });
resolve(index);
}).then(() => {
// 執(zhí)行結(jié)束,從executing刪除自身
const delIndex = executing.indexOf(promise);
delIndex > -1 && executing.splice(delIndex, 1);
});
return promise;
}
// 插入executing數(shù)字,表示正在執(zhí)行的promise
executing.push(itemPromise(index));
// 使用Promise.rece,每當(dāng)executing數(shù)組中promise數(shù)量低于maxLimit,就實(shí)例化新的promise并執(zhí)行
let race = Promise.resolve();
if (executing.length >= maxLimit) {
race = Promise.race(executing);
}
// 遞歸,直到遍歷完
return race.then(() => enqueue(index + 1));
};
return enqueue();
}
// 示例
promiseLimitPool({
data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
maxLimit: 2,
iteratee: async ({ item }) => {
console.log('onClick -> item', item);
await Axios({
method: 'get',
url: `API接口地址`,
params: { page: 0, size: 9 },
});
},
});
缺點(diǎn):沒(méi)有提供全部成功后的回調(diào)函數(shù)(當(dāng)然,這個(gè)也支持?jǐn)U展);代碼邏輯不是很簡(jiǎn)約,代碼有點(diǎn)繞...