前言
最近維護(hù)一個老項(xiàng)目中的微信公眾號h5的新需求,項(xiàng)目是Node.js服務(wù),Node層負(fù)責(zé)前端路由、api聚合以及用戶信息校驗(yàn)等工作,項(xiàng)目較大,上一次維護(hù)已經(jīng)是3年前,現(xiàn)在不方便重構(gòu)整個項(xiàng)目,新需求也依賴Node層做路由和api管理,因此需要在原項(xiàng)目內(nèi)做開發(fā)。
原項(xiàng)目的前端資源由fis3輸出到指定目錄,其中js、css、img資源會上傳至cdn,由另一臺服務(wù)器nginx代理(文件上傳服務(wù)會由另一個Node服務(wù)處理),Node層只負(fù)責(zé)解析html的解析,以減少Node層業(yè)務(wù)的壓力。新需求基于Vue開發(fā),需要webpack打包,因此需要通過其他方法上傳靜態(tài)資源到nginx的服務(wù)器。
文件上傳請求是一個異步操作,那么是否可以通過并發(fā)請求,加快上傳的速度呢?新需求是多頁面,資源有上百個文件,同時并發(fā)上百個請求會對服務(wù)器造成過大壓力,我們需要一定的機(jī)制讓請求排隊(duì),除此以外,當(dāng)上傳失敗時,還需要一定的重試能力。面對這些問題,接下來研究一下如何實(shí)現(xiàn)上述的需求。
嘗試與學(xué)習(xí)
這里我參考了一篇并發(fā)請求的實(shí)踐,可以先了解一下原文:
不到50行代碼實(shí)現(xiàn)一個能對請求并發(fā)數(shù)做限制的通用RequestDecorator - 作者:陳紀(jì)庚
在大佬的基礎(chǔ)上做了一些簡單的改造,增加了重試的功能,同時項(xiàng)目開發(fā)中也遇到了一些小問題,以下是具體的實(shí)現(xiàn),有注釋說明:
// 任務(wù)隊(duì)列
class RequestQueue {
constructor(maxLimit = 5, retry = 2) {
// 最大并發(fā)量
this.maxLimit = maxLimit
// 重試次數(shù)
this.retry = retry
// blocking queue 若當(dāng)前請求并發(fā)量已經(jīng)超過maxLimit,則將請求延遲到下某個任務(wù)完成,再執(zhí)行該隊(duì)列任務(wù)
this.requestQueue = []
// 當(dāng)前并發(fā)量數(shù)目
this.currentConcurrent = 0
// 說明1:
// 實(shí)際請求中,可能會異步的拋出多個error
// 任務(wù)重試過程中,當(dāng)catch到 error且 重試已到上限,會執(zhí)行 next() 執(zhí)行下一個任務(wù),
// 此時,如果有異常拋出前一個異步任務(wù)的,會無法捕獲
// 因此通過全局時間捕獲剩余的異步異常
process.on("unhandledRejection", function(e){
console.log(e);
})
}
async run(request) {
// 并發(fā)限制
if (this.currentConcurrent >= this.maxLimit) {
await this.startBlocking() // 等待執(zhí)行,直到某個任務(wù)執(zhí)行this.next()
}
// 隊(duì)列+1
this.currentConcurrent++
// 設(shè)置隊(duì)列中同一個任務(wù)嘗試次數(shù)
for (let retryCount = this.retry; retryCount > 0; retryCount--) {
let done = false
console.log('[ retryCount ]:' + retryCount)
try {
// 這里與大佬的方法有不同,這里需要傳入一個包裝好請求的Promise實(shí)例,如有需要也可以用pify將請求轉(zhuǎn)成promise
const result = await request()
// 執(zhí)行成功則結(jié)束嘗試
done = true
return Promise.resolve(result)
// 如果有錯誤,會被捕獲,不會執(zhí)行resolve
} catch (error) {
console.log('[ request error ] - ' + error)
// 最后一次重試失敗時停止重試,返回報(bào)錯
if (retryCount === 1) {
done = true
return Promise.reject(error) // 錯誤只會拋出一次
}
} finally {
// 如果已經(jīng)結(jié)束重試,執(zhí)行請求隊(duì)列的下一個任務(wù)
if (done) {
this.currentConcurrent--
this.next()
break;
}
}
}
}
next() {
if (this.requestQueue.length <= 0) return
const resolve = this.requestQueue.shift()
resolve() // 取出block promise 的resolve 執(zhí)行
}
startBlocking() {
let _resolve
let promise2 = new Promise((resolve) => (_resolve = resolve))
this.requestQueue.push(_resolve)
return promise2 // 返回block promise 用于暫停隊(duì)列的執(zhí)行
}
}
使用方式:
const request = () => {
return new Promise((resolve, reject) => {
setTimeout(() => { resolve() }, 1000)
})
}
const instance = new RequestQueue()
const promises = []
for (let i = 0; i < 100; i++ ) {
promises.push(instance.run(request)
.catch(err => {
// 這里是否catch(err)取決于是否允許某個任務(wù)失敗時,其他任務(wù)繼續(xù)執(zhí)行
console.log(err)
})
)
}
Promise.all(promises)
.catch(err => console.log(err))
// 如果前面的push過程中不catch,則一旦有任務(wù)拋出錯誤,剩余的任務(wù)不再執(zhí)行
整個實(shí)現(xiàn)如上,與大佬的實(shí)踐略有不同,僅供學(xué)習(xí)。實(shí)際生產(chǎn)中,更推薦使用開源社區(qū)成熟的庫,async,這個庫提供更全面的異步流控制,便于我們進(jìn)行開發(fā)。