2018-04-03

Stream初探

一:stream.Readable & stream.Writable

1:模擬實(shí)現(xiàn) stream.Readable

1) flowing模式的實(shí)現(xiàn)
let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        
        // 初始化參數(shù)
        this.path = path;
        this.flags = options.flags || 'r';
        this.mode = options.mode || 0o666;
        this.pos = this.start = options.start || 0;
        this.end = options.end;
        this.encoding = options.encoding;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        
        
        this.flowing = null;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open()
        this.on('newListener', (type, listener) => {
            if (type == 'data') {//on('data')觸發(fā)read操作
                this.flowing = true;
                this.read();
            }
        });
    }
    
    read() {
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this.read());
        }
        let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytes) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            if (bytes) {
                let data = this.buffer.slice(0, bytes);
                data = this.encoding ? data.toString(this.encoding) : data;
                this.emit('data', data);
                
                this.pos += bytes;
                
                if (this.end && this.pos > this.end) {
                    return this.endFn();
                } else {
                    if (this.flowing) {
                        this.read();
                    }
                }
            } else {
                return this.endFn();
            }
            
        })
    }
    
    endFn() {
        this.emit('end');
        this.destroy();
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');// 容錯異步操作
        })
    }
    
    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        });
    }
    
    pipe(dest) {
        this.on('data', data => {
            let flag = dest.write(data);
            if (!flag) {
                this.pause();
            }
        });
        dest.on('drain', () => {
            this.resume();
        });
    }
    
    pause() {
        this.flowing = false;
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
}

module.exports = ReadStream;

flowing模式的實(shí)現(xiàn)邏輯比價(jià)簡單,flowing模式的測試代碼如下:

let fs = require('fs');
fs.createReadStream();
require('stream');
let ReadStream = require('./ReadStream');
let rs = new ReadStream('1.txt',{
   highWaterMark:3,
    encoding:'utf8'
});

rs.on('readable',function () {
    console.log(rs.length);//3
    console.log(rs.read(1));//讀了1個字節(jié)
    console.log(rs.length);//2
    setTimeout(()=>{
        console.log(rs.length);//又向緩存區(qū)里加入了highWaterMark個字節(jié)
    },500)
});
2) paused模式的實(shí)現(xiàn)
let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        // 初始化參數(shù)
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');// 容錯異步
        });
    }
    
    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {// 從緩存區(qū)中讀取(shift)數(shù)據(jù)
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);//把沒有取完的Buffer再放回緩存區(qū)
                        break;
                    }
                }
            }
            ret = ret.toString(this.encoding);
        }
        
        let _read = () => {// 把讀取到的數(shù)據(jù)push到緩存區(qū)中
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }
                        
                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }
                        
                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read();
        }
        return ret;
    }
    
    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }
    
    pause() {
        this.flowing = false;
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}

module.exports = ReadStream;

read方法;

  • 在調(diào)用完_read()后,read(n)會試著從緩存中取數(shù)據(jù)。

  • 如果_read()是異步調(diào)用push方法的,則此時緩存中的數(shù)據(jù)量不會增多,容易出現(xiàn)數(shù)據(jù)量不夠的現(xiàn)象。

  • 如果read(n)的返回值為null,說明這次未能從緩存中取出所需量的數(shù)據(jù)。此時,消耗方需要等待新的數(shù)據(jù)到達(dá)后再次嘗試調(diào)用read方法。

  • 在此種情況下,push方法如果立即輸出數(shù)據(jù),接收方直接監(jiān)聽data事件即可,否則數(shù)據(jù)被添加到緩存中,需要觸發(fā)readable事件,消耗方必須監(jiān)聽這個readable事件,再調(diào)用read方法取得數(shù)據(jù)。

  • 在數(shù)據(jù)到達(dá)后,流是通過readable事件來通知消耗方的。

  • 另外,流中維護(hù)了一個緩存,當(dāng)緩存中的數(shù)據(jù)足夠多時,調(diào)用read()不會引起_read()的調(diào)用,即不需要向底層請求數(shù)據(jù)。state.highWaterMark是給緩存大小設(shè)置的一個上限閾值。如果取走n個數(shù)據(jù)后,緩存中保有的數(shù)據(jù)不足這個量,便會從底層取一次數(shù)據(jù)。:

paused模式實(shí)現(xiàn)的邏輯相對比較復(fù)雜,下圖為read方法的邏輯圖,可以參考一下:

[圖片上傳失敗...(image-5f7160-1522734407319)]

paused模式的測試代碼如下:

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = new ReadStream('1.txt', {
    highWaterMark: 3,
    encoding: 'utf8'
});

rs.on('readable', function () {
    console.log(rs.length);// 3 當(dāng)前緩存區(qū)的長度
    console.log('char', rs.read(1));
    console.log(rs.length);// 2 當(dāng)你消耗掉一個字節(jié)之后,緩存區(qū)變成2個字節(jié)了
    
    //一旦發(fā)現(xiàn)緩沖區(qū)的字節(jié)數(shù)小于最高水位線了,則會現(xiàn)再讀到最高水位線個字節(jié)填充到緩存區(qū)里
    setTimeout(() => {
        console.log(rs.length);//5
    }, 500)
});

2 模擬實(shí)現(xiàn) stream.Writable
let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.pos = this.start;
        this.encoding = options.encoding || 'utf8';
        this.autoClose = options.autoClose;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        
        this.buffers = [];//緩存區(qū)
        this.writing = false;//表示內(nèi)部正在寫入數(shù)據(jù)
        this.length = 0;//表示緩存區(qū)字節(jié)的長度
        this.open();
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.fd = fd;
            this.emit('open');
        });
    }
    
    
    write(chunk, encoding, cb) {
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding);
        let len = chunk.length;
        
        this.length += len;//緩存區(qū)的長度加上當(dāng)前寫入的長度
        
        let ret = this.length < this.highWaterMark; //判斷當(dāng)前最新的緩存區(qū)是否小于最高水位線
        if (this.writing) {//表示正在向底層寫數(shù)據(jù),當(dāng)前數(shù)據(jù)必須放在緩存區(qū)里
            this.buffers.push({
                chunk,
                encoding,
                cb
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding, () => this.clearBuffer()); //在底層寫完當(dāng)前數(shù)據(jù)后要清空緩存區(qū)
        }
        return ret;
    }
    
    clearBuffer() {
        let data = this.buffers.shift();
        if (data) {
            this._write(data.chunk, data.encoding, () => this.clearBuffer())
        } else {
            //緩存區(qū)清空的時候,發(fā)射'drain'事件
            this.writing = false;
            this.emit('drain');
        }
    }
    
    _write(chunk, encoding, cb) {
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._write(chunk, encoding, cb));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    this.emit('error', err);
                }
            }
            this.pos += bytesWritten;
            
            this.length -= bytesWritten;
            
            cb && cb();
        })
    }
    
    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}

module.exports = WriteStream;

Writable實(shí)現(xiàn)的邏輯圖如下:

[圖片上傳失敗...(image-33156a-1522734407319)]

第一次請求源的數(shù)據(jù)時首先會先調(diào)用底層的寫入方法,再次請求源數(shù)據(jù)時如果此時底層正在寫數(shù)據(jù)的時候會把請求到的數(shù)據(jù)放到緩存區(qū)里面,底層的寫入方法寫完之后會從緩存區(qū)里拉取數(shù)據(jù)寫入。另外數(shù)據(jù)放到緩存區(qū)時,如果緩存區(qū)里的大小大于或等于highWaterMark時,會觸發(fā)'drain'事件停止繼續(xù)寫入。

Writable的測試代碼,如下:


let fs = require('fs');
let WriteStream = require('./WriteStream');
let ws = new WriteStream('./1.txt', {
    flags: 'w',
    mode: 0o666,
    start: 0,
    encoding: 'utf8',
    autoClose: true,
    highWaterMark: 3
});
let n = 9;
ws.on('error', (err) => {
    console.log(err);
});

function write() {
    let flag = true;
    while (flag && n > 0) {
        flag = ws.write(n + "", 'utf8', () => {
            console.log('ok')
        });
        n--;
        console.log('flag=', flag);
    }
}

ws.on('drain', () => {
    console.log('drain');
    write();
});
write();

3)stream.Readable機(jī)制

我們先來理清一下通過Readable讀取數(shù)據(jù)的機(jī)制,如下圖中:

[圖片上傳失敗...(image-99faa5-1522734407319)]

先來分析一下:

用Readable創(chuàng)建對象readable后,便得到了一個可讀流。
如果實(shí)現(xiàn)_read方法,就將流連接到一個底層數(shù)據(jù)源。
流通過調(diào)用_read向底層請求數(shù)據(jù),底層再調(diào)用流的push方法將需要的數(shù)據(jù)傳遞過來。
當(dāng)readable連接了數(shù)據(jù)源后,下游便可以調(diào)用readable.read(n)向流請求數(shù)據(jù),同時監(jiān)聽readable的data事件來接收取到的數(shù)據(jù)。

let {Readable} = require('stream');
let index = 3;
let rs = new Readable({
    read() {//實(shí)現(xiàn)_read方法(源碼里會將read處理為_read)
        if (index > 0) {
            this.push(index-- + '');
        } else {
            this.push(null);
        }
    }
});

rs.on('data', (data) => {
    console.log(data.toString());//3 2 1
});

push()的作用:
消耗方調(diào)用read(n)促使流輸出數(shù)據(jù),而流通過_read()使底層調(diào)用push方法將數(shù)據(jù)傳給流。
如果流在流動模式下(state.flowing為true)輸出數(shù)據(jù),數(shù)據(jù)會自發(fā)地通過data事件輸出,不需要消耗方反復(fù)調(diào)用read(n)。
如果調(diào)用push方法時緩存為空,則當(dāng)前數(shù)據(jù)即為下一個需要的數(shù)據(jù)。這個數(shù)據(jù)可能先添加到緩存中,也可能直接輸出。
執(zhí)行read方法時,在調(diào)用_read后,如果從緩存中取到了數(shù)據(jù),就以data事件輸出。
所以,如果_read異步調(diào)用push時發(fā)現(xiàn)緩存為空,則意味著當(dāng)前數(shù)據(jù)是下一個需要的數(shù)據(jù),且不會被read方法輸出,應(yīng)當(dāng)在push方法中立即以data事件輸出。

來段'readable'的列子:

let {Readable} = require('stream');
let index = 9;
let rs = new Readable({
    highWaterMark: 3,
    read() {
        if (index > 0) {
            this.push(index-- + '');
        } else {
            this.push(null);
        }
    }
});

let once = false;
rs.setEncoding('utf8');
rs.on('readable', (chunk) => {
    console.log(rs.read(1));
});

觸發(fā)'readable'事件的幾種情況:
在流中有數(shù)據(jù)可讀取時觸發(fā)
達(dá)到流數(shù)據(jù)尾部時觸發(fā)
當(dāng)有新數(shù)據(jù)流到緩存區(qū)時觸發(fā)

End.總結(jié)

以上是我對于Stream 運(yùn)行機(jī)制的一些了解知道這些知識更多技術(shù)資料敬請關(guān)注后續(xù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • stream 流是一個抽象接口,在 Node 里被不同的對象實(shí)現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,541評論 1 10
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評論 19 139
  • 流是Node中最重要的組件和模式之一。在社區(qū)里有一句格言說:讓一切事務(wù)流動起來。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 678評論 0 0
  • 流的基本概念及理解 流是一種數(shù)據(jù)傳輸手段,是有順序的,有起點(diǎn)和終點(diǎn),比如你要把數(shù)據(jù)從一個地方傳到另外一個地方流非常...
    October_yang閱讀 7,856評論 3 9
  • 這是關(guān)于我太爺爺和太奶奶的故事,當(dāng)我聽說的時候,只覺得無限傷感。 太爺爺和太奶奶出生大戶人家,門當(dāng)戶對。十六七歲的...
    哆寶萌噠噠閱讀 624評論 0 0

友情鏈接更多精彩內(nèi)容