引言
對于序列化概念,如果是學習過Java的人,相信一定不會陌生,序列化就是將對象的數(shù)據(jù)、狀態(tài)轉(zhuǎn)換成能夠存儲或者傳輸?shù)倪^程。目前常用的有Json、Protobuf、Thrift等。然而,skynet對于服務(wù)之間的通訊,數(shù)據(jù)序列化采用的是 skynet.pack,反序列化 skynet.unpack。
skynet.pack和skynet.unpack
那么skynet.pack 是以什么方式來序列化的呢?我們可以通過 skynet.lua 這個文件里面看到,skynet.pack 其實是指向 c.pack,其中的 c 就是 c 語言層的調(diào)用。 那么這個 c 又是由哪個文件提供的接口呢, 通過 lualib-src/lua-skynet.c 文件,我們看到了 pack 接口對應(yīng)于 lualib-src/lua-seri.c 的 luaseri_pack 函數(shù)。好了,現(xiàn)在我們終于知道了 skynet.pack 是由 luaseri_pack 實現(xiàn)的。
對于 luaseri_pack 實現(xiàn)序列化的思路也比較簡單。就是 對要進行序列化的數(shù)據(jù)先一個一個取出來,根據(jù)每個數(shù)據(jù)的類型type,將其寫到一個連續(xù)的內(nèi)存塊中。而 luaseri_unpack 函數(shù)就是對其反序列化,將內(nèi)存塊中的數(shù)據(jù)按照類型type依次壓人lua 棧中,最后將數(shù)據(jù)返回給 lua 層,這樣就實現(xiàn)了一次序列化和反序列化操作。skynet 服務(wù)與服務(wù)之間的消息傳遞,也是要經(jīng)過 skynet.pack 序列化和 skynet.unpack 反序列化。這個序列化過程與 protobuf 類似,每個 lua 類型存儲格式如下:
- nil 類型(TYPE_NIL:0):

- boolean 類型(TYPE_BOOLEAN:1):

- string類型:
- 短string類型(TYPE_SHORT_STRING:4):

2. 長string類型(TYPE_LONG_STRING:4):

-
number類型(TYPE_NUMBER:2):
- 值:0

- 值為8個字節(jié)

3. 值為負數(shù)

4. 值小于2個字節(jié)

5. 值小于3個字節(jié)

6. 值為其他情況

- 浮點數(shù)類型(TYPE_NUMBER_REAL: 8):

- 用戶自定義類型(TYPE_USERDATA: 3):

-
table類型(TYPE_TABLE:6):
-
數(shù)組類型
圖13 array_size 會采用之前講到的 number 類型來存儲,所以存幾個字節(jié)要按array_size大小決定 -
key-value類型
圖14 加上 1個字節(jié)的 nil 類型標識結(jié)束
-
通過以上的分析,大概知道了緩存區(qū)域是怎么存儲 lua 的各種數(shù)據(jù)類型。但緩存區(qū)在初始化時應(yīng)該分配多大好呢,我們可以從源代碼中看到,緩存區(qū)結(jié)構(gòu)體 buffer 域只有 128 byte大小,那么在數(shù)據(jù)過大時,buffer 勢必會不夠存儲,它又應(yīng)該如何處理呢。我們接著看下一個結(jié)構(gòu)體 struct write_block ,它的head 域和 current 域都指向了 struct block,可以猜測出,head 應(yīng)該是一個鏈表的頭節(jié)點,current 應(yīng)該是指向當前要寫入鏈表的哪一塊 block。它會先通過 malloc 申請一塊內(nèi)存出來。如果超過了 128,那么就會再申請一塊內(nèi)存,current就指向新的內(nèi)存塊,然后繼續(xù)往里面寫數(shù)據(jù)。再寫完了之后,它不是返回這個 head 指針給 lua 調(diào)用者, 而是再進行一次復(fù)制操作。將鏈表里的所有數(shù)據(jù)寫到一塊新的緩沖區(qū) newbuffer 中。申請緩沖區(qū)的大小 sz 可以根據(jù) struct write_block 結(jié)構(gòu)體中的 len 域獲?。ㄔ诿看螌憯?shù)據(jù)時,這個 len 就記錄了數(shù)據(jù)的總長度)。最終返回的是這個新的內(nèi)存塊 newbuffer 及大小 sz。
至于為什么還要重新復(fù)制一次,沒有直接返回 head 指針給 lua 層調(diào)用者,然后根據(jù) head 指向的鏈表來反序列化呢。我想主要是為了集群等其他模塊的需要。比如說集群,你不可能通過 socket 發(fā)送一個鏈表給對方吧,所以只返回一個內(nèi)存塊地址和大小,可以為其他模塊減少不必要的麻煩。
對于 number 和 string 的序列化也做到了盡可能的節(jié)省內(nèi)存,如果你在 lua 層對一個number變量賦值0,那么它在序列化時,只用了一個字節(jié)的 type 來標識。沒有造成內(nèi)存塊的過多浪費??梢哉f,這個思想,值得我們學習。

#define BLOCK_SIZE 128
//對應(yīng)于圖15 的一塊內(nèi)存
struct block {
struct block * next; //指向下一個內(nèi)存塊
char buffer[BLOCK_SIZE];
};
struct write_block {
struct block * head;
struct block * current;
int len;
int ptr;
};
struct read_block {
char * buffer;
int len;
int ptr;
};
至于反序列化部分,就簡單了,用 struct read_block 中的 buffer 域指向 newbuffer,len 指向 sz,然后先從buffer指向的內(nèi)存中取出一個字節(jié),這個字節(jié)就是type, 根據(jù) type 類型讀取值(有值的情況下),將其壓人lua 棧中,如此反復(fù),直到讀取完,最后返回給 lua 層,這樣就完成了一次反序列化操作了。
如果還是不怎么懂,那接下來再看看代碼是如何實現(xiàn)的吧,為了簡單點,就以序列化 一個 字符串為例吧。
local msg, sz = skynet.pack("hello")
skynet.unpack(msg, sz)
根據(jù)圖3,我們可以畫出hello在內(nèi)存塊中的簡單存儲結(jié)構(gòu)。

接下來再看看代碼的實現(xiàn),skynet.pack 的調(diào)用最終會進入 c 層:
下面引用到的代碼都在 lualib-src/lua-seri.c 文件中。
LUAMOD_API int
luaseri_pack(lua_State *L) {
struct block temp;
temp.next = NULL;
struct write_block wb;
wb_init(&wb, &temp); //初始化結(jié)構(gòu)體 wb
pack_from(L,&wb,0); //開始序列化
assert(wb.head == &temp);
seri(L, &temp, wb.len); //將head 指向的鏈表重新放到一個緩沖區(qū)中,并返回,加上大小sz
wb_free(&wb);
return 2;
}
再看看 pack_from 的實現(xiàn):
static void
pack_from(lua_State *L, struct write_block *b, int from) {
int n = lua_gettop(L) - from; //獲取要序列化的個數(shù),目前只有 hello 一個數(shù)據(jù),所以 n 為 1
int i;
for (i=1;i<=n;i++) {
pack_one(L, b , from + i, 0); // 對 "hello" 數(shù)據(jù)進行序列化
}
}
那么 pack_one 又做了哪些事呢
static void
pack_one(lua_State *L, struct write_block *b, int index, int depth) {
...
int type = lua_type(L,index); // 根據(jù) index 獲取數(shù)據(jù)類型,按照我之前的設(shè)定,只有一個數(shù)據(jù),index 為 0
switch(type) {
case LUA_TNIL:
...
case LUA_TNUMBER: {
...
}
case LUA_TBOOLEAN:
...
case LUA_TSTRING: { // 由于 "hello" 是字符串類型,所以來到這里,如果是 lua 層判斷數(shù)據(jù)類型,應(yīng)該是用 type(data) == "string"
size_t sz = 0;
const char *str = lua_tolstring(L,index,&sz);
wb_string(b, str, (int)sz);
break;
}
case LUA_TLIGHTUSERDATA:
...
case LUA_TTABLE: {
...
}
default:
...
}
}
到了這里,大家應(yīng)該可以看出,序列化 lua 數(shù)據(jù)都是根據(jù)其數(shù)據(jù)類型,依依寫入到buffer緩沖區(qū)當中。再接著看看 wb_string 的實現(xiàn)吧。
static inline void
wb_string(struct write_block *wb, const char *str, int len) {
if (len < MAX_COOKIE) { //這里由于 "hello" 字符串長度不會超過 MAX_COOKIE(32),所以代碼會執(zhí)行到 if 里面
// TYPE_SHORT_STRING | len << 3,一個字節(jié) 8 bit,由于 len 小于 MAX_COOKIE,左移 3 位不會有數(shù)據(jù)溢出情況,TYPE_SHORT_STRING 就保留在低 3 位中
uint8_t n = COMBINE_TYPE(TYPE_SHORT_STRING, len);
//這里就是通過 wb_push 這個函數(shù), 將 type 和 len 一起寫入到緩沖區(qū)鏈表中,只寫入 1 個字節(jié)
//由于 len 小于 MAX_COOKIE,所以一個字節(jié)足夠存儲 type 和 len 內(nèi)容
wb_push(wb, &n, 1);
if (len > 0) {
//*** 這里就是 將 "hello" 寫入到緩沖區(qū)鏈表中,只寫入 len 個字節(jié),跟我們之前畫的數(shù)據(jù)存入buffer 圖相對應(yīng)
wb_push(wb, str, len);
}
} else {
uint8_t n;
if (len < 0x10000) {
n = COMBINE_TYPE(TYPE_LONG_STRING, 2);
wb_push(wb, &n, 1);
uint16_t x = (uint16_t) len;
wb_push(wb, &x, 2);
} else {
n = COMBINE_TYPE(TYPE_LONG_STRING, 4);
wb_push(wb, &n, 1);
uint32_t x = (uint32_t) len;
wb_push(wb, &x, 4);
}
wb_push(wb, str, len);
}
}
接著就是 wb_push 的實現(xiàn)了
inline static void
wb_push(struct write_block *b, const void *buf, int sz) {
const char * buffer = buf; // buf 是 void* 類型,因為buf 可能指向的是int 類型地址、char類型地址等,所以只用這個任意類型指針了
if (b->ptr == BLOCK_SIZE) {
_again:
b->current = b->current->next = blk_alloc(); //重新申請一塊內(nèi)存
b->ptr = 0; //指針偏移的地方,每次要寫入數(shù)據(jù)時,就是根據(jù)它來確定寫入的起始地址
}
if (b->ptr <= BLOCK_SIZE - sz) { // 當要寫入內(nèi)容的長度還足夠時,不超過這個內(nèi)存塊大小時,直接復(fù)制數(shù)據(jù),保存到 b->current 指向的內(nèi)存塊中
memcpy(b->current->buffer + b->ptr, buffer, sz);
b->ptr+=sz; //指向的位置偏移
b->len+=sz; // len 總大小要加上 sz
} else {
// 來到這里,表明一塊內(nèi)存 128 k不夠用了。但我們可以先把 buf 部分內(nèi)容寫入到這個內(nèi)存塊中,不夠存的那部分就留到下一個新的內(nèi)存塊中。
//也就是說,這次要寫入的數(shù)據(jù)分兩次或多次寫,先把一部分寫到當前的內(nèi)存中,剩下的部分寫到下一塊內(nèi)存中。
int copy = BLOCK_SIZE - b->ptr;
memcpy(b->current->buffer + b->ptr, buffer, copy);
buffer += copy;
b->len += copy;
sz -= copy;
goto _again; // 這里就是跳到前面的 _again: 中,重新申請內(nèi)存塊,繼續(xù)寫入剩余的數(shù)據(jù)。
}
}
好了,到這里,"hello",這個字符串也就寫完了,再看看它是如何返回給 lua 層的吧。
static void
seri(lua_State *L, struct block *b, int len) {
uint8_t * buffer = skynet_malloc(len);
uint8_t * ptr = buffer;
int sz = len;
while(len>0) {
if (len >= BLOCK_SIZE) {
memcpy(ptr, b->buffer, BLOCK_SIZE);
ptr += BLOCK_SIZE;
len -= BLOCK_SIZE;
b = b->next;
} else {
memcpy(ptr, b->buffer, len);
break;
}
}
lua_pushlightuserdata(L, buffer); // 為了好區(qū)分buffer是指哪個,我暫時將這個叫 new_buffer
lua_pushinteger(L, sz); //返回所有數(shù)據(jù)的總大小
}
還記得 luaseri_pack(lua_State *L) 函數(shù)里面有個 seri(L, &temp, wb.len); 調(diào)用嗎,這里就是將整個鏈表重新復(fù)制一份,放到 new_buffer 中,最后和 sz 一起返回給 lua 層。
我們再看看反序列化 skynet.unpack 的接口調(diào)用:
local msg, sz = skynet.pack("hello")
skynet.unpack(msg, sz)
到了這里,反序列化需要調(diào)用的 c 層接口 luaseri_unpack 。
int
luaseri_unpack(lua_State *L) {
...
void * buffer;
int len;
...
buffer = lua_touserdata(L,1);
len = luaL_checkinteger(L,2);
...
lua_settop(L,1);
struct read_block rb;
rball_init(&rb, buffer, len); //初始化 rb,讓 rb 的 buffer 指向這個 new_buffer,rb 的 len 指向這個 sz
int i;
for (i=0;;i++) {
if (i%8==7) {
luaL_checkstack(L,LUA_MINSTACK,NULL);
}
uint8_t type = 0;
uint8_t *t = rb_read(&rb, sizeof(type)); //這個就是先讀取數(shù)據(jù)類型type,1個字節(jié)(uint8_t大小占一個字節(jié))
if (t==NULL) //如果讀取不到,證明已經(jīng)讀取完所有的數(shù)據(jù)了,可以跳出循環(huán),返回了
break;
type = *t;
push_value(L, &rb, type & 0x7, type>>3); //這里就是讀取數(shù)據(jù)的總?cè)肟诤瘮?shù),讀完數(shù)據(jù),就將其壓人 lua 棧中
}
// Need not free buffer 這個意思是 unpack 的調(diào)用不用釋放內(nèi)存,至于內(nèi)存的釋放,主要放到
// skynet_server.c 的 dispatch_message 函數(shù)中釋放
/*
if (!reserve_msg) {
skynet_free(msg->data);
}
*/
return lua_gettop(L) - 1;
}
接著,再看看 push_value 的實現(xiàn)
static void
push_value(lua_State *L, struct read_block *rb, int type, int cookie) {
switch(type) {
case TYPE_NIL:
...
case TYPE_BOOLEAN:
...
case TYPE_NUMBER:
...
case TYPE_USERDATA:
...
case TYPE_SHORT_STRING: //到這里面取出 buffer 的數(shù)據(jù)
get_buffer(L,rb,cookie);
break;
case TYPE_LONG_STRING: {
...
}
case TYPE_TABLE: {
...
}
default: {
...
}
}
}
其中,get_buffer 的實現(xiàn)也比較簡單,就是根據(jù) len 長度,從 buffer 中讀取數(shù)據(jù)。并將其壓入 lua 棧中,返回給 lua 層的調(diào)用者。
static void *
rb_read(struct read_block *rb, int sz) {
if (rb->len < sz) {
return NULL;
}
int ptr = rb->ptr;
rb->ptr += sz;
rb->len -= sz;
return rb->buffer + ptr;
}
static void
get_buffer(lua_State *L, struct read_block *rb, int len) {
char * p = rb_read(rb,len); // 根據(jù) len 長度,從 rb 的 buffer 域中讀取數(shù)據(jù)
if (p == NULL) {
invalid_stream(L,rb); // 讀數(shù)據(jù)出現(xiàn)異常,這是異常錯誤處理函數(shù),可以先不理會
}
lua_pushlstring(L,p,len); //返回數(shù)據(jù)給 lua 層,到了這里,就可以在 lua 層獲取到 "hello" 字符串了,反序列化結(jié)束了
}
通過一個簡單的例子,我們可以看出,skynet.pack 和 skynet.unpack 的實現(xiàn)過程。

