pomelo源碼分析(6)--connector協(xié)議處理message

作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者

pomelo框架核心提供了sioconnector,udpconnector,hybirdconnector,mqttconnector。sioconnector基于socket.io,使用json通信,pc端通信。hybirdconnector基于tcp和websocket,使用二進制通信,主要用于手機端通信。mqttconnector使用mqtt協(xié)議通信,mqtt是二進制協(xié)議,是物聯(lián)網(wǎng)協(xié)議,這個就是用于嵌入式設(shè)備通信。而udpconnector,這個看名字也知道是基于udp的,它也是使用二進制協(xié)議進行通信。這個主要用于網(wǎng)絡(luò)環(huán)境不好,數(shù)據(jù)包小的場景。

connector按照約定是要提供encode/decode的。sioconnector的encode/decode最簡單。因為它是處理json的。在connector提供encode/decode之外,還可以單獨設(shè)自定義的encode/decode。先看sioconnector,因為它比較簡單。

從decode看起,decode就是json解析。

/**
 * Decode client message package.
 *
 * Package format:
 *   message id: 4bytes big-endian integer
 *   route length: 1byte
 *   route: route length bytes
 *   body: the rest bytes
 *
 * @param  {String} data socket.io package from client
 * @return {Object}      message object
 */
Connector.decode = Connector.prototype.decode = function(msg) {
  var index = 0;

//package ID
  var id = parseIntField(msg, index, PKG_ID_BYTES);
  index += PKG_ID_BYTES;
//route體長
  var routeLen = parseIntField(msg, index, PKG_ROUTE_LENGTH_BYTES);
//route字符串
  var route = msg.substr(PKG_HEAD_BYTES, routeLen);
  var body = msg.substr(PKG_HEAD_BYTES + routeLen);

  return {
    id: id,  
    route: route,
    body: JSON.parse(body) //json包體
  };
};
//取長度
var parseIntField = function(str, offset, len) {
  var res = 0;
  for(var i=0; i<len; i++) {  //big-endian,網(wǎng)絡(luò)字節(jié)序,高位在前
    if(i > 0) {
      res <<= 8;
    }
    res |= str.charCodeAt(offset + i) & 0xff;
  }

  return res;
};

從decode可以看出來,消息格式是有一個package id,一個route,然后就是消息體。消息體是json。而encode稍微復(fù)雜一點。

Connector.encode = Connector.prototype.encode = function(reqId, route, msg) {
  if(reqId) { //有reqId,這個序號是客戶端編的
    return composeResponse(reqId, route, msg);
  } else { //沒有就是廣播
    return composePush(route, msg);
  }
};

//注意這個地方,route被忽略了
var composeResponse = function(msgId, route, msgBody) {
  return {
    id: msgId, //reqId,請求包序號
    body: msgBody // 回復(fù)消息體
  };
};

var composePush = function(route, msgBody) {
  return JSON.stringify({route: route, body: msgBody});
};

sioconnector.js的協(xié)議處理是非常簡單的。字段也很少,但是body里面可能就千變?nèi)f化了,這個是業(yè)務(wù)相關(guān)的。相信寫過稍大一點項目的都很清楚,有的模塊甚至有幾百個命令,幾百個命令就會產(chǎn)生幾百種body。

下面再分析一下hybirdconnector.js。到了這里就要正式講一下pomelo的消息格式了,pomelo的消息分為兩層,package和message。 以下引用原文:“pomelo的二進制協(xié)議包含兩層編碼:package和message。message層主要實現(xiàn)route壓縮和protobuf壓縮,message層的編碼結(jié)果將傳遞給package層。package層主要實現(xiàn)pomelo應(yīng)用基于二進制協(xié)議的握手過程,心跳和數(shù)據(jù)傳輸編碼,package層的編碼結(jié)果可以通過tcp,websocket等協(xié)議以二進制數(shù)據(jù)的形式進行傳輸。message層編碼可選,也可替換成其他二進制編碼格式,都不影響package層編碼和發(fā)送?!?/p>

package格式

package分為header和body兩部分。header描述package包的類型和包的長度,body則是需要傳輸?shù)臄?shù)據(jù)內(nèi)容。具體格式如下:

type - package類型,1個byte,取值如下。
0x01: 客戶端到服務(wù)器的握手請求以及服務(wù)器到客戶端的握手響應(yīng)
0x02: 客戶端到服務(wù)器的握手ack
0x03: 心跳包
0x04: 數(shù)據(jù)包
0x05: 服務(wù)器主動斷開連接通知
length - body內(nèi)容長度,3個byte的大端整數(shù),因此最大的包長度為2^24個byte。
body - 二進制的傳輸內(nèi)容。

message協(xié)議的主要作用是封裝消息頭,包括route和消息類型兩部分,不同的消息類型有著不同的消息頭,在消息頭里面可能要打入message id(即requestId)和route信息。由于可能會有route壓縮,而且對于服務(wù)端push的消息,message id為空,對于客戶端請求的響應(yīng),route為空,因此message的頭格式比較復(fù)雜。
消息頭分為三部分,flag,message id,route。
pomelo消息頭是可變的,會根據(jù)具體的消息類型和內(nèi)容而改變。其中:
flag位是必須的,占用一個byte,它決定了后面的消息類型和內(nèi)容的格式;
message id和route則是可選的。其中message id采用[varints 128變長編碼](https://developers.google.com/protocol-buffers/docs/encoding#varints)方式,根據(jù)值的大小,長度在0~5byte之間。route則根據(jù)消息類型以及內(nèi)容的大小,長度在0~255byte之間。

從這段文字的描述可以看出來,我們剛才對sioconnector.js中encode和decode的分析都是基于message的,package部分的沒有涉及到。

本篇暫時不講package部分,聚集點在于message部分。因為一發(fā)散的話,就沒有重點了。

hybirdconnector.js對于encode和decode的處理是,寫了一個coder.js作為抽象。

var coder = require('./common/coder');

Connector.decode = Connector.prototype.decode = coder.decode;

Connector.encode = Connector.prototype.encode = coder.encode;

可以看到encode和decode獨立出去了,做了一個單獨的抽象,這樣提高了復(fù)用性和擴展性。

coder.js

//這是pomelo的另一個開源組件
var Message = require('pomelo-protocol').Message;
var Constants = require('../../util/constants');
//pomelo-logger也是另一個組件,不在核心模塊里
var logger = require('pomelo-logger').getLogger('pomelo', __filename);

//encode函數(shù)
var encode = function(reqId, route, msg) {
  if(!!reqId) {
    return composeResponse(this, reqId, route, msg);
  } else {
    return composePush(this, route, msg);
  }
};

//decode函數(shù)
var decode = function(msg) {
  msg = Message.decode(msg.body);
  var route = msg.route;

  // decode use dictionary
  if(!!msg.compressRoute) {
    if(!!this.connector.useDict) {
      var abbrs = this.dictionary.getAbbrs();
      if(!abbrs[route]) {
        logger.error('dictionary error! no abbrs for route : %s', route);
        return null;
      }
      route = msg.route = abbrs[route];
    } else {
      logger.error('fail to uncompress route code for msg: %j, server not enable dictionary.', msg);
      return null;
    }
  }

  // decode use protobuf,protobuf協(xié)議解碼
  if(!!this.protobuf && !!this.protobuf.getProtos().client[route]) {
    msg.body = this.protobuf.decode(route, msg.body);
  } else if(!!this.decodeIO_protobuf && !!this.decodeIO_protobuf.check(Constants.RESERVED.CLIENT, route)) {
    msg.body = this.decodeIO_protobuf.decode(route, msg.body);
  } else {
    try {
      msg.body = JSON.parse(msg.body.toString('utf8'));
    } catch (ex) {
      msg.body = {};
    }
  }

  return msg;
};

var composeResponse = function(server, msgId, route, msgBody) {
  if(!msgId || !route || !msgBody) {
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  return Message.encode(msgId, Message.TYPE_RESPONSE, 0, null, msgBody);
};

var composePush = function(server, route, msgBody) {
  if(!route || !msgBody){
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  // encode use dictionary
  var compressRoute = 0;
  if(!!server.dictionary) {
    var dict = server.dictionary.getDict();
    if(!!server.connector.useDict && !!dict[route]) {
      route = dict[route];
      compressRoute = 1;
    }
  }
  return Message.encode(0, Message.TYPE_PUSH, compressRoute, route, msgBody);
};

var encodeBody = function(server, route, msgBody) {
    // encode use protobuf
  if(!!server.protobuf && !!server.protobuf.getProtos().server[route]) {
    msgBody = server.protobuf.encode(route, msgBody);
  } else if(!!server.decodeIO_protobuf && !!server.decodeIO_protobuf.check(Constants.RESERVED.SERVER, route)) {
     msgBody = server.decodeIO_protobuf.encode(route, msgBody);
  } else { //兼容json
    msgBody = new Buffer(JSON.stringify(msgBody), 'utf8');
  }
  return msgBody;
};

module.exports = {
  encode: encode,
  decode: decode
};

對于coder.js中的encode和decode,里面調(diào)用的函數(shù)名和sioconnector.js中都是一致的。所不同的是對于body的處理,json的話直接用JSON相關(guān)的函數(shù)就可以了。從coder.js文件來看,所謂的二進制實際上是用的protobuf,不支持其它的二進制協(xié)議。代碼是比較清晰的,就不再對代碼做太多解釋了。

最后補充說明,協(xié)議這種東西,最好不要自定義二進制協(xié)議,更不要自定義類似query string那種文本協(xié)議。自定義二進制協(xié)議一是調(diào)試非常的麻煩,二是要做協(xié)議轉(zhuǎn)換的時候,開發(fā)速度慢,出錯率高,工作量大,自定義二進制協(xié)議少有能直接DSL生成轉(zhuǎn)換代碼的。最好的方案目前看到的也是用lua去映射,然后寫一段通用代碼去轉(zhuǎn)換。而類query string的文本協(xié)議就更痛苦了,長的就是像這樣子a=b&c=d。這種協(xié)議第一,要做編碼轉(zhuǎn)換,特殊字符轉(zhuǎn)換。二,這種表示是一維的,key-value形式,也就是一個map轉(zhuǎn)成了數(shù)組。它的擴展性非常地差,嵌套表達能力基本為0,因為嵌套表達就需要新增分隔符,多層嵌套以后,調(diào)協(xié)議會成為開發(fā)之間的導(dǎo)火索。同時作為文本協(xié)議,它的體積很大,無法壓縮,也不能直觀地格式化。非要用文本協(xié)議,直接用json就好了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容