node 使用 async/await 手寫錯誤重試與并發(fā)數量控制

很多人都曾經聽說過,async / await 是 node 異步的終極解決方案,這句話確實沒錯,但是僅僅掌握最基本的 async 語法并不能讓我們應付所有的異步需求,這次我們就來聊一下異步編程中兩個非常常見的功能:錯誤重試并發(fā)數量控制。

本文包含 async / await、Promise、迭代器、Set、閉包等知識點,絕對夠你一次看個爽。

錯誤重試

在 node 開發(fā)中,很多異步請求都是有可能報錯的,而錯誤重試則是解決這個問題最簡單的方法,下面直接上代碼:

/**
 * 錯誤重試包裝器
 * 
 * @param {async function} asyncFunc 要包裝的異步函數
 * @param {number} defaultRetryTime 默認的重試次數
 * @param {number} retryInterval 重試間隔時常
 * @returns 會自動進行錯誤重試的異步函數
 */
const retryWarpper = function (asyncFunc, defaultRetryTime = 3, retryInterval = 1000) {
    // 內部重試計數器
    let retryTime = defaultRetryTime

    const retryCallback = async function(...args) {
        try {
            return await asyncFunc(...args)
        }
        catch (e) {
            if (retryTime <= 0) throw e

            console.log(`${args} 查詢失敗,將在 ${retryInterval} 毫秒后重試,剩余重試次數 ${retryTime}`)
            retryTime -= 1
            await new Promise(reslove => setTimeout(reslove, retryInterval))
            return await retryCallback(...args)
        }
    }

    return retryCallback
}

通過這個 warpper 函數就可以使任意異步函數包裝上錯誤重試功能,大概二十行,思路也非常簡單,這個包裝器里通過返回一個和原異步函數入參返回值保持一致的函數來閉包保存剩余重試次數 retryTime,在執(zhí)行異步方法時通過 try / catch 來檢查有沒有報錯,一旦報錯則減少剩余重試次數并尾遞歸調用自身來達到重試的目的。

這里需要注意的一點是,哪怕內部的異步調用報錯并重試了,對于外部調用者來說,這個異步 promsie 的狀態(tài)依舊是 pending,直到內部調用成功 resolve 或者達到重試上限后變?yōu)?rejected。之所以會這樣,是因為外部調用者看到的是由 async 函數包裝過的 promise(即 retryCallback 的返回值),而不是實際異步方法(asyncFunc 參數)返回的 promise。

下面上代碼測試一下:

/** mock 接口,100% 返回調用失敗 */
const fetchData = function () {
    return new Promise((reslove, reject) => setTimeout(reject, 1000))
}

// 包裝上錯誤重試
const fetchDataWithRetry = retryWarpper(fetchData)

const run = async function () {
    const resp = await fetchDataWithRetry()
    console.log('獲取完成!', resp)
}

run()

可以看到,由于我們的 mock 函數一直報錯并失敗,所以將會一直重試,直到重試達到上限后報錯退出。

然后再來測試下是否能正確返回:

/** mock 接口,10% 幾率調用成功 */
const fetchData = function () {
    return new Promise((reslove, reject) => {
        setTimeout(() => Math.random() < 0.1 ? reslove(true) : reject(false), 1000)
    })
}

// 包裝上錯誤重試,注意這里指定了無限次重試
const fetchDataWithRetry = retryWarpper(fetchData, Infinity)

const run = async function () {
    const resp = await fetchDataWithRetry()
    console.log('獲取完成!', resp)
}

run()

在下圖里可以看到,在指定了無限次重試后,我們的接口會一直重試直到返回成功(當然不太推薦在實際業(yè)務里這么搞):

并發(fā)數量控制

雖然我們可以通過 Promise.all 來實現同時運行多個異步任務,但是有個問題,一旦 new 出來一個 promise 之后,這個任務就已經開始了。如果我們想從一個網站爬取一千張圖片該怎么辦呢,同時并發(fā)一千個 promise?這么做于人于己都不太好,這就引出了我們接下來要了解的并發(fā)數量控制,通過控制同時執(zhí)行的異步任務數量,從而在減輕服務器壓力的情況下最大程度的利用 node 的異步能力:

下面上代碼:

/**
 * 控制并發(fā)數量
 * 
 * @param {any[]} collection 待執(zhí)行的任務數組
 * @param {number} limit 最大并發(fā)數量
 * @param {async function} asyncCallback 要執(zhí)行的異步回調
 */
const concurrent = async function (collection, limit, asyncCallback) {
    const taskIterator = collection.entries();
    const pool = new Set();

    do {
        const { done, value: [index, task] = [] } = taskIterator.next();
        if (done) {
            await Promise.allSettled(pool);
            break;
        };

        const promise = asyncCallback(task).finally(() => pool.delete(promise))

        // 同時并發(fā)的數量已經到了上限,等待其中的任務完成
        if (pool.add(promise).size >= limit) {
            await Promise.race(pool);
        }
    } while (true)
}

這個函數的入參可能有些難以理解,但是很簡單,你可以將其想象成 lodash 的 map 函數,第一個參數 collection 是要迭代的數組,第二個參數就是我們要控制的最大并發(fā)數量,第三個參數就是每次迭代所要執(zhí)行的回調,和 map 的區(qū)別就在于這個回調是異步的。

這么實現的原因就是剛才我們提到的 promise 一旦實例化之后任務就開始執(zhí)行了,所以我們不能傳入一個 promise 數組然后對其進行控制,這樣是沒有意義的。因此,我們可以通過迭代器的形式,由函數內部控制該何時開始新的異步調用。

接下來我們來看一下內部實現,主要有以下幾個要點:

  1. 由 do / while 實現整體流程,由于 await 可以“暫停”同步代碼的執(zhí)行,所以我們可以通過在 do / while 內部 await 的方法暫停循環(huán),并且使用 await Promise.race 的形式等待線程池中是否有任務完成,一旦有任務完成,我們就可以進行下一個 while 循環(huán)取出新的任務執(zhí)行并加入線程池。

  2. 使用迭代器來彈出下一個任務:由于每個 while 循環(huán)都會執(zhí)行一個新任務,所以我們要記錄當前已經執(zhí)行到了哪一個任務,最簡單的方式就是新建一個 index 變量并保存當前執(zhí)行到的任務索引,取出后再給這個變量 += 1。但是我們也可以通過 array.entries 方法生成該數組的迭代器,這樣只需要執(zhí)行 next 方法就可以取出下一個任務,并且還可以通過返回的 done 屬性來識別是否已經取完了所有的任務。

  3. 使用 Set 來作為線程池:一開始新建的 Set 對象 pool 可以說是整個函數的核心,可以看到 while 循環(huán)的中間部分,我們執(zhí)行異步回調來獲取該任務的 promise,并在其結束時將自身從池子中移除,隨后我們將這個 promise 加入到池子中,并根據池子的當前存放數量來判斷是否超過了設定的并發(fā)上限。

    這里需要注意的是,我們直接給 Promise.race 傳入了這個 Set,這其實是沒有問題的,我們可以在 MDN - Promise.race 中看到他的入參是一個可迭代對象而不僅僅是個數組。而 Set - JavaScript | MDN (mozilla.org) 告訴我們其實現了 @@iterator 方法,所以 Set 實例其實就是一個可迭代對象,可以直接傳遞給 Promise.race。

  4. done 時使用 Promise.allSettled 進行特判:這里主要有兩個疑問,為什么不繼續(xù)使用 Promise.race 了?因為 race 是傳入的 promise 里只要有一個結束了,那 race 返回的 promise 就會結束。而當迭代結束時,即所有的任務都已經執(zhí)行并放到任務池里了,那么我們接下來的任務就是等待 所有 剩下的任務完成,自然就不能再用 race 了。

    另一個疑問是為什么不用 Promise.all?答案是:all 會等待所有任務執(zhí)行是沒錯,但是他是等待所有任務 成功,一旦其中有一個 promise 狀態(tài)變?yōu)?rejected 了,那 Promise.all 就會直接失敗并退出。而比較少見的 MDN - Promise.allSettled() 則可以很好的契合這個需求。

光分析可能會有些干巴巴的,下面我們來結合例子實踐一下:計算從 0-9 的 +1 結果,首先請出我們的測試素材:

/**
 * 異步任務,接受一個數字,并在隨機時間后計算數字 + 1
 */
const plusNum = function (num) {
    console.log('正在獲取', num, '的計算結果')
    const calcTime = Math.ceil(Math.random() * 4000)
    return new Promise(resolve => {
        setTimeout(() => {
            console.log(`${num} + 1 的計算結果為 ${num + 1},用時 ${calcTime}ms`)
            resolve()
        }, calcTime);
    });
}

/**
 * 生成一個從 0 到 9 的步進數組
 */
const numArray = Array.from({ length: 10 }).map((_, index) => index)

注意其中的 mock 函數 plusNum 的異步時間是隨機的(最大 4 秒),因為如果這個時間是固定的話就會產生同時開始同時結束的情況,不方便我們檢查。現在我們加入并發(fā)數量控制,如下:

const run = async function () {
    await concurrent(numArray, 3, plusNum)
    console.log('任務執(zhí)行完畢')
}

run()

很簡單對吧,傳入要迭代的數組,指定并發(fā)數量,最后傳入要執(zhí)行的異步回調,下面我們執(zhí)行一下:

可以看到,由于我們設置的并發(fā)上限為 3,所以一開始直接啟動了前三個任務,隨后每完成一個任務就會取出新的任務并執(zhí)行,最后等待所有任務都結束后,并發(fā)結束。

收集并發(fā)結果并結合錯誤重試

實際上上面這個例子并不完整,為什么呢?因為我們并沒有收集到異步任務的執(zhí)行結果并返回給調用者,就想我們更喜歡用 map 而不是 forEach 一樣,接下來我們就實現這個功能,并順便加上上面實現的錯誤重試(畢竟這兄弟倆基本都是同時出現的)。

老規(guī)矩先上代碼再介紹:

const concurrent = async function (collection, limit, asyncCallback) {
    const taskIterator = collection.entries();
    const pool = new Set();
    // 存放異步任務的執(zhí)行結果或錯誤
    const finalResult = [];
    const finalError = [];

    do {
        const { done, value: [index, task] = [] } = taskIterator.next();
        if (done) {
            await Promise.allSettled(pool);
            break;
        };

        // 完成或失敗時將結果存起來
        // 注意這里包裝上了錯誤重試功能
        const promise = retryWarpper(asyncCallback)(task)
            .then(data => finalResult[index] = data)
            .catch(error => finalError[index] = { error, task })
            .finally(() => pool.delete(promise))

        if (pool.add(promise).size >= limit) {
            await Promise.race(pool);
        }
    } while (true)

    // 將結果和異常一起返回出去
    return [finalError, Array.from(finalResult)];
}

大致就是新建兩個數組來存放最終的結果和異常,在異步回調執(zhí)行之前先包裝上我們的錯誤重試,然后在用 .then 和 .catch 來捕獲并存放執(zhí)行結果。注意這里要在 while 循環(huán)里對每次異步回調執(zhí)行都執(zhí)行一下錯誤重試的包裝。

如果我們在一開始就包裝了的話,在 while 循環(huán)里調用的所有異步回調都將會訪問到同一個閉包中的錯誤重試計時器,就導致了所有的異步任務總共只有那么多重試次數,而非每個異步任務享有自己單獨的重試次數。

這里還有一個小問題,有的同學可能會問:在 while 里執(zhí)行異步回調的時候這么寫行不行:

const promise = retryWarpper(asyncCallback)(task);

// 先加入任務池,再綁定回調
pool.add(promise);

promise.then(data => finalResult[index] = data)
    .catch(error => finalError[index] = { error, task })
    .finally(() => pool.delete(promise));

答案是不行的,雖然看起來只是流程上變了一下,但是兩者是有本質上的區(qū)別的,問題在于 Promise 的 then / catch / finally 都會返回一個 新的 Promise。也就是說,一開始我們存放到任務池里的 promise 是 finally 返回的,所以在最后的 Promise.all 的等待時,他會等待這個 promise 綁定的 then / catch / finally 回調都執(zhí)行完之后才結束。

而如果我們先將異步任務返回的 promise 加入任務池,然后再綁定回調,會發(fā)生什么呢:

  • 迭代器返回 done,開始 await Promise.all
  • 異步任務完成,Promise.all 結束退出
  • while 循環(huán)結束,返回結果數組

是不是漏了什么,是的,then / catch 回調表示:

由于我們 Promise.all 等待的是異步任務的 promise 而非 finally 回調返回 promise,所以 Promise.all 并不會等你把結果塞到 finalResult / finalError 里,然后就會導致我們最后的返回數組里少了某一個或某幾個任務的結果。


除此之外,還有個小知識點就是最后返回的時候將 finalResult 重建了一遍,而 finalError 卻沒有執(zhí)行操作,這個的原因在于我們異步回調執(zhí)行完存放結果的時候是直接使用 index 插入到數組里的,所以沒有被插入的位置是空的(empty 而不是 undefind),這兩者的區(qū)別在于 undefined 會被數組迭代器迭代到,而 empty 不會。

所以我們就可以通過這種方式,讓我們對結果進行后續(xù)處理的時候得以發(fā)現有那些數據沒有取到,而且對 error 結果進行遍歷的時候也不需要再剔除掉結果為空的情況了。


ok,到這里本文的所有分享就結束了,下面放一下包含錯誤重試和并發(fā)數量控制的完整例子,你可以自己親手試一下:

/**
 * 計算入參數字 +1 的結果,50% 幾率成功,50% 幾率失敗
 * @param {number} num 要計算的數字
 */
const plusNum = async function (num) {
    console.log('正在獲取', num, '的計算結果')
    return new Promise((resolve, reject) => {
        setTimeout(() => {

            if (Math.random() < 0.5) resolve(num + 1);
            else reject(`無法計算 ${num} + 1`);
    
        }, Math.floor(Math.random() * 5000));
    });
}

/**
 * 0-10 的步進數組
 */
const numArray = Array.from({ length: 10 }).map((_, index) => index)

/**
 * 錯誤重試包裝器
 * 
 * @param {async function} asyncFunc 要包裝的異步函數
 * @param {number} defaultRetryTime 默認的重試次數
 * @param {number} retryInterval 重試間隔時常
 * @returns 會自動進行錯誤重試的異步函數
 */
const retryWarpper = function (asyncFunc, defaultRetryTime = 3, retryInterval = 1000) {
    let retryTime = defaultRetryTime

    const retryCallback = async function(...args) {
        try {
            return await asyncFunc(...args)
        }
        catch (e) {
            if (retryTime <= 0) throw e

            console.log(`${args} 查詢失敗,將在 ${retryInterval} 毫秒后重試,剩余重試次數 ${retryTime}`)
            retryTime -= 1
            await new Promise(reslove => setTimeout(reslove, retryInterval))
            return await retryCallback(...args)
        }
    }

    return retryCallback
}

/**
 * 控制并發(fā)數量
 * 
 * @param {any[]} collection 待執(zhí)行的任務數組
 * @param {number} limit 最大并發(fā)數量
 * @param {async function} asyncCallback 要執(zhí)行的異步回調
 */
const concurrent = async function (collection, limit, asyncCallback) {
    // 用于在 while 循環(huán)中取出任務的迭代器
    const taskIterator = collection.entries();
    // 任務池
    const pool = new Set();
    // 最終返回的結果數組
    const finalResult = [];
    // 最終返回的異常數組
    const finalError = [];

    do {
        const { done, value: [index, task] = [] } = taskIterator.next();
        // 任務都已執(zhí)行,等待最后的剩下的任務執(zhí)行完畢
        if (done) {
            await Promise.allSettled(pool);
            break;
        };

        // 將結果存入結果數組,并從任務池中移除自己
        const promise = retryWarpper(asyncCallback)(task)
            .then(data => finalResult[index] = data)
            .catch(error => finalError[index] = { error, task })
            .finally(() => pool.delete(promise))

        // 達到上限后就等待某個任務完成
        if (pool.add(promise).size >= limit) {
            await Promise.race(pool);
        }
    } while (true)

    return [finalError, Array.from(finalResult)];
}

const run = async function() {
    const [errors, results] = await concurrent(numArray, 3, plusNum);
    console.log('所有報錯', errors)
    console.log('所有結果', results)
}

run()

寫在最后

其實實現類似功能的包在 npm 上已經有很多了,例如 es6-promise-pool - npmasync.mapLimit - github.io,你可以發(fā)現我上面例子里并發(fā)數量控制的入參和 async.mapLimit 是一致的,其實本來也是在研究 async 的時候產生的這個想法。這兩個例子基本上就是對 Promise 和 async / await 的綜合運用,認真思考一下還是對 js 異步編程的理解非常有幫助的,特別是 promise 回調鏈和 async 作為 promise 的語法糖究竟甜在哪里。

上面僅僅是我對這兩個需求的基本實現,才疏學淺也沒有考慮兼容性之類的問題,如果發(fā)現了什么問題歡迎評論區(qū)里指正。

參考

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容