1. 流的概念
- 流是一組有序的,有起點和終點的字節(jié)數(shù)據(jù)傳輸手段
- 它不關心文件的整體內(nèi)容,只關注是否從文件中讀到了數(shù)據(jù),以及讀到數(shù)據(jù)之后的處理
- 流是一個抽象接口,被 Node 中的很多對象所實現(xiàn)。比如HTTP 服務器request和response對象都是流。
2.可讀流createReadStream
實現(xiàn)了stream.Readable接口的對象,將對象數(shù)據(jù)讀取為流數(shù)據(jù),當監(jiān)聽data事件后,開始發(fā)射數(shù)據(jù)
fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);
2.1 創(chuàng)建可讀流
var rs = fs.createReadStream(path,[options]);
- path讀取文件的路徑
- options
- flags打開文件要做的操作,默認為'r'
- encoding默認為null
- start開始讀取的索引位置
- end結(jié)束讀取的索引位置(包括結(jié)束位置)
- highWaterMark讀取緩存區(qū)默認的大小64kb
如果指定utf8編碼highWaterMark要大于3個字節(jié)
2.2 監(jiān)聽data事件
流切換到流動模式,數(shù)據(jù)會被盡可能快的讀出
rs.on('data', function (data) {
console.log(data);
});
2.3 監(jiān)聽end事件
該事件會在讀完數(shù)據(jù)后被觸發(fā)
rs.on('end', function () {
console.log('讀取完成');
});
2.4 監(jiān)聽error事件
rs.on('error', function (err) {
console.log(err);
});
2.5 監(jiān)聽open事件
rs.on('open', function () {
console.log(err);
});
2.6 監(jiān)聽close事件
rs.on('close', function () {
console.log(err);
});
2.7 設置編碼
與指定{encoding:'utf8'}效果相同,設置編碼
rs.setEncoding('utf8');
2.8 暫停和恢復觸發(fā)data
通過pause()方法和resume()方法
rs.on('data', function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
3.可寫流createWriteStream
實現(xiàn)了stream.Writable接口的對象來將流數(shù)據(jù)寫入到對象中
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
util.inherits(WriteStream, Writable);
3.1 創(chuàng)建可寫流
var ws = fs.createWriteStream(path,[options]);
- path寫入的文件路徑
- options
- flags打開文件要做的操作,默認為'w'
- encoding默認為utf8
- highWaterMark寫入緩存區(qū)的默認大小16kb
3.2 write方法
ws.write(chunk,[encoding],[callback]);
- chunk寫入的數(shù)據(jù)buffer/string
- encoding編碼格式chunk為字符串時有用,可選
- callback 寫入成功后的回調(diào)
返回值為布爾值,系統(tǒng)緩存區(qū)滿時為false,未滿時為true
3.3 end方法
ws.end(chunk,[encoding],[callback]);
表明接下來沒有數(shù)據(jù)要被寫入 Writable 通過傳入可選的 chunk 和 encoding 參數(shù),可以在關閉流之前再寫入一段數(shù)據(jù) 如果傳入了可選的 callback 函數(shù),它將作為 'finish' 事件的回調(diào)函數(shù)
3.4 drain方法
當一個流不處在 drain 的狀態(tài), 對 write() 的調(diào)用會緩存數(shù)據(jù)塊, 并且返回 false。 一旦所有當前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進行輸出), 那么 'drain' 事件就會被觸發(fā)
-
建議, 一旦 write() 返回 false, 在 'drain' 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊
let fs = require('fs'); let ws = fs.createWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); let i = 10; function write(){ let flag = true; while(i&&flag){ flag = ws.write("1"); i--; console.log(flag); } } write(); ws.on('drain',()=>{ console.log("drain"); write(); });
3.5 finish方法
在調(diào)用了 stream.end() 方法,且緩沖區(qū)數(shù)據(jù)都已經(jīng)傳給底層系統(tǒng)之后, 'finish' 事件將被觸發(fā)。
var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('結(jié)束\n');
writer.on('finish', () => {
console.error('所有的寫入已經(jīng)完成!');
});
4.pipe方法
4.1 pipe方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
4.2 pipe用法
readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
將數(shù)據(jù)的滯留量限制到一個可接受的水平,以使得不同速度的來源和目標不會淹沒可用內(nèi)存。
4.3 unpipe用法
readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離
-
如果 destination 沒有傳入, 則所有綁定的流都會被分離.
let fs = require('fs'); var from = fs.createReadStream('./1.txt'); var to = fs.createWriteStream('./2.txt'); from.pipe(to); setTimeout(() => { console.log('關閉向2.txt的寫入'); from.unpipe(writable); console.log('手工關閉文件流'); to.end(); }, 1000);
4.4 cork
調(diào)用 writable.cork() 方法將強制所有寫入數(shù)據(jù)都存放到內(nèi)存中的緩沖區(qū)里。 直到調(diào)用 stream.uncork() 或 stream.end() 方法時,緩沖區(qū)里的數(shù)據(jù)才會被輸出。
4.5 uncork
writable.uncork()將輸出在stream.cork()方法被調(diào)用之后緩沖在內(nèi)存中的所有數(shù)據(jù)。
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
5. 簡單實現(xiàn)
5.1 可讀流的簡單實現(xiàn) [#](#t235.1 可讀流的簡單實現(xiàn))
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
start: 3,
end: 7,
highWaterMark: 3
});
rs.on('open', function () {
console.log("open");
});
rs.on('data', function (data) {
console.log(data);
});
rs.on('end', function () {
console.log("end");
});
rs.on('close', function () {
console.log("close");
});
/**
open
456
789
end
close
**/
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.fd = options.fd;
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.start = options.start || 0;
this.pos = this.start;
this.end = options.end;
this.flowing = false;
this.autoClose = true;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.length = 0;
this.on('newListener', (type, listener) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
});
this.on('end', () => {
if (this.autoClose) {
this.destroy();
}
});
this.open();
}
read() {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd,this.buffer,0,n,this.pos,(err,bytesRead)=>{
if(err){
return;
}
if(bytesRead){
let data = this.buffer.slice(0,bytesRead);
data = this.encoding?data.toString(this.encoding):data;
this.emit('data',data);
this.pos += bytesRead;
if(this.end && this.pos > this.end){
return this.emit('end');
}
if(this.flowing)
this.read();
}else{
this.emit('end');
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit('error', err);
this.fd = fd;
this.emit('open', fd);
})
}
end() {
if (this.autoClose) {
this.destroy();
}
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
module.exports = WriteStream;
?
5.2 可寫流的簡單實現(xiàn)
let fs = require('fs');
let FileWriteStream = require('./FileWriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1",'utf8',(function(i){
return function(){
console.log(i);
}
})(i));
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
/**
10
9
8
drain
7
6
5
drain
4
3
2
drain
1
**/
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
constructor(path, options) {
super(path, options);
this.path = path;
this.fd = options.fd;
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.encoding = options.encoding;
this.start = options.start || 0;
this.pos = this.start;
this.writing = false;
this.autoClose = true;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.buffers = [];
this.length = 0;
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit('error', err);
this.fd = fd;
this.emit('open', fd);
})
}
write(chunk, encoding, cb) {
if (typeof encoding == 'function') {
cb = encoding;
encoding = null;
}
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
let len = chunk.length;
this.length += len;
let ret = this.length < this.highWaterMark;
if (this.writing) {
this.buffers.push({
chunk,
encoding,
cb,
});
} else {
this.writing = true;
this._write(chunk, encoding,this.clearBuffer.bind(this));
}
return ret;
}
_write(chunk, encoding, cb) {
if (typeof this.fd != 'number') {
return this.once('open', () => this._write(chunk, encoding, cb));
}
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
return this.emit('error', err);
}
this.length -= written;
this.pos += written;
cb && cb();
});
}
clearBuffer() {
let data = this.buffers.shift();
if (data) {
this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
} else {
this.writing = false;
this.emit('drain');
}
}
end() {
if (this.autoClose) {
this.emit('end');
this.destroy();
}
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
module.exports = WriteStream;
5.3 pipe [#](#t255.3 pipe)
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
5.4 暫停模式
let fs =require('fs');
let ReadStream2 = require('./ReadStream2');
let rs = new ReadStream2('./1.txt',{
start:3,
end:8,
encoding:'utf8',
highWaterMark:3
});
rs.on('readable',function () {
console.log('readable');
console.log('rs.buffer.length',rs.length);
let d = rs.read(1);
console.log(d);
console.log('rs.buffer.length',rs.length);
setTimeout(()=>{
console.log('rs.buffer.length',rs.length);
},500)
});
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n,10);
if(n != n){
n = this.length;
}
if(this.length ==0)
this.needReadable = true;
let ret;
if (0<n < this.length) {
ret = Buffer.alloc(n);
let b ;
let index = 0;
while(null != (b = this.buffers.shift())){
for(let i=0;i<b.length;i++){
ret[index++] = b[i];
if(index == ret.length){
this.length -= n;
b = b.slice(i+1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if(this.needReadable){
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if(this.needReadable){
this.emit('readable');
this.needReadable = false;
}
}
} else {
if(this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream; /**
-
if (n !== 0) state.emittedReadable = false; 只要要讀的字節(jié)數(shù)不是0就需要觸發(fā)readable事件 如果傳入的NaN,則將n賦為緩區(qū)的長度,第一次就是0
緩存區(qū)為0就開始讀吧 如果n等于0就返回null,state.needReadable = true; 如果緩存區(qū)為0,是 state.needReadable = true; 需要觸發(fā)readable