EventStream

背景:使用EventStream讀取大文件,獲取文件總行數(shù)

//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
  es.map(function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })
  )
  .pipe(
    es.wait(function(){//相當于監(jiān)聽前一個 stream 的 end 事件
      console.log('total',total)
    })
  )

問題:es.wait并沒有監(jiān)聽到end事件,始終無法輸出total。

嘗試排查

思路一:是否和es.wait函數(shù)有關(guān)?

//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//

es.wait = function (callback) {
  var arr = []
  return es.through(function (data) { arr.push(data) },
    function () {
      var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
        : arr.join('')
      this.emit('data', body)
      this.emit('end')
      if(callback) callback(null, body)
    })
}

es.wait這個函數(shù)返回的是through對象(也是一個stream),傳了2個function參數(shù)進去,第二個function實際會回調(diào)觸發(fā) this.emit('end'),所以繼續(xù)排查這個function是否被調(diào)用

function through (write, end, opts) {
  write = write || function (data) { this.queue(data) }
  end = end || function () { this.queue(null) }

  var ended = false, destroyed = false, buffer = [], _ended = false
  var stream = new Stream()
  stream.readable = stream.writable = true
  stream.paused = false

//  stream.autoPause   = !(opts && opts.autoPause   === false)
  stream.autoDestroy = !(opts && opts.autoDestroy === false)

  stream.write = function (data) {
    write.call(this, data)
    return !stream.paused
  }

through構(gòu)造函數(shù)的上面這個end參數(shù)就是之前es.wait的第二個參數(shù),看看它什么時候觸發(fā)調(diào)用,找到相關(guān)代碼:

stream.on('end', function () {
    //end()
    stream.readable = false
    if(!stream.writable && stream.autoDestroy)
      process.nextTick(function () {
        stream.destroy()
      })
  })

  function _end () {
    stream.writable = false
    end.call(stream)//只有這個地方涉及end的觸發(fā)
    if(!stream.readable && stream.autoDestroy)
      stream.destroy()
  }

  stream.end = function (data) {
    if(ended) return
    ended = true
    if(arguments.length) stream.write(data)
    _end() // will emit or queue
    return stream
  }

在_end函數(shù)斷點debug發(fā)現(xiàn)執(zhí)行end.call(stream)并沒有跳回es.wait的第二個回調(diào)函數(shù),顯然這個地方出現(xiàn)異常,聯(lián)想我們這次用的pipe和stream,那可能是上層的stream調(diào)用異常了。

思路二:是否和上一個pipe的 es.map有關(guān)?
es.map對應(yīng)的實際是mapStream類,找到這個流中和end相關(guān)代碼:

function end (data) {
    //if end was called with args, write it, 
    ended = true //write will emit 'end' if ended is true
    stream.writable = false
    if(data !== undefined) {
      return queueData(data, inputs)
    } else if (inputs == outputs) { //wait for processing //滿足這個條件才可以
      stream.readable = false, stream.emit('end'), stream.destroy() 
    }
  }

  stream.end = function (data) {
    if(ended) return
    end(data)
  }

只有滿足inputs === ouputs才可以觸發(fā)end事件,仔細跟蹤代碼,outputs在queueData函數(shù)被遞增修改,queueData -> next -> stream.write,跟蹤到:

// Wrap the mapper function by calling its callback with the order number of
  // the item in the stream.
  function wrappedMapper (input, number, callback) {
    return mapper.call(null, input, function(err, data){
      callback(err, data, number)
    })
  }

  stream.write = function (data) {
    if(ended) throw new Error('map stream is not writable')
    inNext = false
    inputs ++

    try {
      //catch sync errors and handle them like async errors
      var written = wrappedMapper(data, inputs, next)//next在這里被調(diào)用
      paused = (written === false)
      return !paused
    } catch (err) {
      //if the callback has been called syncronously, and the error
      //has occured in an listener, throw it again.
      if(inNext)
        throw err
      next(err)
      return !paused
    }
  }

仔細看wrappedMapper這個函數(shù),next實際就是它的callback參數(shù),這個callback的調(diào)用依賴于mapper.call的第三個參數(shù),mapper又是什么呢?

//map stream 導出代碼
module.exports = function (mapper, opts) {

  var stream = new Stream()
    , inputs = 0
    , outputs = 0
    , ended = false
    , paused = false
    , destroyed = false
    , lastWritten = 0
    , inNext = false

  opts = opts || {};
  var errorEventName = opts.failures ? 'failure' : 'error';
//使用map stream的代碼
  es.map(function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })

顯然mapper就是這個函數(shù):

function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })

mapper的第三個參數(shù)就是cb了,cb沒有調(diào)用,那當然導致后面一連串函數(shù)沒有調(diào)用,最終影響end事件的觸發(fā),那我們加上cb調(diào)用看看:

//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
  es.map(function (line, cb) {
    //do something with the line
    //console.log(line)
   total++
   cb(null,line)//這個回調(diào)很重要
  })

  )
  .pipe(
    es.wait(function(){//相當于監(jiān)聽前一個 stream 的 end 事件
      console.log('total',total)
    })
  )

結(jié)果能輸出total了:


image.png

總結(jié)

使用stream和pipe搭配使用的時候,需要注意前后stream的關(guān)聯(lián)影響。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容