曾經(jīng)學(xué)C++的STL中的IOStream,輸入輸出流,看個(gè)代碼
using namespace std;
cout<<x;
眼角濕潤(rùn)了,這是大學(xué)的記憶啊,大學(xué)時(shí)我們幸苦的學(xué)習(xí)C++,為了指針的釋放和指針的指針搞的焦頭爛額,更何況記憶中不僅有代碼,還有我的青春和她。算了,搬磚的腰又酸了,還是回到現(xiàn)實(shí),看看node.js中的流吧。
啥是流啊。。。
流顧名思義就是流水的意思,stream英文也是溪流。如果把二進(jìn)制數(shù)據(jù)從一個(gè)地方很源源不斷的送到另一個(gè)地方,像水流一樣的功能,就叫流。
A stream is an abstract interface implemented by various objects in Node.js. For example a request to an HTTP server is a stream, as is process.stdout
.Streams are readable, writable, or both. All streams are instances of EventEmitter.
愿意啃文檔的兄弟可以看stream
stream的例子
因?yàn)閚ode.js非常善于處理數(shù)據(jù)(這里數(shù)據(jù)的可能是服務(wù)器的網(wǎng)頁(yè),或者返回的json數(shù)據(jù),或者任何東西),所以我們來看看一些例子,說明stream對(duì)服務(wù)器的重要作用。node.js里面很多類都是繼承了流的接口。
創(chuàng)建一個(gè)echo服務(wù)
echo是回聲的意思,我們對(duì)著大山喊話,回聽到自己喊的聲音,這邊我們做個(gè)服務(wù)器干這個(gè)無(wú)聊的事情吧。
var http = require('http');
http.createServer(function(request, response) {
response.writeHead(200);
request.pipe(response);
}).listen(8080);
運(yùn)行以后調(diào)用curl -d 'hello' http://localhost:8080。簡(jiǎn)直不敢相信服務(wù)器這么簡(jiǎn)單就寫好了,這就是node.js的魅力吧。
上面的pipe就是管道的意思,和linux的命令行|一個(gè)意思,大家應(yīng)該熟悉命令行的管道吧,概念都是相通的。大家應(yīng)該知道這個(gè)

就是基于stream來做的。
上傳文件
我們?cè)诳匆粋€(gè)上傳文件的例子。
var http = require('http');
var fs = require('fs');
http.createServer(function(request, response) {
var newFile = fs.createWriteStream("copy" + new Date() + ".md");
var fileBytes = request.headers['content-length'];
var uploadedBytes = 0;
response.write("server receive request\n");
request.pipe(newFile);
request.on('readable', function() {
var chunk = null;
response.write("progress: start\n");
while (null !== (chunk = request.read())) {
uploadedBytes += chunk.length;
var progress = (uploadedBytes / fileBytes) * 100;
response.write("progress: " + parseInt(progress, 10) + "%\n");
}
});
request.on('end', function() {
response.end('uploaded!\n');
});
}).listen(8080);
//curl --upload-file uploadFiles.js http://localhost:8080

這里的看點(diǎn)是
- 如何返回進(jìn)度的:
request.on('readable', function() {,有沒有覺得這種異步I/O方式的優(yōu)點(diǎn)。 - 如何保存文件
request.pipe(newFile);,是不是很方便。
流的實(shí)現(xiàn)
上面我們看到流的結(jié)構(gòu)的簡(jiǎn)單易用,現(xiàn)在我們看看node.js的流是怎么設(shè)計(jì)的。
To implement any sort of stream, the pattern is the same:
- Extend the appropriate parent class in your own subclass. (Theutil.inherits() method is particularly helpful for this.)
- Call the appropriate parent class constructor in your constructor, to be sure that the internal mechanisms are set up properly.
- Implement one or more specific methods, as detailed below.
The class to extend and the method(s) to implement depend on the sort of stream class you are writing:

翻譯一下流實(shí)現(xiàn)的過程:
- 繼承合適的class
- 不要忘記調(diào)用基類構(gòu)造函數(shù)
- 重寫基類方法
數(shù)數(shù)的可讀流
看一個(gè)例子就清楚了,下面這段程序就是數(shù)數(shù),1數(shù)到1000000。
const Readable = require('stream').Readable;
const util = require('util');
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000000;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
if (i > this._max)
this.push(null);
else {
var str = '' + i;
var buf = new Buffer(str, 'ascii');
this.push(buf);
}
};
///////////////////////////////////////////////////////////
//test
var fs = require('fs');
var newFile = fs.createWriteStream("test_counter.txt");
var myCounter = new Counter();
myCounter.pipe(newFile);
上面的Counter完成了三部曲,測(cè)試程序把這個(gè)conter輸出到文件。如果我們想自己實(shí)現(xiàn)一個(gè)流,這樣就可以了。如果上面例子太簡(jiǎn)單了,我們看一下復(fù)雜點(diǎn)的例子,比如transform
啥是transform流
Transform streams are Duplex streams where the output is in some way computed from the input. They implement both the Readable and Writable interfaces.
Examples of Transform streams include:
zlib streams
crypto streams
翻譯一下就是用來把輸入流變化一下,再輸出。比如壓縮,加密等。
const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');
inp.pipe(gzip).pipe(out);
實(shí)現(xiàn)transform流
這個(gè)例子解析一個(gè)數(shù)據(jù),產(chǎn)生一個(gè)readable stream,這個(gè)stream是經(jīng)過變換的哦。
-
解析的格式:有兩個(gè)換行符的數(shù)據(jù)流,換行符前面是頭,后面是內(nèi)容
格式 - 解析的過程中發(fā)出一個(gè)事件header,用來顯示頭部信息
- 最后去掉頭部,保留內(nèi)容信息
現(xiàn)在來看一下代碼吧。
const util = require('util');
const Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);
function SimpleProtocol(options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options);
Transform.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// and let them know that we are done parsing the header.
this.emit('header', this.header);
// now, because we got some extra data, emit this first.
this.push(chunk.slice(split));
}
} else {
// from there on, just provide the data to our consumer as-is.
this.push(chunk);
}
done();
};
// Usage:
var fs = require('fs');
const source = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');
var parser = new SimpleProtocol();
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
source.pipe(parser).pipe(out);
parser.on('header',function(header){
console.log(header);
});
雖然代碼長(zhǎng)了點(diǎn),但是有注釋,我就不解釋了,注意最后如何使用的哦??纯催\(yùn)行的結(jié)果吧。

流就介紹到這里了,如果還意猶未盡,可以看看node的源碼node in github或者文檔stream
