Node.js介紹2-流

曾經(jīng)學(xué)C++的STL中的IOStream,輸入輸出流,看個(gè)代碼

using namespace std;
cout<<x;

眼角濕潤(rùn)了,這是大學(xué)的記憶啊,大學(xué)時(shí)我們幸苦的學(xué)習(xí)C++,為了指針的釋放和指針的指針搞的焦頭爛額,更何況記憶中不僅有代碼,還有我的青春和她。算了,搬磚的腰又酸了,還是回到現(xiàn)實(shí),看看node.js中的流吧。

啥是流啊。。。

流顧名思義就是流水的意思,stream英文也是溪流。如果把二進(jìn)制數(shù)據(jù)從一個(gè)地方很源源不斷的送到另一個(gè)地方,像水流一樣的功能,就叫流。

A stream is an abstract interface implemented by various objects in Node.js. For example a request to an HTTP server is a stream, as is process.stdout
.Streams are readable, writable, or both. All streams are instances of EventEmitter.

愿意啃文檔的兄弟可以看stream

stream的例子

因?yàn)閚ode.js非常善于處理數(shù)據(jù)(這里數(shù)據(jù)的可能是服務(wù)器的網(wǎng)頁(yè),或者返回的json數(shù)據(jù),或者任何東西),所以我們來看看一些例子,說明stream對(duì)服務(wù)器的重要作用。node.js里面很多類都是繼承了流的接口。

創(chuàng)建一個(gè)echo服務(wù)

echo是回聲的意思,我們對(duì)著大山喊話,回聽到自己喊的聲音,這邊我們做個(gè)服務(wù)器干這個(gè)無(wú)聊的事情吧。

var http = require('http');
http.createServer(function(request, response) {
    response.writeHead(200);
    request.pipe(response);
}).listen(8080);

運(yùn)行以后調(diào)用curl -d 'hello' http://localhost:8080。簡(jiǎn)直不敢相信服務(wù)器這么簡(jiǎn)單就寫好了,這就是node.js的魅力吧。
上面的pipe就是管道的意思,和linux的命令行|一個(gè)意思,大家應(yīng)該熟悉命令行的管道吧,概念都是相通的。大家應(yīng)該知道這個(gè)

gulp

就是基于stream來做的。

上傳文件

我們?cè)诳匆粋€(gè)上傳文件的例子。

var http = require('http');
var fs = require('fs');
http.createServer(function(request, response) {
    var newFile = fs.createWriteStream("copy" + new Date() + ".md");
    var fileBytes = request.headers['content-length'];
    var uploadedBytes = 0;

    response.write("server receive request\n");
    request.pipe(newFile);

    request.on('readable', function() {
        var chunk = null;
        response.write("progress: start\n");
        while (null !== (chunk = request.read())) {
            uploadedBytes += chunk.length;
            var progress = (uploadedBytes / fileBytes) * 100;
            response.write("progress: " + parseInt(progress, 10) + "%\n");
        }
    });


    request.on('end', function() {
        response.end('uploaded!\n');
    });

}).listen(8080);
//curl --upload-file uploadFiles.js http://localhost:8080
上傳文件例子

這里的看點(diǎn)是

  1. 如何返回進(jìn)度的:request.on('readable', function() {,有沒有覺得這種異步I/O方式的優(yōu)點(diǎn)。
  2. 如何保存文件request.pipe(newFile);,是不是很方便。

流的實(shí)現(xiàn)

上面我們看到流的結(jié)構(gòu)的簡(jiǎn)單易用,現(xiàn)在我們看看node.js的流是怎么設(shè)計(jì)的。

To implement any sort of stream, the pattern is the same:

  1. Extend the appropriate parent class in your own subclass. (Theutil.inherits() method is particularly helpful for this.)
  2. Call the appropriate parent class constructor in your constructor, to be sure that the internal mechanisms are set up properly.
  3. Implement one or more specific methods, as detailed below.

The class to extend and the method(s) to implement depend on the sort of stream class you are writing:


流的基礎(chǔ)

翻譯一下流實(shí)現(xiàn)的過程:

  1. 繼承合適的class
  2. 不要忘記調(diào)用基類構(gòu)造函數(shù)
  3. 重寫基類方法

數(shù)數(shù)的可讀流

看一個(gè)例子就清楚了,下面這段程序就是數(shù)數(shù),1數(shù)到1000000。

const Readable = require('stream').Readable;
const util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
    Readable.call(this, opt);
    this._max = 1000000;
    this._index = 1;
}

Counter.prototype._read = function() {
    var i = this._index++;
    if (i > this._max)
        this.push(null);
    else {
        var str = '' + i;
        var buf = new Buffer(str, 'ascii');
        this.push(buf);
    }
};

///////////////////////////////////////////////////////////
//test 
var fs = require('fs');
var newFile = fs.createWriteStream("test_counter.txt");
var myCounter = new Counter();
myCounter.pipe(newFile);

上面的Counter完成了三部曲,測(cè)試程序把這個(gè)conter輸出到文件。如果我們想自己實(shí)現(xiàn)一個(gè)流,這樣就可以了。如果上面例子太簡(jiǎn)單了,我們看一下復(fù)雜點(diǎn)的例子,比如transform

啥是transform流

Transform streams are Duplex streams where the output is in some way computed from the input. They implement both the Readable and Writable interfaces.
Examples of Transform streams include:
zlib streams
crypto streams
翻譯一下就是用來把輸入流變化一下,再輸出。比如壓縮,加密等。

const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');

inp.pipe(gzip).pipe(out);

實(shí)現(xiàn)transform流

這個(gè)例子解析一個(gè)數(shù)據(jù),產(chǎn)生一個(gè)readable stream,這個(gè)stream是經(jīng)過變換的哦。

  1. 解析的格式:有兩個(gè)換行符的數(shù)據(jù)流,換行符前面是頭,后面是內(nèi)容


    格式
  2. 解析的過程中發(fā)出一個(gè)事件header,用來顯示頭部信息
  3. 最后去掉頭部,保留內(nèi)容信息
    現(xiàn)在來看一下代碼吧。
const util = require('util');
const Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // and let them know that we are done parsing the header.
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(chunk.slice(split));
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(chunk);
  }
  done();
};

// Usage:
var fs = require('fs');
const source = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');

var parser = new SimpleProtocol();

// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
source.pipe(parser).pipe(out);
parser.on('header',function(header){
  console.log(header);
});

雖然代碼長(zhǎng)了點(diǎn),但是有注釋,我就不解釋了,注意最后如何使用的哦??纯催\(yùn)行的結(jié)果吧。


運(yùn)行結(jié)果

流就介紹到這里了,如果還意猶未盡,可以看看node的源碼node in github或者文檔stream

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • stream 流是一個(gè)抽象接口,在 Node 里被不同的對(duì)象實(shí)現(xiàn)。例如 request to an HTTP se...
    明明三省閱讀 3,521評(píng)論 1 10
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,544評(píng)論 19 139
  • 流是Node中最重要的組件和模式之一。在社區(qū)里有一句格言說:讓一切事務(wù)流動(dòng)起來。這已經(jīng)足夠來描述在Node中流...
    宮若石閱讀 673評(píng)論 0 0
  • 小人物 從一生下就沒有光環(huán), 生活到現(xiàn)在還是平凡。 羨慕別人的光彩, 一味模仿多無(wú)奈。 簡(jiǎn)單...
    生來彷徨ii閱讀 286評(píng)論 0 4
  • 轉(zhuǎn)身回眸,兩兩相望,只是一瞬,我看到他的眼睛,深邃而溫柔,仿如許久未見的故人,沒有早一步,也沒有晚一步,剛巧,這就...
    臆想的行路夢(mèng)閱讀 361評(píng)論 2 2

友情鏈接更多精彩內(nèi)容