skynet.pack序列化學習

引言

對于序列化概念,如果是學習過Java的人,相信一定不會陌生,序列化就是將對象的數(shù)據(jù)、狀態(tài)轉(zhuǎn)換成能夠存儲或者傳輸?shù)倪^程。目前常用的有Json、Protobuf、Thrift等。然而,skynet對于服務(wù)之間的通訊,數(shù)據(jù)序列化采用的是 skynet.pack,反序列化 skynet.unpack。

skynet.pack和skynet.unpack

那么skynet.pack 是以什么方式來序列化的呢?我們可以通過 skynet.lua 這個文件里面看到,skynet.pack 其實是指向 c.pack,其中的 c 就是 c 語言層的調(diào)用。 那么這個 c 又是由哪個文件提供的接口呢, 通過 lualib-src/lua-skynet.c 文件,我們看到了 pack 接口對應(yīng)于 lualib-src/lua-seri.c 的 luaseri_pack 函數(shù)。好了,現(xiàn)在我們終于知道了 skynet.pack 是由 luaseri_pack 實現(xiàn)的。

對于 luaseri_pack 實現(xiàn)序列化的思路也比較簡單。就是 對要進行序列化的數(shù)據(jù)先一個一個取出來,根據(jù)每個數(shù)據(jù)的類型type,將其寫到一個連續(xù)的內(nèi)存塊中。而 luaseri_unpack 函數(shù)就是對其反序列化,將內(nèi)存塊中的數(shù)據(jù)按照類型type依次壓人lua 棧中,最后將數(shù)據(jù)返回給 lua 層,這樣就實現(xiàn)了一次序列化和反序列化操作。skynet 服務(wù)與服務(wù)之間的消息傳遞,也是要經(jīng)過 skynet.pack 序列化和 skynet.unpack 反序列化。這個序列化過程與 protobuf 類似,每個 lua 類型存儲格式如下:

  • nil 類型(TYPE_NIL:0):
圖1 寫入buffer緩存區(qū)一個字節(jié),不需要值
  • boolean 類型(TYPE_BOOLEAN:1):
圖2
  • string類型:
    1. 短string類型(TYPE_SHORT_STRING:4):
圖3

2. 長string類型(TYPE_LONG_STRING:4):

圖4 如果 len < 0x10000 左移2位,并用2個字節(jié)存長度,否則左移4位,并用4個字節(jié)存長度
  • number類型(TYPE_NUMBER:2):

    1. 值:0
圖5 buffer緩存區(qū)同樣只存type,不需要存值
  1. 值為8個字節(jié)
圖6

3. 值為負數(shù)

圖7

4. 值小于2個字節(jié)

圖8

5. 值小于3個字節(jié)

圖9

6. 值為其他情況

圖10
  • 浮點數(shù)類型(TYPE_NUMBER_REAL: 8):
圖11
  • 用戶自定義類型(TYPE_USERDATA: 3):
圖12
  • table類型(TYPE_TABLE:6):

    1. 數(shù)組類型


      圖13 array_size 會采用之前講到的 number 類型來存儲,所以存幾個字節(jié)要按array_size大小決定
    2. key-value類型


      圖14 加上 1個字節(jié)的 nil 類型標識結(jié)束

通過以上的分析,大概知道了緩存區(qū)域是怎么存儲 lua 的各種數(shù)據(jù)類型。但緩存區(qū)在初始化時應(yīng)該分配多大好呢,我們可以從源代碼中看到,緩存區(qū)結(jié)構(gòu)體 buffer 域只有 128 byte大小,那么在數(shù)據(jù)過大時,buffer 勢必會不夠存儲,它又應(yīng)該如何處理呢。我們接著看下一個結(jié)構(gòu)體 struct write_block ,它的head 域和 current 域都指向了 struct block,可以猜測出,head 應(yīng)該是一個鏈表的頭節(jié)點,current 應(yīng)該是指向當前要寫入鏈表的哪一塊 block。它會先通過 malloc 申請一塊內(nèi)存出來。如果超過了 128,那么就會再申請一塊內(nèi)存,current就指向新的內(nèi)存塊,然后繼續(xù)往里面寫數(shù)據(jù)。再寫完了之后,它不是返回這個 head 指針給 lua 調(diào)用者, 而是再進行一次復(fù)制操作。將鏈表里的所有數(shù)據(jù)寫到一塊新的緩沖區(qū) newbuffer 中。申請緩沖區(qū)的大小 sz 可以根據(jù) struct write_block 結(jié)構(gòu)體中的 len 域獲?。ㄔ诿看螌憯?shù)據(jù)時,這個 len 就記錄了數(shù)據(jù)的總長度)。最終返回的是這個新的內(nèi)存塊 newbuffer 及大小 sz。

至于為什么還要重新復(fù)制一次,沒有直接返回 head 指針給 lua 層調(diào)用者,然后根據(jù) head 指向的鏈表來反序列化呢。我想主要是為了集群等其他模塊的需要。比如說集群,你不可能通過 socket 發(fā)送一個鏈表給對方吧,所以只返回一個內(nèi)存塊地址和大小,可以為其他模塊減少不必要的麻煩。

對于 number 和 string 的序列化也做到了盡可能的節(jié)省內(nèi)存,如果你在 lua 層對一個number變量賦值0,那么它在序列化時,只用了一個字節(jié)的 type 來標識。沒有造成內(nèi)存塊的過多浪費??梢哉f,這個思想,值得我們學習。


圖15 緩沖區(qū)鏈表結(jié)構(gòu)
#define BLOCK_SIZE 128

//對應(yīng)于圖15 的一塊內(nèi)存
struct block {
    struct block * next; //指向下一個內(nèi)存塊
    char buffer[BLOCK_SIZE];
};

struct write_block {
    struct block * head;
    struct block * current;
    int len;
    int ptr;
};

struct read_block {
    char * buffer;
    int len;
    int ptr;
};

至于反序列化部分,就簡單了,用 struct read_block 中的 buffer 域指向 newbuffer,len 指向 sz,然后先從buffer指向的內(nèi)存中取出一個字節(jié),這個字節(jié)就是type, 根據(jù) type 類型讀取值(有值的情況下),將其壓人lua 棧中,如此反復(fù),直到讀取完,最后返回給 lua 層,這樣就完成了一次反序列化操作了。

如果還是不怎么懂,那接下來再看看代碼是如何實現(xiàn)的吧,為了簡單點,就以序列化 一個 字符串為例吧。

local msg, sz = skynet.pack("hello")
skynet.unpack(msg, sz)

根據(jù)圖3,我們可以畫出hello在內(nèi)存塊中的簡單存儲結(jié)構(gòu)。


圖16

接下來再看看代碼的實現(xiàn),skynet.pack 的調(diào)用最終會進入 c 層:
下面引用到的代碼都在 lualib-src/lua-seri.c 文件中。

LUAMOD_API int
luaseri_pack(lua_State *L) {
    struct block temp;
    temp.next = NULL;
    struct write_block wb;
    wb_init(&wb, &temp);    //初始化結(jié)構(gòu)體 wb
    pack_from(L,&wb,0);      //開始序列化
    assert(wb.head == &temp);
    seri(L, &temp, wb.len);  //將head 指向的鏈表重新放到一個緩沖區(qū)中,并返回,加上大小sz

    wb_free(&wb);

    return 2;
}

再看看 pack_from 的實現(xiàn):

static void
pack_from(lua_State *L, struct write_block *b, int from) {
    int n = lua_gettop(L) - from;  //獲取要序列化的個數(shù),目前只有 hello 一個數(shù)據(jù),所以 n 為 1
    int i;
    for (i=1;i<=n;i++) {
        pack_one(L, b , from + i, 0);  // 對 "hello" 數(shù)據(jù)進行序列化
    }
}

那么 pack_one 又做了哪些事呢

static void
pack_one(lua_State *L, struct write_block *b, int index, int depth) {
    ...
    int type = lua_type(L,index);  // 根據(jù) index 獲取數(shù)據(jù)類型,按照我之前的設(shè)定,只有一個數(shù)據(jù),index 為 0
    switch(type) {
    case LUA_TNIL:
        ...
    case LUA_TNUMBER: {
        ...
    }
    case LUA_TBOOLEAN: 
        ...
    case LUA_TSTRING: {  // 由于 "hello" 是字符串類型,所以來到這里,如果是 lua 層判斷數(shù)據(jù)類型,應(yīng)該是用 type(data) == "string"
        size_t sz = 0;
        const char *str = lua_tolstring(L,index,&sz);
        wb_string(b, str, (int)sz);
        break;
    }
    case LUA_TLIGHTUSERDATA:
        ...
    case LUA_TTABLE: {
        ...
    }
    default:
        ...
    }
}

到了這里,大家應(yīng)該可以看出,序列化 lua 數(shù)據(jù)都是根據(jù)其數(shù)據(jù)類型,依依寫入到buffer緩沖區(qū)當中。再接著看看 wb_string 的實現(xiàn)吧。

static inline void
wb_string(struct write_block *wb, const char *str, int len) {
    if (len < MAX_COOKIE) {  //這里由于 "hello" 字符串長度不會超過 MAX_COOKIE(32),所以代碼會執(zhí)行到 if 里面
        
        // TYPE_SHORT_STRING | len << 3,一個字節(jié) 8 bit,由于 len 小于 MAX_COOKIE,左移 3 位不會有數(shù)據(jù)溢出情況,TYPE_SHORT_STRING 就保留在低 3 位中
        uint8_t n = COMBINE_TYPE(TYPE_SHORT_STRING, len);
        
        //這里就是通過 wb_push 這個函數(shù), 將 type 和 len 一起寫入到緩沖區(qū)鏈表中,只寫入 1 個字節(jié)
        //由于 len 小于 MAX_COOKIE,所以一個字節(jié)足夠存儲 type 和 len 內(nèi)容
        wb_push(wb, &n, 1);
        if (len > 0) {
            //*** 這里就是 將 "hello" 寫入到緩沖區(qū)鏈表中,只寫入 len 個字節(jié),跟我們之前畫的數(shù)據(jù)存入buffer 圖相對應(yīng)
            wb_push(wb, str, len);
        }
    } else {
        uint8_t n;
        if (len < 0x10000) {
            n = COMBINE_TYPE(TYPE_LONG_STRING, 2);
            wb_push(wb, &n, 1);
            uint16_t x = (uint16_t) len;
            wb_push(wb, &x, 2);
        } else {
            n = COMBINE_TYPE(TYPE_LONG_STRING, 4);
            wb_push(wb, &n, 1);
            uint32_t x = (uint32_t) len;
            wb_push(wb, &x, 4);
        }
        wb_push(wb, str, len);
    }
}

接著就是 wb_push 的實現(xiàn)了

inline static void
wb_push(struct write_block *b, const void *buf, int sz) {
    const char * buffer = buf;  // buf 是 void* 類型,因為buf 可能指向的是int 類型地址、char類型地址等,所以只用這個任意類型指針了
    if (b->ptr == BLOCK_SIZE) {
_again:
        b->current = b->current->next = blk_alloc();  //重新申請一塊內(nèi)存
        b->ptr = 0;      //指針偏移的地方,每次要寫入數(shù)據(jù)時,就是根據(jù)它來確定寫入的起始地址
    }
    if (b->ptr <= BLOCK_SIZE - sz) {  // 當要寫入內(nèi)容的長度還足夠時,不超過這個內(nèi)存塊大小時,直接復(fù)制數(shù)據(jù),保存到 b->current 指向的內(nèi)存塊中
        memcpy(b->current->buffer + b->ptr, buffer, sz);
        b->ptr+=sz;  //指向的位置偏移
        b->len+=sz; // len 總大小要加上 sz
    } else {
        // 來到這里,表明一塊內(nèi)存 128 k不夠用了。但我們可以先把 buf 部分內(nèi)容寫入到這個內(nèi)存塊中,不夠存的那部分就留到下一個新的內(nèi)存塊中。
        //也就是說,這次要寫入的數(shù)據(jù)分兩次或多次寫,先把一部分寫到當前的內(nèi)存中,剩下的部分寫到下一塊內(nèi)存中。
        int copy = BLOCK_SIZE - b->ptr;
        memcpy(b->current->buffer + b->ptr, buffer, copy);
        buffer += copy;
        b->len += copy;
        sz -= copy;
        goto _again;  // 這里就是跳到前面的 _again: 中,重新申請內(nèi)存塊,繼續(xù)寫入剩余的數(shù)據(jù)。
    }
}

好了,到這里,"hello",這個字符串也就寫完了,再看看它是如何返回給 lua 層的吧。

static void
seri(lua_State *L, struct block *b, int len) {
    uint8_t * buffer = skynet_malloc(len);
    uint8_t * ptr = buffer;
    int sz = len;
    while(len>0) {
        if (len >= BLOCK_SIZE) {
            memcpy(ptr, b->buffer, BLOCK_SIZE);
            ptr += BLOCK_SIZE;
            len -= BLOCK_SIZE;
            b = b->next;
        } else {
            memcpy(ptr, b->buffer, len); 
            break;
        }
    }
    
    lua_pushlightuserdata(L, buffer);  // 為了好區(qū)分buffer是指哪個,我暫時將這個叫 new_buffer
    lua_pushinteger(L, sz);  //返回所有數(shù)據(jù)的總大小
}

還記得 luaseri_pack(lua_State *L) 函數(shù)里面有個 seri(L, &temp, wb.len); 調(diào)用嗎,這里就是將整個鏈表重新復(fù)制一份,放到 new_buffer 中,最后和 sz 一起返回給 lua 層。

我們再看看反序列化 skynet.unpack 的接口調(diào)用:

local msg, sz = skynet.pack("hello")
skynet.unpack(msg, sz)

到了這里,反序列化需要調(diào)用的 c 層接口 luaseri_unpack 。

int
luaseri_unpack(lua_State *L) {
    ...
    void * buffer;
    int len;
        ...
        buffer = lua_touserdata(L,1);
        len = luaL_checkinteger(L,2);
    ...
    lua_settop(L,1);
    struct read_block rb;
    rball_init(&rb, buffer, len);  //初始化 rb,讓 rb 的 buffer 指向這個 new_buffer,rb 的 len 指向這個 sz

    int i;
    for (i=0;;i++) {
        if (i%8==7) {
            luaL_checkstack(L,LUA_MINSTACK,NULL);
        }
        uint8_t type = 0;
        uint8_t *t = rb_read(&rb, sizeof(type));  //這個就是先讀取數(shù)據(jù)類型type,1個字節(jié)(uint8_t大小占一個字節(jié))
        if (t==NULL)  //如果讀取不到,證明已經(jīng)讀取完所有的數(shù)據(jù)了,可以跳出循環(huán),返回了
            break;
        type = *t;
        push_value(L, &rb, type & 0x7, type>>3);  //這里就是讀取數(shù)據(jù)的總?cè)肟诤瘮?shù),讀完數(shù)據(jù),就將其壓人 lua 棧中
    }
    // Need not free buffer  這個意思是 unpack 的調(diào)用不用釋放內(nèi)存,至于內(nèi)存的釋放,主要放到
    // skynet_server.c 的 dispatch_message 函數(shù)中釋放
    /*
    if (!reserve_msg) {
        skynet_free(msg->data);
    }
    */
    return lua_gettop(L) - 1;
}

接著,再看看 push_value 的實現(xiàn)

static void
push_value(lua_State *L, struct read_block *rb, int type, int cookie) {
    switch(type) {
    case TYPE_NIL:
        ...
    case TYPE_BOOLEAN:
        ...
    case TYPE_NUMBER:
        ...
    case TYPE_USERDATA:
        ...
    case TYPE_SHORT_STRING:  //到這里面取出 buffer 的數(shù)據(jù)
        get_buffer(L,rb,cookie);
        break;
    case TYPE_LONG_STRING: {
        ...
    }
    case TYPE_TABLE: {
        ...
    }
    default: {
        ...
    }
    }
}

其中,get_buffer 的實現(xiàn)也比較簡單,就是根據(jù) len 長度,從 buffer 中讀取數(shù)據(jù)。并將其壓入 lua 棧中,返回給 lua 層的調(diào)用者。

static void *
rb_read(struct read_block *rb, int sz) {
    if (rb->len < sz) {
        return NULL;
    }

    int ptr = rb->ptr;
    rb->ptr += sz;
    rb->len -= sz;
    return rb->buffer + ptr;
}

static void
get_buffer(lua_State *L, struct read_block *rb, int len) {
    char * p = rb_read(rb,len);  // 根據(jù) len 長度,從 rb 的 buffer 域中讀取數(shù)據(jù)
    if (p == NULL) {
        invalid_stream(L,rb); // 讀數(shù)據(jù)出現(xiàn)異常,這是異常錯誤處理函數(shù),可以先不理會
    }
    lua_pushlstring(L,p,len);  //返回數(shù)據(jù)給 lua 層,到了這里,就可以在 lua 層獲取到 "hello" 字符串了,反序列化結(jié)束了
}

通過一個簡單的例子,我們可以看出,skynet.pack 和 skynet.unpack 的實現(xiàn)過程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容