nodejs 的 Event Loop 和 EventEmitter

nodejs 的 Event Loop

nodejs 執(zhí)行環(huán)境的 Event Loop 與瀏覽器上的不同,nodejs 使用 V8 作為 JS 的解釋器,在 I/O 處理方面使用自己設(shè)計的 libuv,libuv 封裝了不同 OS 平臺的 I/O 操作,提供一致的異步(asynchronous) 、非阻塞(non-blocking) API、事件循環(huán)方式。

nodejs 的單線程

nodejs 的單線程不是絕對的,在用戶界面視圖上的 js 是單線程的,但是使用 nodejs 創(chuàng)建應(yīng)用程序是多線程的。
nodejs 需要維持一個線程池用來委托同步任務(wù),同時 V8 會為垃圾回收創(chuàng)建自己的線程。

The famous statement ‘Node.js runs in a single thread’ is only partly true. Actually only your ‘userland’ code runs in one thread. Starting a simple node application and looking at the processes reveals that Node.js in fact spins up a number of threads. This is because Node.js maintains a thread pool to delegate synchronous tasks to, while Google V8 creates its own threads for tasks like garbage collection.

Event Loop 模型

Event Loop 的特點

  • 每個 phase 階段都有存放與自己相關(guān)回調(diào)的 queue
  • 進入一個 phase 后,都會執(zhí)行完自己 queue 的回調(diào)才會進入下一個 phase
  • 在回調(diào)中執(zhí)行長時間任務(wù)會被阻塞
  • 在每次運行的事件循環(huán)之間,Node.js 檢查它是否在等待任何異步 I/O 或計時器,如果沒有的話,則關(guān)閉干凈, 事件循環(huán)就結(jié)束了

比如 app.js 里只有簡單的運行代碼,執(zhí)行完后進事件循環(huán)就結(jié)束了。

// app.js
console.log('event loop start!')

console.log('event loop stop')

如果啟動了一個 http.createServer().listen 就會一直執(zhí)行,底層開啟了 socket 一直等待 I/O 事件, 直到進行 close

// app.js
const http = require('http')

const server = http.createServer()

console.log('event loop start!')

setTimeout(() => server.close(), 2000) // timers 階段

let t = null

// 啟動 I/O 事件
server.listen(3000, () => {
  console.log('poll running')
  t = setInterval(() => console.log('poll'), 500) // 進行輪詢
})

// close callbacks 階段
server.on('close', () => {
  clearInterval(t)
  console.log('event loop stop')
})

Event Loop 各階段說明

  • timers 階段:執(zhí)行已經(jīng)準(zhǔn)備好的 setTimeout、setInterval 回調(diào)。
  • pending callbacks 階段:執(zhí)行被延遲到下一個 event loop 的I/O回調(diào)。如網(wǎng)絡(luò)、stream、tcp錯誤回調(diào)
  • idle, prepare 階段:內(nèi)部使用。
  • poll 階段:取出新的 I/O 事件回調(diào)執(zhí)行,(除: close 事件、setImmediate、timers 回調(diào)) node 程序?qū)⒃谶@個階段阻塞。
  • check 階段:setImmediate() 將在這個階段調(diào)用。
  • close callbacks 階段:close 事件的回調(diào)將在這執(zhí)行,如 socket.on('close', ...)

事件輪詢機制

輪詢

nodejs 事件循環(huán)的輪詢階段跟瀏覽器上的 event loop 相似,區(qū)別在于置入回調(diào)隊列的任務(wù)是 連接、數(shù)據(jù)、輸入等。有關(guān)輪詢中有關(guān) promise 等 MicroTask MacroTask 執(zhí)行順序可以查看下面兩篇
事件循環(huán)中的 MacroTask與 MicroTask
瀏覽器的事件循環(huán)

setTimeout() 與 setImmediate() 對比

  • setTimeout() 屬于 timers phase,設(shè)計在定時完成后執(zhí)行。
  • setImmediate() 屬于 check phase。每次 poll phase 后執(zhí)行。
  • 如果在 I/O 循環(huán)中調(diào)用,setImmediate 一定先執(zhí)行 (因為下一個階段就是 check 階段)。否則 setImmediate() 與 setTimeout(cb, 0) 的執(zhí)行順序不可預(yù)測

兩者在執(zhí)行順序上不能確定

setImmediate(() => {
  console.log('immediate');
});

setTimeout(() => {
  console.log('timeout');
}, 0);

如果處于 IO 循環(huán),setImmediate() 回調(diào)的執(zhí)行一定先于 setTimeout()

const fs = require('fs');

fs.readFile(__filename, () => {
  setTimeout(() => {
    console.log('timeout');
  }, 0);
  setImmediate(() => {
    console.log('immediate');
  });
});

理解 process.nextTick()

  • process.nextTick() 不屬于 Event Loop 的各個階段
  • process.nextTick() 的回調(diào)在每個階段結(jié)束后進入下個階段前同步執(zhí)行
  • 絕不可在 process.nextTick 的 callback 中執(zhí)行 long-running task
  • 不要執(zhí)行會返回process.nextTick 的函數(shù),不然這個階段會一直認(rèn)為還有回調(diào)需要執(zhí)行,事件循環(huán)會被阻塞在這個階段。
let bar;

function someAsyncApiCall(callback) { callback(); }

someAsyncApiCall(() => {
  // 同步的執(zhí)行,但此時變量還沒賦值
  console.log('bar', bar); // undefined
});

bar = 1;
let bar;

function someAsyncApiCall(callback) {
  process.nextTick(callback);
}

someAsyncApiCall(() => {
  // process.nextTick 使此回調(diào)在階段結(jié)束后才執(zhí)行
  console.log('bar', bar); // 1
});

bar = 1;
const EventEmitter = require('events');

class MyEmitter extends EventEmitter {
  constructor() {
    super()
    this.emit('event'); // 不會正常觸發(fā),事件還沒綁定
  }
}

const myEmitter = new MyEmitter();

myEmitter.on('event', () => {
  console.log('an event occurred!');
});
const EventEmitter = require('events');

class MyEmitter extends EventEmitter {
  constructor() {
    super()
    process.nextTick(() => {
      this.emit('event'); // 會正常觸發(fā),因為是在繼承階段結(jié)束后才執(zhí)行
    })
  }
}

const myEmitter = new MyEmitter();

myEmitter.on('event', () => {
  console.log('an event occurred!');
});

process.nextTick() 與 setImmediate() 對比

  • process.nextTick() 不屬于 Event Loop 的各個階段
  • process.nextTick() 的回調(diào)在每個階段結(jié)束后進入下個階段前同步執(zhí)行
  • process.nextTick() 在同一個階段立即執(zhí)行。
  • setImmediate() 只每次 poll phase 后進入 check phase 才執(zhí)行。
  • process.nextTick() 比 setImmediate() 觸發(fā)得更直接。
  • setImmediate() 更容易理解,如果需要拆分 long-running task 請使用 setImmediate()

EventEmitter

nodejs 的大多模塊(如HTTP request、response 和 stream)都繼承了 EventEmitter 模塊,它們可以觸發(fā)和監(jiān)聽事件。

Events 模塊核心實現(xiàn)

Events 模塊的核心實現(xiàn)非常簡單,讓你可以創(chuàng)建一個 event pattern 的工具,是 nodejs 事件驅(qū)動的核心,但它本身跟 nodejs 的 Event Loop 沒有任何關(guān)系。

class MyEventEmitter {
  constructor () {
    this.events = {} // 事件對象
  }

  listeners (type) {
    return this.events[type]
  }

  addListener (type, listener) {
    if (this.events[type])
      this.events[type] = [ ...this.events[type], listener ]
    else
      this.events[type] = [ listener ]
  }

  once (type, listener) {
    this.addListener(type, _onceWrap(this, type, listener))
    return this
  }

  removeListener (type, listener) {
    if (this.events[type].length > 0)
      this.events[type] = this.events[type].filter(item => item !== listener)
    return this
  }

  removeAllListener (type) {
    delete this.events[type]
  }

  emit (type, ...args) {
    if (type === 'error' && !this.events[type].length) throw new Error('emit error event !~')
    this.events[type] && this.events[type].forEach(listener => Reflect.apply(listener, this, args))
  }

  get on() {
    return this.addListener
  }

  get off() {
    return this.removeListener
  }

}

function _onceWrap(target, type, listener) {
  const wrapped = (...args)
    => target.removeListener(type, wrapped) && Reflect.apply(listener, target, args)
  return wrapped
}

Events 是同步的

Events 的調(diào)用非常簡單

e.on('event')
e.emit('event', cb)

Events 僅僅只是簡單的執(zhí)行了事件的回調(diào)函數(shù),它是同步執(zhí)行的。
每一次的 emit,都是同步的執(zhí)行了所綁定事件 queue 里的回調(diào) 。而 EventEmitter 本身與 nodejs 的 Event Loop 沒有關(guān)系,也不存在異步執(zhí)行的代碼,是否異步只跟傳入的回調(diào)函數(shù)有關(guān)。

EE.on('data', function (data) {
  console.log(data);
});
fs.readFile(__filename, (err, data) => {
  if (!err) EE.emit('data', data);
});

EventEmitter 需要注意的地方

下面代碼會造成 Maximum call stack size exceeded 報錯, 因為所有的回調(diào)都是同步的,會在一個 poll phase 階段不停執(zhí)行下去,一直到系統(tǒng)崩潰.

const EventEmitter = require("events")
const EE = new EventEmitter()
EE.on('event1', function () {
  console.log('event1 fired!');
  EE.emit('event2');
})
EE.on('event2', function () {
  console.log('event2 fired!');
  EE.emit('event3');
})
EE.on('event3', function () {
  console.log('event3 fired!');
  EE.emit('event1');
})
EE.emit('event1');

換成 setImmediate() 來調(diào)用 emit ,會發(fā)現(xiàn)這段程序不會崩潰,setImmediate 把回調(diào)放入了每次輪詢的下個階段才進行,一個真正的通過 events 模塊創(chuàng)建的異步代碼.

const EventEmitter = require("events")
const EE = new EventEmitter()
EE.on('event1', function () {
  console.log('event1 fired!');
  setImmediate(() => {
    EE.emit('event2');
  })
})
EE.on('event2', function () {
  console.log('event2 fired!');
  setImmediate(() => {
    EE.emit('event3');
  })
})
EE.on('event3', function () {
  console.log('event3 fired!');
  setImmediate(() => {
    EE.emit('event1');
  })
})
EE.emit('event1');

EventEmitter 中使用 process.nextTick()

如果把上面的代碼 setImmediate() 換成 process.nextTick() 講會報錯,因為 process.nextTick() 是在當(dāng)前階段結(jié)束時且在下個階段前執(zhí)行,而在 process.nextTick() 里觸發(fā)回調(diào)會導(dǎo)致程序一直認(rèn)為當(dāng)前階段還有任務(wù)需要執(zhí)行而出錯的,這個階段將會有無法清除的 nextTick 需要執(zhí)行。

參考1
參考2
參考3

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容