Stream在node中是一個(gè)無(wú)處不在的概念,但凡和IO沾邊的程序都離不開(kāi)stream,所以不弄懂stream是無(wú)法真正使用node的。在這份簡(jiǎn)短的筆記中,讓我們來(lái)一起看看這個(gè)融入node血肉中的stream到底是什么,以及該如何正確使用stream吧。
Stream這個(gè)概念一點(diǎn)都不新鮮,早在unix早期就被用于命令行中了。在當(dāng)時(shí),計(jì)算機(jī)內(nèi)部帶寬非常有限,輸入輸出都只能按字節(jié)碼順序進(jìn)行傳輸,這也就是最早的stream的概念。后來(lái)stream這個(gè)概念被發(fā)揚(yáng)廣大,幾乎所有IO數(shù)據(jù)都按stream的形式,通過(guò)pipe(管道)進(jìn)行傳輸,甚至連進(jìn)程間通信都用到了stream和pipe的概念。
在node中,stream作為一個(gè)各種可讀寫的對(duì)象的抽象層,方便我們用統(tǒng)一的方法對(duì)各種對(duì)象進(jìn)行操作。常見(jiàn)的http request response,標(biāo)準(zhǔn)輸出流stdout和輸入流stdin都可以用stream來(lái)進(jìn)行處理。
node中的stream分為Readable(可讀的)、Writable(可寫)或是Duplex兩者兼有的三大類,讓我們依次了解吧。
stream.Readable
Readable stream是對(duì)所有可讀數(shù)據(jù)源的抽象。node中的Readable stream有兩個(gè)模式:flowing和paused。flowing指的是數(shù)據(jù)流不停歇地流入,一有數(shù)據(jù)就立刻傳輸,以便盡快將數(shù)據(jù)傳輸給出去;而paused模式則每次只傳輸一部分?jǐn)?shù)據(jù),讀取了一部分?jǐn)?shù)據(jù)后再傳輸這部分?jǐn)?shù)據(jù),按塊傳輸數(shù)據(jù)直到傳輸結(jié)束。
前面說(shuō)過(guò),stream是通過(guò)pipe(管道)傳輸?shù)?。readable的pipe方法非常直觀:
readable.pipe(destination[, options])
destination就是數(shù)據(jù)傳輸?shù)哪繕?biāo)。options接收一個(gè)end參數(shù),該參數(shù)為true時(shí),只要接收方明確停止數(shù)據(jù)傳輸,數(shù)據(jù)流就立即停止。
在node學(xué)習(xí)筆記中我們見(jiàn)過(guò)的Readable stream包括:
- 客戶端的http response
- 服務(wù)器端的http request
- fs模塊中的read streams
- tcp中的socket
- process的stdin和stdout方法
下面讓我們來(lái)看一個(gè)fs的例子。fs有一個(gè)createReadStream方法,可以按stream的方式讀取文件。我們就用這個(gè)方法來(lái)讀取命令行傳入的指定路徑的文件,然后用pipe(管道)傳輸給stdout:
var fs = require('fs');
fs.createReadStream(process.argv[2]).pipe(process.stdout);
另外一個(gè)簡(jiǎn)單的例子,stdin產(chǎn)生的流傳輸給stdout:
process.stdin.pipe(process.stdout);
如果你用node執(zhí)行這個(gè)程序,就會(huì)看到每輸入一行回車后,屏幕(stdout)就會(huì)重復(fù)這一行的內(nèi)容。
Transform Stream
如果只是簡(jiǎn)單的將數(shù)據(jù)從一端傳到另一端,那么流的也未免太簡(jiǎn)單了。事實(shí)上,stream是非常靈活的一種數(shù)據(jù)傳輸手段。現(xiàn)在我們就來(lái)看看transform stream是如何實(shí)現(xiàn)這種靈活性的。
許多時(shí)候,我們希望對(duì)數(shù)據(jù)先做一些處理,再將處理后的結(jié)果傳輸給接收方。這就是的transform stream的作用了,顧名思義,它的輸出流是基于輸入流變動(dòng)得來(lái)的。它本身屬于Deplux類型,可讀可寫。
我們來(lái)看一個(gè)簡(jiǎn)單例子,將前面例子的stdin輸入流轉(zhuǎn)換為大寫后再傳輸給輸出流stdout。
也許你會(huì)想到process.stdin.toUpperCase().pipe.(process.stdout),但這樣行不通的。前面講過(guò),process.stdin的流是一個(gè)Readable stream,是不能被改變的。所以我們需要將readable stream轉(zhuǎn)換為Deplux stream,再將內(nèi)容改成大寫,最后輸出給stdout。
這個(gè)轉(zhuǎn)換流的功能有些復(fù)雜,我們這里借助through2模塊來(lái)做演示。
var through = require('through2');
使用through我們就可以創(chuàng)建一個(gè)Deplux流了:
var stream = through(transformFunction, flushFunction)
這里的transformFunction和flushFunction都是callback函數(shù)。transformFunction用來(lái)對(duì)出入流做處理,該函數(shù)的簽名為:
transform._transform(chunk, encoding, callback)
chunk就是輸入流的數(shù)據(jù)塊,encoding是輸入流的編碼,callback在數(shù)據(jù)處理完成后調(diào)用。每個(gè)chunk處理完成后用push方法來(lái)寫入,并轉(zhuǎn)換為deplux stream進(jìn)行輸出。
flushFunction將在所有輸入流傳輸結(jié)束后調(diào)用,用來(lái)結(jié)束transform stream。
有了這個(gè)模塊的幫助,程序就可以寫成:
var through = require('through2');
var stream = through(write, end);
function write(buffer, encoding, next) {
this.push(buffer.toString().toUpperCase());
next();
}
function end(done) {
done();
}
process.stdin.pipe(stream).pipe(process.stdout);
用write函數(shù)將輸入流逐塊轉(zhuǎn)換為大寫,并用push轉(zhuǎn)換為deplux stream。end則在輸入流結(jié)束后發(fā)出done來(lái)明確結(jié)束transform stream。
下面再介紹一個(gè)很有用的模塊split,可以將輸入流按指定的分隔符分開(kāi)。比如前面的輸入的例子,我們可以按換行符來(lái)分割輸入流,然后再按行輸出:
process.stdin.pipe(split()).pipe(process.stdot);
另外如果我們不想按塊或者按行處理,而是想在一起處理所有的輸入流怎么辦呢?我們需要一個(gè)方法將輸入的數(shù)據(jù)塊塊連接起來(lái),有個(gè)很方便的模塊concat-stream可以完成這個(gè)任務(wù):
var concat = require('concat-stream');
process.stdin.pipe(concat(function (buf) {
console.log(buf);
}));
注意這個(gè)concat的callback的buf參數(shù)不再是一個(gè)stream了,不能再使用pipe傳輸了,所以這里我們console就可以看到完整的輸入了。
Stream與http服務(wù)
在node學(xué)習(xí)筆記里,我們介紹過(guò)http服務(wù)。我們說(shuō)過(guò),利用node核心的http模塊,可以很方便的創(chuàng)建http服務(wù)。http.createServer()的callback帶有兩個(gè)參數(shù),一個(gè)代表用戶請(qǐng)求的request,還有一個(gè)代表響應(yīng)的response。其實(shí)request和response都可以處理stream。比如可以將某個(gè)流輸出到response中:pipe(response)。也可以將用戶請(qǐng)求中附帶的數(shù)據(jù)按流輸入,比如request.pipe()。
這里我們來(lái)看一個(gè)例子,將request數(shù)據(jù)按stream輸入,轉(zhuǎn)換為大寫后再通過(guò)response返回給客戶:
var http = require('http');
var through = require('through2');
var server = http.createServer(function (req, res) {
req.pipe(tr).pipe(res);
});
server.listen(process.argv[2]);
var tr = through(function (buf, _, next) {
var buffer = buf.toString().toUpperCase();
this.push(buffer);
next();
});
內(nèi)容前面都講過(guò),應(yīng)該不陌生了。處理stream也很直觀,只要按req.pipe(tr).pipe(res)就可以按前面提過(guò)的方法處理stream了。這么做的好處是無(wú)需等待所有數(shù)據(jù)傳輸完畢就可以開(kāi)始處理和傳輸,能夠盡快給客戶響應(yīng)。