nodejs 的 Event Loop
nodejs 執(zhí)行環(huán)境的 Event Loop 與瀏覽器上的不同,nodejs 使用 V8 作為 JS 的解釋器,在 I/O 處理方面使用自己設(shè)計的 libuv,libuv 封裝了不同 OS 平臺的 I/O 操作,提供一致的異步(asynchronous) 、非阻塞(non-blocking) API、事件循環(huán)方式。
nodejs 的單線程
nodejs 的單線程不是絕對的,在用戶界面視圖上的 js 是單線程的,但是使用 nodejs 創(chuàng)建應(yīng)用程序是多線程的。
nodejs 需要維持一個線程池用來委托同步任務(wù),同時 V8 會為垃圾回收創(chuàng)建自己的線程。
The famous statement ‘Node.js runs in a single thread’ is only partly true. Actually only your ‘userland’ code runs in one thread. Starting a simple node application and looking at the processes reveals that Node.js in fact spins up a number of threads. This is because Node.js maintains a thread pool to delegate synchronous tasks to, while Google V8 creates its own threads for tasks like garbage collection.
Event Loop 模型

Event Loop 的特點
- 每個 phase 階段都有存放與自己相關(guān)回調(diào)的 queue
- 進入一個 phase 后,都會執(zhí)行完自己 queue 的回調(diào)才會進入下一個 phase
- 在回調(diào)中執(zhí)行長時間任務(wù)會被阻塞
- 在每次運行的事件循環(huán)之間,Node.js 檢查它是否在等待任何異步 I/O 或計時器,如果沒有的話,則關(guān)閉干凈, 事件循環(huán)就結(jié)束了
比如 app.js 里只有簡單的運行代碼,執(zhí)行完后進事件循環(huán)就結(jié)束了。
// app.js
console.log('event loop start!')
console.log('event loop stop')

如果啟動了一個 http.createServer().listen 就會一直執(zhí)行,底層開啟了 socket 一直等待 I/O 事件, 直到進行 close
// app.js
const http = require('http')
const server = http.createServer()
console.log('event loop start!')
setTimeout(() => server.close(), 2000) // timers 階段
let t = null
// 啟動 I/O 事件
server.listen(3000, () => {
console.log('poll running')
t = setInterval(() => console.log('poll'), 500) // 進行輪詢
})
// close callbacks 階段
server.on('close', () => {
clearInterval(t)
console.log('event loop stop')
})

Event Loop 各階段說明
- timers 階段:執(zhí)行已經(jīng)準(zhǔn)備好的 setTimeout、setInterval 回調(diào)。
- pending callbacks 階段:執(zhí)行被延遲到下一個 event loop 的I/O回調(diào)。如網(wǎng)絡(luò)、stream、tcp錯誤回調(diào)
- idle, prepare 階段:內(nèi)部使用。
- poll 階段:取出新的 I/O 事件回調(diào)執(zhí)行,(除: close 事件、setImmediate、timers 回調(diào)) node 程序?qū)⒃谶@個階段阻塞。
- check 階段:setImmediate() 將在這個階段調(diào)用。
- close callbacks 階段:close 事件的回調(diào)將在這執(zhí)行,如 socket.on('close', ...)
事件輪詢機制

nodejs 事件循環(huán)的輪詢階段跟瀏覽器上的 event loop 相似,區(qū)別在于置入回調(diào)隊列的任務(wù)是 連接、數(shù)據(jù)、輸入等。有關(guān)輪詢中有關(guān) promise 等 MicroTask MacroTask 執(zhí)行順序可以查看下面兩篇
事件循環(huán)中的 MacroTask與 MicroTask
瀏覽器的事件循環(huán)
setTimeout() 與 setImmediate() 對比
- setTimeout() 屬于 timers phase,設(shè)計在定時完成后執(zhí)行。
- setImmediate() 屬于 check phase。每次 poll phase 后執(zhí)行。
- 如果在 I/O 循環(huán)中調(diào)用,setImmediate 一定先執(zhí)行 (因為下一個階段就是 check 階段)。否則 setImmediate() 與 setTimeout(cb, 0) 的執(zhí)行順序不可預(yù)測
兩者在執(zhí)行順序上不能確定
setImmediate(() => {
console.log('immediate');
});
setTimeout(() => {
console.log('timeout');
}, 0);


如果處于 IO 循環(huán),setImmediate() 回調(diào)的執(zhí)行一定先于 setTimeout()
const fs = require('fs');
fs.readFile(__filename, () => {
setTimeout(() => {
console.log('timeout');
}, 0);
setImmediate(() => {
console.log('immediate');
});
});

理解 process.nextTick()
- process.nextTick() 不屬于 Event Loop 的各個階段
- process.nextTick() 的回調(diào)在每個階段結(jié)束后進入下個階段前同步執(zhí)行
- 絕不可在 process.nextTick 的 callback 中執(zhí)行 long-running task
- 不要執(zhí)行會返回process.nextTick 的函數(shù),不然這個階段會一直認(rèn)為還有回調(diào)需要執(zhí)行,事件循環(huán)會被阻塞在這個階段。
let bar;
function someAsyncApiCall(callback) { callback(); }
someAsyncApiCall(() => {
// 同步的執(zhí)行,但此時變量還沒賦值
console.log('bar', bar); // undefined
});
bar = 1;
let bar;
function someAsyncApiCall(callback) {
process.nextTick(callback);
}
someAsyncApiCall(() => {
// process.nextTick 使此回調(diào)在階段結(jié)束后才執(zhí)行
console.log('bar', bar); // 1
});
bar = 1;
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {
constructor() {
super()
this.emit('event'); // 不會正常觸發(fā),事件還沒綁定
}
}
const myEmitter = new MyEmitter();
myEmitter.on('event', () => {
console.log('an event occurred!');
});
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {
constructor() {
super()
process.nextTick(() => {
this.emit('event'); // 會正常觸發(fā),因為是在繼承階段結(jié)束后才執(zhí)行
})
}
}
const myEmitter = new MyEmitter();
myEmitter.on('event', () => {
console.log('an event occurred!');
});
process.nextTick() 與 setImmediate() 對比
- process.nextTick() 不屬于 Event Loop 的各個階段
- process.nextTick() 的回調(diào)在每個階段結(jié)束后進入下個階段前同步執(zhí)行
- process.nextTick() 在同一個階段立即執(zhí)行。
- setImmediate() 只每次 poll phase 后進入 check phase 才執(zhí)行。
- process.nextTick() 比 setImmediate() 觸發(fā)得更直接。
- setImmediate() 更容易理解,如果需要拆分 long-running task 請使用 setImmediate()
EventEmitter
nodejs 的大多模塊(如HTTP request、response 和 stream)都繼承了 EventEmitter 模塊,它們可以觸發(fā)和監(jiān)聽事件。
Events 模塊核心實現(xiàn)
Events 模塊的核心實現(xiàn)非常簡單,讓你可以創(chuàng)建一個 event pattern 的工具,是 nodejs 事件驅(qū)動的核心,但它本身跟 nodejs 的 Event Loop 沒有任何關(guān)系。
class MyEventEmitter {
constructor () {
this.events = {} // 事件對象
}
listeners (type) {
return this.events[type]
}
addListener (type, listener) {
if (this.events[type])
this.events[type] = [ ...this.events[type], listener ]
else
this.events[type] = [ listener ]
}
once (type, listener) {
this.addListener(type, _onceWrap(this, type, listener))
return this
}
removeListener (type, listener) {
if (this.events[type].length > 0)
this.events[type] = this.events[type].filter(item => item !== listener)
return this
}
removeAllListener (type) {
delete this.events[type]
}
emit (type, ...args) {
if (type === 'error' && !this.events[type].length) throw new Error('emit error event !~')
this.events[type] && this.events[type].forEach(listener => Reflect.apply(listener, this, args))
}
get on() {
return this.addListener
}
get off() {
return this.removeListener
}
}
function _onceWrap(target, type, listener) {
const wrapped = (...args)
=> target.removeListener(type, wrapped) && Reflect.apply(listener, target, args)
return wrapped
}
Events 是同步的
Events 的調(diào)用非常簡單
e.on('event')
e.emit('event', cb)
Events 僅僅只是簡單的執(zhí)行了事件的回調(diào)函數(shù),它是同步執(zhí)行的。
每一次的 emit,都是同步的執(zhí)行了所綁定事件 queue 里的回調(diào) 。而 EventEmitter 本身與 nodejs 的 Event Loop 沒有關(guān)系,也不存在異步執(zhí)行的代碼,是否異步只跟傳入的回調(diào)函數(shù)有關(guān)。
EE.on('data', function (data) {
console.log(data);
});
fs.readFile(__filename, (err, data) => {
if (!err) EE.emit('data', data);
});
EventEmitter 需要注意的地方
下面代碼會造成 Maximum call stack size exceeded 報錯, 因為所有的回調(diào)都是同步的,會在一個 poll phase 階段不停執(zhí)行下去,一直到系統(tǒng)崩潰.
const EventEmitter = require("events")
const EE = new EventEmitter()
EE.on('event1', function () {
console.log('event1 fired!');
EE.emit('event2');
})
EE.on('event2', function () {
console.log('event2 fired!');
EE.emit('event3');
})
EE.on('event3', function () {
console.log('event3 fired!');
EE.emit('event1');
})
EE.emit('event1');
換成 setImmediate() 來調(diào)用 emit ,會發(fā)現(xiàn)這段程序不會崩潰,setImmediate 把回調(diào)放入了每次輪詢的下個階段才進行,一個真正的通過 events 模塊創(chuàng)建的異步代碼.
const EventEmitter = require("events")
const EE = new EventEmitter()
EE.on('event1', function () {
console.log('event1 fired!');
setImmediate(() => {
EE.emit('event2');
})
})
EE.on('event2', function () {
console.log('event2 fired!');
setImmediate(() => {
EE.emit('event3');
})
})
EE.on('event3', function () {
console.log('event3 fired!');
setImmediate(() => {
EE.emit('event1');
})
})
EE.emit('event1');
EventEmitter 中使用 process.nextTick()
如果把上面的代碼 setImmediate() 換成 process.nextTick() 講會報錯,因為 process.nextTick() 是在當(dāng)前階段結(jié)束時且在下個階段前執(zhí)行,而在 process.nextTick() 里觸發(fā)回調(diào)會導(dǎo)致程序一直認(rèn)為當(dāng)前階段還有任務(wù)需要執(zhí)行而出錯的,這個階段將會有無法清除的 nextTick 需要執(zhí)行。