node中的stream(前篇)

Stream在node中是一個(gè)無(wú)處不在的概念,但凡和IO沾邊的程序都離不開(kāi)stream,所以不弄懂stream是無(wú)法真正使用node的。在這份簡(jiǎn)短的筆記中,讓我們來(lái)一起看看這個(gè)融入node血肉中的stream到底是什么,以及該如何正確使用stream吧。

Stream這個(gè)概念一點(diǎn)都不新鮮,早在unix早期就被用于命令行中了。在當(dāng)時(shí),計(jì)算機(jī)內(nèi)部帶寬非常有限,輸入輸出都只能按字節(jié)碼順序進(jìn)行傳輸,這也就是最早的stream的概念。后來(lái)stream這個(gè)概念被發(fā)揚(yáng)廣大,幾乎所有IO數(shù)據(jù)都按stream的形式,通過(guò)pipe(管道)進(jìn)行傳輸,甚至連進(jìn)程間通信都用到了stream和pipe的概念。

在node中,stream作為一個(gè)各種可讀寫的對(duì)象的抽象層,方便我們用統(tǒng)一的方法對(duì)各種對(duì)象進(jìn)行操作。常見(jiàn)的http request response,標(biāo)準(zhǔn)輸出流stdout和輸入流stdin都可以用stream來(lái)進(jìn)行處理。

node中的stream分為Readable(可讀的)、Writable(可寫)或是Duplex兩者兼有的三大類,讓我們依次了解吧。

stream.Readable

Readable stream是對(duì)所有可讀數(shù)據(jù)源的抽象。node中的Readable stream有兩個(gè)模式:flowing和paused。flowing指的是數(shù)據(jù)流不停歇地流入,一有數(shù)據(jù)就立刻傳輸,以便盡快將數(shù)據(jù)傳輸給出去;而paused模式則每次只傳輸一部分?jǐn)?shù)據(jù),讀取了一部分?jǐn)?shù)據(jù)后再傳輸這部分?jǐn)?shù)據(jù),按塊傳輸數(shù)據(jù)直到傳輸結(jié)束。

前面說(shuō)過(guò),stream是通過(guò)pipe(管道)傳輸?shù)?。readable的pipe方法非常直觀:

readable.pipe(destination[, options])

destination就是數(shù)據(jù)傳輸?shù)哪繕?biāo)。options接收一個(gè)end參數(shù),該參數(shù)為true時(shí),只要接收方明確停止數(shù)據(jù)傳輸,數(shù)據(jù)流就立即停止。

在node學(xué)習(xí)筆記中我們見(jiàn)過(guò)的Readable stream包括:

  • 客戶端的http response
  • 服務(wù)器端的http request
  • fs模塊中的read streams
  • tcp中的socket
  • process的stdin和stdout方法

下面讓我們來(lái)看一個(gè)fs的例子。fs有一個(gè)createReadStream方法,可以按stream的方式讀取文件。我們就用這個(gè)方法來(lái)讀取命令行傳入的指定路徑的文件,然后用pipe(管道)傳輸給stdout:

var fs = require('fs');
fs.createReadStream(process.argv[2]).pipe(process.stdout);

另外一個(gè)簡(jiǎn)單的例子,stdin產(chǎn)生的流傳輸給stdout:

process.stdin.pipe(process.stdout);

如果你用node執(zhí)行這個(gè)程序,就會(huì)看到每輸入一行回車后,屏幕(stdout)就會(huì)重復(fù)這一行的內(nèi)容。

Transform Stream

如果只是簡(jiǎn)單的將數(shù)據(jù)從一端傳到另一端,那么流的也未免太簡(jiǎn)單了。事實(shí)上,stream是非常靈活的一種數(shù)據(jù)傳輸手段。現(xiàn)在我們就來(lái)看看transform stream是如何實(shí)現(xiàn)這種靈活性的。

許多時(shí)候,我們希望對(duì)數(shù)據(jù)先做一些處理,再將處理后的結(jié)果傳輸給接收方。這就是的transform stream的作用了,顧名思義,它的輸出流是基于輸入流變動(dòng)得來(lái)的。它本身屬于Deplux類型,可讀可寫。

我們來(lái)看一個(gè)簡(jiǎn)單例子,將前面例子的stdin輸入流轉(zhuǎn)換為大寫后再傳輸給輸出流stdout。

也許你會(huì)想到process.stdin.toUpperCase().pipe.(process.stdout),但這樣行不通的。前面講過(guò),process.stdin的流是一個(gè)Readable stream,是不能被改變的。所以我們需要將readable stream轉(zhuǎn)換為Deplux stream,再將內(nèi)容改成大寫,最后輸出給stdout。

這個(gè)轉(zhuǎn)換流的功能有些復(fù)雜,我們這里借助through2模塊來(lái)做演示。

var through = require('through2');

使用through我們就可以創(chuàng)建一個(gè)Deplux流了:

var stream = through(transformFunction, flushFunction)

這里的transformFunction和flushFunction都是callback函數(shù)。transformFunction用來(lái)對(duì)出入流做處理,該函數(shù)的簽名為:

transform._transform(chunk, encoding, callback)

chunk就是輸入流的數(shù)據(jù)塊,encoding是輸入流的編碼,callback在數(shù)據(jù)處理完成后調(diào)用。每個(gè)chunk處理完成后用push方法來(lái)寫入,并轉(zhuǎn)換為deplux stream進(jìn)行輸出。

flushFunction將在所有輸入流傳輸結(jié)束后調(diào)用,用來(lái)結(jié)束transform stream。

有了這個(gè)模塊的幫助,程序就可以寫成:

var through = require('through2');
var stream = through(write, end);

function write(buffer, encoding, next) {
    this.push(buffer.toString().toUpperCase());
    next();
}

function end(done) {
    done();
}
process.stdin.pipe(stream).pipe(process.stdout);

用write函數(shù)將輸入流逐塊轉(zhuǎn)換為大寫,并用push轉(zhuǎn)換為deplux stream。end則在輸入流結(jié)束后發(fā)出done來(lái)明確結(jié)束transform stream。

下面再介紹一個(gè)很有用的模塊split,可以將輸入流按指定的分隔符分開(kāi)。比如前面的輸入的例子,我們可以按換行符來(lái)分割輸入流,然后再按行輸出:

process.stdin.pipe(split()).pipe(process.stdot);

另外如果我們不想按塊或者按行處理,而是想在一起處理所有的輸入流怎么辦呢?我們需要一個(gè)方法將輸入的數(shù)據(jù)塊塊連接起來(lái),有個(gè)很方便的模塊concat-stream可以完成這個(gè)任務(wù):

var concat = require('concat-stream');
process.stdin.pipe(concat(function (buf) {
    console.log(buf);
}));

注意這個(gè)concat的callback的buf參數(shù)不再是一個(gè)stream了,不能再使用pipe傳輸了,所以這里我們console就可以看到完整的輸入了。

Stream與http服務(wù)

在node學(xué)習(xí)筆記里,我們介紹過(guò)http服務(wù)。我們說(shuō)過(guò),利用node核心的http模塊,可以很方便的創(chuàng)建http服務(wù)。http.createServer()的callback帶有兩個(gè)參數(shù),一個(gè)代表用戶請(qǐng)求的request,還有一個(gè)代表響應(yīng)的response。其實(shí)request和response都可以處理stream。比如可以將某個(gè)流輸出到response中:pipe(response)。也可以將用戶請(qǐng)求中附帶的數(shù)據(jù)按流輸入,比如request.pipe()。

這里我們來(lái)看一個(gè)例子,將request數(shù)據(jù)按stream輸入,轉(zhuǎn)換為大寫后再通過(guò)response返回給客戶:

var http = require('http');
var through = require('through2');

var server = http.createServer(function (req, res) {
  req.pipe(tr).pipe(res);
});

server.listen(process.argv[2]);

var tr = through(function (buf, _, next) {
  var buffer = buf.toString().toUpperCase();
  this.push(buffer);
  next();
});

內(nèi)容前面都講過(guò),應(yīng)該不陌生了。處理stream也很直觀,只要按req.pipe(tr).pipe(res)就可以按前面提過(guò)的方法處理stream了。這么做的好處是無(wú)需等待所有數(shù)據(jù)傳輸完畢就可以開(kāi)始處理和傳輸,能夠盡快給客戶響應(yīng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • stream 流是一個(gè)抽象接口,在 Node 里被不同的對(duì)象實(shí)現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,521評(píng)論 1 10
  • 流是Node中最重要的組件和模式之一。在社區(qū)里有一句格言說(shuō):讓一切事務(wù)流動(dòng)起來(lái)。這已經(jīng)足夠來(lái)描述在Node中流...
    宮若石閱讀 673評(píng)論 0 0
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,538評(píng)論 19 139
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測(cè)試 ...
    KeKeMars閱讀 6,603評(píng)論 0 6
  • Node.js是目前非常火熱的技術(shù),但是它的誕生經(jīng)歷卻很奇特。 眾所周知,在Netscape設(shè)計(jì)出JavaScri...
    w_zhuan閱讀 3,731評(píng)論 2 41

友情鏈接更多精彩內(nèi)容