Nodejs cluster 模塊

cluster 和 child_process 模塊子進(jìn)程的區(qū)別

child_process 執(zhí)行 shell 命令、利用多進(jìn)程執(zhí)行代碼
cluster 通過多進(jìn)程 master、worker 實現(xiàn)多個 HTTP 應(yīng)用服務(wù)器架構(gòu)

總結(jié)寫前面

cluster 模塊是 node 利用多進(jìn)程處理網(wǎng)絡(luò)連接的應(yīng)用架構(gòu)
cluster 通過進(jìn)程 IPC 通道共享主進(jìn)程的 server handle 句柄創(chuàng)建 socket 文件描述符 實現(xiàn)子進(jìn)程共同監(jiān)聽同一端口
cluster 在 http 網(wǎng)絡(luò)請求中采用 RoundRobin 輪詢的負(fù)載均衡方式對 woker 進(jìn)行調(diào)度

框架圖

http createServer 時 child 通過 IPC 通道獲取 master 的 server.handle 流程

多進(jìn)程通信,子進(jìn)程監(jiān)聽同一端口為什么不沖突

不同進(jìn)程之間的 server 通過 IPC 通道共享監(jiān)聽某個端口的 socket 連接句柄來解決沖突。

// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()

const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
  const worker = fork('worker.js')
  worker.send('server', netServer)
  console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}

// worker.js
const http = require('http')

const server = http.createServer((req, res) => { //   this.on('connection', connectionListener);
  res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})

let _handle
process.on('message', (msg, handle) => {
  if (msg !== 'server') return
  _handle = handle
  _handle.on('connection', (socket) => { // _http_server.js 中實現(xiàn), this.on('connection', connectionListener)
    server.emit('connection', socket); // 與子進(jìn)程 server 共享 socket 處理連接后執(zhí)行子進(jìn)程 http.createServer 的 callback
  })
})

server 共享 socket 過程

看下 createServer 的處理過程就可以知道 server.emit('connection', socket); 是如何共享 socket 并何時觸發(fā) createServer 回調(diào)對用戶進(jìn)行響應(yīng)的

// Server 構(gòu)造函數(shù)
function Server {
 ... 
  if (requestListener) {
    this.on('request', requestListener);
  }

  this.on('connection', connectionListener);
}
function connectionListener(socket) {
  defaultTriggerAsyncIdScope(
    getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  );
}
function connectionListenerInternal(server, socket) {
// ...
  parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
  // ...
  res.detachSocket(socket); // 關(guān)閉 socket
  // ...
}

function parserOnIncoming(server, socket, state, req, keepAlive) {
  // ...
  res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));

  if (req.headers.expect !== undefined &&
      (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
  // ...
  } else {
    server.emit('request', req, res);
  }
}

cluster 源碼

// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master 進(jìn)程 ${process.pid} 正在運行`);

  for (let i = 0; i < 1; i++) { // 衍生工作進(jìn)程。
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
  http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
  console.log(`Worker 進(jìn)程 ${process.pid} 已啟動`);
}

主進(jìn)程創(chuàng)建子進(jìn)程 cluster fork

createWorkerProcess 通過 child_process fork 創(chuàng)建子進(jìn)程源碼
在主進(jìn)程中 cluster.fork 通過 child_process fork 創(chuàng)建子進(jìn)程

function createWorkerProcess(id, env) {
// ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

在子進(jìn)程中的 http.createServer

當(dāng)在子進(jìn)程通過 createServer 并 listen 端口時,net 模塊會根據(jù) isMaster 來當(dāng)前進(jìn)程是主進(jìn)程直接監(jiān)聽端口,當(dāng)前進(jìn)程是子進(jìn)程則通過 IPC 通道獲取 master 進(jìn)程的服務(wù)器(server)句柄(handle),并監(jiān)聽(listen)它。

http createServer 時 child 通過 IPC 通道獲取 master 的 server.handle 流程

listenInCluster

http.createServer().listen(port) 會執(zhí)行 net 模塊的 Server.prototype.listen 方法 調(diào)用 listenInCluster,在此方法中根據(jù) isMaster 判斷,子進(jìn)程時通過 cluster 模塊 cluster._getServer 方法與 master 建立 IPC 通道獲取 master 中 創(chuàng)建 server 的 handle 并在子進(jìn)程代碼中進(jìn)行 listen。
listenInCluster 源碼

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // 通過 IPC 通道獲取 master server 的 handle 進(jìn)行監(jiān)聽
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    server._handle = handle; // 重用 master handle

    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

子進(jìn)程 cluster._getServer

子進(jìn)程中 _getServer 向 master 通過 IPC 發(fā)送 node 內(nèi)部消息為 act: 'queryServer ' 的通信獲取 master handle
子進(jìn)程 cluster._getServer

// lib/internal/cluster/master.js
// obj 在 http 請求 TCP 連接中是 net 的 Server 實例,UDP 連接是 dgram 的 Socket 實例
cluster._getServer = function(obj, options, cb) {
  ...
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  if (obj._getServerData)
    message.data = obj._getServerData();
  // 向 master 發(fā)送 querServer 消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket. UDP 連接處理方式
    else
      rr(reply, indexesKey, cb);              // Round-robin. TCP 連接 rr 模式
  });

  obj.once('listening', () => {
    ...
  }
}

主進(jìn)程 master 中 queryServer

// master 監(jiān)聽內(nèi)部消息
function onmessage(message, handle) {
  const worker = this;

  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message); // queryServer
  else if (message.act === 'listening')
    listening(worker, message);
  else if (message.act === 'exitedAfterDisconnect')
    exitedAfterDisconnect(worker, message);
  else if (message.act === 'close')
    close(worker, message);
}

queryServer 當(dāng)不存在 RoundRobinHandle 實例時會創(chuàng)建一個, 通過 RoundRobinHandle 原型 add 方法添加 woker 到 實例 this.all 屬性中,用來進(jìn)行 master 對 worker 的負(fù)載均衡策略。

function queryServer(worker, message) {
  ...
  let handle = handles.get(key);
  // 創(chuàng)建 TCP RoundRobinHandle rr 實例, master 邏輯
  if (handle === undefined) {
    ...
    let constructor = RoundRobinHandle;
    handle = new constructor(key, address, message);
    handles.set(key, handle);
  }
  ...
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.
     // handle.add 后去執(zhí)行子進(jìn)程 queryServe 的 cb,告知采用 UDP 或 TCP
    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}

RoundRobinHandle 實例創(chuàng)建

創(chuàng)建 server,重寫 server._handle 句柄的 onconnection 改用輪詢的方式分發(fā)句柄給子進(jìn)程處理

在 master 進(jìn)程中會接收、傳遞請求給 worker 處理,RoundRobinHandle 的作用就是用來對 woker 進(jìn)行分發(fā)、任務(wù)交接、調(diào)度的負(fù)載均衡策略,同時進(jìn)程間共享的 TCP server handle 是在 RoundRobinHandle 實例創(chuàng)建時生成的。

  • RoundRobinHandle 實例創(chuàng)建,重寫 server._handle.onconnection 處理請求
    通過傳入無操作的回調(diào)用 net.createServer 創(chuàng)建 server 并監(jiān)聽端口,通過對 server.once('listening') 的監(jiān)聽重寫 this.server._handle 的 onconnection,當(dāng) server 的 handle 遇到 connection 事件時將會使用 RoundRobinHandle 實例的 distribute 進(jìn)行 handle 的分發(fā)
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new Map(); // 所有的 woker 
  this.free = new Map(); // 空閑可用的 woker
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail); // assert.fail typeof Function, 這里給了個沒用的 onconnection 回調(diào)用來生成 server

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) { // fd: undefined, port: 9000
    this.server.listen({ // 監(jiān)聽 address port, 觸發(fā) listening 事件
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.
  // 在調(diào)用 server.listen() 后綁定服務(wù)器時觸發(fā)。
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改寫 net.Server onconnection
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle 輪詢分配策略

RoundRobinHandle 通過輪詢分配 handle 給 woker 的負(fù)載策略共享 handle 的 socket 解決子進(jìn)程共同監(jiān)聽一個端口處理請求。



最后就回到文章中最開始 server 共享 socket 過程 中觸發(fā) createServer((req, res) => {}) 回調(diào)的內(nèi)容。

參考

源碼分析
cluster-base
cluster 模塊的主要功能實現(xiàn)

egg-cluster 模塊的實現(xiàn)

cluster 模塊是用來處理網(wǎng)絡(luò)連接的多進(jìn)程模塊,egg-cluster 通過 cluster 模塊對 egg 進(jìn)行多進(jìn)程管理的基礎(chǔ)模塊
在 egg-cluster 中:
master 主進(jìn)程類似守護進(jìn)程在后臺執(zhí)行
agent 是由 child_process 模塊 fork 創(chuàng)建,當(dāng) master 退出時會優(yōu)雅的退出 agent 進(jìn)程(防止變?yōu)楣聝哼M(jìn)程被系統(tǒng) init 收養(yǎng) parentId: 0)
woker 是由 cluster 模塊 fork 創(chuàng)建,用來處理 http 請求
可以參考文章 egg-cluster

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容