js實(shí)現(xiàn)并發(fā)請(qǐng)求控制

在項(xiàng)目開(kāi)發(fā)中,會(huì)經(jīng)常遇到需要異步請(qǐng)求多個(gè)接口,獲取到數(shù)據(jù)結(jié)果,但是一個(gè)個(gè)串行調(diào)太慢,思索著可以并發(fā)同時(shí)幾個(gè)一起調(diào),做個(gè)并發(fā)池,提高請(qǐng)求效率。
這個(gè)場(chǎng)景很經(jīng)典,在日常開(kāi)發(fā)中也很實(shí)用。3年前,面試字節(jié)的時(shí)候,也被問(wèn)到類似的現(xiàn)場(chǎng)編程題。
2020年寫(xiě)了此文章,時(shí)隔3年,已經(jīng)是個(gè)老鳥(niǎo)了,把之前的豆腐渣工程翻新一下。


功能說(shuō)明

支持多個(gè)異步任務(wù)同時(shí)執(zhí)行,等待全部執(zhí)行完成后,直接返回執(zhí)行結(jié)果

  • 支持限制并發(fā)池的大小
  • 支持全部任務(wù)執(zhí)行成功后,進(jìn)行回調(diào)處理
  • 支持執(zhí)行過(guò)程中繼續(xù)追加任務(wù)
  • 返回結(jié)果支持【按順序取用】或者【按特定對(duì)象取用】

實(shí)現(xiàn)圖解

方式一【推薦】

先取用任務(wù)進(jìn)行執(zhí)行,直到工作區(qū)占滿了,當(dāng)某個(gè)任務(wù)執(zhí)行結(jié)束后,繼續(xù)取用任務(wù)執(zhí)行,直到任務(wù)區(qū)滿了,則暫停取用。一直等待某個(gè)任務(wù)結(jié)束,重新取用任務(wù)執(zhí)行。
按此循環(huán)操作,直到工作區(qū)正在執(zhí)行的任務(wù)數(shù)為0,表示全部執(zhí)行完畢。然后返回全部執(zhí)行結(jié)果,執(zhí)行回調(diào)函數(shù)。
期間,支持不斷地往任務(wù)池追加任務(wù)。

interface ITask {
  fn: () => Promise<any>;
  key?: string | number;
}

interface IResults {
  [key: string | number]: any;
}
class TaskQueue {
  maxNum: number;
  running: number;
  queue: Array<ITask>;
  queueIndex: number;
  results: IResults;
  callback: null | Function;
  isPushOver: boolean;

  constructor(maxNum: number) {
    this.maxNum = maxNum; // 并發(fā)池?cái)?shù)量大小
    this.running = 0; // 當(dāng)前正在執(zhí)行的池?cái)?shù)
    this.queue = []; // 任務(wù)隊(duì)列
    this.queueIndex = 0; // 當(dāng)前進(jìn)入執(zhí)行的任務(wù)索引順序
    this.results = {}; // 存儲(chǔ)任務(wù)執(zhí)行結(jié)果
    this.callback = null; // 回調(diào)函數(shù)
    this.isPushOver = false; // 任務(wù)是否追加完畢
  }

  // 追加任務(wù),并執(zhí)行
  pushTasks(tasks: Array<ITask>) {
    this.queue.push(...tasks);
    this.next();
  }

  // 通知任務(wù)追加完畢
  pushOver() {
    this.isPushOver = true;
    this.runOver();
  }

  // 任務(wù)全部執(zhí)行完畢
  runOver() {
    if (
      typeof this.callback === "function" &&
      this.running == 0 &&
      this.isPushOver
    ) {
      this.callback.call(null, this.results);
    }
  }

  next() {
    while (this.running < this.maxNum && this.queue.length) {
      const task = this.queue.shift();
      // 標(biāo)識(shí)當(dāng)前任務(wù)索引,方便從 results 取用值
      const key = task?.key || this.queueIndex;
      this.queueIndex++;
      // 任務(wù)池被占用數(shù)量+1
      this.running++;
      // 任務(wù)臨界判斷合法性
      if (!task) {
        this.results[key] = null;
        this.running--;
        continue;
      }
      // 執(zhí)行任務(wù)
      task
        .fn()
        .then((res: any) => {
          this.results[key] = res;
        })
        .catch((err: any) => {
          this.results[key] = err;
        })
        .finally(() => {
          this.running--;
          this.next();
        });
    }

    this.runOver();
  }
}

/**
 * 測(cè)試用例
 */

function run() {
  // 創(chuàng)建實(shí)例
  const queue = new TaskQueue(5);
  queue.callback = (result: any) => {
    console.log("asyncPool_1 ~ result:", result);
    console.log(result[1]); // 支持按順序取用結(jié)果
    console.log(result.four); // 支持針對(duì)特殊任務(wù)取用結(jié)果
  };

  function buildTask(result: any, delay = 0) {
    return () =>
      new Promise((resolve) =>
        setTimeout(() => {
          console.log("正在執(zhí)行任務(wù)", result);
          resolve(result);
        }, delay)
      );
  }

  const tasks = [
    { fn: buildTask(1, 100) },
    { fn: buildTask(2, 200) },
    { fn: buildTask(3, 300) },
    { fn: buildTask(4, 100), key: "four" }, // key
  ];
  queue.pushTasks(tasks);
  setTimeout(() => {
    console.log("再追加一個(gè)任務(wù)");
    queue.pushTasks([{ fn: buildTask(5, 100), key: 5 }]);
  }, 500);

  setTimeout(() => {
    console.log("通知追加結(jié)束");
    queue.pushOver();
  }, 700);
}

run();
測(cè)試執(zhí)行結(jié)果
  1. 支持中途追加任務(wù)
  2. 支持全部執(zhí)行完成后,返回執(zhí)行結(jié)果,并執(zhí)行回調(diào)函數(shù)
  3. 支持對(duì)執(zhí)行結(jié)果按順序取用 或 按需取用

方式二

假設(shè)支持最大并發(fā)執(zhí)行的任務(wù)數(shù)是5個(gè),先創(chuàng)建5個(gè)執(zhí)行工作線,每個(gè)工作線開(kāi)始取任務(wù)并執(zhí)行。當(dāng)某個(gè)工作線的任務(wù)執(zhí)行完畢,則再次從任務(wù)池中取任務(wù)。直到某個(gè)工作線取不到任務(wù)時(shí),表示全部執(zhí)行完畢,進(jìn)行執(zhí)行回調(diào)方法。

interface ITask {
  fn: () => Promise<any>;
  key?: string | number;
}

interface IResults {
  [key: string | number]: any;
}
function asyncPool_2(tasks: Array<ITask>, max: number, callback: Function) {
  let result: IResults = {};
  let taskIndex = 0;
  Promise.all(
    Array.from({ length: max }).map(() => {
      return new Promise((resolve) => {
        function runTask() {
          if (tasks.length <= 0) {
            resolve(null);
            return;
          }
          const task = tasks.shift();
          const key = task?.key || taskIndex;
          if (!task?.fn) {
            result[key] = null;
            taskIndex++;
            runTask();
            return;
          }
          task.fn().then((res) => {
            result[key] = res;
            runTask();
          });
          taskIndex++;
        }
        runTask();
      });
    })
  ).then(() => callback(result));
}
/**
 * 測(cè)試用例
 */
function run() {
  function buildTask(result: any, delay = 0) {
    return () =>
      new Promise((resolve) =>
        setTimeout(() => {
          console.log("正在執(zhí)行任務(wù)", result);
          resolve(result);
        }, delay)
      );
  }

  const tasks = [
    { fn: buildTask(1, 100) },
    { fn: buildTask(2, 200) },
    { fn: buildTask(3, 300) },
    { fn: buildTask(4, 100), key: "four" }, // key
  ];
  asyncPool_2(tasks, 5, (result: any) => {
    console.log("asyncPool_2 ~ result:", result);
    console.log(result[1]); // 支持按順序取用結(jié)果
    console.log(result.four); // 支持針對(duì)特殊任務(wù)取用結(jié)果
  });
}

run();
測(cè)試用例執(zhí)行結(jié)果

代碼邏輯簡(jiǎn)潔一些,適用于一些相對(duì)簡(jiǎn)單的應(yīng)用場(chǎng)景,需要一開(kāi)始確定好執(zhí)行哪些任務(wù)。

  1. 支持傳入任務(wù)數(shù)組,雖能中途追加任務(wù),但是可能存在時(shí)機(jī)問(wèn)題
  2. 支持全部執(zhí)行完成后,返回結(jié)果,并執(zhí)行回調(diào)函數(shù)
  3. 支持對(duì)執(zhí)行結(jié)果按順序取用 或 按需取用

方式三

npm中有挺多第三方包,比如 async-pool、es-promise-pool、p-limit等,但是實(shí)際使用起來(lái)還挺麻煩,挑了使用比較多的async-pool進(jìn)行重寫(xiě)。
其中,具體實(shí)現(xiàn)原理可以查看Promise.all并發(fā)限制文章(這邊文章提供的代碼是存在問(wèn)題的,但是原理講得挺清楚的)。
基于這篇文章提供的思路,對(duì)代碼進(jìn)行改寫(xiě),具體如下

/**
 * promise并發(fā)限制調(diào)用
 * @param {object[]} data - 調(diào)用的數(shù)據(jù)列表
 * @param {number} maxLimit - 并發(fā)調(diào)用限制個(gè)數(shù)
 * @param {function} iteratee - 處理單個(gè)節(jié)點(diǎn)的方法
 * @returns {promise}
 */
export function promiseLimitPool({ data = [], maxLimit = 3, iteratee = () => {} } = {}, callback=()=>{}) {
  const executing = [];
  const enqueue = (index = 0) => {
    // 邊界處理
    if (index === data.length) {
      return Promise.all(executing);
    }
    // 每次調(diào)用enqueue, 初始化一個(gè)promise
    const item = data[index];

    function itemPromise(index) {
      const promise = new Promise(async (resolve) => {
        // 處理單個(gè)節(jié)點(diǎn)
        await iteratee({ index, item: cloneDeep(item), data: cloneDeep(data) });
        resolve(index);
      }).then(() => {
        // 執(zhí)行結(jié)束,從executing刪除自身
        const delIndex = executing.indexOf(promise);
        delIndex > -1 && executing.splice(delIndex, 1);
      });
      return promise;
    }
    // 插入executing數(shù)字,表示正在執(zhí)行的promise
    executing.push(itemPromise(index));

    // 使用Promise.rece,每當(dāng)executing數(shù)組中promise數(shù)量低于maxLimit,就實(shí)例化新的promise并執(zhí)行
    let race = Promise.resolve();

    if (executing.length >= maxLimit) {
      race = Promise.race(executing);
    }

    // 遞歸,直到遍歷完
    return race.then(() => enqueue(index + 1));
  };

  return enqueue();
}

// 示例
 promiseLimitPool({
      data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
      maxLimit: 2,
      iteratee: async ({ item }) => {
        console.log('onClick -> item', item);
        await Axios({
          method: 'get',
          url: `API接口地址`,
          params: { page: 0, size: 9 },
        });
      },
 });

缺點(diǎn):沒(méi)有提供全部成功后的回調(diào)函數(shù)(當(dāng)然,這個(gè)也支持?jǐn)U展);代碼邏輯不是很簡(jiǎn)約,代碼有點(diǎn)繞...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容