nodejs學(xué)習(xí)筆記Stream(流)

什么流

通俗的說就是一種,有起點和終點的字節(jié)數(shù)據(jù)傳輸手段,把數(shù)據(jù)從一個地方傳到另一個地方。
流(Stream)是一個抽象接口,可讀、可寫或兼具兩者的。并且所有流都是 EventEmitter 的實例。
基于流實現(xiàn)的工具 webpack glup,比如HTTP 服務(wù)器request和response ,(TCP sockets),標(biāo)準(zhǔn)輸出(process.stdout)等等對象都是流。

可讀流 (Readable Stream)

可讀流存在兩種工作模式 流動模式(flowing)暫停模式 (paused)
下面介紹一下兩個模式的特點 :

暫停模式

可讀流在默認(rèn)狀態(tài)就是暫停模式 ,不監(jiān)聽readable也會默認(rèn)先打開文件,打開文件后會調(diào)用一次read(0)方法
我們讀取數(shù)據(jù)的時候如果文件過大,我們選擇一點一點讀取,讀取字節(jié)為highWaterMark的值,放到緩存區(qū)中。
在暫停模式中,我們監(jiān)聽readable事件,可讀流會馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區(qū)里const state = this._readableState;,同過read()方法來消費數(shù)據(jù)。
看下面一個暫停模式的例子:

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    highWaterMark:3,
    encoding:'utf8'
});
rs.on('readable',function () {
    let char = rs.read(1);
    console.log(char);
});
read() 方法

readable事件中,read回向可讀流請求讀取n個字節(jié)的數(shù)據(jù),根據(jù)n值的不同會有下面幾種情況:

  • n = undefined ; 即不傳參數(shù),此時文件會不斷讀取hwm(hignWaterMark)字節(jié),并且不斷觸發(fā)readable事件,讀取緩存區(qū)(hwm的值),如果沒設(shè)置hwm大小,讀取默認(rèn)大小64k。
  • n = 0 ; 可讀流返回一個null 并且不會消費任何數(shù)據(jù)
  • 0 < n < hwm ; 此時n小于最高水位線執(zhí)行底層的 _read 方法,從數(shù)據(jù)源中讀取hwm大小的數(shù)據(jù)填充到緩存區(qū)內(nèi)。并且下次讀取字節(jié)為( hwm-n) + hwm 個字節(jié)
  • n > hwm ; read 方法會先返回null,然后從數(shù)據(jù)源處讀取hwm大小的數(shù)據(jù)加入緩沖區(qū),并判斷緩沖區(qū)內(nèi)數(shù)據(jù)大小是否大于或等于n,如果是則返回數(shù)據(jù),否則會再次返回null并讀取n大小的數(shù)據(jù)。
readable 事件

在 readable 事件表示流中有數(shù)據(jù)可以被讀取 有兩種情況會被觸發(fā):

  • 緩存區(qū)為空,或者或 緩存區(qū)大小 - 可讀大小 < hwm 時,第一次緩存區(qū)大小為hwm第二次為緩存去大小為 剩余緩存大小 + hwm
  • 當(dāng)文件讀完時,會自動觸發(fā) readable 事件

流動模式

開啟流動模式的常用方法為兩種:監(jiān)聽data 或者使用pipe(管道)方法,下面兩個例子減少一下這兩中方法。

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.setEncoding('utf8');
rs.on('data',function (data) {
    console.log(data);
    rs.pause();//暫停讀取和發(fā)射data事件
    setTimeout(function(){
        rs.resume();//恢復(fù)讀取并觸發(fā)data事件
    },2000);
});

當(dāng)監(jiān)聽data 事件后 ,可讀流會不斷從數(shù)據(jù)源去除hwm大小的數(shù)據(jù),并向data事件發(fā)送這些數(shù)據(jù), 我們當(dāng)然和以使用stream.pause()手動將流切換到暫停模式,否則該過程將持續(xù)下去,直到讀到數(shù)據(jù)源的結(jié)束位置。

如果數(shù)據(jù)消費的速度小于數(shù)據(jù)生產(chǎn)的速度的話該怎么辦呢,引出了跟高級的方法pipe它就像一個管道,比如我們一邊讀取文件,一邊把讀取的數(shù)據(jù)寫入另一個文件,這樣寫的速度會跟不上讀取的速度,就相當(dāng)于,消費小于生產(chǎn)的情況,如果寫的慢我們可讀流就會停下來,始終保持讀寫在一個頻率上。

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    encoding:'utf8'
});
let ws = fs.createWriteStream('2.txt',{
    encoding:'utf8'
});
rs.pipe(ws);

pipe 方法返回一個 readable 對象,這意味著我們可以使用鏈?zhǔn)讲僮鲗?shù)個流連接在一起,管道一旦被接上,數(shù)據(jù)將持續(xù)不斷的從可讀流寫入可寫流。想要終止這個過程,只能使用 stream.unpipe() 來取消管道連接。

上面?zhèn)€兩個例子,我們不僅理解了流動模式,還可以發(fā)現(xiàn)其實流動模式 和 暫停模式是可以切換的,下面總結(jié)一下模式切換的方法:

暫停模式切換到流動模式:
  • 監(jiān)聽 data 事件
  • 調(diào)用 stream.resume() 方法
  • 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到可寫流中
流動模式切換到暫停模式:
  • 如果不存在管道目標(biāo)(pipe destination),可以通過調(diào)用 stream.pause() 方法實現(xiàn)。
  • 如果存在管道目標(biāo),可以通過取消 data 事件監(jiān)聽,并調(diào)用 stream.unpipe() 方法移除所有管道目標(biāo)來實現(xiàn)。

可寫流 (Writable stream)

可讀流的默認(rèn)緩存空間是64k,可寫流的默認(rèn)緩存空間為16k,可寫流當(dāng)時是向目標(biāo)文件些數(shù)據(jù)的,下面同樣通過代碼了解:

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flag:'w',
    mode:0o666,
    highWaterMark:3,
    encoding:'utf8'
});
let count = 9;
function write(){
    let flag = true;
    while(flag && count>0){
        flag = ws.write(count-- +'');
    }
}
write();
ws.on('drain',function () {
    console.log('drain');
    write();
});

面這段代碼展示了一個可寫流實例的幾個基本的事件和方法,下面我們來逐一介紹:

write()

這個方法的作用是向可寫流寫入數(shù)據(jù),它的類型必須是字符串或者Buffer,同時方法返回值為布爾值,當(dāng)前我們的緩存區(qū)大小為hwm3個字符,我們一個一個字符的寫,當(dāng)寫到第三個時,緩存區(qū)滿了,此時返回false。
一旦我們確認(rèn)方法返回了false后,應(yīng)該立刻停止調(diào)用 write 方法,直到緩沖器中的數(shù)據(jù)被清空為止。當(dāng)然,即使方法返回了false,你實際上也可以繼續(xù)使用 write 方法寫入數(shù)據(jù)。node會將你寫入的數(shù)據(jù)全部緩存起來,直到超過了能使用的最大內(nèi)存。

end()

end 方法的作用是關(guān)閉流,它可以傳入三個可選的參數(shù)。chunk 和 encoding 是在關(guān)閉可寫流前希望最后寫入的數(shù)據(jù)及其對應(yīng)的編碼。如果傳入 callback,這個 callback 會作為 finish 事件的回調(diào)函數(shù)觸發(fā)

drain事件

在可寫流的緩沖區(qū)超過hwm的條件下,會觸發(fā)drain事件,提示使用者,先不要在寫了,內(nèi)存滿了,注意這個事件觸發(fā)的前提,即write 方法返回了false后清空緩沖區(qū)才會觸發(fā) drain 事件

注意:建議在 write 方法返回false時停止寫入數(shù)據(jù),在 drain 事件的回調(diào)中再次開始寫入,這樣可以更好的控制緩沖區(qū)的大小,避免發(fā)生內(nèi)存泄漏問題。

close事件

close 如文件系統(tǒng)被關(guān)閉時。當(dāng) close 事件被觸發(fā)后,可寫流將不會再觸發(fā)其他事件。值得注意的是,不是所有可寫流都會觸發(fā) close 事件。

可寫流可讀流大概內(nèi)容已經(jīng)說完了,下面配上一張圖再總結(jié)一下流的原理:
image.png
  • 當(dāng)建立一個可讀流的時候,可讀流默認(rèn)會監(jiān)聽readable 事件,此時的read(n) n=0;不會消費任何數(shù)據(jù)
  • 當(dāng)我們主動監(jiān)聽readable事件時,調(diào)用read方法消費數(shù)據(jù),當(dāng)緩存區(qū)為空時,會再次觸發(fā)readable事件,直到你讀完了源文件的所有數(shù)據(jù)。
  • 當(dāng)我們監(jiān)聽 data的時候,通過data 事件回調(diào)進行消費,這個過程不會停止,直到全部讀取完成。當(dāng)然,在這個過程中你隨時可以通過stream.pause()方法暫停它。
  • 接下來創(chuàng)建一個可寫流,并調(diào)用write方法來消費我們的數(shù)據(jù),但是因為寫入的速度較慢,如果當(dāng)前寫入還在進行,而你又調(diào)用了write方法,node會將你要寫入的數(shù)據(jù)緩存在一個緩存區(qū)中,等到文件寫入完畢會從緩存區(qū)中取出數(shù)據(jù),繼續(xù)寫入。
  • write 有一個布爾類型的返回值,如果寫入過快,緩存區(qū)滿了之后就會返回false。
  • 當(dāng)緩存區(qū)內(nèi)容完全寫入清空口,這是會調(diào)用drain事件,我們可以在他的回調(diào)中,繼續(xù)寫入文件,執(zhí)行write()

如有對不的地方,不吝賜教

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容