Egg.js是阿里推出的面向Node的企業(yè)級服務框架,這里只是講一講egg進階中的插件開發(fā)會遇到多進程增強模型.
背景
Egg.js原理簡介
稍微熟悉Egg.js原理的應該都知道master / agent / worker這三個進程的職責以及agent.js / app.js 這兩個js文件,agent進程對應于agent.js,worker進程對應的是app.js,而worker進程是有多個的以集群方式進行工作的,并且最終部署的應用也是集群的方式部署在不同的機器上的,因此實際的worker是一個n x m的數(shù)量。
服務長鏈
服務端應用最典型的就是數(shù)據(jù)庫連接(如: MySQL),尤其是微服務化后出現(xiàn)了各種各樣的中間件(如:Eureka/Zookeeper/Disconf), 這樣每一個應用都需要維護各種各樣的長鏈接。
Egg的支持
對于長鏈接的創(chuàng)建方式,Egg提供了兩種支持分別是:app.addSingleton(name, creator) 和 多進程增強模型。兩種方式分別在什么時候使用? addSingleton的方式可以直接參考Egg提供的例子MySQL,它可以保證一個application的對象(一個worker)只會有一個mysql實例,但是多個worker還是會有多個,對于MySQL這種在server端有鏈接池的是沒有問題的,而且這樣實現(xiàn)也簡單易用,但是如果沒有鏈接池的中間件來講這樣是一種極大的資源浪費 (n x m), 因此就會用到了多進程增強模型,下面具體說說。
多進程增強模型
Egg中的多進程增強模型實際上完全使用的就是Cluser-Client庫(也是阿里開源),在GitHub上面有它工作原理和使用方式的介紹,只是不知道大家會不會和我一樣看了一遍之后依然不知所云和無從下手的感覺,因此才寫了這篇博客將源碼閱讀的理解記錄下來。
Egg文檔引用
首先創(chuàng)建RegistryClient代碼如下:
const Base = require('sdk-base');
class RegistryClient extends Base {
...
}
然后創(chuàng)建一個APIClient類繼承框架提供的快捷類APIClientBase, 代碼如下:
const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase {
// 返回原始的客戶端類
get DataClient() {
return RegistryClient;
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
}
這里需要注意的是:
-
DataClient方法需要返回前面定義好的RegistryClient類。 -
_client屬性是繼承自父類, 直接就可以使用。
在Egg中嵌入上面的代碼:
// app.js || agent.js
const APIClient = require('./APIClient'); // 上面那個模塊
module.exports = app => {
const config = app.config.apiClient;
app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
app.beforeStart(async () => {
await app.apiClient.ready();
});
};
根據(jù)上面的代碼進行下面的梳理:
- 每一個
agent / application都會有一個APIClient實例。 - 所有的
APIClient實例都會知曉RegistryClient類名。 -
APIClient里面的方法會實際的調(diào)用一個_client屬性。
理解:
這就是一個靜態(tài)代理模式,所有想要對RegistryClient類的調(diào)用都要經(jīng)過APIClient進行一次代理,所以只要保證RegistryClient的實例只有一個,其它所有的APIClient都可以通過某種方式將操作(請求)傳達給RegistryClient就可以實現(xiàn)多進程單實例模式了。
注:上面使用方式是cluster-client的最佳實踐,雖然拋開APIClient這個類也可以,這里直接跳過了是因為這樣拆解更靈活并易于擴展,實際這里是需要進行兩層的代理, RegistryClient會代理真正的業(yè)務client的調(diào)用(可以動態(tài)代理實現(xiàn))并維護業(yè)務client的鏈接和事件接收,APIClient是用來mock所有業(yè)務client的api,讓業(yè)務的使用更貼近真正業(yè)務client的調(diào)用。如(示意):
APIClient.getData() --> RegistryClient.<DynamicDispatcher> --> zkClient.getData()
源碼分析
有了上面的例子和思路,帶著下面兩個問題進行源碼的分析:
- 如何保證
RegistryClient的實例只有一個。 -
APIClient類是如何和真實的client類進行交互的。
主從模式(Leader / Follower)
將多進程分為主(Leader)進程和從(Follower)進程,Leader只有一個并負責維護實際的第三方應用的鏈接及事件處理,F(xiàn)ollower用于訂閱Leader的一些事件及主動推送數(shù)據(jù)給Leader,也可以主動調(diào)用Leader執(zhí)行一些操作,它們之間可以通過進程間通信的方式進行信息交換。在Egg中規(guī)定了agent進程是Leader,而其他worker進程作為Follower,代碼如下isLeader: this.type === 'agent':
// node_modules/egg/egg.js
class EggApplication extends EggCore {
constructor(options) {
...
...
/**
* Wrap the Client with Leader/Follower Pattern
*
* @description almost the same as Agent.cluster API, the only different is that this method create Follower.
*
* @see https://github.com/node-modules/cluster-client
* @param {Function} clientClass - client class function
* @param {Object} [options]
* - {Boolean} [autoGenerate] - whether generate delegate rule automatically, default is true
* - {Function} [formatKey] - a method to tranform the subscription info into a string,default is JSON.stringify
* - {Object} [transcode|JSON.stringify/parse]
* - {Function} encode - custom serialize method
* - {Function} decode - custom deserialize method
* - {Boolean} [isBroadcast] - whether broadcast subscrption result to all followers or just one, default is true
* - {Number} [responseTimeout] - response timeout, default is 3 seconds
* - {Number} [maxWaitTime|30000] - leader startup max time, default is 30 seconds
* @return {ClientWrapper} wrapper
*/
this.cluster = (clientClass, options) => {
options = Object.assign({}, this.config.clusterClient, options, {
// cluster need a port that can't conflict on the environment
port: this.options.clusterPort,
// agent worker is leader, app workers are follower
isLeader: this.type === 'agent',
logger: this.coreLogger,
});
const client = cluster(clientClass, options);
this._patchClusterClient(client);
return client;
};
...
...
}
...
...
}
上面??代碼是在agent 和 application對象上掛了一個名為cluser的創(chuàng)建方法,方法返回一個ClientWrapper實例。
Cluster-Client代碼結構
|--cluster-client
|--lib
|--protocol
--byte_buffer.js
--packet.js
--request.js
--response.js
--api_client.js
--client.js
--connections.js
--const.js
--default_logger.js
--default_transcode.js
--follower.js
--index.js
--leader.js
--server.js
--symbol.js
--utils.js
這里我們先重點關注api_client.js / index.js / client.js這三個源碼。回想到上面Egg文檔給我提供的創(chuàng)建apiClient的例代碼?? :
new APIClient(Object.assign({}, config, { cluster: app.cluster });
我們就來到了cluster-client/lib/api_client.js, 這里將app.cluster方法傳入,參考源碼:
1 constructor(options) {
2 options = options || {};
3 super(options);
4 const wrapper = (options.cluster || cluster)(
5 this.DataClient, this.clusterOptions
6 );
7 for (const from in this.delegates) {
8 const to = this.delegates[from];
9 wrapper.delegate(from, to);
10 }
11 this._client = wrapper.create(options);
12 utils.delegateEvents(this._client, this);
13 if (!options.initMethod) {
14 this._client.ready(err => {
15 this.ready(err ? err : true);
16 });
17 }
18 }
第4行代碼直接就調(diào)用了cluster方法創(chuàng)建了一個ClientWrapper實例,第11行調(diào)用了wrapper的create方法,這樣我們就來到了cluster-client/lib/index.js:
// 去掉不分析的代碼
...
create (...args) {
...
function createRealClient() {
return Reflect.construct(clientClass, args);
}
const client = new ClusterClient(Object.assign({
createRealClient,
descriptors: this._descriptors,
}, this._options));
...
}
...
create方法主要是做了一些方法delegate生成和方法校驗(下回分析),這里調(diào)用了包裝了一個反射創(chuàng)建真實RegistryClient實例的方法并傳入ClusterClient生成了一個實例最終返回給調(diào)用者其實就是APIClient中的_client,那么這樣就來到了重點的cluster-client/lib/client.js, 方便查看這里直接就貼出[init]部分代碼:
async [init]() {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = await ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
await ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = await ClusterServer.create(name, port);
}
if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}
...
}
代碼非常清晰,如果是leader就會創(chuàng)建一個server并監(jiān)聽<port>, 如果是follower就鏈接server的<port>端口(可以查看server.js代碼)。然后分別new了Leader和Follower兩個實例并賦值給[innerClient]。當我們再查看Leader.js 的代碼時,發(fā)現(xiàn)在構造函數(shù)里有 this._realClient = this.options.createRealClient();, 原來真正的client是在這個時間創(chuàng)建的,而查看Follower.js的代碼時發(fā)現(xiàn)都是發(fā)送的tcp請求。這樣上面的兩個問題我們就都有了答案。
- agent進程起來后加載agent.js的時候設置了
cluster方法,在beforeStart時通過new APIClient初始化_client屬性的同時啟動了一個tcp server并在new Leader對象時初始化正在的client。Egg的agent進程只有一個因此真正的client實例也只有一個。 - 當調(diào)用
APIClient的方法時就會通過_client屬性調(diào)用到ClusterClient,然后再調(diào)用它內(nèi)部[innerClient], 而[innerClient]分別是Leader和Follower的實例,所以如果是leader就直接調(diào)用realClient否則就發(fā)送tcp請求。
至此cluster-client的多進程增強模式的主從原理就分析完成了,在實際的實現(xiàn)過程中具體的調(diào)用還是有一些規(guī)則和約束,如:
delegates的設置以及subscribe / publish / invoke / invokeOneway分別是如何使用的還需要進一步了解。