前言
上一章講解了node如何通過事件循環(huán)實(shí)現(xiàn)異步,包括與各種IO多路復(fù)用搭配實(shí)現(xiàn)的異步IO已經(jīng)與IO無關(guān)的異步API。
以前,之所以異步IO在應(yīng)用層面不太流行,是因?yàn)楫惒骄幊淘诹鞒炭刂浦?,業(yè)務(wù)表達(dá)并不太適合程序員開發(fā)。
函數(shù)式編程
函數(shù)式編程是js異步編程的基礎(chǔ)。
高階函數(shù)
在js中,函數(shù)的參數(shù)可以為基本數(shù)據(jù)類型、對象引用,甚至是一個(gè)函數(shù)(函數(shù)也是一種對象)。同理,函數(shù)的返回值也可以是基本數(shù)據(jù)類型、對象引用,甚至是一個(gè)函數(shù)。
function foo(x) {
return function () {
return x;
};
}
那么高階函數(shù)就是把函數(shù)作為參數(shù)或是返回值的一類函數(shù)的稱謂。這就形成了一種后續(xù)傳遞風(fēng)格(Continuation Passing Style)的結(jié)果接收方式,這種風(fēng)格將編程重點(diǎn)從關(guān)注返回值,轉(zhuǎn)移到了回調(diào)函數(shù)中。
function foo(x, bar) {
return bar(x);
}
//例如sort()
var points = [40, 100, 1, 5, 25, 10];
points.sort(function(a, b) {
return a - b;
});
// [ 1, 5, 10, 25, 40, 100 ]
通過改動(dòng)sort()方法的參數(shù),可以決定不同的排序方式,從這里就可以看出高階函數(shù)的靈活性了。
結(jié)合node提供的最基本的事件模塊可以看出,事件的處理方式正是基于高階函數(shù)的特性來完成的,在自定義事件實(shí)例中,通過為相同事件注冊不同的回調(diào)函數(shù),可以很靈活的處理業(yè)務(wù)邏輯。
var emitter = new events.EventEmitter();
emitter.on('event_foo', function () {
// TODO
});
這本書里時(shí)常提到事件可以十分方便的進(jìn)行復(fù)雜業(yè)務(wù)邏輯的解耦,它其實(shí)受益于高階函數(shù)。(ES5中提供的一些數(shù)組方法都是高階函數(shù),forEach()map()、 reduce()、 reduceRight()、 filter()、 every()、 some())
偏函數(shù)用法
書中對于偏函數(shù)的描述,十分的拗口.....“偏函數(shù)用法是指創(chuàng)建一個(gè)調(diào)用另外一個(gè)部分(參數(shù)或變量已經(jīng)預(yù)置的函數(shù))的函數(shù)的用法?!?....這簡直就是翻譯界的災(zāi)難呀,我們來看看例子:
var toString = Object.prototype.toString;
var isString = function (obj) {
return toString.call(obj) == '[object String]';
};
var isFunction = function (obj) {
return toString.call(obj) == '[object Function]';
};
上述代碼的業(yè)務(wù)邏輯很簡單,但是卻需要我們重復(fù)定義相同的部分,如果有跟多的isXXX(),就會(huì)出現(xiàn)更多的冗余代碼,為了解決重復(fù)定義的問題,我們引入新的函數(shù),這個(gè)函數(shù)可以如工廠一樣批量創(chuàng)建一些類似的函數(shù)。我們看一下改造:
var isType = function (type) {
return function (obj) {
return toString.call(obj) == '[object ' + type + ']';
};
};
var isString = isType('String');
var isFunction = isType('Function');
引入isType()函數(shù)后,創(chuàng)建方法就簡單了,這個(gè)根本就是js的工廠模式嘛,這種通過指定部分參數(shù)來產(chǎn)生一個(gè)新的定制函數(shù)的形式就是偏函數(shù)。
偏函數(shù)在異步編程中十分
異步編程的優(yōu)勢與難點(diǎn)
單線程容易阻塞服務(wù)器,多線程因?yàn)榇嬖阪i、線程間狀態(tài)同步以及多線程之間的上下文切換,用起來也是需要一定經(jīng)驗(yàn)和積累的。當(dāng)然,如果你是c/c++技術(shù)大牛,你可以通過c/c++調(diào)用操作系統(tǒng)底層接口,自己手工完成異步IO,這樣性能可以提升很多,但是調(diào)試開發(fā)門檻則十分高了,不是新入行的菜鳥小白們能玩耍的了的。(不熟不生巧。)
優(yōu)勢
node的優(yōu)勢在于基于事件驅(qū)動(dòng)的非阻塞IO模型,這個(gè)模型使得非阻塞IO可以使cpu計(jì)算與IO相互解耦,讓資源得到更好的利用。我們來看一個(gè)圖進(jìn)一步說明node的異步IO模型:

下邊是同步IO模型,在這里再次進(jìn)行對比,雖然上一章已經(jīng)反復(fù)講解了,但是在這里作者還是給大家重新描述,我想這應(yīng)該是作者希望每一章都可以獨(dú)立閱讀的緣故。

在第三章中,node的異步IO利用了事件循環(huán)方式,js線程像分配任務(wù)和處理結(jié)果的大管家,IO線程池里的各個(gè)IO都是小二,負(fù)責(zé)兢兢業(yè)業(yè)的完成分配的任務(wù),小二與管家之間沒有依賴,可以保持整體的高效率。(前端編程、ios開發(fā)等都是這樣的)
書中在此處補(bǔ)充了第一章說的如何分解任務(wù)的方法來應(yīng)對cpu密集型的程序:
由于事件循環(huán)模型需要應(yīng)對海量請求,海量請求同時(shí)作用在單線程上,就需要防止任何一個(gè)計(jì)算耗費(fèi)過多的cpu時(shí)間片。至于是計(jì)算密集型,還是IO密集型,只要計(jì)算不影響異步IO的調(diào)度,那就不構(gòu)成問題。建議對cpu的耗用不要超過10ms,或者將大量的計(jì)算分解為諸多的小量計(jì)算,通過setImmediate()進(jìn)行調(diào)度。只要合理利用node的異步模型與V8的高性能,就可以充分發(fā)揮cpu和IO資源的優(yōu)勢。
難點(diǎn)
異步編程跟傳統(tǒng)的同步編程還是有很大差異的,此處哦說的難點(diǎn)主要是針對同步編程來說的,也就是同步編程可以很好解決的問題,但是在異步編程中變成了難點(diǎn)。
| 難點(diǎn) | 描述 | 解決 |
|---|---|---|
| 異常處理 | 無法利用try/catch/final的方式捕獲異常,也就是說對于回調(diào)拋出的異常,使用傳統(tǒng)的同步異常抓取辦法是抓不到的。 | 將回調(diào)函數(shù)的第一個(gè)實(shí)參作為err回傳,如果為null則沒有異常,如果有err對象,則發(fā)生了異常。這也就要求我們在寫異步程序時(shí),第一,要有回調(diào)函數(shù),第二,要正確設(shè)置回調(diào)函數(shù)的參數(shù),并且將第一個(gè)參數(shù)設(shè)置為err,第三,要確保在回調(diào)函數(shù)內(nèi)部發(fā)生錯(cuò)誤時(shí)正確的傳遞了這個(gè)錯(cuò)誤。 |
| 函數(shù)嵌套過深 | callback hell | 最新的解決方案是使用async/await來將異步變同步 |
| 阻塞代碼 | 因?yàn)閚ode是單線程程序,因此沒有sleep()來阻塞程序 | 使用setTimeout()來阻塞程序,但是這個(gè)方案也未必就好,我們已經(jīng)從第三章了解了這個(gè)異步API的一些知識(shí),因此,阻塞代碼的做法,不要在node中出現(xiàn),盡量還是利用異步事件編程,來實(shí)現(xiàn)業(yè)務(wù)。 |
| 多線程 | node的js執(zhí)行方式是單線程的 | node沒有web workers,同時(shí),web workers雖然解決了利用cpu和減少阻塞ui渲染的問題,但是還是不能解決ui渲染效率的問題。因此,在node層面,使用了child_process作為基礎(chǔ)的解決API方案,同時(shí)還提供了cluster模塊作為更深層次的應(yīng)用解決方案。node借助了這個(gè)web worker的模式,通過多進(jìn)程的方式,調(diào)用了操作系統(tǒng)層面的多線程。 |
| 異步轉(zhuǎn)同步 | 還是那個(gè)回調(diào)問題 | 最新的解決方案是使用async/await來將異步變同步 |
異常處理的正確參數(shù)傳遞
異常處理的正確參數(shù)傳遞的示例代碼。
var async = function (callback) {
process.nextTick(function() {
var results = something;
if (error) {
return callback(error);
}
callback(null, results);
});
};
此處說一個(gè)題外話,在以前還有一種基于node核心模塊domain進(jìn)行異常處理的方式,代碼如下:
var d = require('domain').create();
d.on('error', function (err) {
logger.fatal('Domain master', { message: err.message, stack: err.stack });
process.exit(0);
});
d.run(function () {
...
...
但是這個(gè)方法經(jīng)常會(huì)造成整個(gè)程序奔潰,因此,已經(jīng)不建議使用了。
我們來看看前端的web workers
瀏覽器提高了web workers來將js執(zhí)行和ui渲染分離,并通過web workers的消息傳遞來調(diào)度多核cpu進(jìn)行運(yùn)算。

異步編程解決方案
因?yàn)?,這本書寫作時(shí)還沒有推出node v7.6和es2015規(guī)范,因此,書中介紹的解決方案并不完美,目前通過async/await的方式,已經(jīng)可以完美解決這個(gè)問題了。(目前看來是完美的)
我們來看看書中介紹的解決方案,分別是:事件發(fā)布/訂閱模式、Promise/Deferred模式、流程控制庫
事件發(fā)布/訂閱模式
事件發(fā)布/訂閱模式,其實(shí)就是回調(diào)函數(shù)的事件化。這個(gè)功能基于的是node自身提供的events模塊,換句話說這個(gè)應(yīng)該就是很多node事件回調(diào)語法的中間層實(shí)現(xiàn)的這部分了。所謂事件發(fā)布訂閱,也可以理解為是瀏覽器中發(fā)布一個(gè)按鈕,然后為瀏覽器注冊這個(gè)按鈕監(jiān)聽的相關(guān)事件,然后,點(diǎn)擊這個(gè)按鈕。這個(gè)events模塊提供了addListener/on()、once()、removeListener()、removeAllListeners()、emit()等基本的事件監(jiān)聽模式的方法實(shí)現(xiàn)。示例代碼如下:
// 訂閱
emitter.on("event1", function (message) {
console.log(message);
});
// 發(fā)布
emitter.emit('event1', "I am message!");
可以看到訂閱事件就是一個(gè)高階函數(shù)的應(yīng)用,事件發(fā)布/訂閱模式可以實(shí)現(xiàn)一個(gè)事件與多個(gè)回調(diào)函數(shù)的關(guān)聯(lián),這些回調(diào)函數(shù)又被稱為事件監(jiān)聽器。(這個(gè)類似于瀏覽器中使用onclick="doSomething();doSomethingElse();"這樣的事件關(guān)聯(lián)方式)通過emit()發(fā)布事件后,消息會(huì)立即傳遞給當(dāng)前事件的所有偵聽器執(zhí)行。偵聽器可以很靈活的添加和刪除,使得事件和具體處理邏輯之間可以很輕松的關(guān)聯(lián)和解耦。
事件發(fā)布/訂閱模式自身并無同步和異步調(diào)用的問題,但在node中,emit()調(diào)用多半是伴隨事件循環(huán)而異步觸發(fā)的,所以,事件發(fā)布/訂閱廣泛應(yīng)用于異步編程。
事件發(fā)布/訂閱模式常常用來解耦業(yè)務(wù)邏輯,事件發(fā)布者無需關(guān)注訂閱的偵聽器如何實(shí)現(xiàn)業(yè)務(wù)邏輯,甚至不用關(guān)注有多少個(gè)偵聽器存在數(shù)據(jù)通過消息的方式可以很靈活的傳遞。這個(gè)就非常像面向?qū)ο笤O(shè)計(jì)中的接口設(shè)計(jì)了,接口設(shè)計(jì)者只需要規(guī)定都要哪些接口,實(shí)現(xiàn)哪些功能,而具體的接口實(shí)現(xiàn)都在子類中由開發(fā)程序員們來實(shí)現(xiàn)。
因此,在node中,事件的設(shè)計(jì)其實(shí)就是組件的接口設(shè)計(jì)。
從另外一個(gè)角度來看,事件偵聽器模式也是一種鉤子hook機(jī)制,利用鉤子導(dǎo)出內(nèi)部數(shù)據(jù)或狀態(tài)給外部的調(diào)用者。Node中的很多對象大多具有黑盒的特點(diǎn),功能點(diǎn)較少,如果不通過事件鉤子的形式,我們就無法獲取對象在運(yùn)行期間的中間值或內(nèi)部狀態(tài)。這種通過事件鉤子的方式,可以是編程人員不用關(guān)注組件是如何啟動(dòng)和執(zhí)行的,只需要關(guān)注在需要的事件點(diǎn)上即可。因此,我們不需要了解內(nèi)部運(yùn)行的機(jī)制,只需要關(guān)注關(guān)鍵數(shù)據(jù)就行了。例如http請求,就是這樣一個(gè)場景:
var options = {
host: 'www.google.com',
port: 80,
path: '/upload',
method: 'POST'
};
var req = http.request(options, function (res) {
console.log('STATUS: ' + res.statusCode);
console.log('HEADERS: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', function (chunk) {
console.log('BODY: ' + chunk);
});
res.on('end', function () {
// TODO
});
});
req.on('error', function (e) {
console.log('problem with request: ' + e.message);
});
// write data to request body
req.write('data\n');
req.write('data\n');
req.end();
在這段http請求中,程序員只需要將視線放在error、data、end上即可,至于內(nèi)部流程如何,我們不需要過度關(guān)注。
注意:
1.如果對一個(gè)事件添加了超過10個(gè)偵聽器,將會(huì)得到一條警告,因?yàn)?,有可能造成?nèi)存泄漏。調(diào)用emitter.setMaxListeners(0);可以去掉這個(gè)限制。但是,設(shè)置太多的偵聽器,也可能造成占用cpu過多的情況發(fā)生。
2.為了處理異常,EventEmitter對象對error事件進(jìn)行了特殊對待,如果運(yùn)行期間的錯(cuò)誤觸發(fā)了error事件,EventEmitter會(huì)檢查是否有對error事件添加過偵聽器,如果添加了,這個(gè)錯(cuò)誤將會(huì)交由該偵聽器處理,否則這個(gè)錯(cuò)誤將會(huì)作為異常拋出。如果外部沒有捕獲這個(gè)異常,將會(huì)引起線程退出。因此,一定要對error事件進(jìn)行處理。
事件的實(shí)現(xiàn)
1.繼承events模塊:
實(shí)現(xiàn)一個(gè)繼承EventEmitter的類是十分簡單的,我們來看一下這個(gè)代碼:
var events = require('events');
function Stream() {
events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
node在util模塊中封裝了繼承的方法,所以此次可以很方便的調(diào)用。開發(fā)者可以通過這樣的方式輕松繼承EventEmitter類。在node核心模塊中,幾乎有近一半的模塊都繼承自EventEmitter。這個(gè)也是node事件驅(qū)動(dòng)的一個(gè)實(shí)現(xiàn)的基礎(chǔ)。
雪崩問題和解決方案
雪崩問題,是因?yàn)楦咴L問量和大并發(fā)的情況下,造成緩存失效,大量的請求同時(shí)涌入數(shù)據(jù)庫中,使得數(shù)據(jù)庫無法同時(shí)承受如此大的查詢需求,從而影響整個(gè)應(yīng)用的性能的一種情況。
我們可以利用事件隊(duì)列解決雪崩問題。我們利用once()方法,使得通過它添加的偵聽器只能執(zhí)行一次,在執(zhí)行之后就會(huì)將他與事件的關(guān)聯(lián)移除。這個(gè)特性可以幫助我們過濾一些重復(fù)的事件響應(yīng)。
例如如下代碼:
var select = function (callback) {
db.select("SQL", function (results) {
callback(results);
});
};
如果應(yīng)用剛剛啟動(dòng),這時(shí)緩存中沒有數(shù)據(jù),而訪問量巨大,同一句SQL會(huì)被發(fā)到數(shù)據(jù)庫中反復(fù)查詢,影響整體性能。我們可以增加一個(gè)狀態(tài)鎖:
var status = "ready";
var select = function (callback) {
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
status = "ready";
callback(results);
});
}
};
這樣雖然解決了訪問量大的問題,但是,除了第一訪問存在數(shù)據(jù),后續(xù)的訪問都獲取不到數(shù)據(jù)了,前端的請求就得不到正確的結(jié)果了,因此,這時(shí)就需要用once()來解決了:
var proxy = new events.EventEmitter();
var status = "ready";
var select = function (callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
proxy.emit("selected", results);
status = "ready";
});
}
};
這里我們利用了once()方法,將所有請求的回調(diào)都?jí)喝胧录?duì)列中,利用其執(zhí)行一次就將監(jiān)視移除的特點(diǎn),保證每一個(gè)回調(diào)只會(huì)被執(zhí)行一次。對于相同的SQL語句,保證在同一個(gè)查詢開始到結(jié)束的過程中永遠(yuǎn)只有一次。sql在進(jìn)行查詢時(shí),新到來的相同調(diào)用只需要在隊(duì)列中等待數(shù)據(jù)就緒即可,這樣就可以保證同一個(gè)查詢從開始到結(jié)束的過程中,只有一次。新來的相同調(diào)用,只需要在隊(duì)列中等待數(shù)據(jù)就緒即可,一旦查詢結(jié)束,得到的結(jié)果可以被這些調(diào)用共同使用。這種方式能節(jié)省重復(fù)的數(shù)據(jù)庫調(diào)用產(chǎn)生的開銷,由于node單線程執(zhí)行的原因,此處無需擔(dān)心狀態(tài)同步的問題,這種方式其實(shí)也可以應(yīng)用到其他遠(yuǎn)程調(diào)用的場景中,即使外部沒有緩存策略,也可以有效的節(jié)省重復(fù)開銷。
此處可能會(huì)存在注冊事件過多引發(fā)的警告,需要調(diào)用setMaxListeners(0),移除警告,或者設(shè)置更大的警告閾值。
多異步之間的協(xié)作方案
產(chǎn)生這個(gè)的原因是,一般情況下,事件監(jiān)聽器和回調(diào)函數(shù)是一對多的關(guān)系,也就是有一個(gè)同類型的監(jiān)聽器,可以監(jiān)聽多個(gè)相同類型事件的響應(yīng)。但是,也會(huì)存在多個(gè)事件監(jiān)聽器去響應(yīng)一個(gè)回調(diào)函數(shù)的情況,這就是callback hell產(chǎn)生的原因,因?yàn)榇_實(shí)有這種需求,因此,在這里講解一下多異步之間的協(xié)作方案。
這里,作者想要通過node的原生代碼,來解決callback hell的問題,我們來看一下下邊這個(gè)代碼,這里以渲染頁面需要的模板讀取、數(shù)據(jù)讀取和本地化資源讀取為例,簡單介紹
var count = 0;
var results = {};
var done = function (key, value) {
results[key] = value;
count++;
if (count === 3) {
// 渲染頁面
render(results);
}
};
fs.readFile(template_path, "utf8", function (err, template) {
done("template", template);
});
db.query(sql, function (err, data) {
done("data", data);
});
l10n.get(function (err, resources) {
done("resources", resources);
});
由于多個(gè)異步場景中,回調(diào)函數(shù)的執(zhí)行并不能保證順序,且回調(diào)函數(shù)間不會(huì)存在交集,因此,需要借助第三方函數(shù)和第三方變量來協(xié)助處理結(jié)果,這個(gè)變量就是用于檢測執(zhí)行次數(shù)的,它一般被稱為哨兵變量。此處需要用到偏函數(shù)的相關(guān)知識(shí)。
var after = function (times, callback) {
var count = 0, results = {};
return function (key, value) {
results[key] = value;
count++;
if (count === times) {
callback(results);
}
};
};
var done = after(times, render);
done(1,"張")
//例子
after(1, results => {
console.log('hello world ' + results[1]);
})(1, "zhangz");
//如果寫成這樣
var after = function (times, callback) {
var count = 0, results = {};
callback(results);
};
//也是可以觸發(fā)回調(diào)函數(shù)的
after(1,results=>{
console.log('hello world '+results[1]);
});
//但是就沒有地方傳遞參數(shù)了,因此,最好寫成返回函數(shù)的形式。
上述方案實(shí)現(xiàn)了多對一的目的,如果業(yè)務(wù)繼續(xù)增長,我們依然可以繼續(xù)利用發(fā)布訂閱方式,來完成多對多的方案:
var emitter = new events.Emitter();
var done = after(times, render);
emitter.on("done", done);
emitter.on("done", other);
fs.readFile(template_path, "utf8", function (err, template) {
emitter.emit("done", "template", template);
});
db.query(sql, function (err, data) {
emitter.emit("done", "data", data);
});
l10n.get(function (err, resources) {
emitter.emit("done", "resources", resources);
});
這種方案結(jié)合了前者簡單的偏函數(shù)完成多對一的收斂和事件訂閱/發(fā)布模式中一對多的發(fā)散。
另外,樸靈自己寫了一個(gè)叫做EventProxy的模塊,也是來處理這種問題的,它對應(yīng)事件發(fā)布訂閱模式是一種補(bǔ)充。我們來感受一下:
var proxy = new EventProxy();
proxy.all("template", "data", "resources", function (template, data, resources) {
// TODO
});
fs.readFile(template_path, "utf8", function (err, template) {
proxy.emit("template", template);
});
db.query(sql, function (err, data) {
proxy.emit("data", data);
});
l10n.get(function (err, resources) {
proxy.emit("resources", resources);
});
EventProxy提供了一個(gè)all()方法來訂閱多個(gè)事件,當(dāng)每個(gè)事件被觸發(fā)后,監(jiān)聽器才會(huì)被執(zhí)行。使用tail()方法,讓每個(gè)事件都順序執(zhí)行。另外,還有after,可以命令事件在多少次訪問后,執(zhí)行。
var proxy = new EventProxy();
proxy.after("data", 10, function (datas) {
// TODO
});
EventProxy的原理
EventProxy來自于Backbone的事件模型,我們來看一下相關(guān)代碼
// Trigger an event, firing all bound callbacks. Callbacks are passed the
// same arguments as `trigger` is, apart from the event name.
// Listening for `"all"` passes the true event name as the first argument
trigger: function(eventName) {
var list, calls, ev, callback, args;
var both = 2;
if (!(calls = this._callbacks)) return this;
while (both--) {
ev = both ? eventName : 'all';
if (list = calls[ev]) {
for (var i = 0, l = list.length; i < l; i++) {
if (!(callback = list[i])) {
list.splice(i, 1); i--; l--;
} else {
args = both ? Array.prototype.slice.call(arguments, 1) : arguments;
callback[0].apply(callback[1] || this, args);
}
}
}
}
return this;
}
EventProxy則是將all當(dāng)做一個(gè)事件流的攔截層,在其中注入一些業(yè)務(wù)來處理單一事件無法解決的異步處理問題。類似的擴(kuò)展方法還有all、tail、after、not、any
EventProxy的異常處理
根據(jù)commonjs的規(guī)范,異常處理都被封裝在了回調(diào)函數(shù)的第一個(gè)err中。
exports.getContent = function (callback) {
var ep = new EventProxy();
ep.all('tpl', 'data', function (tpl, data) {
// 成功回調(diào)
callback(null, {
template: tpl,
data: data
});
});
// 監(jiān)聽error事件
ep.bind('error', function (err) {
//卸載掉所有處理函數(shù)
ep.unbind();
// 異常回調(diào)
callback(err);
});
fs.readFile('template.tpl', 'utf-8', function (err, content) {
if (err) {
// 一旦發(fā)生異常,一律交給error事件的處理函數(shù)處理
return ep.emit('error', err);
}
ep.emit('tpl', content);
});
db.get('some sql', function (err, result) {
if (err) {
// 一旦發(fā)生異常,一律交給error事件的處理函數(shù)處理
return ep.emit('error', err);
}
ep.emit('data', result);
});
};
exports.getContent = function (callback) {
var ep = new EventProxy();
ep.all('tpl', 'data', function (tpl, data) {
// 成功回調(diào)
callback(null, {
template: tpl,
data: data
});
});
//綁定錯(cuò)誤處理函數(shù)
ep.fail(callback);
fs.readFile('template.tpl', 'utf-8', ep.done('tpl'));
db.get('some sql', ep.done('data'));
};
另外,此處還有一些編程技巧:例如
ep.fail(callback);
//等價(jià)于
ep.fail(function (err) {
callback(err);
});
//等價(jià)于
ep.bind('error', function (err) {
// 卸載掉所有處理函數(shù)
ep.unbind();
// 異?;卣{(diào)
callback(err);
});
done()也可以變換實(shí)現(xiàn)
ep.done('tpl');
//等價(jià)于
function (err, content) {
if (err) {
// 一旦發(fā)生異常,一律交給error事件的處理函數(shù)處理
return ep.emit('error', err);
}
ep.emit('tpl', content);
}
又或者讓done接受一個(gè)函數(shù)作為參數(shù)
ep.done(function (content) {
// TODO
ep.emit('tpl', content);
});
//等價(jià)于
function (err, content) {
if (err) {
return ep.emit('error', err);
}
(function (content) {
// TODO
ep.emit('tpl', content);
}(content));
}
ep.done('tpl', function (content) {
// content.replace('s', 'S');
// TODO
return content;
});
大家也可以不看樸靈關(guān)于eventproxy的這個(gè)介紹,把之前的原理弄明白即可。
Promise/Deferred模式
使用事件的方式時(shí),執(zhí)行流程需要被預(yù)先設(shè)定,即便是分支,也需要預(yù)先設(shè)定,這是由發(fā)布/訂閱模式的運(yùn)行機(jī)制所決定的,例如ajax:
$.get('/api', {
success: onSuccess,
error: onError,
complete: onComplete
});
我們還可以利用Promise/Deferred模式來先執(zhí)行異步調(diào)用,延遲傳遞處理內(nèi)容。jquery的作者們通過這個(gè)模式幾乎重寫了jquery 1.5 ,我們便可以這樣調(diào)用ajax了:
$.get('/api')
.success(onSuccess)
.error(onError)
.complete(onComplete);
在原始api中,一個(gè)事件只能處理一個(gè)回調(diào),而通過Derferred對象,可以對事件加入任意的業(yè)務(wù)邏輯,如:
$.get('/api')
.success(onSuccess1)
.success(onSuccess2);
Promise/Deferred模式在CommonJS下抽象出了Promises/A、Promises/B、 Promises/D等模式,接下來我們就重點(diǎn)介紹一下Promises/A。
Promises/A
Promise/Deferred模式肯定包含Promise模式和Deferred模式兩部分,Promises/A的行為就印證了這一點(diǎn):
1.Promises只會(huì)存在三種狀態(tài),未完成態(tài)、完成態(tài)、失敗態(tài)
2.狀態(tài)只會(huì)從未完成態(tài)向完成態(tài),或者從未完成態(tài)向失敗態(tài)轉(zhuǎn)化,過程不可逆,完成態(tài)和失敗態(tài)也不會(huì)相互轉(zhuǎn)化。
3.狀態(tài)一旦轉(zhuǎn)化,將不能被更改。
Promises狀態(tài)轉(zhuǎn)化
Promises/A的實(shí)現(xiàn)非常簡單,一個(gè)Promises對象只需要具備then()方法即可,這個(gè)then()有如下特點(diǎn):
1.接受完成態(tài)、錯(cuò)誤態(tài)的回調(diào)方法,在操作完成或者出現(xiàn)錯(cuò)誤時(shí),將會(huì)調(diào)用對應(yīng)方法。
2.可選的支持progress事件回調(diào)作為第三方法
3.then()方法只接受function對象,其余對象將被忽略。
4.then()方法繼續(xù)返回promise對象,以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用。
then()方法的定義
then(fulfilledHandler, errorHandler, progressHandler)
我們使用events模塊來實(shí)現(xiàn)then()
var Promise = function () {
EventEmitter.call(this);
};
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) {
if (typeof fulfilledHandler === 'function') {
this.once('success', fulfilledHandler);
}
if (typeof errorHandler === 'function') {
this.once('error', errorHandler);
}
if (typeof progressHandler === 'function') {
this.on('progress', progressHandler);
}
return this;
};
在這里我們看到,實(shí)現(xiàn)then()方法所做的事情,是將回調(diào)函數(shù)存放起來,為了完成整個(gè)流程,還需要觸發(fā)執(zhí)行這些回調(diào)函數(shù)的地方,實(shí)現(xiàn)這些功能的對象通常被稱為Deferred,即延遲對象,示例代碼如下:
var Deferred = function () {
this.state = 'unfulfilled';
this.promise = new Promise();
};
Deferred.prototype.resolve = function (obj) {
this.state = 'fulfilled';
this.promise.emit('success', obj);
};
Deferred.prototype.reject = function (err) {
this.state = 'failed';
this.promise.emit('error', err);
};
Deferred.prototype.progress = function (data) {
this.promise.emit('progress', data);
};

利用promise/a提議的模式,我們可以對一個(gè)典型的響應(yīng)對象進(jìn)行封裝,代碼如下:
res.setEncoding('utf8');
res.on('data', function (chunk) {
console.log('BODY: ' + chunk);
});
res.on('end', function () {
// Done
});
res.on('error', function (err) {
// Error
});
//封裝為
res.then(function () {
// Done
}, function (err) {
// Error
}, function (chunk) {
console.log('BODY: ' + chunk);
});
因此,實(shí)現(xiàn)promise只需要簡單改造即可:
var promisify = function (res) {
var deferred = new Deferred();
var result = '';
res.on('data', function (chunk) {
result += chunk;
deferred.progress(chunk);
});
res.on('end', function () {
deferred.resolve(result);
});
res.on('error', function (err) {
deferred.reject(err);
});
return deferred.promise;
};
//執(zhí)行代碼
promisify(res).then(function () {
// Done
}, function (err) {
// Error
}, function (chunk) {
// progress
console.log('BODY: ' + chunk);
});
注意:這里返回deferred.promise的目的是為了不讓外部程序調(diào)用resolve()和reject()方法,更改內(nèi)部狀態(tài)的行為全部交由定義者處理。

deferred主要用于內(nèi)部,用于維護(hù)異步模型狀態(tài),promise則作用于外部,通過then()方法,暴露給外部已添加自定義邏輯。
因此綜上所示,promise/Deferred模式與事件發(fā)布訂閱模式相比,在api的接口和抽象上都進(jìn)行了簡化,它將業(yè)務(wù)中不可變的部分封裝在了deferred中,可變部分交給了promise。從這里可以看出設(shè)計(jì)者的思路,事件發(fā)布訂閱時(shí)直接使用了events,屬于低級(jí)接口,用戶可以自己實(shí)現(xiàn)任何業(yè)務(wù)邏輯,但是,都需要經(jīng)過一些相對復(fù)雜的過程和對于業(yè)務(wù)的抽象以及接口的封裝,promise/deferred從本質(zhì)上看是對于events模塊的抽象封裝的一種,是對于經(jīng)典場景的高度抽象,簡潔好用,能夠覆蓋大部分業(yè)務(wù)場景的需要。
第三方包:q
q是promise/a的一個(gè)實(shí)現(xiàn),通過npm install q。我們來看一下例子:
/**
* Creates a Node-style callback that will resolve or reject the deferred
* promise.
* @returns a nodeback
*/
defer.prototype.makeNodeResolver = function () {
var self = this;
return function (error, value) {
if (error) {
self.reject(error);
} else if (arguments.length > 2) {
self.resolve(array_slice(arguments, 1));
} else {
self.resolve(value);
}
};
};
//如果基于q則變?yōu)椋?
var readFile = function (file, encoding) {
var deferred = Q.defer();
fs.readFile(file, encoding, deferred.makeNodeResolver());
return deferred.promise;
};
readFile("foo.txt", "utf-8").then(function (data) {
// Success case
}, function (err) {
// Failed case
});
promise中的多異步協(xié)作
因?yàn)?,promise主要是用來解決單個(gè)異步操作而設(shè)計(jì)的,那么多個(gè)異步調(diào)用應(yīng)該如何處理呢?我們給出一個(gè)簡單的邏輯實(shí)現(xiàn)
Deferred.prototype.all = function (promises) {
var count = promises.length;
var that = this;
var results = [];
promises.forEach(function (promise, i) {
promise.then(function (data) {
count--;
results[i] = data;
if (count === 0) {
that.resolve(results);
}
}, function (err) {
that.reject(err);
});
});
return this.promise;
}
var promise1 = readFile("foo.txt", "utf-8");
var promise2 = readFile("bar.txt", "utf-8");
var deferred = new Deferred();
deferred.all([promise1, promise2]).then(function (results) {
// TODO
}, function (err) {
// TODO
});
promise進(jìn)階知識(shí)
我們先來看一個(gè)問題
obj.api1(function (value1) {
obj.api2(value1, function (value2) {
obj.api3(value2, function (value3) {
obj.api4(value3, function (value4) {
callback(value4);
});
});
});
});
//然后利用普通函數(shù),將代碼并行地展開
var handler1 = function (value1) {
obj.api2(value1, handler2);
};
var handler2 = function (value2) {
obj.api3(value2, handler3);
};
var handler3 = function (value3) {
obj.api4(value3, hander4);
};
var handler4 = function (value4) {
callback(value4);
});
obj.api1(handler1);
//并行展開后,所以代碼幾乎是同時(shí)執(zhí)行的,回調(diào)先后無法控制,因此,引入事件來控制
var emitter = new event.Emitter();
emitter.on("step1", function () {
obj.api1(function (value1) {
emitter.emit("step2", value1);
});
});
emitter.on("step2", function (value1) {
obj.api2(value1, function (value2) {
emitter.emit("step3", value2);
});
});
emitter.on("step3", function (value2) {
obj.api3(value2, function (value3) {
emitter.emit("step4", value3);
});
});
emitter.on("step4", function (value3) {
obj.api4(value3, function (value4) {
callback(value4);
});
});
emitter.emit("step1");
//其實(shí)事件控制后,反而代碼量更多了,其實(shí)這個(gè)跟promise的想法已經(jīng)差不多了,但是抽象不夠。
//因此,理想的編程體驗(yàn),應(yīng)該是前一個(gè)調(diào)用的結(jié)果作為下一個(gè)調(diào)用的開始,也就是鏈?zhǔn)秸{(diào)用或者說是隊(duì)列調(diào)用:
//這就好像是promise的then一樣
promise()
.then(obj.api1)
.then(obj.api2)
.then(obj.api3)
.then(obj.api4)
.then(function (value4) {
// Do something with value4
}, function (error) {
// Handle any error from step1 through step4
})
.done();
//于是對代碼進(jìn)行一些改造
var Deferred = function () {
this.promise = new Promise();
};
// 完成態(tài)
Deferred.prototype.resolve = function (obj) {
var promise = this.promise;
var handler;
while ((handler = promise.queue.shift())) {
if (handler && handler.fulfilled) {
var ret = handler.fulfilled(obj);
if (ret && ret.isPromise) {
ret.queue = promise.queue;
this.promise = ret;
return;
}
}
}
};
// 失敗態(tài)
Deferred.prototype.reject = function (err) {
var promise = this.promise;
var handler;
while ((handler = promise.queue.shift())) {
if (handler && handler.error) {
var ret = handler.error(err);
if (ret && ret.isPromise) {
ret.queue = promise.queue;
this.promise = ret;
return;
}
}
}
};
// 生成回調(diào)函數(shù)
Deferred.prototype.callback = function () {
var that = this;
return function (err, file) {
if (err) {
return that.reject(err);
}
that.resolve(file);
};
};
var Promise = function () {
// 隊(duì)列用于存儲(chǔ)待執(zhí)行的回調(diào)函數(shù)
this.queue = [];
this.isPromise = true;
};
Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) {
var handler = {};
if (typeof fulfilledHandler === 'function') {
handler.fulfilled = fulfilledHandler;
}
if (typeof errorHandler === 'function') {
handler.error = errorHandler;
}
this.queue.push(handler);
return this;
};
經(jīng)過這個(gè)改造,我們可以開始對原來的多個(gè)異步調(diào)用進(jìn)行操作了:
var readFile1 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
var readFile2 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
readFile1('file1.txt', 'utf8').then(function (file1) {
return readFile2(file1.trim(), 'utf8');
}).then(function (file2) {
console.log(file2);
})
因此,如果讓,promise支持鏈?zhǔn)綀?zhí)行,需要做兩件事
1.將回調(diào)都存入隊(duì)列
2.promise完成時(shí),逐個(gè)執(zhí)行回調(diào),一旦檢測到返回了新的promise對象,就停止執(zhí)行,然后將當(dāng)前deferred對象的promise引用改變?yōu)樾碌膒romise對象,并將隊(duì)列中余下的回調(diào)轉(zhuǎn)交給它。當(dāng)然,這只是自己研究promise原理,如果真的使用promise還是請用when和q這樣的成熟promise庫來解決問題吧。
將API promise化
// smooth(fs.readFile);
var smooth = function (method) {
return function () {
var deferred = new Deferred();
var args = Array.prototype.slice.call(arguments, 1);
args.push(deferred.callback());
method.apply(null, args);
return deferred.promise;
};
};
//于是,前面兩次文件讀取就可以簡化了
var readFile1 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
var readFile2 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
//簡化為
var readFile = smooth(fs.readFile);
//這樣代碼量會(huì)急劇減少
var readFile = smooth(fs.readFile);
readFile('file1.txt', 'utf8').then(function (file1) {
return readFile(file1.trim(), 'utf8');
}).then(function (file2) {
// file2 => I am file2
console.log(file2);
});
流程庫控制
事件發(fā)布訂閱和promise規(guī)范,都是模式和相關(guān)的commonjs規(guī)范,這些其實(shí)都是實(shí)現(xiàn)其他庫的底層方法和響應(yīng)規(guī)范。接下來介紹一下實(shí)際的方法,來解決異步編程的實(shí)際問題。
尾觸發(fā)與next
在用connect中間件時(shí),會(huì)有一個(gè)next對象,我們來看一下:
var app = connect();
// Middleware
app.use(connect.staticCache());
app.use(connect.static(__dirname + '/public'));
app.use(connect.cookieParser());
app.use(connect.session());
app.use(connect.query());
app.use(connect.bodyParser());
app.use(connect.csrf());
app.listen(3001);
用use注冊好中間件后,就在監(jiān)聽上存在了請求,然后就可以利用尾觸發(fā)機(jī)制了:
function (req, res, next) {
// 中間件
}
這樣就人為的設(shè)置了一個(gè)業(yè)務(wù)隊(duì)列

其實(shí),過濾呀,驗(yàn)證呀,日志呀都可以用這個(gè)機(jī)制,我們來看一下:
exports.authorize_session = function (req, res, next) {
if (req.session.user) {
if (req.session.clt_id) {
return next();
}
else {
if (req.xhr) {
res.json(
{
errorcode: "b0001",
errorinfo: "(用戶未認(rèn)證,請先進(jìn)行賬戶認(rèn)證)"
}
);
res.end();
}
else {
var loginTip;
switch (req.session.statusname) {
case "未認(rèn)證": loginTip = "<div class='status0'>該賬號(hào)尚未認(rèn)證,請先進(jìn)行:<a href='/au'>身份認(rèn)證</a></div>"; break;
case "待認(rèn)證": loginTip = "<div class='status1'>該賬號(hào)已提交認(rèn)證申請,請等待系統(tǒng)審核。</div>"; break;
case "駁回": loginTip = "<div class='status3'>該賬號(hào)認(rèn)證被駁回,請重新提交:<a href='/au'>身份認(rèn)證</a></div>"; break;
case "已認(rèn)證": loginTip = false;
default: break;
}
res.render('./login/logined', {
layout: 'admin',
title: "登錄成功",
username: req.session.user,
accMsg: req.session.accMsg,
status: req.session.statusname,//0未,1待,2已
clientid: req.session.clt_id,
login_tip: loginTip
});
}
}
}
else {
if (req.xhr) {
res.json(
{
errorcode: "b0001",
errorinfo: "(用戶未登錄,請先登錄)"
}
);
res.end();
}
else {
res.render('./login/login', {
title: "請先登錄",
usernametemp: req.body.username,
error_reason: "請先登錄",
layout: "default"
});
}
}
};
接下來,我們看一下connect的核心實(shí)現(xiàn)
function createServer() {
function app(req, res) { app.handle(req, res); }
utils.merge(app, proto);
utils.merge(app, EventEmitter.prototype);
app.route = '/';
app.stack = [];
for (var i = 0; i < arguments.length; ++i) {
app.use(arguments[i]);
}
return app;
};
這段代碼通過function app(req, res){ app.handle(req, res); }就可以創(chuàng)建http服務(wù)器的request事件處理函數(shù)了。其中,真正的核心代碼是app.stack = [];
stack屬性是這個(gè)服務(wù)器內(nèi)部維護(hù)的中間件隊(duì)列,通過調(diào)用use(),我們可以將中間件放入隊(duì)列
app.use = function (route, fn) {
// some code
this.stack.push({ route: route, handle: fn });
return this;
};
此時(shí),就建好出了模型了,接下來結(jié)合node原生http模塊,實(shí)現(xiàn)監(jiān)聽即可。
app.listen = function(){
var server = http.createServer(this);
return server.listen.apply(server, arguments);
};
最終,app.handle()將會(huì)把監(jiān)聽到的網(wǎng)絡(luò)請求,都從這里進(jìn)程處理。
app.handle = function(req, res, out) {
// some code
next();
};
每一個(gè)next()將取出隊(duì)列中的中間件,并執(zhí)行,同時(shí)將傳入當(dāng)前方法以實(shí)現(xiàn)遞歸調(diào)用,最終達(dá)到持續(xù)觸發(fā)的目的:
function next(err) {
// some code
// next callback
layer = stack[index++];
layer.handle(req, res, next);
}
async庫
第一次我使用這個(gè)庫是因?yàn)閘isk的區(qū)塊鏈程序,這個(gè)應(yīng)該是當(dāng)時(shí)最為知名的流程控制模塊了。這個(gè)庫的基本使用可以參考我寫的一篇文章:ebookcoin中出現(xiàn)的異步編程淺析
樸靈在書中描述了使用async庫,如何實(shí)現(xiàn)異步的串行執(zhí)行、并行執(zhí)行、異步調(diào)用的依賴處理、自動(dòng)依賴處理等功能,官方細(xì)節(jié)可以參考:https://github.com/caolan/async。在此為了加快筆記的書寫速度,我將不再寫樸靈書上關(guān)于async庫的介紹了
setp庫
setp比async更加輕量,通過npm install step安裝即可使用。step只有一個(gè)接口:Step(task1, task2, task3);
它可以接受任意數(shù)量的任務(wù),所以任務(wù)都會(huì)串行依次執(zhí)行:
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this);
},
function readFile2(err, content) {
fs.readFile('file2.txt', 'utf-8', this);
},
function done(err, content) {
console.log(content);
}
)
step的this關(guān)鍵字,是他內(nèi)部的next()方法,將異步調(diào)用的結(jié)果傳遞給下一個(gè)任務(wù)做為參數(shù)。
因?yàn)?,這個(gè)庫也不是本次學(xué)習(xí)的重點(diǎn),因此step的并行任務(wù)執(zhí)行、結(jié)果分組等都不再寫筆記,大家可以自己上網(wǎng)查看。
wind
這個(gè)庫,大家可以自行查看https://github.com/JeffreyZhao/wind,在此也不做過多的介紹。
流程控制小結(jié)
流程控制是為了解決callback hell的,這個(gè)也是異步編程的重點(diǎn)。我們比較一下幾種方案:
| 流程控制方法 | 說明 |
|---|---|
| 事件發(fā)布/訂閱 | node的事件底層實(shí)現(xiàn),是其他庫的實(shí)現(xiàn)基礎(chǔ),比較原始和底層,理解后對于其他庫的原理可以有更深刻的理解 |
| promise/deferred | 它是一種解決異步編程的規(guī)范,并做了代碼抽象和封裝,現(xiàn)在已經(jīng)廣泛應(yīng)用于各種異步庫中。 |
| eventproxy | 樸靈自己寫的一個(gè)對于events模塊的擴(kuò)展,可以理解其原理,深刻體會(huì)流程控制的精妙之處 |
| async | 一個(gè)流程控制庫,可以解決異步串行、并行、自動(dòng)執(zhí)行等多種任務(wù) |
| step | 跟async差不多 |
| wind | 也是一個(gè)庫,沒怎么看,以后補(bǔ)充 |
| streamline | 書中一筆帶過,以后補(bǔ)充原理 |
我們現(xiàn)在處于node7.6以后的時(shí)代,更多的流程控制可以通過async/await來自行設(shè)計(jì)和解決,這些內(nèi)容都做完程序底層原理學(xué)習(xí)即可。
異步并發(fā)控制
因?yàn)椋琻ode可以隨便的調(diào)用異步發(fā)起并行調(diào)用程序,因此,如果使用的太過于隨意很可能出現(xiàn)資源被吃光的情況,并報(bào)類似于打開文件過多的錯(cuò):Error: EMFILE, too many open files
雖然,這樣確實(shí)榨干了服務(wù)器資源,但是也應(yīng)該做一下過載保護(hù),防止問題出現(xiàn)。
bagpipe的解決方案
bagpipe可以實(shí)現(xiàn)通過隊(duì)列控制并發(fā)量的功能,同時(shí)還啟用了拒絕模式,防止大量的異步調(diào)用。另外,對于過長時(shí)間的異步調(diào)用,也提供了超時(shí)控制。因此,這個(gè)不是本次筆記的學(xué)習(xí)重點(diǎn),因此,請大家先自行查詢網(wǎng)絡(luò),自己學(xué)習(xí),我會(huì)后續(xù)補(bǔ)全筆記。
async的解決方案
async提供了 parallelLimit()來做異步并發(fā)控制,還是一樣,請大家看我的另外一篇筆記。ebookcoin中出現(xiàn)的異步編程淺析
總結(jié)
異步編程我們可以通過事件注冊和監(jiān)聽、Promise/Deferred模式、流程控制庫、Generator/yeild、async/await等方式進(jìn)行解決。
其中事件發(fā)布/訂閱模式相對算是一種較為原始的方式,Promise/Deferred模式貢獻(xiàn)了一個(gè)非常不錯(cuò)的異步任務(wù)模型的抽象。流程控制庫則對異步進(jìn)行了封裝,讓用戶只關(guān)注回調(diào)函數(shù),而不是異步程序。
另外,書中還介紹了一個(gè)streamline,如果有興趣,可以去看看。
當(dāng)然,我們知道,在現(xiàn)在這種情況下,async/await則是對于異步編程,非常完美的解決方案了。
async/await搭配Promise/Deferred模式,從原生核心node代碼解決方案出發(fā),幾乎是完美的解決了異步編程的問題。
注意:這本書主要還是基于node6和es5語法的,現(xiàn)在在node7.6和es2015等規(guī)范下,已經(jīng)可以通過async/await這種形式,完美的解決異步編程callback hell的問題了。
