pomelo-rpc原理解析之server

原文pomelo-rpc原理解析之server

pomelo-rpc是pomelo項(xiàng)目底層的rpc框架,提供了一個(gè)多服務(wù)器進(jìn)程間進(jìn)行rpc調(diào)用的基礎(chǔ)設(shè)施。 pomelo-rpc分為客戶端和服務(wù)器端兩個(gè)部分。 客戶端部分提供了rpc代理生成,消息路由和網(wǎng)絡(luò)通訊等功能,并支持動(dòng)態(tài)添加代理和遠(yuǎn)程服務(wù)器配置。 服務(wù)器端提供了遠(yuǎn)程服務(wù)暴露,請(qǐng)求派發(fā),網(wǎng)絡(luò)通訊等功能。

本文主要分析pomelo-rpc中server部分的實(shí)現(xiàn)原理以及運(yùn)行邏輯。

server初始化

Server.create(opts)

創(chuàng)建一個(gè)rpc server實(shí)例。根據(jù)配置信息加載遠(yuǎn)程服務(wù)代碼,并生成底層acceptor。
首先看create部分源碼:

module.exports.create = function(opts) {
  if(!opts || !opts.port || opts.port < 0 || !opts.paths) {
    throw new Error('opts.port or opts.paths invalid.');
  }
  // 根據(jù)paths加載遠(yuǎn)程服務(wù)
  var services = loadRemoteServices(opts.paths, opts.context);
  opts.services = services;
  var gateway = Gateway.create(opts);
  return gateway;
};

首先loadRemoteServices()方法根據(jù)opts參數(shù)中的paths加載遠(yuǎn)程服務(wù)。
其中pomelo paths的格式類似,pomelo根據(jù)約定封裝,詳見
https://github.com/NetEase/pomelo/blob/master/lib/components/remote.js

[
  {
    "namespace": "user",
    "serverType": "test",
    "path": "/data/pomelo/app/servers/test/remote/"
  },
  {
    "namespace": "sys",
    "serverType": "test",
    "path": "/data/pomelo/pomelo/lib/common/remote/backend/"
  }
]
var loadRemoteServices = function(paths, context) {
  var res = {}, item, m;
  for(var i=0, l=paths.length; i<l; i++) {
    item = paths[i];
    // Loader是pomelo-loader,用來加載pomelo handler和remote服務(wù)
    // 關(guān)于Loader的細(xì)節(jié)可以參考https://github.com/NetEase/pomelo-loader/
    m = Loader.load(item.path, context);

    if(m) {
      createNamespace(item.namespace, res);
      for(var s in m) {
        res[item.namespace][s] = m[s];
      }
    }
  }

  return res;
};

var createNamespace = function(namespace, proxies) {
  proxies[namespace] = proxies[namespace] || {};
};

loadRemoteServices()最終得到的services對(duì)象類似如下結(jié)構(gòu)的數(shù)據(jù):

{
  "user": {
    "testRemote": require("/path/to/testRemote.js"),
    "test2Remote": require("/path/to/test2Remote.js"),
    ...
  },
  "sys": {
    "msgRemote": require("/data/pomelo/pomelo/lib/common/remote/backend/msgRemote.js")
  }
}

Gateway.create(opts)

創(chuàng)建gateway對(duì)象,并初始化dispatcher和acceptor。
gateway的構(gòu)造方法:

var Gateway = function(opts) {
  EventEmitter.call(this);
  this.opts = opts || {};
  this.port = opts.port || 3050;
  this.started = false;
  this.stoped = false;
  this.services = opts.services;
  if(!!this.opts.reloadRemotes) {
    // 如果remote配置reloadRemotes=true的話,gateway會(huì)通過`fs.watch()`來檢測(cè)remote文件的變化,
    // 然后通過pomelo-loader重新require對(duì)應(yīng)的remote文件來實(shí)現(xiàn)remote的熱更新。
    watchServices(this, dispatcher);
  }
  var self = this;

  this.acceptors = {};
  // __defineGetter__的用法可以參考
  // https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Object/__defineGetter__
  this.acceptors.__defineGetter__('tcp', utils.load.bind(null, '../rpc-server/acceptors/tcp-acceptor'));
  this.acceptors.__defineGetter__('ws', utils.load.bind(null,'../rpc-server/acceptors/ws-acceptor'));

  if(!!opts.acceptorName && opts.acceptorName === 'ws') {
    this.acceptorFactory = this.acceptors.ws;
  } else {
    // 默認(rèn)是使用tcp協(xié)議的acceptors
    this.acceptorFactory = this.acceptors.tcp;
  }

  if(!!opts.acceptorFactory) {
    this.acceptorFactory = opts.acceptorFactory;
  }
  // Dispatcher初始化,沒什么好看的
  var dispatcher = new Dispatcher(this.services);
  // acceptorFactory.create初始化acceptor,也沒什么好看的
  this.acceptor = this.acceptorFactory.create(opts, function(tracer, msg, cb) {
    dispatcher.route(tracer, msg, cb);
  });
};

Dispatcher.route(tracer, msg, cb)

提供路由服務(wù),路由消息到對(duì)應(yīng)的service。

/**
 * route the msg to appropriate service object
 *
 * @param msg msg package {service:serviceString, method:methodString, args:[]}
 * @param services services object collection, such as {service1: serviceObj1, service2: serviceObj2}
 * @param cb(...) callback function that should be invoked as soon as the rpc finished
 */
pro.route = function(tracer, msg, cb) {
  tracer.info('server', __filename, 'route', 'route messsage to appropriate service object');
  // this.services 就是loadRemoteServices()加載的本地remote service
  var namespace = this.services[msg.namespace];
  if(!namespace) {
    tracer.error('server', __filename, 'route', 'no such namespace:' + msg.namespace);
    utils.invokeCallback(cb, new Error('no such namespace:' + msg.namespace));
    return;
  }

  var service = namespace[msg.service];
  if(!service) {
    tracer.error('server', __filename, 'route', 'no such service:' + msg.service);
    utils.invokeCallback(cb, new Error('no such service:' + msg.service));
    return;
  }

  var method = service[msg.method];
  if(!method) {
    tracer.error('server', __filename, 'route', 'no such method:' + msg.method);
    utils.invokeCallback(cb, new Error('no such method:' + msg.method));
    return;
  }

  var args = msg.args.slice(0);
  args.push(cb);
  // 調(diào)用remote service方法
  method.apply(service, args);
};

至此,pomelo-rpc的server初始化工作就完成了,接下來就是啟動(dòng)server。

server啟動(dòng)

Gateway.start()

pro.start = function() {
  if(this.started) {
    throw new Error('gateway already start.');
  }
  this.started = true;

  var self = this;
  this.acceptor.on('error', self.emit.bind(self, 'error'));
  this.acceptor.on('closed', self.emit.bind(self, 'closed'));
  // 啟動(dòng)acceptor并監(jiān)聽port端口
  this.acceptor.listen(this.port);
};

Acceptor.listen(port)

啟動(dòng)acceptor并監(jiān)聽port端口,這里以tcp acceptor為例。

pro.listen = function(port) {
  //check status
  if(!!this.inited) {
    utils.invokeCallback(this.cb, new Error('already inited.'));
    return;
  }
  this.inited = true;

  var self = this;

  this.server = net.createServer();
  this.server.listen(port);

  this.server.on('error', function(err) {
    logger.error('rpc server is error: %j', err.stack);
    self.emit('error', err, this);
  });
  // 處理鏈接請(qǐng)求
  this.server.on('connection', function(socket) {
    // 設(shè)置socket自增id
    socket.id = self.socketId++;
    // 保存socket句柄到本地sockets
    self.sockets[socket.id] = socket;
    // 設(shè)置socket的消息編解碼處理器,處理各種類型的消息(ping,pong,msg等)
    // pkgSize默認(rèn)-1,不限制消息長(zhǎng)度
    socket.composer = new Composer({maxLength: self.pkgSize});
    // 心跳超時(shí)檢測(cè)timer
    self.timer[socket.id] = null;
    // 啟動(dòng)心跳檢測(cè),如果heartbeat timeout間隔內(nèi)沒有心跳包過來,就斷開連接。
    // 下面socket.composer.on('data' ...中的self.heartbeat才是真正連接后的心跳
    // 相當(dāng)于這里只是處理初始連接時(shí)的心跳
    self.heartbeat(socket.id);

    socket.on('data', function(data) {
      // 調(diào)用composer解析數(shù)據(jù)流
      // feed讀完數(shù)據(jù)后會(huì)emit data事件
      socket.composer.feed(data);
    });
    // 接收feed emit的data事件
    socket.composer.on('data', function(data) {
      self.heartbeat(socket.id);
      if(data[0] === PING) {
        //incoming::ping,response with PONG
        socket.write(socket.composer.compose(PONG));
      } else {
        try {
          var pkg = JSON.parse(data.toString('utf-8', 1));
          var id  = null;
          // 處理消息
          if(pkg instanceof Array) {
            processMsgs(socket, self, pkg, id);
          } else {
            processMsg(socket, self, pkg, id);
          }
        } catch(err) { //json parse exception
          if(err) {
            // 重置編解器狀態(tài)
            socket.composer.reset();
            logger.error(err);
          }
        }
      }
    });

    socket.on('error', function(err) {
      logger.error('[pomelo-rpc] tcp socket error: %j', err);
    });

    socket.on('close', function() {
      logger.error('[pomelo-rpc] tcp socket close: %s', socket.id);
      delete self.sockets[socket.id];
      delete self.msgQueues[socket.id];
      if(self.timer[socket.id]){
        clearTimeout(self.timer[socket.id]);
      }
      delete self.timer[socket.id];
    });
  });
  // 定時(shí)flush緩存的消息數(shù)據(jù)
  if(this.bufferMsg) {
    this._interval = setInterval(function() {
      flush(self);
    }, this.interval);
  }
};

processMsg()

處理rpc消息。實(shí)際就是調(diào)用dispatcher.route()來根據(jù)namespace和service來調(diào)用對(duì)應(yīng)的remote method。

var processMsg = function(socket, acceptor, pkg, id) {
  var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
  tracer.info('server', __filename, 'processMsg', 'tcp-acceptor receive message and try to process message');
  // 實(shí)際就是調(diào)用dispatcher.route()
  acceptor.cb.call(null, tracer, pkg.msg, function() {
    var args = Array.prototype.slice.call(arguments, 0);
    for(var i=0, l=args.length; i<l; i++) {
      if(args[i] instanceof Error) {
        args[i] = cloneError(args[i]);
      }
    }
    var resp;
    if(tracer.isEnabled) {
      resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
    }
    else {
      resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
    }
    if(acceptor.bufferMsg) {
      // 如果開啟緩沖消息則將結(jié)果緩存到隊(duì)列,由_interval定時(shí)flush,用以減少網(wǎng)絡(luò)io次數(shù)
      enqueue(socket, acceptor, resp);
    } else {
      // 發(fā)送封裝remote方法的返回結(jié)果
      socket.write(socket.composer.compose(RES_TYPE, JSON.stringify(resp), id));
    }
  });
};

總結(jié)

通過對(duì)pomelo-rpc server部分代碼的分析,可以很清晰了解到server端主要作用就是暴露遠(yuǎn)程服務(wù)(remote目錄下的.js文件)、根據(jù)消息的namespace和service信息派發(fā)到對(duì)應(yīng)的remote服務(wù)處理、基于tcp/ws來提供底層的網(wǎng)絡(luò)通訊。

在日常開發(fā)中,新手很容易遇到rpc調(diào)用超時(shí)的情況,一般來看都是因?yàn)槟承﹔emote方法沒有正確回調(diào)或者根本漏寫了回調(diào)。

相關(guān)文章:pomelo-rpc原理解析之client

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,545評(píng)論 19 139
  • 原文pomelo-rpc原理解析之client pomelo-rpc是pomelo項(xiàng)目底層的rpc框架,提供了一個(gè)...
    kikoroc閱讀 4,461評(píng)論 0 2
  • 今天分布式應(yīng)用、云計(jì)算、微服務(wù)大行其道,作為其技術(shù)基石之一的 RPC 你了解多少?一篇 RPC 的技術(shù)總結(jié)文章,數(shù)...
    零一間閱讀 1,959評(píng)論 1 46
  • 有些說不出來的委屈才是真的委屈 不知道前方的路有多長(zhǎng)、有多遠(yuǎn),不知道這樣的日子能堅(jiān)持多久
    砸扁回憶閱讀 202評(píng)論 0 0
  • 已經(jīng)沒上班倆個(gè)月了,爛在家裡倆個(gè)月了,今天靜極思動(dòng),覺得七夕雖我是單身狗,但是不出去逛逛不合適,就跑去塞納河邊逛逛...
    長(zhǎng)安丶閱讀 1,244評(píng)論 0 1

友情鏈接更多精彩內(nèi)容