1.流是什么?
在node.js官網(wǎng)文檔中是這么說的
流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來構(gòu)建實(shí)現(xiàn)流接口的對象。
Node.js 提供了多種流對象。 例如, HTTP 請求 和 process.stdout 就都是流的實(shí)例。
流可以是可讀的、可寫的,或是可讀寫的。所有的流都是 EventEmitter 的實(shí)例。
本人最近研究node,特意記下,分享一下。
言歸正傳
1. Node.js 中有四種基本的流類型:
· Readable - 可讀的流 (例如 fs.createReadStream()).
· Writable - 可寫的流 (例如 fs.createWriteStream()).
· Duplex - 可讀寫的流 (例如 net.Socket).
· Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).
那么今天的主角就是可讀流(createReadStream),后續(xù)會陸續(xù)介紹其他3種流
2.可讀流(createReadStream)
實(shí)現(xiàn)了stream.Readable接口的對象,將對象數(shù)據(jù)讀取為流數(shù)據(jù),當(dāng)監(jiān)聽data事件后,開始發(fā)射數(shù)據(jù)
2.1 創(chuàng)建可讀流
var rs = fs.createReadStream(path,[options]);
ps:如果指定utf8編碼highWaterMark要大于3個(gè)字節(jié)
2.2 監(jiān)聽data事件
流切換到流動模式,數(shù)據(jù)會被盡可能快的讀出
rs.on('data', function (data) {
console.log(data);
});
2.3 監(jiān)聽end事件
該事件會在讀完數(shù)據(jù)后被觸發(fā)
rs.on('end', function () {
console.log('讀取完成');
})
2.4 監(jiān)聽error事件
rs.on('error', function (err) {
console.log(err);
});
2.5 監(jiān)聽open事件
rs.on('open', function () {
console.log(err);
});
2.6 監(jiān)聽close事件
rs.on('close', function () {
console.log(err);
});
2.7 設(shè)置編碼
與指定{encoding:'utf8'}效果相同,設(shè)置編碼
rs.setEncoding('utf8');
2.8 暫停和恢復(fù)觸發(fā)data
通過pause()方法和resume()方法
rs.on('data', function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
3. 流中的數(shù)據(jù)有兩種模式,二進(jìn)制模式和對象模式
· 二進(jìn)制模式, 每個(gè)分塊都是buffer或者string對象
· 對象模式, 流內(nèi)部處理的是一系列普通對象.
4.可讀流的兩種模式
可讀流事實(shí)上工作在下面兩種模式之一:flowing 和 paused
在 flowing 模式下, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù),并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。
在 paused 模式下,必須顯式調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段。
所有初始工作模式為 paused 的 Readable 流
如果 Readable 切換到 flowing 模式,且沒有消費(fèi)者處理流中的數(shù)據(jù),這些數(shù)據(jù)將會丟失。 比如, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 'data' 事件,或是取消了 'data' 事件監(jiān)聽,就有可能出現(xiàn)這種情況
下面通過代碼簡單模擬下實(shí)現(xiàn)的邏輯:
如圖:

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//這里用到的是es6的原型繼承
constructor(path, options) {
super(path, options);
this.path = path;//讀取文件的路徑
this.highWaterMark = options.highWaterMark || 64 * 1024;//讀取的最高水位線
this.buffer = Buffer.alloc(this.highWaterMark);//臨時(shí)存放數(shù)據(jù)的容器
this.flags = options.flags || 'r';//打開文件要做的操作,默認(rèn)為'r'
this.encoding = options.encoding;//編碼
this.mode = options.mode || 0o666;//讀取文件的權(quán)限
this.start = options.start || 0;//開始讀取的索引位置
this.end = options.end;//結(jié)束讀取的索引位置(包括結(jié)束位置)
this.pos = this.start;
this.autoClose = options.autoClose || true;////當(dāng)流讀完之后自動關(guān)閉文件
this.bytesRead = 0;//實(shí)際讀到的字節(jié)數(shù)
this.closed = false;
this.flowing;////初始狀態(tài)為暫停模式 為true時(shí)為流動模式, 為false時(shí)為暫停模式
this.needReadable = false;//是否需要發(fā)送readable事件
this.length = 0;//緩存區(qū)的總長度
this.buffers = [];//這才是真正的緩存區(qū)
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
//當(dāng)給這個(gè)實(shí)例添加了任意的監(jiān)聽函數(shù)時(shí)會觸發(fā)newListener
this.on('newListener', (type) => {
//如果監(jiān)聽了data事件,流會自動切換的流動模式
//流動模式不緩存,直接發(fā)射,然后讀取下次的數(shù)據(jù)。如果你用流動模式而且沒有消費(fèi),數(shù)據(jù)就白白丟失了
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
//打開文件
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)//重新填充
this.needReadable = true;
let ret;
if (0 < n < this.length) {//緩存區(qū)的數(shù)據(jù)足夠用,并且要讀取的字節(jié)大于0
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {//填充完畢
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
//底層讀數(shù)據(jù)的方法
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {//實(shí)際讀到的字節(jié)數(shù)
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
//讓緩存區(qū)的數(shù)量加實(shí)際讀到的字節(jié)數(shù)
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
//銷毀流
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
//暫停
pause() {
this.flowing = false;
}
//恢復(fù)
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
readable時(shí)其實(shí)會立刻填充緩存區(qū),當(dāng)你消費(fèi)掉一個(gè)字節(jié)后,一旦發(fā)現(xiàn)緩沖區(qū)的字節(jié)數(shù)小于最高水位線了,則會立馬讀到最高水位線highWaterMark(n)個(gè)字節(jié)填充到緩存區(qū)
read(n)是可讀流的核心方法,那么方法中傳參數(shù)跟不傳,傳幾個(gè)是有講究的:
n = undefined :即n不傳參數(shù),如果處于流動模式,并且緩存區(qū)大小不為空,則返回緩存區(qū)第一個(gè)buffer的長度,否則讀取整個(gè)緩存 如果讀到了數(shù)據(jù)沒有返回值,但是會發(fā)射data事件,數(shù)據(jù)也能取到,也就是用來清空緩存區(qū)。也不需要向底層_read()請求數(shù)據(jù)
n=0:只是填充緩存區(qū),并不真正讀取
0 < n < highWaterMark(n):可讀流會從緩存區(qū)取出對應(yīng)大小的數(shù)據(jù),并且作為 read 方法返回值返回。同時(shí)判斷此時(shí)緩存區(qū)大小是否小于highWaterMark(n),如果小于,那么將執(zhí)行底層的 _read 方法,從數(shù)據(jù)源中讀取highWaterMark(n)大小的數(shù)據(jù)填充到緩存區(qū)內(nèi)。
n>highWaterMark(n):如果緩存中數(shù)據(jù)不夠,便會調(diào)用_read方法去底層取數(shù)據(jù)。
尾聲:
礙于篇幅和時(shí)間限制,就不再繼續(xù)介紹下去。本文僅介紹了可讀流的基本使用方法,如果覺得有一些地方覺得不太深入,可以去看一下官方文檔,官方文檔上面有更加全面和詳細(xì)的介紹。
參考資料:
stream | Node.js API 文檔nodejs.cn