背景:使用EventStream讀取大文件,獲取文件總行數(shù)
//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
es.map(function (line, cb) {
//do something with the line
console.log(line)
total++
})
)
.pipe(
es.wait(function(){//相當于監(jiān)聽前一個 stream 的 end 事件
console.log('total',total)
})
)
問題:es.wait并沒有監(jiān)聽到end事件,始終無法輸出total。
嘗試排查
思路一:是否和es.wait函數(shù)有關(guān)?
//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//
es.wait = function (callback) {
var arr = []
return es.through(function (data) { arr.push(data) },
function () {
var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
: arr.join('')
this.emit('data', body)
this.emit('end')
if(callback) callback(null, body)
})
}
es.wait這個函數(shù)返回的是through對象(也是一個stream),傳了2個function參數(shù)進去,第二個function實際會回調(diào)觸發(fā) this.emit('end'),所以繼續(xù)排查這個function是否被調(diào)用
function through (write, end, opts) {
write = write || function (data) { this.queue(data) }
end = end || function () { this.queue(null) }
var ended = false, destroyed = false, buffer = [], _ended = false
var stream = new Stream()
stream.readable = stream.writable = true
stream.paused = false
// stream.autoPause = !(opts && opts.autoPause === false)
stream.autoDestroy = !(opts && opts.autoDestroy === false)
stream.write = function (data) {
write.call(this, data)
return !stream.paused
}
through構(gòu)造函數(shù)的上面這個end參數(shù)就是之前es.wait的第二個參數(shù),看看它什么時候觸發(fā)調(diào)用,找到相關(guān)代碼:
stream.on('end', function () {
//end()
stream.readable = false
if(!stream.writable && stream.autoDestroy)
process.nextTick(function () {
stream.destroy()
})
})
function _end () {
stream.writable = false
end.call(stream)//只有這個地方涉及end的觸發(fā)
if(!stream.readable && stream.autoDestroy)
stream.destroy()
}
stream.end = function (data) {
if(ended) return
ended = true
if(arguments.length) stream.write(data)
_end() // will emit or queue
return stream
}
在_end函數(shù)斷點debug發(fā)現(xiàn)執(zhí)行end.call(stream)并沒有跳回es.wait的第二個回調(diào)函數(shù),顯然這個地方出現(xiàn)異常,聯(lián)想我們這次用的pipe和stream,那可能是上層的stream調(diào)用異常了。
思路二:是否和上一個pipe的 es.map有關(guān)?
es.map對應(yīng)的實際是mapStream類,找到這個流中和end相關(guān)代碼:
function end (data) {
//if end was called with args, write it,
ended = true //write will emit 'end' if ended is true
stream.writable = false
if(data !== undefined) {
return queueData(data, inputs)
} else if (inputs == outputs) { //wait for processing //滿足這個條件才可以
stream.readable = false, stream.emit('end'), stream.destroy()
}
}
stream.end = function (data) {
if(ended) return
end(data)
}
只有滿足inputs === ouputs才可以觸發(fā)end事件,仔細跟蹤代碼,outputs在queueData函數(shù)被遞增修改,queueData -> next -> stream.write,跟蹤到:
// Wrap the mapper function by calling its callback with the order number of
// the item in the stream.
function wrappedMapper (input, number, callback) {
return mapper.call(null, input, function(err, data){
callback(err, data, number)
})
}
stream.write = function (data) {
if(ended) throw new Error('map stream is not writable')
inNext = false
inputs ++
try {
//catch sync errors and handle them like async errors
var written = wrappedMapper(data, inputs, next)//next在這里被調(diào)用
paused = (written === false)
return !paused
} catch (err) {
//if the callback has been called syncronously, and the error
//has occured in an listener, throw it again.
if(inNext)
throw err
next(err)
return !paused
}
}
仔細看wrappedMapper這個函數(shù),next實際就是它的callback參數(shù),這個callback的調(diào)用依賴于mapper.call的第三個參數(shù),mapper又是什么呢?
//map stream 導出代碼
module.exports = function (mapper, opts) {
var stream = new Stream()
, inputs = 0
, outputs = 0
, ended = false
, paused = false
, destroyed = false
, lastWritten = 0
, inNext = false
opts = opts || {};
var errorEventName = opts.failures ? 'failure' : 'error';
//使用map stream的代碼
es.map(function (line, cb) {
//do something with the line
console.log(line)
total++
})
顯然mapper就是這個函數(shù):
function (line, cb) {
//do something with the line
console.log(line)
total++
})
mapper的第三個參數(shù)就是cb了,cb沒有調(diào)用,那當然導致后面一連串函數(shù)沒有調(diào)用,最終影響end事件的觸發(fā),那我們加上cb調(diào)用看看:
//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
es.map(function (line, cb) {
//do something with the line
//console.log(line)
total++
cb(null,line)//這個回調(diào)很重要
})
)
.pipe(
es.wait(function(){//相當于監(jiān)聽前一個 stream 的 end 事件
console.log('total',total)
})
)
結(jié)果能輸出total了:

總結(jié)
使用stream和pipe搭配使用的時候,需要注意前后stream的關(guān)聯(lián)影響。