文中實(shí)現(xiàn)的部分工具方法正處于早期/測試階段,仍在持續(xù)優(yōu)化中,僅供參考...
在 Ubuntu20.04 上進(jìn)行開發(fā)/測試,可用于 Electron 項(xiàng)目,測試版本:Electron@8.2.0 / 9.3.5
Contents
├── Contents (you are here!)
│
├── I. 前言
├── II. 架構(gòu)圖
│
├── III.electron-re 可以用來做什么?
│ ├── 1) 用于 Electron 應(yīng)用
│ └── 2) 用于 Electron/Nodejs 應(yīng)用
│
├── IV. UI 功能介紹
│ ├── 主界面
│ ├── 功能1:Kill 進(jìn)程
│ ├── 功能2:一鍵開啟 DevTools
│ ├── 功能3:查看進(jìn)程日志
│ ├── 功能4:查看進(jìn)程 CPU/Memory 占用趨勢
│ └── 功能5:查看 MessageChannel 請求發(fā)送日志
│
├── V. 新特性:進(jìn)程池負(fù)載均衡
│ ├── 關(guān)于負(fù)載均衡
│ ├── 負(fù)載均衡策略說明
│ ├── 負(fù)載均衡策略的簡易實(shí)現(xiàn)
│ ├── 負(fù)載均衡器的實(shí)現(xiàn)
│ └── 進(jìn)程池配合 LoadBalancer 來實(shí)現(xiàn)負(fù)載均衡
│
├── VI. 新特性:子進(jìn)程智能啟停
│ ├── 使進(jìn)程休眠的各種方式
│ ├── 生命周期 LifeCycle 的實(shí)現(xiàn)
│ └── 進(jìn)程互斥鎖的雛形
│
├── VII. 存在的已知問題
├── VIII. Next To Do
│
├── IX. 幾個(gè)實(shí)際使用示例
│ ├── 1) Service/MessageChannel 使用示例
│ ├── 2) 一個(gè)實(shí)際用于生產(chǎn)項(xiàng)目的例子
│ ├── 3) ChildProcessPool/ProcessHost 使用示例
│ ├── 3) test 測試目錄示例
│ └── 4) github README 說明
│
I. 前言
之前在做 Electron 應(yīng)用開發(fā)的時(shí)候,寫了個(gè) Electron 進(jìn)程管理工具 electron-re,支持 Electron/Node 多進(jìn)程管理、service 模擬、進(jìn)程實(shí)時(shí)監(jiān)控(UI功能)、Node.js 進(jìn)程池等特性。已經(jīng)發(fā)布為npm組件,可以直接安裝(最新特性還沒發(fā)布到線上,需要再進(jìn)行測試):
$: npm install electron-re --save
# or
$: yarn add electron-re
本主題前面兩篇文章:
-
《Electron/Node多進(jìn)程工具開發(fā)日記》 描述了
electron-re的開發(fā)背景、針對的問題場景以及詳細(xì)的使用方法。 -
《Electron多進(jìn)程工具開發(fā)日記2》 介紹了新特性 "多進(jìn)程管理 UI" 的開發(fā)和使用相關(guān)。UI 界面基于
electron-re已有的BrowserService/MessageChannel和ChildProcessPool/ProcessHost基礎(chǔ)架構(gòu)驅(qū)動,使用 React17 / Babel7 開發(fā)。
這篇文章主要是描述最近支持的進(jìn)程池模塊新特性 - "進(jìn)程池負(fù)載均衡" 和 "子進(jìn)程智能啟停",以及相關(guān)的基本實(shí)現(xiàn)原理。同時(shí)提出自己遇到的一些問題,以及對這些問題的思考、解決方案,對之后版本迭代的一些想法等等。
II. electron-re 架構(gòu)圖

- Electron Core:Electron 應(yīng)用的一系列核心功能,包含了應(yīng)用的主進(jìn)程、渲染進(jìn)程、窗口等等(Electron 自帶)。
- BrowserWindow:渲染窗口進(jìn)程,一般用于UI渲染 (Electron 自帶)。
- ProcessManager:進(jìn)程管理器,負(fù)責(zé)進(jìn)程占用資源采集、異步刷新UI、響應(yīng)和發(fā)出各種進(jìn)程管理信號,作為一個(gè)觀察者對象給其它模塊和UI提供服務(wù) (electron-re 引入)。
- MessageChannel:適用于主進(jìn)程、渲染進(jìn)程、Service 進(jìn)程的消息發(fā)送工具,基于原生 IPC 封裝,主要服務(wù)于 BrowserService,也可替代原生的 IPC 通信方法 (electron-re 引入)。
-
ChildProcess:由
child_process.fork方法生成的子進(jìn)程,不過以裝飾器的方式為其添加了簡單的進(jìn)程休眠和喚醒邏輯 (electron-re 引入)。 -
ProcessHost:配合進(jìn)程池使用的工具,我稱它為 "進(jìn)程事務(wù)中心",封裝了
process.send / process.on基本邏輯,提供了 Promise 的調(diào)用方式讓 主進(jìn)程/子進(jìn)程 之間 IPC 消息通信更簡單 (electron-re 引入)。 - LoadBalancer:服務(wù)于進(jìn)程池的負(fù)載均衡器 (electron-re 引入)。
- LifeCycle:服務(wù)于進(jìn)程池的生命周期 (electron-re 引入)。
-
ChildProcessPool:基于 Node.js -
child_process.fork方法實(shí)現(xiàn)的進(jìn)程池,內(nèi)部管理多個(gè) ChildProcess 實(shí)例對象,支持自定義負(fù)載均衡策略、子進(jìn)程智能啟停、子進(jìn)程異常退出后自動重啟等特性 (electron-re 引入)。 -
BrowserService:基于 BrowserWindow 實(shí)現(xiàn)的 Service 進(jìn)程,可以看成是一個(gè)運(yùn)行在后臺的隱藏渲染窗口進(jìn)程,允許 Node 注入,不過僅支持
CommonJs規(guī)范 (electron-re 引入)。
III. electron-re 可以用來做什么?
1. 用于 Electron 應(yīng)用
BrowserServiceMessageChannel
在 Electron 的一些“最佳實(shí)踐”中,建議將占用cpu的代碼放到渲染過程中而不是直接放在主過程中,這里先看下 chromium 的架構(gòu)圖:

每個(gè)渲染進(jìn)程都有一個(gè)全局對象 RenderProcess,用來管理與父瀏覽器進(jìn)程的通信,同時(shí)維護(hù)著一份全局狀態(tài)。瀏覽器進(jìn)程為每個(gè)渲染進(jìn)程維護(hù)一個(gè) RenderProcessHost 對象,用來管理瀏覽器狀態(tài)和與渲染進(jìn)程的通信。瀏覽器進(jìn)程和渲染進(jìn)程使用 Chromium 的 IPC 系統(tǒng)進(jìn)行通信。在 chromium 中,頁面渲染時(shí),UI進(jìn)程需要和 main process 不斷的進(jìn)行 IPC 同步,若此時(shí) main process 忙,則 UIprocess 就會在 IPC 時(shí)阻塞。所以如果主進(jìn)程持續(xù)進(jìn)行消耗 CPU 時(shí)間的任務(wù)或阻塞同步 IO 的任務(wù)的話,就會在一定程度上阻塞,從而影響主進(jìn)程和各個(gè)渲染進(jìn)程之間的 IPC 通信,IPC 通信有延遲或是受阻,渲染進(jìn)程窗口就會卡頓掉幀,嚴(yán)重的話甚至?xí)ㄗ〔粍印?/p>
因此 electron-re 在 Electron 已有的 Main Process 主進(jìn)程 和 Renderer Process 渲染進(jìn)程邏輯的基礎(chǔ)上獨(dú)立出一個(gè)單獨(dú)的 Service 概念。Service即不需要顯示界面的后臺進(jìn)程,它不參與 UI 交互,單獨(dú)為主進(jìn)程或其它渲染進(jìn)程提供服務(wù),它的底層實(shí)現(xiàn)為一個(gè)允許 node注入 和 remote調(diào)用 的 隱藏渲染窗口進(jìn)程。
這樣就可以將代碼中耗費(fèi) cpu 的操作(比如文件上傳中維護(hù)一個(gè)數(shù)千個(gè)上傳任務(wù)的隊(duì)列)編寫成一個(gè)單獨(dú)的js文件,然后使用 BrowserService 構(gòu)造函數(shù)以這個(gè) js 文件的地址 path 為參數(shù)構(gòu)造一個(gè) Service 實(shí)例,從而將他們從主進(jìn)程中分離。如果你說那這部分耗費(fèi) cpu 的操作直接放到渲染窗口進(jìn)程可以嘛?這其實(shí)取決于項(xiàng)目自身的架構(gòu)設(shè)計(jì),以及對進(jìn)程之間數(shù)據(jù)傳輸性能損耗和傳輸時(shí)間等各方面的權(quán)衡,創(chuàng)建一個(gè) Service 的簡單示例:
const { BrowserService } = require('electron-re');
const myServcie = new BrowserService('app', path.join(__dirname, 'path/to/app.service.js'));
如果使用了 BrowserService 的話,要想在主進(jìn)程、渲染進(jìn)程、service 進(jìn)程之間相互發(fā)送消息就要使用 electron-re 提供的 MessageChannel 通信工具,它的接口設(shè)計(jì)跟 Electron 內(nèi)建的IPC基本一致,底層也是基于原生的 IPC 異步通信原理來實(shí)現(xiàn)的,簡單示例如下:
/* ---- main.js ---- */
const { BrowserService } = require('electron-re');
// 主進(jìn)程中向一個(gè) service 'app' 發(fā)送消息
MessageChannel.send('app', 'channel1', { value: 'test1' });
2. 用于 Electron/Nodejs 應(yīng)用
ChildProcessPoolProcessHost
此外,如果要?jiǎng)?chuàng)建一些不依賴于 Electron 運(yùn)行時(shí)的子進(jìn)程(相關(guān)參考nodejs child_process),可以使用 electron-re 提供的專門為 nodejs 運(yùn)行時(shí)編寫的進(jìn)程池 ChildProcessPool 。因?yàn)閯?chuàng)建進(jìn)程本身所需的開銷很大,使用進(jìn)程池來重復(fù)利用已經(jīng)創(chuàng)建了的子進(jìn)程,將多進(jìn)程架構(gòu)帶來的性能效益最大化,簡單示例如下:
/* --- 主進(jìn)程中 --- */
const { ChildProcessPool, LoadBalancer } = require('electron-re');
const pool = new ChildProcessPool({
path: path.join(app.getAppPath(), 'app/services/child.js'), // 子進(jìn)程執(zhí)行文件路徑
max: 3, // 最大進(jìn)程數(shù)
strategy: LoadBalancer.ALGORITHM.WEIGHTS, // 負(fù)載均衡策略 - 權(quán)重
weights: [1, 2, 3], // 權(quán)重分配
});
pool
.send('sync-work', params)
.then(rsp => console.log(rsp));
一般情況下,在我們的子進(jìn)程執(zhí)行文件中,為了在主進(jìn)程和子進(jìn)程之間同步數(shù)據(jù),可以使用 process.send('channel', params) 和 process.on('channel', function) 的方式實(shí)現(xiàn)(前提是進(jìn)程以以 fork 方式創(chuàng)建或者手動開啟了 IPC 通信)。但是這樣在處理業(yè)務(wù)邏輯的同時(shí)也強(qiáng)迫我們?nèi)リP(guān)注進(jìn)程之間的通信,你需要知道子進(jìn)程什么時(shí)候能處理完畢,然后再使用process.send再將數(shù)據(jù)返回主進(jìn)程,使用方式繁瑣。
electron-re 引入了 ProcessHost 的概念,我稱之為"進(jìn)程事務(wù)中心"。實(shí)際使用時(shí)在子進(jìn)程執(zhí)行文件中只需要將各個(gè)任務(wù)函數(shù)通過 ProcessHost.registry('task-name', function) 注冊成多個(gè)被監(jiān)聽的事務(wù),然后配合進(jìn)程池的 ChildProcessPool.send('task-name', params) 來觸發(fā)子進(jìn)程事務(wù)邏輯的調(diào)用即可,ChildProcessPool.send() 同時(shí)會返回一個(gè) Promise 實(shí)例以便獲取回調(diào)數(shù)據(jù),簡單示例如下:
/* --- 子進(jìn)程中 --- */
const { ProcessHost } = require('electron-re');
ProcessHost
.registry('sync-work', (params) => {
return { value: 'task-value' };
})
.registry('async-work', (params) => {
return fetch(params.url);
});
IV. UI 功能介紹
UI 功能基于 electron-re 基礎(chǔ)架構(gòu)開發(fā),它通過異步 IPC 和主進(jìn)程的 ProcessManager 進(jìn)行通信,實(shí)時(shí)刷新進(jìn)程狀態(tài)。操作者可以通過 UI 手動 Kill 進(jìn)程、查看進(jìn)程 console 數(shù)據(jù)、查看進(jìn)程數(shù) CPU/Memory 占用趨勢以及查看 MessageChannel 工具的請求發(fā)送記錄。
主界面
UI參考 electron-process-manager 設(shè)計(jì)
預(yù)覽圖:

主要功能如下:
展示 Electron 應(yīng)用中所有開啟的進(jìn)程,包括主進(jìn)程、普通的渲染進(jìn)程、Service 進(jìn)程(electron-re 引入)、ChildProcessPool 創(chuàng)建的子進(jìn)程(electron-re 引入)。
進(jìn)程列表中顯示各個(gè)進(jìn)程進(jìn)程號、進(jìn)程標(biāo)識、父進(jìn)程號、內(nèi)存占用大小、CPU 占用百分比等,所有進(jìn)程標(biāo)識分為:main(主進(jìn)程)、service(服務(wù)進(jìn)程)、renderer(渲染進(jìn)程)、node(進(jìn)程池子進(jìn)程),點(diǎn)擊表格頭可以針對對某項(xiàng)進(jìn)行遞增/遞減排序。
選中某個(gè)進(jìn)程后可以 Kill 此進(jìn)程、查看進(jìn)程控制臺 Console 數(shù)據(jù)、查看1分鐘內(nèi)進(jìn)程 CPU/Memory 占用趨勢,如果此進(jìn)程是渲染進(jìn)程的話還可以通過
DevTools按鈕一鍵打開內(nèi)置調(diào)試工具。ChildProcessPool 創(chuàng)建的子進(jìn)程暫不支持直接打開 DevTools 進(jìn)行調(diào)試,不過由于創(chuàng)建子進(jìn)程時(shí)添加了
--inspect參數(shù),可以使用 chrome 的chrome://inspect進(jìn)行遠(yuǎn)程調(diào)試。點(diǎn)擊
Signals按鈕可以查看MessageChannel工具的請求發(fā)送日志,包括簡單的請求參數(shù)、請求名、請求返回?cái)?shù)據(jù)等。
功能:Kill 進(jìn)程

功能:一鍵開啟 DevTools

功能:查看進(jìn)程日志

功能:查看進(jìn)程 CPU/Memory 占用趨勢

功能:查看 MessageChannel 請求發(fā)送日志

V. 新特性:進(jìn)程池負(fù)載均衡
簡化的初版實(shí)現(xiàn)
? 關(guān)于負(fù)載均衡
“ 負(fù)載均衡,英文名稱為 Load Balance,其含義就是指將負(fù)載(工作任務(wù))進(jìn)行平衡、分?jǐn)偟蕉鄠€(gè)操作單元上進(jìn)行運(yùn)行,例如 FTP 服務(wù)器、Web服務(wù)器、企業(yè)核心應(yīng)用服務(wù)器和其它主要任務(wù)服務(wù)器等,從而協(xié)同完成工作任務(wù)。
負(fù)載均衡構(gòu)建在原有網(wǎng)絡(luò)結(jié)構(gòu)之上,它提供了一種透明且廉價(jià)有效的方法擴(kuò)展服務(wù)器和網(wǎng)絡(luò)設(shè)備的帶寬、加強(qiáng)網(wǎng)絡(luò)數(shù)據(jù)處理能力、增加吞吐量、提高網(wǎng)絡(luò)的可用性和靈活性?!?-- 《百度百科》
? 負(fù)載均衡策略說明
之前的實(shí)現(xiàn)中,進(jìn)程池創(chuàng)建好后,當(dāng)使用 pool 發(fā)送請求時(shí),采用兩種方式處理請求發(fā)送策略:
默認(rèn)使用輪詢策略選擇一個(gè)子進(jìn)程處理請求,只能保證基本的請求平均分配。
另一種使用情況是通過手動指定發(fā)送請求時(shí)的額外參數(shù) id:
pool.send(channel, params, id),這樣子讓id相同的請求發(fā)送到同一個(gè)子進(jìn)程上。一個(gè)適用情景就是:第一次我們向某個(gè)子進(jìn)程發(fā)送請求,該子進(jìn)程處理請求后在其運(yùn)行時(shí)內(nèi)存空間中存儲了一些處理結(jié)果,之后某個(gè)情況下需要將之前那次請求產(chǎn)生的處理結(jié)果再次拿回主進(jìn)程,這時(shí)候就需要使用id來區(qū)分請求。
新版本引入了一些負(fù)載均衡策略,包括:
- POLLING - 輪詢:子進(jìn)程輪流處理請求
- WEIGHTS - 權(quán)重:子進(jìn)程根據(jù)設(shè)置的權(quán)重來處理請求
- RANDOM - 隨機(jī):子進(jìn)程隨機(jī)處理請求
- SPECIFY - 指定:子進(jìn)程根據(jù)指定的進(jìn)程 id 處理請求
- WEIGHTS_POLLING - 權(quán)重輪詢:權(quán)重輪詢策略與輪詢策略類似,但是權(quán)重輪詢策略會根據(jù)權(quán)重來計(jì)算子進(jìn)程的輪詢次數(shù),從而穩(wěn)定每個(gè)子進(jìn)程的平均處理請求數(shù)量。
- WEIGHTS_RANDOM - 權(quán)重隨機(jī):權(quán)重隨機(jī)策略與隨機(jī)策略類似,但是權(quán)重隨機(jī)策略會根據(jù)權(quán)重來計(jì)算子進(jìn)程的隨機(jī)次數(shù),從而穩(wěn)定每個(gè)子進(jìn)程的平均處理請求數(shù)量。
- MINIMUM_CONNECTION - 最小連接數(shù):選擇子進(jìn)程上具有最小連接活動數(shù)量的子進(jìn)程處理請求。
- WEIGHTS_MINIMUM_CONNECTION - 權(quán)重最小連接數(shù):權(quán)重最小連接數(shù)策略與最小連接數(shù)策略類似,不過各個(gè)子進(jìn)程被選中的概率由連接數(shù)和權(quán)重共同決定。
? 負(fù)載均衡策略的簡易實(shí)現(xiàn)
參數(shù)說明:
- tasks:任務(wù)數(shù)組,一個(gè)示例:
[{id: 11101, weight: 2}, {id: 11102, weight: 1}]。 - currentIndex: 目前所處的任務(wù)索引,默認(rèn)為 0,每次調(diào)用時(shí)會自動加 1,超出任務(wù)數(shù)組長度時(shí)會自動取模。
- context:主進(jìn)程參數(shù)上下文,用于動態(tài)更新當(dāng)前任務(wù)索引和權(quán)重索引。
- weightIndex:權(quán)重索引,用于權(quán)重策略,默認(rèn)為 0,每次調(diào)用時(shí)會自動加 1,超出權(quán)重總和時(shí)會自動取模。
- weightTotal:權(quán)重總和,用于權(quán)重策略相關(guān)計(jì)算。
- connectionsMap:各個(gè)進(jìn)程活動連接數(shù)的映射,用于最小連接數(shù)策略相關(guān)計(jì)算。
1. 輪詢策略(POLLING)
原理:索引值遞增,每次調(diào)用時(shí)會自動加 1,超出任務(wù)數(shù)組長度時(shí)會自動取模,保證平均調(diào)用。
時(shí)間復(fù)雜度 O(n) = 1
/* polling algorithm */
module.exports = function (tasks, currentIndex, context) {
if (!tasks.length) return null;
const task = tasks[currentIndex];
context.currentIndex ++;
context.currentIndex %= tasks.length;
return task || null;
};
2. 權(quán)重策略(WEIGHTS)
原理:每個(gè)進(jìn)程根據(jù) (權(quán)重值 + (權(quán)重總和 * 隨機(jī)因子)) 生成最終計(jì)算值,最終計(jì)算值中的最大值被命中。
時(shí)間復(fù)雜度 O(n) = n
/* weight algorithm */
module.exports = function (tasks, weightTotal, context) {
if (!tasks.length) return null;
let max = tasks[0].weight, maxIndex = 0, sum;
for (let i = 0; i < tasks.length; i++) {
sum = (tasks[i].weight || 0) + Math.random() * weightTotal;
if (sum >= max) {
max = sum;
maxIndex = i;
}
}
context.weightIndex += 1;
context.weightIndex %= (weightTotal + 1);
return tasks[maxIndex];
};
3. 隨機(jī)策略(RANDOM)
原理:隨機(jī)函數(shù)在 [0, length) 中任意選取一個(gè)索引即可
時(shí)間復(fù)雜度 O(n) = 1
/* random algorithm */
module.exports = function (tasks) {
const length = tasks.length;
const target = tasks[Math.floor(Math.random() * length)];
return target || null;
};
4. 權(quán)重輪詢策略(WEIGHTS_POLLING)
原理:類似輪詢策略,不過輪詢的區(qū)間為:[最小權(quán)重值, 權(quán)重總和],根據(jù)各項(xiàng)權(quán)重累加值進(jìn)行命中區(qū)間計(jì)算。每次調(diào)用時(shí)權(quán)重索引會自動加 1,超出權(quán)重總和時(shí)會自動取模。
時(shí)間復(fù)雜度 O(n) = n
/* weights polling */
module.exports = function (tasks, weightIndex, weightTotal, context) {
if (!tasks.length) return null;
let weight = 0;
let task;
for (let i = 0; i < tasks.length; i++) {
weight += tasks[i].weight || 0;
if (weight >= weightIndex) {
task = tasks[i];
break;
}
}
context.weightIndex += 1;
context.weightIndex %= (weightTotal + 1);
return task;
};
5. 權(quán)重隨機(jī)策略(WEIGHTS_RANDOM)
原理:由 (權(quán)重總和 * 隨機(jī)因子) 產(chǎn)生計(jì)算值,將各項(xiàng)權(quán)重值與其相減,第一個(gè)不大于零的最終值即被命中。
時(shí)間復(fù)雜度 O(n) = n
/* weights random algorithm */
module.exports = function (tasks, weightTotal) {
let task;
let weight = Math.ceil(Math.random() * weightTotal);
for (let i = 0; i < tasks.length; i++) {
weight -= tasks[i].weight || 0;
if (weight <= 0) {
task = tasks[i];
break;
}
}
return task || null;
};
6. 最小連接數(shù)策略(MINIMUM_CONNECTION)
原理:直接選擇當(dāng)前連接數(shù)最小的項(xiàng)即可。
時(shí)間復(fù)雜度 O(n) = n
/* minimum connections algorithm */
module.exports = function (tasks, connectionsMap={}) {
if (tasks.length < 2) return tasks[0] || null;
let min = connectionsMap[tasks[0].id];
let minIndex = 0;
for (let i = 1; i < tasks.length; i++) {
const con = connectionsMap[tasks[i].id] || 0;
if (con <= min) {
min = con;
minIndex = i;
}
}
return tasks[minIndex] || null;
};
7. 權(quán)重最小連接數(shù)(WEIGHTS_MINIMUM_CONNECTION)
原理:權(quán)重 + ( 隨機(jī)因子 * 權(quán)重總和 ) + ( 連接數(shù)占比 * 權(quán)重總和 ) 三個(gè)因子,計(jì)算出最終值,根據(jù)最終值的大小進(jìn)行比較,最小值所代表項(xiàng)即被命中。
時(shí)間復(fù)雜度 O(n) = n
/* weights minimum connections algorithm */
module.exports = function (tasks, weightTotal, connectionsMap, context) {
if (!tasks.length) return null;
let min = tasks[0].weight, minIndex = 0, sum;
const connectionsTotal = tasks.reduce((total, cur) => {
total += (connectionsMap[cur.id] || 0);
return total;
}, 0);
// algorithm: (weight + connections'weight) + random factor
for (let i = 0; i < tasks.length; i++) {
sum =
(tasks[i].weight || 0) + (Math.random() * weightTotal) +
(( (connectionsMap[tasks[i].id] || 0) * weightTotal ) / connectionsTotal);
if (sum <= min) {
min = sum;
minIndex = i;
}
}
context.weightIndex += 1;
context.weightIndex %= (weightTotal + 1);
return tasks[minIndex];
};
? 負(fù)載均衡器的實(shí)現(xiàn)
代碼都不復(fù)雜,有幾點(diǎn)需要說明:
- params 對象保存了用于各種策略計(jì)算的一些參數(shù),比如權(quán)重索引、權(quán)重總和、連接數(shù)、CPU/Memory占用等等。
-
scheduler 對象用于調(diào)用各種策略進(jìn)行計(jì)算,
scheduler.calculate()會返回一個(gè)命中的進(jìn)程 id。 -
targets 即所有用于計(jì)算的目標(biāo)進(jìn)程,不過其中僅存放了目標(biāo)進(jìn)程 pid 和 其權(quán)重 weight:
[{id: [pid], weight: [number]}, ...]。 - algorithm 為特定的負(fù)載均衡策略,默認(rèn)值為輪詢策略。
-
ProcessManager.on('refresh', this.refreshParams),負(fù)載均衡器通過監(jiān)聽
ProcessManager的 refresh 事件來定時(shí)更新各個(gè)進(jìn)程的計(jì)算參數(shù)。ProcessManager中有一個(gè)定時(shí)器,每隔一段時(shí)間就會采集一次各個(gè)被監(jiān)聽的進(jìn)程的資源占用情況,并攜帶采集數(shù)據(jù)觸發(fā)一次 refresh 事件。
const CONSTS = require("./consts");
const Scheduler = require("./scheduler");
const {
RANDOM,
POLLING,
WEIGHTS,
SPECIFY,
WEIGHTS_RANDOM,
WEIGHTS_POLLING,
MINIMUM_CONNECTION,
WEIGHTS_MINIMUM_CONNECTION,
} = CONSTS;
const ProcessManager = require('../ProcessManager');
/* Load Balance Instance */
class LoadBalancer {
/**
* @param {Object} options [ options object ]
* @param {Array } options.targets [ targets for load balancing calculation: [{id: 1, weight: 1}, {id: 2, weight: 2}] ]
* @param {String} options.algorithm [ strategies for load balancing calculation : RANDOM | POLLING | WEIGHTS | SPECIFY | WEIGHTS_RANDOM | WEIGHTS_POLLING | MINIMUM_CONNECTION | WEIGHTS_MINIMUM_CONNECTION]
*/
constructor(options) {
this.targets = options.targets;
this.algorithm = options.algorithm || POLLING;
this.params = { // data for algorithm
currentIndex: 0, // index
weightIndex: 0, // index for weight alogrithm
weightTotal: 0, // total weight
connectionsMap: {}, // connections of each target
cpuOccupancyMap: {}, // cpu occupancy of each target
memoryOccupancyMap: {}, // cpu occupancy of each target
};
this.scheduler = new Scheduler(this.algorithm);
this.memoParams = this.memorizedParams();
this.calculateWeightIndex();
ProcessManager.on('refresh', this.refreshParams);
}
/* params formatter */
memorizedParams = () => {
return {
[RANDOM]: () => [],
[POLLING]: () => [this.params.currentIndex, this.params],
[WEIGHTS]: () => [this.params.weightTotal, this.params],
[SPECIFY]: (id) => [id],
[WEIGHTS_RANDOM]: () => [this.params.weightTotal],
[WEIGHTS_POLLING]: () => [this.params.weightIndex, this.params.weightTotal, this.params],
[MINIMUM_CONNECTION]: () => [this.params.connectionsMap],
[WEIGHTS_MINIMUM_CONNECTION]: () => [this.params.weightTotal, this.params.connectionsMap, this.params],
};
}
/* refresh params data */
refreshParams = (pidMap) => { ... }
/* pick one task from queue */
pickOne = (...params) => {
return this.scheduler.calculate(
this.targets, this.memoParams[this.algorithm](...params)
);
}
/* pick multi task from queue */
pickMulti = (count = 1, ...params) => {
return new Array(count).fill().map(
() => this.pickOne(...params)
);
}
/* calculate weight */
calculateWeightIndex = () => {
this.params.weightTotal = this.targets.reduce((total, cur) => total + (cur.weight || 0), 0);
if (this.params.weightIndex > this.params.weightTotal) {
this.params.weightIndex = this.params.weightTotal;
}
}
/* calculate index */
calculateIndex = () => {
if (this.params.currentIndex >= this.targets.length) {
this.params.currentIndex = (ths.params.currentIndex - 1 >= 0) ? (this.params.currentIndex - 1) : 0;
}
}
/* clean data of a task or all task */
clean = (id) => { ... }
/* add a task */
add = (task) => {...}
/* remove target from queue */
del = (target) => {...}
/* wipe queue and data */
wipe = () => {...}
/* update calculate params */
updateParams = (object) => {
Object.entries(object).map(([key, value]) => {
if (key in this.params) {
this.params[key] = value;
}
});
}
/* reset targets */
setTargets = (targets) => {...}
/* change algorithm strategy */
setAlgorithm = (algorithm) => {...}
}
module.exports = Object.assign(LoadBalancer, { ALGORITHM: CONSTS });
? 進(jìn)程池配合 LoadBalancer 來實(shí)現(xiàn)負(fù)載均衡
有幾點(diǎn)需要說明:
- 當(dāng)我們使用
pool.send('channel', params)時(shí),pool 內(nèi)部getForkedFromPool()函數(shù)會被調(diào)用,函數(shù)從進(jìn)程池中選擇一個(gè)進(jìn)程來執(zhí)行任務(wù),如果子進(jìn)程數(shù)未達(dá)到最大設(shè)定數(shù),則優(yōu)先創(chuàng)建一個(gè)子進(jìn)程來處理請求。 - 子進(jìn)程 創(chuàng)建/銷毀/退出 時(shí)需要同步更新
LoadBalancer中監(jiān)聽的targets,否則已被銷毀的進(jìn)程 pid 可能會在執(zhí)行負(fù)載均衡策略計(jì)算后被返回。 -
ForkedProcess是一個(gè)裝飾器類,封裝了child_process.fork邏輯,為其增加了一些額外功能,如:進(jìn)程睡眠、喚醒、綁定事件、發(fā)送請求等基本方法。
const _path = require('path');
const EventEmitter = require('events');
const ForkedProcess = require('./ForkedProcess');
const ProcessLifeCycle = require('../ProcessLifeCycle.class');
const ProcessManager = require('../ProcessManager/index');
const { defaultLifecycle } = require('../ProcessLifeCycle.class');
const LoadBalancer = require('../LoadBalancer');
let { inspectStartIndex } = require('../../conf/global.json');
const { getRandomString, removeForkedFromPool, convertForkedToMap, isValidValue } = require('../utils');
const { UPDATE_CONNECTIONS_SIGNAL } = require('../consts');
const defaultStrategy = LoadBalancer.ALGORITHM.POLLING;
class ChildProcessPool extends EventEmitter {
constructor({
path, max=6, cwd, env={},
weights=[], // weights of processes, the length is equal to max
strategy=defaultStrategy,
...
}) {
super();
this.cwd = cwd || _path.dirname(path);
this.env = {
...process.env,
...env
};
this.callbacks = {};
this.pidMap = new Map();
this.callbacksMap = new Map();
this.connectionsMap={};
this.forked = [];
this.connectionsTimer = null;
this.forkedMap = {};
this.forkedPath = path;
this.forkIndex = 0;
this.maxInstance = max;
this.weights = new Array(max).fill().map(
(_, i) => (isValidValue(weights[i]) ? weights[i] : 1)
);
this.LB = new LoadBalancer({
algorithm: strategy,
targets: [],
});
this.initEvents();
}
/* -------------- internal -------------- */
/* init events */
initEvents = () => {
// process exit
this.on('forked_exit', (pid) => {
this.onForkedDisconnect(pid);
});
...
}
/**
* onForkedCreate [triggered when a process instance created]
* @param {[String]} pid [process pid]
*/
onForkedCreate = (forked) => {
const pidsValue = this.forked.map(f => f.pid);
const length = this.forked.length;
this.LB.add({
id: forked.pid,
weight: this.weights[length - 1],
});
ProcessManager.listen(pidsValue, 'node', this.forkedPath);
...
}
/**
* onForkedDisconnect [triggered when a process instance disconnect]
* @param {[String]} pid [process pid]
*/
onForkedDisconnect = (pid) => {
const length = this.forked.length;
removeForkedFromPool(this.forked, pid, this.pidMap);
this.LB.del({
id: pid,
weight: this.weights[length - 1],
});
ProcessManager.unlisten([pid]);
...
}
/* Get a process instance from the pool */
getForkedFromPool = (id="default") => {
let forked;
if (!this.pidMap.get(id)) {
// create new process and put it into the pool
if (this.forked.length < this.maxInstance) {
inspectStartIndex ++;
forked = new ForkedProcess(
this,
this.forkedPath,
this.env.NODE_ENV === "development" ? [`--inspect=${inspectStartIndex}`] : [],
{ cwd: this.cwd, env: { ...this.env, id }, stdio: 'pipe' }
);
this.forked.push(forked);
this.onForkedCreate(forked);
} else {
// get a process from the pool based on load balancing strategy
forked = this.forkedMap[this.LB.pickOne().id];
}
if (id !== 'default') {
this.pidMap.set(id, forked.pid);
}
} else {
// pick a special process from the pool
forked = this.forkedMap[this.pidMap.get(id)];
}
if (!forked) throw new Error(`Get forked process from pool failed! the process pid: ${this.pidMap.get(id)}.`);
return forked;
}
/* -------------- caller -------------- */
/**
* send [Send request to a process]
* @param {[String]} taskName [task name - necessary]
* @param {[Any]} params [data passed to process - necessary]
* @param {[String]} id [the unique id bound to a process instance - not necessary]
* @return {[Promise]} [return a Promise instance]
*/
send = (taskName, params, givenId) => {
if (givenId === 'default') throw new Error('ChildProcessPool: Prohibit the use of this id value: [default] !')
const id = getRandomString();
const forked = this.getForkedFromPool(givenId);
this.lifecycle.refresh([forked.pid]);
return new Promise(resolve => {
this.callbacks[id] = resolve;
forked.send({action: taskName, params, id });
});
}
...
}
module.exports = ChildProcessPool;
VI. 新特性:子進(jìn)程智能啟停
這個(gè)特性我也將其稱為 進(jìn)程生命周期 (lifecycle)。
主要作用是:當(dāng)子進(jìn)程一段時(shí)間未被調(diào)用,則自動進(jìn)入休眠狀態(tài),減少 CPU 占用 (減少內(nèi)存占用很難)。進(jìn)入休眠狀態(tài)的時(shí)間可以和由創(chuàng)建者控制,默認(rèn)為 10 min。當(dāng)子進(jìn)程進(jìn)入休眠后,如果有新的請求到來并分發(fā)到該休眠的進(jìn)程上,則會自動喚醒該進(jìn)程并繼續(xù)處理當(dāng)前請求。一段時(shí)間閑置后,將會再次進(jìn)入休眠狀態(tài)。
? 使進(jìn)程休眠的各種方式
1)如果是讓進(jìn)程暫停的話,可以向進(jìn)程發(fā)送 SIGSTOP 信號,發(fā)送 SIGCONT 信號可以恢復(fù)進(jìn)程。
Node.js:
process.kill([pid], "SIGSTOP");
process.kill([pid], "SIGCONT");
Unix System (Windows 暫未測試):
kill -STOP [pid]
kill -CONT [pid]
2)Node.js 新的 Atomic.wait API 也可以做到編程控制。該方法會監(jiān)聽一個(gè) Int32Array 對象的給定下標(biāo)下的值,若值未發(fā)生改變,則一直等待(阻塞 event loop),直到發(fā)生超時(shí)(由 ms 參數(shù)決定)??梢栽谥鬟M(jìn)程中操作這塊共享數(shù)據(jù),然后為子進(jìn)程解除休眠鎖定。
const nil = new Int32Array(new SharedArrayBuffer(4));
const array = new Array(100000).fill(0);
setInterval(() => {
console.log(1);
}, 1e3);
Atomics.wait(nil, 0, 0, Number(600e3));
? 生命周期 LifeCycle 的實(shí)現(xiàn)
代碼同樣很簡單,有幾點(diǎn)需要說明:
采用了
標(biāo)記清除法,子進(jìn)程觸發(fā)請求時(shí)更新調(diào)用時(shí)間,同時(shí)使用定時(shí)器循環(huán)計(jì)算各個(gè)被監(jiān)聽子進(jìn)程的 ( 當(dāng)前時(shí)間 - 上次調(diào)用時(shí)間) 差值。如果有超過設(shè)定的時(shí)間的進(jìn)程則發(fā)送sleep信號,同時(shí)攜帶所有進(jìn)程 pid。每個(gè)
ChildProcessPool進(jìn)程池實(shí)例都會擁有一個(gè)ProcessLifeCycle實(shí)例對象用于控制當(dāng)前進(jìn)程池中的進(jìn)程的 休眠/喚醒。ChildProcessPool會監(jiān)聽ProcessLifeCycle對象的sleep事件,拿到需要 sleep 的進(jìn)程 pid 后調(diào)用ForkedProcess的sleep()方法使其睡眠。下個(gè)請求分發(fā)到該進(jìn)程時(shí),會自動喚醒該進(jìn)程。
const EventEmitter = require('events');
const defaultLifecycle = {
expect: 600e3, // default timeout 10 minutes
internal: 30e3 // default loop check interval 30 seconds
};
class ProcessLifeCycle extends EventEmitter {
constructor(options) {
super();
const {
expect=defaultLifecycle.expect,
internal=defaultLifecycle.internal
} = options;
this.timer = null;
this.internal = internal;
this.expect = expect;
this.params = {
activities: new Map()
};
}
/* task check loop */
taskLoop = () => {
if (this.timer) return console.warn('ProcessLifeCycle: the task loop is already running');
this.timer = setInterval(() => {
const sleepTasks = [];
const date = new Date();
const { activities } = this.params;
([...activities.entries()]).map(([key, value]) => {
if (date - value > this.expect) {
sleepTasks.push(key);
}
});
if (sleepTasks.length) {
// this.unwatch(sleepTasks);
this.emit('sleep', sleepTasks);
}
}, this.internal);
}
/* watch processes */
watch = (ids=[]) => {
ids.forEach(id => {
this.params.activities.set(id, new Date());
});
}
/* unwatch processes */
unwatch = (ids=[]) => {
ids.forEach(id => {
this.params.activities.delete(id);
});
}
/* stop task check loop */
stop = () => {
clearInterval(this.timer);
this.timer = null;
}
/* start task check loop */
start = () => {
this.taskLoop();
}
/* refresh tasks */
refresh = (ids=[]) => {
ids.forEach(id => {
if (this.params.activities.has(id)) {
this.params.activities.set(id, new Date());
} else {
console.warn(`The task with id ${id} is not being watched.`);
}
});
}
}
module.exports = Object.assign(ProcessLifeCycle, { defaultLifecycle });
? 進(jìn)程互斥鎖的雛形
之前看文章時(shí)看到關(guān)于 API - Atomic.wait 的一篇文章,Atomic 除了用于實(shí)現(xiàn)進(jìn)程睡眠,也能基于它來理解進(jìn)程互斥鎖的實(shí)現(xiàn)原理。這里有個(gè)基本雛形可以作為參考,相關(guān)文檔可以參閱 MDN。
AsyncLock 對象需要在子進(jìn)程中引入,創(chuàng)建 AsyncLock 的構(gòu)造函數(shù)中有一個(gè)參數(shù) sab 需要注意。這個(gè)參數(shù)是一個(gè) SharedArrayBuffer 共享數(shù)據(jù)塊,這個(gè)共享數(shù)據(jù)快需要在主進(jìn)程創(chuàng)建,然后通過 IPC 通信發(fā)送到各個(gè)子進(jìn)程,通常 IPC 通信會序列化一般的諸如 Object / Array 等數(shù)據(jù),導(dǎo)致消息接受者和消息發(fā)送者拿到的不是同一個(gè)對象,但是經(jīng)由 IPC 發(fā)送的 SharedArrayBuffer 對象卻會指向同一個(gè)內(nèi)存塊。
在子進(jìn)程中使用 SharedArrayBuffer 數(shù)據(jù)創(chuàng)建 AsyncLock 實(shí)例后,任意一個(gè)子進(jìn)程對共享數(shù)據(jù)的修改都會導(dǎo)致其它進(jìn)程內(nèi)指向這塊內(nèi)存的 SharedArrayBuffer 數(shù)據(jù)內(nèi)容變化,這就是我們使用它實(shí)現(xiàn)進(jìn)程鎖的基本要點(diǎn)。
先對 Atomic API 做個(gè)簡單說明:
- Atomics.compareExchange(typedArray, index, expectedValue, newValue):Atomics.compareExchange() 靜態(tài)方法會在數(shù)組的值與期望值相等的時(shí)候,將給定的替換值替換掉數(shù)組上的值,然后返回舊值。此原子操作保證在寫上修改的值之前不會發(fā)生其他寫操作。
- Atomics.waitAsync(typedArray, index, value[, timeout]):靜態(tài)方法 Atomics.wait() 確保了一個(gè)在 Int32Array 數(shù)組中給定位置的值沒有發(fā)生變化且仍然是給定的值時(shí)進(jìn)程將會睡眠,直到被喚醒或超時(shí)。該方法返回一個(gè)字符串,值為"ok", "not-equal", 或 "timed-out" 之一。
- Atomics.notify(typedArray, index[, count]):靜態(tài)方法 Atomics.notify() 喚醒指定數(shù)量的在等待隊(duì)列中休眠的進(jìn)程,不指定 count 時(shí)默認(rèn)喚醒所有。
AsyncLock 即異步鎖,等待鎖釋放的時(shí)候不會阻塞主線程。主要關(guān)注 executeAfterLocked() 這個(gè)方法,調(diào)用該方法并傳入回調(diào)函數(shù),該回調(diào)函數(shù)會在鎖被獲取后執(zhí)行,并且在執(zhí)行完畢后自動釋放鎖。其中一步的關(guān)鍵就是 tryGetLock() 函數(shù),它返回了一個(gè) Promise 對象,因此我們等待鎖釋放的邏輯在微任務(wù)隊(duì)列中執(zhí)行而并不阻塞主線程。
/**
* @name AsyncLock
* @description
* Use it in child processes, mutex lock logic.
* First create SharedArrayBuffer in main process and transfer it to all child processes to control the lock.
*/
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab; // data like this: const sab = new SharedArrayBuffer(16);
this.i32a = new Int32Array(sab);
}
lock() {
while (true) {
const oldValue = Atomics.compareExchange(
this.i32a, AsyncLock.INDEX,
AsyncLock.UNLOCKED, // old
AsyncLock.LOCKED // new
);
if (oldValue == AsyncLock.UNLOCKED) { // success
return;
}
Atomics.wait( // wait
this.i32a,
AsyncLock.INDEX,
AsyncLock.LOCKED // expect
);
}
}
unlock() {
const oldValue = Atomics.compareExchange(
this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED,
AsyncLock.UNLOCKED
);
if (oldValue != AsyncLock.LOCKED) { // failed
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
/**
* executeLocked [async function to acquired the lock and execute callback]
* @param {Function} callback [callback function]
*/
executeAfterLocked(callback) {
const tryGetLock = async () => {
while (true) {
const oldValue = Atomics.compareExchange(
this.i32a,
AsyncLock.INDEX,
AsyncLock.UNLOCKED,
AsyncLock.LOCKED
);
if (oldValue == AsyncLock.UNLOCKED) { // success if AsyncLock.UNLOCKED
callback();
this.unlock();
return;
}
const result = Atomics.waitAsync( // wait when AsyncLock.LOCKED
this.i32a,
AsyncLock.INDEX,
AsyncLock.LOCKED
);
await result.value; // return a Promise, will not block the main thread
}
}
tryGetLock();
}
}
VII. 存在的已知問題
由于使用了 Electron 原生的
remoteAPI,因此electron-re部分特性(Service 相關(guān))不支持 Electron 14 以及以上版本(已經(jīng)移除 remote),正考慮近期使用第三方remote庫進(jìn)行替代兼容。容錯(cuò)處理做的不夠好,這一塊會成為之后的重要優(yōu)化點(diǎn)。
采集進(jìn)程池中活動連接數(shù)時(shí)采用了"調(diào)用計(jì)數(shù)"的方式。這個(gè)處理方法不太好,準(zhǔn)確性也不夠高,但是目前還未想到更好的解決方法用于統(tǒng)計(jì)子進(jìn)程中活躍的連接數(shù)。我覺得還是要從底層進(jìn)行解決,比如:宏任務(wù)和微任務(wù)隊(duì)列、V8 虛擬機(jī)、垃圾回收、Libuv 底層原理、Node 進(jìn)程和線程原理...
暫時(shí)沒在 windows 平臺測試進(jìn)程休眠功能,win 平臺本身不支持進(jìn)程信號,但是 Node 提供了模擬支持,但是具體表現(xiàn)還需測試。
VIII. Next To Do
- 讓 Service 支持代碼更新后自動重啟
- 添加 ChildProcessPool 子進(jìn)程調(diào)度邏輯
- 優(yōu)化 ChildProcessPool 多進(jìn)程console輸出
- 添加可視化進(jìn)程管理界面
- 增強(qiáng) ChildProcessPool 進(jìn)程池功能
- 增強(qiáng) ProcessHost 事務(wù)中心功能
- 子進(jìn)程之間互斥鎖邏輯的實(shí)現(xiàn)
- 使用外部 remote 庫以支持最新版本的 Electron
- Kill Bugs ??
IX. 幾個(gè)實(shí)際使用示例
electronux - 我的一個(gè)Electron項(xiàng)目,使用了
BrowserService/MessageChannel,并且附帶了ChildProcessPool/ProcessHost使用demo。暗影襪子-electron - 我的另一個(gè)Electron 跨平臺桌面應(yīng)用項(xiàng)目(不提供鏈接,可以點(diǎn)擊上面的查看原文),使用
electron-re進(jìn)行調(diào)試開發(fā),并且在生產(chǎn)環(huán)境下可以打開ProcessManagerUI 用于 CPU/Memory 資源占用監(jiān)控和請求日志查看。file-slice-upload - 一個(gè)關(guān)于多文件分片并行上傳的demo,使用了
ChildProcessPoolandProcessHost,基于 Electron@9.3.5開發(fā)。也可直接查看
index.test.js和test目錄下的測試樣例文件,包含了一些使用示例。當(dāng)然 github - README 也有相關(guān)說明項(xiàng)。