本文是Node.js設(shè)計模式的筆記, 代碼都是來自 <Node.js Design Patterns> by Mario Casciaro.
流的重要性
一般我們處理數(shù)據(jù)有兩種模式, 一種是buffer模式, 一種是stream模式, buffer模式就是取完數(shù)據(jù)一次性操作, stream模式就是邊取數(shù)據(jù)邊操作.
舉個例子, 如果打開一個2G的文件, 用buffer模式就是先分配2G的內(nèi)存, 把文件全部讀出來, 然后開始操作內(nèi)存, 而用流模式的方法就是邊讀數(shù)據(jù), 邊開始處理.
從這里看出stream模式無論是在空間和時間上都優(yōu)于buffer模式:
在空間上, 內(nèi)存只會占用當前需要處理的一塊數(shù)據(jù)區(qū)域的大小, 而不是整個文件.
在時間上, 因為不需要全部的數(shù)據(jù)就可以開始處理, 時間就相當于節(jié)約了, 從串行變成了并行操作(這里的并行不是多線程的并行, 而是生產(chǎn)者和消費者并行).
還有一個好處就是鏈式調(diào)用, 也就是可組合操作, 大大增加了代碼的可重用性.
比如下面這個代碼(中間的pipe可以很方便的增刪):
fs.createReadStream(file)
.pipe(zlib.createGzip())
//.pipe(crypto.createCipher('aes192', 'secret'))
.pipe(req)
.on('finish', function() {
console.log('File succesfully sent');
});
開始編碼
nodejs里面的stream一般分四種, 其中轉(zhuǎn)換流是一種特殊的讀寫流.
- 輸入流(stream.Readable)
- 輸出流(stream.Writable)
- 讀寫流(stream.Duplex)
- 轉(zhuǎn)換流(stream.Transform)
另外, nodejs里面的流有兩種模式, 二進制模式和對象模式.
- 二進制模式, 每個分塊都是buffer或者string對象.
- 對象模式, 流內(nèi)部處理的是一系列普通對象.
輸入流(stream.Readable)
先看一下怎么使用輸入流, 這里一般有兩種方法, 一個是非流動模式, 一個是流動模式.
非流動模式就是直接調(diào)用read()方法, 被動模式就是監(jiān)聽data事件.
下面直接看代碼:
// 非流動模式
process.stdin
.on('readable', function() {
// 有數(shù)據(jù)到了, 拼命讀, 直到讀完.
var chunk;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
console.log(
'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
);
}})
.on('end', function() {
process.stdout.write('End of stream');
});
接下來看流動模式怎么玩
// 流動模式
process.stdin
.on('data', function(chunk) {
console.log('New data available');
console.log(
'Chunk read: (' + chunk.length + ')" ' +
chunk.toString() + '"'
);
})
.on('end', function() {
process.stdout.write('End of stream');
});
同樣實現(xiàn)一個輸入流也很簡單, 主要是
- 繼承Readable類
- 實現(xiàn)_read(size)接口(一般帶下劃線的表示內(nèi)部函數(shù), 調(diào)用者不要直接調(diào)用, 相當于C++里面的protect方法, 只是javascript里面沒有對方法做區(qū)分, 只能是命名上面區(qū)分一下了).
下面看示例代碼:
// randomStream.js
var stream = require('stream');
var util = require('util');
var chance = require('chance').Chance();
function RandomStream(options) {
// option支持3個參數(shù)
// encoding String 用于轉(zhuǎn)換Buffer到String的編碼類型(默認null)
// objMode Boolean 用戶指定是否是對象模式(默認false)
// highWaterMark Number 最高水位(可讀的最大數(shù)據(jù)量), 默認是16K
stream.Readable.call(this, options);
}
util.inherits(RandomStream, stream.Readable);
RandomStream.prototype._read = function(size) {
// 這是一個隨機產(chǎn)生數(shù)據(jù)的流, 5%的概率輸出null, 也就是流停止.
var chunk = chance.string();
console.log('Pushing chunk of size:' + chunk.length);
this.push(chunk, 'utf8');
if(chance.bool({likelihood: 5})) {
this.push(null);
}
}
module.exports = RandomStream;
好, 接下來是如何使用:
// generateRandom.js
var RandomStream = require('./randomStream');
var randomStream = new RandomStream();
randomStream.on('readable', function() {
var chunk;
while((chunk = randomStream.read()) !== null) {
console.log("Chunk received: " + chunk.toString());
}
});
輸出流(stream.Writable)
先是怎么用:
// 寫數(shù)據(jù)
writable.write(chunk, [encoding], [callback]);
// 結(jié)束流
writable.end([chunk], [encoding], [callback])
回壓(back-pressure)
這里涉及一個概念, 回壓(back-pressure), 意思就是當生產(chǎn)者速度大于消費者的時候, 輸出流的水位會不斷上升, 當?shù)竭_設(shè)定的最高水位時候, 就會寫入失敗, 這時候也就是產(chǎn)生了back-pressure, 那如何處理呢, 此時輸入流在水位降低到零點的時候會有一個drain事件發(fā)送, 只要監(jiān)聽這個事件, 在事件發(fā)生的時候就可以繼續(xù)向流寫入數(shù)據(jù)了.
直接看代碼:
var chance = require('chance').Chance();
require('http').createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
function generateMore() { //[1]
while(chance.bool({likelihood: 95})) {
var shouldContinue = res.write(
chance.string({length: (16 * 1024) – 1})
);
if(!shouldContinue) { //[3]
console.log('Backpressure');
return res.once('drain', generateMore);
}
}
res.end('\nThe end...\n', function() {
console.log('All data was sent');
});
}
generateMore();
}).listen(8080, function () {
console.log('Listening');
});
同樣, 實現(xiàn)一個輸出流也很簡單, 只要繼承Writable類, 實現(xiàn)_write()接口即可.
示例代碼:
// toFileStream.js
var stream = require('stream');
var fs = require('fs');
var util = require('util');
var path = require('path');
var mkdirp = require('mkdirp');
function ToFileStream() {
// 這次我們用對象模式
stream.Writable.call(this, {objectMode: true});
};
util.inherits(ToFileStream, stream.Writable);
ToFileStream.prototype._write = function(chunk, encoding, callback) {
var self = this;
mkdirp(path.dirname(chunk.path), function(err) {
if(err) {
return callback(err);
}
fs.writeFile(chunk.path, chunk.content, callback);
});
}
module.exports = ToFileStream;
下面是調(diào)用的代碼
var ToFileStream = require('./toFileStream');
var tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(function() {
console.log("All files created");
});
讀寫流(stream.Duplex)
就是把輸入流和輸出流的接口都實現(xiàn)了.
注意:
此時option參數(shù)是同時傳給了內(nèi)部的Readable和Writeable, 如果要使用不同的選項, 就要分開配置,
像這樣:
this._writableState.objectMode
this._readableState.objectMode
同時, Duplex又多了一個選項allowHalfOpen, 這個選項的意思是, 當其中一個流關(guān)閉的時候, 另外一條流是否也同時關(guān)閉, 默認是true, 也就是不同時關(guān)閉.
轉(zhuǎn)換流(stream.Transform)
對于讀寫流來說, 要實現(xiàn)的是 _read() 和 _write() 接口, 而轉(zhuǎn)換流要實現(xiàn)的是 _transform() 和 _flush()接口.
區(qū)別是什么, 轉(zhuǎn)換流一般在transform的過程中把讀寫都做了, 也就是在處理輸入的時候, 直接就輸出了. 最后在輸入結(jié)束的時候_flush() 會被調(diào)用, 就可以把剩余的內(nèi)部數(shù)據(jù)一并輸出了.
示例代碼:
// 這代碼寫的很漂亮, 解決的問題是在流中操作替換操作
// 其中替換的部分可以仔細看一下, stream和buffer一個很大的區(qū)別就是stream會被切割
// 導致要替換的數(shù)據(jù)也有可能被切割, 這個例子就提供了一種解決方法,
// 這個在后續(xù)實踐中肯定也會遇到的.
var stream = require('stream');
var util = require('util');
function ReplaceStream(searchString, replaceString) {
stream.Transform.call(this, {decodeStrings: false});
this.searchString = searchString;
this.replaceString = replaceString;
this.tailPiece = '';
}
util.inherits(ReplaceStream, stream.Transform);
ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
var pieces = (this.tailPiece + chunk).split(this.searchString);
var lastPiece = pieces[pieces.length - 1];
var tailPieceLen = this.searchString.length - 1;
this.tailPiece = lastPiece.slice(-tailPieceLen);
pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
this.push(pieces.join(this.replaceString)); //[3]
callback();
}
ReplaceStream.prototype._flush = function(callback) {
this.push(this.tailPiece);
callback();
}
module.exports = ReplaceStream;
幾個流操作相關(guān)的有用包
程序員就是懶, 有這幾個包就可以少寫一些代碼了.
- readable-stream, 統(tǒng)一了nodejs 實現(xiàn)的不同版本stream接口
- [through2](https://npmjs.org/ package/through2), 用于快速創(chuàng)建轉(zhuǎn)化流
- from2, 用于快速創(chuàng)建輸入流
- writable2, 用于快速創(chuàng)建輸出流