Node.js流模式編程詳解

本文是Node.js設(shè)計模式的筆記, 代碼都是來自 <Node.js Design Patterns> by Mario Casciaro.

流的重要性

一般我們處理數(shù)據(jù)有兩種模式, 一種是buffer模式, 一種是stream模式, buffer模式就是取完數(shù)據(jù)一次性操作, stream模式就是邊取數(shù)據(jù)邊操作.
舉個例子, 如果打開一個2G的文件, 用buffer模式就是先分配2G的內(nèi)存, 把文件全部讀出來, 然后開始操作內(nèi)存, 而用流模式的方法就是邊讀數(shù)據(jù), 邊開始處理.

從這里看出stream模式無論是在空間和時間上都優(yōu)于buffer模式:
在空間上, 內(nèi)存只會占用當前需要處理的一塊數(shù)據(jù)區(qū)域的大小, 而不是整個文件.
在時間上, 因為不需要全部的數(shù)據(jù)就可以開始處理, 時間就相當于節(jié)約了, 從串行變成了并行操作(這里的并行不是多線程的并行, 而是生產(chǎn)者和消費者并行).

還有一個好處就是鏈式調(diào)用, 也就是可組合操作, 大大增加了代碼的可重用性.
比如下面這個代碼(中間的pipe可以很方便的增刪):

fs.createReadStream(file)
     .pipe(zlib.createGzip())
     //.pipe(crypto.createCipher('aes192', 'secret'))
     .pipe(req)
     .on('finish', function() {
       console.log('File succesfully sent');
     });

開始編碼

nodejs里面的stream一般分四種, 其中轉(zhuǎn)換流是一種特殊的讀寫流.

  • 輸入流(stream.Readable)
  • 輸出流(stream.Writable)
  • 讀寫流(stream.Duplex)
  • 轉(zhuǎn)換流(stream.Transform)

另外, nodejs里面的流有兩種模式, 二進制模式和對象模式.

  • 二進制模式, 每個分塊都是buffer或者string對象.
  • 對象模式, 流內(nèi)部處理的是一系列普通對象.

輸入流(stream.Readable)

先看一下怎么使用輸入流, 這里一般有兩種方法, 一個是非流動模式, 一個是流動模式.
非流動模式就是直接調(diào)用read()方法, 被動模式就是監(jiān)聽data事件.
下面直接看代碼:

// 非流動模式
process.stdin
    .on('readable', function() {
        // 有數(shù)據(jù)到了, 拼命讀, 直到讀完.
        var chunk;
        console.log('New data available');
        while((chunk = process.stdin.read()) !== null) {
            console.log(
            'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
            );
        }})
    .on('end', function() {
       process.stdout.write('End of stream');
    });

接下來看流動模式怎么玩

// 流動模式
process.stdin
     .on('data', function(chunk) {
       console.log('New data available');
       console.log(
         'Chunk read: (' + chunk.length + ')" ' +
         chunk.toString() + '"'
       );
     })
     .on('end', function() {
       process.stdout.write('End of stream');
     });

同樣實現(xiàn)一個輸入流也很簡單, 主要是

  1. 繼承Readable類
  2. 實現(xiàn)_read(size)接口(一般帶下劃線的表示內(nèi)部函數(shù), 調(diào)用者不要直接調(diào)用, 相當于C++里面的protect方法, 只是javascript里面沒有對方法做區(qū)分, 只能是命名上面區(qū)分一下了).
    下面看示例代碼:
// randomStream.js
var stream = require('stream');
var util = require('util');
var chance = require('chance').Chance();

function RandomStream(options) {
    // option支持3個參數(shù)
    // encoding String 用于轉(zhuǎn)換Buffer到String的編碼類型(默認null)
    // objMode Boolean 用戶指定是否是對象模式(默認false)
    // highWaterMark Number 最高水位(可讀的最大數(shù)據(jù)量), 默認是16K
    stream.Readable.call(this, options);
}
util.inherits(RandomStream, stream.Readable);
RandomStream.prototype._read = function(size) {
    // 這是一個隨機產(chǎn)生數(shù)據(jù)的流, 5%的概率輸出null, 也就是流停止.
    var chunk = chance.string();
    console.log('Pushing chunk of size:' + chunk.length);
    this.push(chunk, 'utf8');
    if(chance.bool({likelihood: 5})) {
       this.push(null);
    }
}
module.exports = RandomStream;

好, 接下來是如何使用:

// generateRandom.js
var RandomStream = require('./randomStream');
var randomStream = new RandomStream();
randomStream.on('readable', function() {
    var chunk;
    while((chunk = randomStream.read()) !== null) {
        console.log("Chunk received: " + chunk.toString());
    }
});

輸出流(stream.Writable)

先是怎么用:

// 寫數(shù)據(jù)
writable.write(chunk, [encoding], [callback]);
// 結(jié)束流
writable.end([chunk], [encoding], [callback])

回壓(back-pressure)

這里涉及一個概念, 回壓(back-pressure), 意思就是當生產(chǎn)者速度大于消費者的時候, 輸出流的水位會不斷上升, 當?shù)竭_設(shè)定的最高水位時候, 就會寫入失敗, 這時候也就是產(chǎn)生了back-pressure, 那如何處理呢, 此時輸入流在水位降低到零點的時候會有一個drain事件發(fā)送, 只要監(jiān)聽這個事件, 在事件發(fā)生的時候就可以繼續(xù)向流寫入數(shù)據(jù)了.
直接看代碼:

var chance = require('chance').Chance();
require('http').createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/plain'});
    function generateMore() {             //[1]
        while(chance.bool({likelihood: 95})) {
            var shouldContinue = res.write(
                chance.string({length: (16 * 1024) – 1})
            );
            if(!shouldContinue) {             //[3]
                console.log('Backpressure');
                return res.once('drain', generateMore);
            }
        }
        res.end('\nThe end...\n', function() {
            console.log('All data was sent');
        });
    }
    generateMore();
}).listen(8080, function () {
  console.log('Listening');
});

同樣, 實現(xiàn)一個輸出流也很簡單, 只要繼承Writable類, 實現(xiàn)_write()接口即可.
示例代碼:

// toFileStream.js
var stream = require('stream');
var fs = require('fs');
var util = require('util');
var path = require('path');
var mkdirp = require('mkdirp');

function ToFileStream() {
    // 這次我們用對象模式
    stream.Writable.call(this, {objectMode: true});
};
util.inherits(ToFileStream, stream.Writable);
ToFileStream.prototype._write = function(chunk, encoding, callback) {
    var self = this;
    mkdirp(path.dirname(chunk.path), function(err) {
        if(err) {
            return callback(err);
        }
        fs.writeFile(chunk.path, chunk.content, callback);
    });
}
module.exports = ToFileStream;

下面是調(diào)用的代碼

var ToFileStream = require('./toFileStream');
var tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(function() {
    console.log("All files created");
});

讀寫流(stream.Duplex)

就是把輸入流和輸出流的接口都實現(xiàn)了.
注意:
此時option參數(shù)是同時傳給了內(nèi)部的Readable和Writeable, 如果要使用不同的選項, 就要分開配置,
像這樣:
this._writableState.objectMode
this._readableState.objectMode
同時, Duplex又多了一個選項allowHalfOpen, 這個選項的意思是, 當其中一個流關(guān)閉的時候, 另外一條流是否也同時關(guān)閉, 默認是true, 也就是不同時關(guān)閉.

轉(zhuǎn)換流(stream.Transform)

對于讀寫流來說, 要實現(xiàn)的是 _read() 和 _write() 接口, 而轉(zhuǎn)換流要實現(xiàn)的是 _transform() 和 _flush()接口.
區(qū)別是什么, 轉(zhuǎn)換流一般在transform的過程中把讀寫都做了, 也就是在處理輸入的時候, 直接就輸出了. 最后在輸入結(jié)束的時候_flush() 會被調(diào)用, 就可以把剩余的內(nèi)部數(shù)據(jù)一并輸出了.

示例代碼:

// 這代碼寫的很漂亮, 解決的問題是在流中操作替換操作
// 其中替換的部分可以仔細看一下, stream和buffer一個很大的區(qū)別就是stream會被切割
// 導致要替換的數(shù)據(jù)也有可能被切割, 這個例子就提供了一種解決方法, 
// 這個在后續(xù)實踐中肯定也會遇到的.
var stream = require('stream');
var util = require('util');
function ReplaceStream(searchString, replaceString) {
    stream.Transform.call(this, {decodeStrings: false});
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
}
util.inherits(ReplaceStream, stream.Transform);
ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
    var pieces = (this.tailPiece + chunk).split(this.searchString);
    var lastPiece = pieces[pieces.length - 1];
    var tailPieceLen = this.searchString.length - 1;
    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
    this.push(pieces.join(this.replaceString));       //[3]
    callback();
}
ReplaceStream.prototype._flush = function(callback) {
    this.push(this.tailPiece);
    callback();
}
module.exports = ReplaceStream;

幾個流操作相關(guān)的有用包

程序員就是懶, 有這幾個包就可以少寫一些代碼了.

  • readable-stream, 統(tǒng)一了nodejs 實現(xiàn)的不同版本stream接口
  • [through2](https://npmjs.org/ package/through2), 用于快速創(chuàng)建轉(zhuǎn)化流
  • from2, 用于快速創(chuàng)建輸入流
  • writable2, 用于快速創(chuàng)建輸出流
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • stream 流是一個抽象接口,在 Node 里被不同的對象實現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,521評論 1 10
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評論 19 139
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,603評論 0 6
  • 流是Node中最重要的組件和模式之一。在社區(qū)里有一句格言說:讓一切事務(wù)流動起來。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 673評論 0 0
  • 周一早晨似乎總是昏昏沉沉,做事無法集中注意力,喝兩杯咖啡都不一定能提神。大概是因為周末的生物鐘還沒有倒過來時差吧。...
    vivi要學習閱讀 244評論 0 0

友情鏈接更多精彩內(nèi)容