skynet集群學(xué)習(xí)

在了解cluster之前,先看看example下的cluster1.luacluster2.lua例子 ,為了方便理解,我對(duì)這兩個(gè)例子做了相應(yīng)的修改:

--cluster1.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local snax = require "skynet.snax"
require "skynet.manager"


skynet.start(function()
    cluster.reload {
        db = "127.0.0.1:2528",
        db2 = "127.0.0.1:2529",
    }
    local sdb = skynet.newservice("simpledb")
    skynet.name("sdb", sdb)

    print(skynet.call(sdb, "lua", "SET", "a", "foobar"))

    cluster.open "db"
    cluster.open "db2"
end)
--cluster2.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

skynet.start(function()
    print(cluster.call("db", "sdb", "GET", "a"))
end)

現(xiàn)在就來(lái)具體分析了解一下

--cluster1.lua
    cluster.reload {
        db = "127.0.0.1:2528",
        db2 = "127.0.0.1:2529",
    }

--clusterd.lua
local function loadconfig(tmp)
    ...
    for name,address in pairs(tmp) do
        assert(address == false or type(address) == "string")
        if node_address[name] ~= address then
            -- address changed,用rawget是為了不觸對(duì)發(fā)元表的訪問(wèn)
            if rawget(node_channel, name) then
                node_channel[name] = nil    -- reset connection
            end
            node_address[name] = address
        end
        ...
    end
end

function command.reload(source, config)
    loadconfig(config)
    skynet.ret(skynet.pack(nil)) 
end

cluster.reload 的作用主要是先將節(jié)點(diǎn)名和與其相當(dāng)于的地址保存到表node_address中,目的是為了后續(xù)發(fā)起遠(yuǎn)程請(qǐng)求用到,如cluster.send或者cluster.call。

--cluster1.lua
local sdb = skynet.newservice("simpledb")
skynet.name("sdb", sdb)


--clusterd.lua
local register_name = {}
function command.register(source, name, addr)
    assert(register_name[name] == nil)
    addr = addr or source
    local old_name = register_name[addr]
    if old_name then
        register_name[old_name] = nil
    end
    register_name[addr] = name
    register_name[name] = addr
    skynet.ret(nil)
    skynet.error(string.format("Register [%s] :%08x", name, addr))
end


創(chuàng)建一個(gè)simpledb服務(wù),并為 sdb 服務(wù)的 addr 起一個(gè)別名"sdb",這里我做了稍微的修改,不使用原來(lái)例子的 cluster.register("sdb", sdb) 。主要方便 cluster2.lua 用節(jié)點(diǎn)名 + 服務(wù)名來(lái)做遠(yuǎn)程訪問(wèn)。

--cluster1.lua
    cluster.open "db"
    cluster.open "db2"

--cluster.lua
function cluster.open(port)
    if type(port) == "string" then
        skynet.call(clusterd, "lua", "listen", port)
    else
        skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
    end
end

--clusterd.lua
function command.listen(source, addr, port)
    local gate = skynet.newservice("gate")
    if port == nil then
        local address = assert(node_address[addr], addr .. " is down")
        addr, port = string.match(address, "([^:]+):(.*)$")
    end
    skynet.call(gate, "lua", "open", { address = addr, port = port })
    skynet.ret(skynet.pack(nil))
end

--gate.lua
...
gateserver.start(handler)

--gateserver.lua
function gateserver.start(handler)
    assert(handler.message)
    assert(handler.connect)

    function CMD.open( source, conf )
        ...
        skynet.error(string.format("Listen on %s:%d", address, port))
        socket = socketdriver.listen(address, port)  --監(jiān)聽(tīng)ip地址和port端口號(hào)
        socketdriver.start(socket)
        if handler.open then
            return handler.open(source, conf)
        end
    end
...


接下來(lái)是 cluster.open "db" 和 "db2" ,通過(guò)node_address來(lái)獲取其之前保存 db 和 db2 的addr,然后創(chuàng)建gate網(wǎng)關(guān),調(diào)用gate 的open方法,因?yàn)樵趃ate的消息分發(fā)函數(shù)是寫在gateserver.lua文件里面的,所以 skynet.call(gate, "lua", "open", { address = addr, port = port }) 其實(shí)是跑到了 gateserver.lua 里面的open方法中,當(dāng)調(diào)用完成時(shí),就開(kāi)始監(jiān)聽(tīng)地址和端口號(hào)了。

接下來(lái)再看看 cluster2.lua 是如何遠(yuǎn)程調(diào)用 cluster1.luad 的:

--cluster2.lua
print(cluster.call("db", "sdb", "GET", "a"))

這里的遠(yuǎn)程調(diào)用也很簡(jiǎn)單,只需要知道cluster1.lua 節(jié)點(diǎn)的監(jiān)聽(tīng)地址和它提供了哪些服務(wù)(通過(guò) skynet.name 起的別名來(lái)查找)。

接著,再看看 cluster.call("db", "sdb", "GET", "a") 是如何發(fā)送數(shù)據(jù)給 cluster1節(jié)點(diǎn)的sdb服務(wù)的。

--cluster.lua
function cluster.call(node, address, ...)
    return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end

--clusterd.lua
local function send_request(source, node, addr, msg, sz)
    local session = node_session[node] or 1
    -- msg is a local pointer, cluster.packrequest will free it
    local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
    node_session[node] = new_session

    -- node_channel[node] may yield or throw error
    local c = node_channel[node]

    return c:request(request, session, padding)
end

function command.req(...)
    local ok, msg, sz = pcall(send_request, ...)
    if ok then
        if type(msg) == "table" then
            skynet.ret(cluster.concat(msg))
        else
            skynet.ret(msg)
        end
    else
        skynet.error(msg)
        skynet.response()(false)
    end
end

這里,先對(duì)用戶的數(shù)據(jù)進(jìn)行第一層打包skynet.pack(...),對(duì)于skynet.pack 是如何打包數(shù)據(jù)的,由于篇幅有限,將再以后的章節(jié)中具體再描述。大家只需要先知道它對(duì)數(shù)據(jù)打包后會(huì)返回 一個(gè)用戶自定義類型 msg 和長(zhǎng)度 sz,就好了。
cluster.packrequest(addr, session, msg, sz) 就是對(duì) skynet.pack 打包后得到的 msg 再一次打包,其實(shí)也就是加上頭部信息,并重新放到一塊新的內(nèi)存中。

--lua-cluster.c
//宏定義
#define TEMP_LENGTH 0x8200    //十進(jìn)制 33280
#define MULTI_PART 0x8000     //十進(jìn)制 32768

// 對(duì)session打包,占用buf 4個(gè)字節(jié)
static void
fill_uint32(uint8_t * buf, uint32_t n) {
    buf[0] = n & 0xff;
    buf[1] = (n >> 8) & 0xff;
    buf[2] = (n >> 16) & 0xff;
    buf[3] = (n >> 24) & 0xff;
}

//對(duì)消息長(zhǎng)度打包,占用buf 2個(gè)字節(jié)
static void
fill_header(lua_State *L, uint8_t *buf, int sz) {
    assert(sz < 0x10000);
    buf[0] = (sz >> 8) & 0xff;  //sz 左移8位,得到高8位數(shù)據(jù)
    buf[1] = sz & 0xff;         //sz & 0xff,屏蔽高位數(shù)據(jù),得到sz低8位數(shù)據(jù)
}

static int
packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
    size_t namelen = 0;
    const char *name = lua_tolstring(L, 1, &namelen);
    if (name == NULL || namelen < 1 || namelen > 255) {
        skynet_free(msg);
        luaL_error(L, "name is too long %s", name);
    }

    uint8_t buf[TEMP_LENGTH];
    if (sz < MULTI_PART) {
        fill_header(L, buf, sz+6+namelen);
        buf[2] = 0x80;
        buf[3] = (uint8_t)namelen;
        memcpy(buf+4, name, namelen);
        fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
        memcpy(buf+8+namelen,msg,sz);

        lua_pushlstring(L, (const char *)buf, sz+8+namelen);
        return 0;
    } else {
        int part = (sz - 1) / MULTI_PART + 1;
        fill_header(L, buf, 10+namelen);
        buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
        buf[3] = (uint8_t)namelen;
        memcpy(buf+4, name, namelen);
        fill_uint32(buf+4+namelen, (uint32_t)session);
        fill_uint32(buf+8+namelen, sz);

        lua_pushlstring(L, (const char *)buf, 12+namelen);
        return part;
    }
}

static int
packrequest(lua_State *L, int is_push) {
    void *msg = lua_touserdata(L,3);
    if (msg == NULL) {
        return luaL_error(L, "Invalid request message");
    }
    uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
    int session = luaL_checkinteger(L,2);
    ...
    int addr_type = lua_type(L,1);
    int multipak;
    ...
    multipak = packreq_string(L, session, msg, sz, is_push);
    ...
    uint32_t new_session = (uint32_t)session + 1;
    ...
    lua_pushinteger(L, new_session);
    ...
    skynet_free(msg);
    return 2;
    ...
}

static int
lpackrequest(lua_State *L) {
    return packrequest(L, 0);
}

static int
lpackpush(lua_State *L) {
    return packrequest(L, 1);
}

由于 packrequest 函數(shù)會(huì)對(duì) addr 地址進(jìn)行判斷,是否是數(shù)字或者是字符串,然后再按其類型打包。這里就以 addr 是字符串為例,先從 cluster.packrequest(addr, session, msg, sz) 中獲取第3個(gè)參數(shù) msg,判斷如果 msg 為空的活,就沒(méi)有必要再進(jìn)行打包了,接著再獲取第4個(gè)參數(shù) sz,第2個(gè)參數(shù) session,這里會(huì)對(duì)session進(jìn)行加一操作,得到一個(gè)新的new_session,目的就是 new_session 用來(lái)標(biāo)識(shí)遠(yuǎn)程會(huì)話記錄,在上上面的代碼中有所體現(xiàn)

node_session[node] = new_session

packreq_string(L, session, msg, sz, is_push); 會(huì)對(duì) sz 長(zhǎng)度進(jìn)行判斷,如果大于 MULTI_PART (32k字節(jié))的話,并且 packrequest的第二個(gè)參數(shù)是 0 的話就是,表明 rpc 是一次請(qǐng)求 + 響應(yīng)過(guò)程,那么 buf[2] = 0x81。如果是1,表示這一次請(qǐng)求是推送的,不需要有回復(fù),那么 buf[2] = 0xc1,并且會(huì)根據(jù) part 進(jìn)行多次發(fā)送。如果 sz 小于32 k字節(jié),那么就好辦了, buf 存儲(chǔ)的內(nèi)容如下:

  • 第0~1個(gè)字節(jié) : 度信息(msg消息長(zhǎng)度+5+namelen服務(wù)名長(zhǎng)度)
  • 第2個(gè)字節(jié) : type類型
  • 第3個(gè)字節(jié):服務(wù)名長(zhǎng)度
  • 第4~namelen個(gè)字節(jié)(namelen個(gè)字節(jié)):服務(wù)名
  • 第4+namelen~4+namelen+4個(gè)字節(jié)(4個(gè)字節(jié)):session
  • 之后就是存儲(chǔ) msg 消息的內(nèi)容了

此時(shí),整個(gè) c 層調(diào)用完之后,將會(huì)得到 buf 和 new_session(padding只有在sz大于32k時(shí)才存在)。

--clusterd.lua
local function open_channel(t, key)
    ...
    local address = node_address[key]    --在
    ...
    if address then
        local host, port = string.match(address, "([^:]+):(.*)$")
        c = sc.channel {
            host = host,
            port = tonumber(port),
            response = read_response,
            nodelay = true,
        }
        succ, err = pcall(c.connect, c, true)    -- 發(fā)起遠(yuǎn)程連接
        if succ then
            t[key] = c
            ct.channel = c
        end
    else
        err = "cluster node [" .. key .. "] is down."
    end
    ...
    return c
end

--設(shè)置 node_channel 元表為 open_channel 
local node_channel = setmetatable({}, { __index = open_channel })

local function send_request(source, node, addr, msg, sz)
    local session = node_session[node] or 1
    -- msg is a local pointer, cluster.packrequest will free it
    local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
    node_session[node] = new_session

    --再對(duì)下面兩行代碼進(jìn)行分析
    -- node_channel[node] may yield or throw error
    local c = node_channel[node]

    return c:request(request, session, padding)
end

此時(shí),在執(zhí)行 local c = node_channel[node] 的時(shí)候,就已經(jīng)發(fā)起了遠(yuǎn)程連接的請(qǐng)求了。為什么呢,在學(xué)習(xí)lua語(yǔ)法時(shí),有個(gè)元表的概念,如果索引 key 在本 table 中找不到,并且存在元表的情況下,那么它會(huì)去元表再找一次。此時(shí),就會(huì)調(diào)用到 open_channel 方法,發(fā)起遠(yuǎn)程連接。
連接請(qǐng)求的發(fā)送函數(shù)在socketchannel.lua文件中:

--socketchannel.lua
local function connect_once(self)
    ...
    local fd,err = socket.open(self.__host, self.__port)  --調(diào)用 c底層的網(wǎng)絡(luò)API
    ...
end

local function try_connect(self , once)
    local t = 0
    while not self.__closed do
        local ok, err = connect_once(self)
        ...
    end
end

local function block_connect(self, once)
    ...
    if #self.__connecting > 0 then
        -- connecting in other coroutine
        local co = coroutine.running()
        table.insert(self.__connecting, co)
        skynet.wait(co)
    else
        self.__connecting[1] = true
        err = try_connect(self, once)
        self.__connecting[1] = nil
        for i=2, #self.__connecting do
            local co = self.__connecting[i]
            self.__connecting[i] = nil
            skynet.wakeup(co)
        end
    end
    ...
end

function channel:connect(once)
    ...
    return block_connect(self, once)
end

緊接著再看看 c:request(request, session, padding) 又調(diào)用了哪些函數(shù):

--socketchannel.lua
function channel:request(request, response, padding)
    assert(block_connect(self, true))   -- connect once 由于之前已經(jīng)發(fā)起連接過(guò)了,這里不會(huì)再去連接,大家可以放心。
    local fd = self.__sock[1]

    if padding then
        -- padding may be a table, to support multi part request
        -- multi part request use low priority socket write
        -- now socket_lwrite returns as socket_write
        if not socket_lwrite(fd , request) then
            sock_err(self)
        end
        --這里就將之前大于32k的數(shù)據(jù)包分多次發(fā)送
        for _,v in ipairs(padding) do
            if not socket_lwrite(fd, v) then
                sock_err(self)
            end
        end
    else
        --小于32k 的數(shù)據(jù)包一次發(fā)送完
        if not socket_write(fd , request) then
            sock_err(self)
        end
    end

    if response == nil then
        -- no response
        return
    end
    --發(fā)送完數(shù)據(jù),那么就要掛起當(dāng)前協(xié)程,等待對(duì)方響應(yīng)消息了。
    return wait_for_response(self, response)
end

在等待函數(shù) wait_for_response() 中,又做了哪些事呢。

--socketchannel.lua
local function wait_for_response(self, response)
    local co = coroutine.running()
    push_response(self, response, co)
    skynet.wait(co)  --掛起當(dāng)前協(xié)程

    local result = self.__result[co]  -- 存放 本次 co 的錯(cuò)誤碼
    self.__result[co] = nil
    local result_data = self.__result_data[co]  --存放遠(yuǎn)程服務(wù)返回的數(shù)據(jù),這里就是你最想要的結(jié)果數(shù)據(jù)了
    self.__result_data[co] = nil

    if result == socket_error then
        if result_data then
            error(result_data)
        else
            error(socket_error)
        end
    else
        assert(result, result_data)
        return result_data  --如果遠(yuǎn)程調(diào)用沒(méi)有錯(cuò)誤,就返回?cái)?shù)據(jù),這個(gè)數(shù)據(jù)還是經(jīng)過(guò)打包的。
    end
end

那么大家可能會(huì)問(wèn),既然掛起了,在什么時(shí)候會(huì)被喚醒呢,還記得之前講過(guò)的
local function open_channel(t, key) 函數(shù)嗎, 這個(gè)函數(shù)里面有這么一段代碼:

--clusterd.lua
c = sc.channel {
    host = host,
    port = tonumber(port),
    response = read_response,  --設(shè)置讀取響應(yīng)結(jié)果的回調(diào)函數(shù)
    nodelay = true,
}

就是這個(gè)讀取響應(yīng)函數(shù)起的作用。接著再來(lái)仔細(xì)看看:

--clusterd.lua
local function read_response(sock)
    local sz = socket.header(sock:read(2))     --阻塞的讀取socket數(shù)據(jù)
    local msg = sock:read(sz)                  --讀取內(nèi)容
    return cluster.unpackresponse(msg)  -- session, ok, data, padding 稍后介紹到
end

--socketchannel.lua
local function dispatch_by_session(self)
    local response = self.__response
    -- response() return session
    while self.__sock do
        --這里的 response 函數(shù),就是之前設(shè)置的 read_response 函數(shù)了。
        --這里會(huì)一直阻塞,直到回調(diào)函數(shù)返回,等待結(jié)果。
        local ok , session, result_ok, result_data, padding = pcall(response, self.__sock)  --這里的result_data就是對(duì)方響應(yīng)的內(nèi)容了,經(jīng)skynet.pack打包。
        if ok and session then
            local co = self.__thread[session]
            if co then
                if padding and result_ok then
                    -- If padding is true, append result_data to a table (self.__result_data[co])
                    local result = self.__result_data[co] or {}
                    self.__result_data[co] = result
                    table.insert(result, result_data)
                else
                    self.__thread[session] = nil
                    self.__result[co] = result_ok
                    if result_ok and self.__result_data[co] then
                        table.insert(self.__result_data[co], result_data)
                    else
                        self.__result_data[co] = result_data
                    end
                    skynet.wakeup(co)    --在這里被換醒了,wait_for_response 函數(shù)就可以往下走了
                end
            ...
        end
    end
    exit_thread(self)
end

local function dispatch_function(self)
    if self.__response then
        return dispatch_by_session  --假設(shè)需要有響應(yīng)結(jié)果,那么就會(huì)返回這個(gè)函數(shù)(根據(jù)cluster.call決定)
    else
        return dispatch_by_order  --假設(shè)不需要有響應(yīng)結(jié)果,那么就會(huì)返回這個(gè)函數(shù)(根據(jù)cluster.send決定)
    end
end

local function connect_once(self)
    ...
    --fork一個(gè)協(xié)程出來(lái),在下一幀執(zhí)行
    --這里就是要等待響應(yīng)結(jié)果的關(guān)鍵入口
    self.__dispatch_thread = skynet.fork(dispatch_function(self), self) 
    ...
end

這里,又涉及到一個(gè) c層的關(guān)鍵調(diào)用,read_response 函數(shù)中的 cluster.unpackresponse(msg),看看它做了些什么:

//lua-cluster.c
static int
lunpackresponse(lua_State *L) {
    size_t sz;
    const char * buf = luaL_checklstring(L, 1, &sz);
    if (sz < 5) {
        return 0;
    }
    uint32_t session = unpack_uint32((const uint8_t *)buf);  //session占4個(gè)字節(jié),跟打包一一對(duì)應(yīng)
    lua_pushinteger(L, (lua_Integer)session);            //將session壓入棧中,作為函數(shù)的第一個(gè)返回?cái)?shù)據(jù)
    switch(buf[4]) {
    case 0: // error
        lua_pushboolean(L, 0);
        lua_pushlstring(L, buf+5, sz-5);
        return 3;
    case 1: // ok
    case 4: // multi end
        lua_pushboolean(L, 1);
        lua_pushlstring(L, buf+5, sz-5);
        return 3;
    case 2: // multi begin
        if (sz != 9) {
            return 0;
        }
        sz = unpack_uint32((const uint8_t *)buf+5);
        lua_pushboolean(L, 1);
        lua_pushinteger(L, sz);
        lua_pushboolean(L, 1);
        return 4;
    case 3: // multi part
        lua_pushboolean(L, 1);
        lua_pushlstring(L, buf+5, sz-5);
        lua_pushboolean(L, 1);
        return 4;
    default:
        return 0;
    }
}

lunpackresponse 函數(shù)主要對(duì) msg 內(nèi)容進(jìn)行第一層解包,主要是根據(jù)頭部消息來(lái)解。包頭有:

  • 0~3個(gè)字節(jié):session
  • 第4個(gè)字節(jié):type
  • 第5個(gè)字節(jié)開(kāi)始:skynet.pack打包的內(nèi)容了
  • 最后一個(gè)字節(jié)(可能有,也可能沒(méi)有,主要數(shù)據(jù)包不超過(guò)32k,就不會(huì)有):padding

到了這里,再一層一層的返回,我們就可以看到返回的結(jié)果了:

skynet.lua
function skynet.call(addr, typename, ...)
    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))    --這里再進(jìn)行第二層解包,最后就是用戶想要的遠(yuǎn)程響應(yīng)結(jié)果了
end

--clusterd.lua
function command.req(...)
    local ok, msg, sz = pcall(send_request, ...)
    if ok then
        --數(shù)據(jù)原路返回
        if type(msg) == "table" then
            skynet.ret(cluster.concat(msg))
        else
            skynet.ret(msg)
        end
    ...
end

--cluster.lua
function cluster.call(node, address, ...)
    -- skynet.pack(...) will free by cluster.core.packrequest
    return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end

好了,到此,我們可以了解到 cluster2.lua 是如何發(fā)起請(qǐng)求數(shù)據(jù),以及如何獲取響應(yīng)結(jié)果了,也完成了遠(yuǎn)程調(diào)用的一半內(nèi)容了。

接下來(lái),再看看cluster1.lua 在接收到數(shù)據(jù)后是如何轉(zhuǎn)發(fā)到相應(yīng)的服務(wù),以及服務(wù)是如何回消息的。
之前也有提到過(guò),cluster.open "db" 最終會(huì)創(chuàng)建 gate 網(wǎng)關(guān)來(lái)監(jiān)聽(tīng)。

--gate.lua
function handler.message(fd, msg, sz)
    -- recv a package, forward it
    local c = connection[fd]
    local agent = c.agent        --由于之前clusterd.lua在創(chuàng)建 gate 服務(wù)時(shí),并沒(méi)有指定 agent,所以這里的 agent 是 nil
    if agent then
        skynet.redirect(agent, c.client, "client", 1, msg, sz)
    else
        skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))  --轉(zhuǎn)發(fā)到 clusterd.lua 的socket方法
    end
end


--gateserver.lua
    local function dispatch_msg(fd, msg, sz)
        if connection[fd] then
            handler.message(fd, msg, sz)    --回調(diào) gate.lua 的 message 方法
        else
            skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
        end
    end

    MSG.data = dispatch_msg

    --注冊(cè) socket消息
    skynet.register_protocol {
        name = "socket",
        id = skynet.PTYPE_SOCKET,   -- PTYPE_SOCKET = 6
        unpack = function ( msg, sz )
            return netpack.filter( queue, msg, sz)
        end,
        dispatch = function (_, _, q, type, ...)
            queue = q
            if type then
                MSG[type](...)    --設(shè)置回調(diào)函數(shù)
            end
        end
    }

--clusterd.lua
function command.listen(source, addr, port)
    local gate = skynet.newservice("gate")
    ...
    skynet.call(gate, "lua", "open", { address = addr, port = port })
end

這里再次回顧一下 clusterd.lua 是如何創(chuàng)建 gate 服務(wù)的。以及如何接收遠(yuǎn)程發(fā)送過(guò)來(lái)的消息。接下來(lái),就看看gate 再接收消息后,clusterd.lua又是如何來(lái)處理的。

--clusterd.lua
function command.socket(source, subcmd, fd, msg)
    if subcmd == "data" then
        local sz
        local addr, session, msg, padding, is_push = cluster.unpackrequest(msg)
        if padding then                    --(1)
            local requests = large_request[fd]
            if requests == nil then
                requests = {}
                large_request[fd] = requests
            end
            local req = requests[session] or { addr = addr , is_push = is_push }
            requests[session] = req
            table.insert(req, msg)
            return
        else    
            local requests = large_request[fd]
            if requests then
                local req = requests[session]
                if req then
                    requests[session] = nil
                    table.insert(req, msg)
                    msg,sz = cluster.concat(req)
                    addr = req.addr
                    is_push = req.is_push
                end
            end
            if not msg then
                local response = cluster.packresponse(session, false, "Invalid large req")
                socket.write(fd, response)
                return
            end
        end
        local ok, response
        if addr == 0 then
            local name = skynet.unpack(msg, sz)
            local addr = register_name[name]
            if addr then
                ok = true
                msg, sz = skynet.pack(addr)
            else
                ok = false
                msg = "name not found"
            end
        elseif is_push then        --(2)
            skynet.rawsend(addr, "lua", msg, sz)
            return  -- no response
        else    --(3)
            ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 
        end
        if ok then     
            response = cluster.packresponse(session, true, msg, sz)
            if type(response) == "table" then
                for _, v in ipairs(response) do
                    socket.lwrite(fd, v)
                end
            else
                socket.write(fd, response)
            end
        else
            response = cluster.packresponse(session, false, msg)  --根據(jù) session 返回給對(duì)應(yīng)的請(qǐng)求方
            socket.write(fd, response)
        end
    elseif subcmd == "open" then
        skynet.error(string.format("socket accept from %s", msg))
        skynet.call(source, "lua", "accept", fd)
    else
        large_request[fd] = nil
        skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
    end
end

為了方便起見(jiàn),這里假設(shè)padding 為 nil,數(shù)據(jù)包不超過(guò)32k,那么就不會(huì)走流程(1)處代碼。如果是對(duì)方節(jié)點(diǎn)發(fā)起的請(qǐng)求是 cluster.send 方式(推送方式),則走流程(2)。如果是 cluster.call 方式(請(qǐng)求響應(yīng)),則走流程(3)。
對(duì)于流程(2),調(diào)用 skynet.rawsend(addr, "lua", msg, sz), 就是對(duì)消息進(jìn)行派發(fā),發(fā)送給指定的 addr 服務(wù)。addr 可以是字符串也可以是數(shù)字,但對(duì)于我們之前說(shuō)的,addr 就是 "sdb" 字符串。它不需要響應(yīng),所以這里直接返回,就是 (3)上一行代碼 return -- no response
對(duì)于流程(3),在調(diào)用 ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 完后,會(huì)得到響應(yīng)消息。如果調(diào)用成功后,那么就會(huì)對(duì) msg 進(jìn)行打包,加上頭部消息,從而通過(guò) socket.write(fd, response) 發(fā)送回去,這樣就完成了一次遠(yuǎn)程過(guò)程調(diào)用。

對(duì)于skynet.rawsendskynet.rawcall 不是很了解的,可以先看看 skynet源碼賞析。

現(xiàn)在對(duì) cluster.unpackrequest(msg) 進(jìn)行分析,看看是如何解包的。

//lua-cluster.c
static int
unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
    if (sz < 2) {
        return luaL_error(L, "Invalid cluster message (size=%d)", sz);
    }
    size_t namesz = buf[1];  //獲取服務(wù)名長(zhǎng)度
    if (sz < namesz + 6) {
        return luaL_error(L, "Invalid cluster message (size=%d)", sz);
    }
    lua_pushlstring(L, (const char *)buf+2, namesz);   //返回服務(wù)名
    uint32_t session = unpack_uint32(buf + namesz + 2); 
    lua_pushinteger(L, (uint32_t)session);   //返回session
    lua_pushlstring(L, (const char *)buf+2+namesz+4, sz - namesz - 6);  //返回消息內(nèi)容 msg
    if (session == 0) {
        lua_pushnil(L);
        lua_pushboolean(L,1);   // is_push, no reponse
        return 5;
    }

    return 3;
}

static int
lunpackrequest(lua_State *L) {
    size_t ssz;
    const char *msg = luaL_checklstring(L,1,&ssz);
    int sz = (int)ssz;
    switch (msg[0]) {
    ...
    case '\x80':  //地址是一個(gè)字符串,且內(nèi)容不超過(guò) 32k
        return unpackreq_string(L, (const uint8_t *)msg, sz);
    ...
    }
}

與之前講到的 packreq_string(L, session, msg, sz, is_push); 相對(duì)應(yīng)。先獲取服務(wù)名長(zhǎng)度namesz。再通過(guò)namesz獲取服務(wù)名,之后就是 session,最后就是消息體了。
再看看 cluster.packresponse(session, false, msg),是如何對(duì) msg 打包加上頭部的吧。

//lua-cluster.c
static int
lpackresponse(lua_State *L) {
    uint32_t session = (uint32_t)luaL_checkinteger(L,1);
    // clusterd.lua:command.socket call lpackresponse,
    // and the msg/sz is return by skynet.rawcall , so don't free(msg)
    int ok = lua_toboolean(L,2);
    void * msg;
    size_t sz;
    
    if (lua_type(L,3) == LUA_TSTRING) {  //
        msg = (void *)lua_tolstring(L, 3, &sz);  //msg指向消息體
    } 
    ...
    //接下來(lái)就是打包頭部信息了 
    uint8_t buf[TEMP_LENGTH];
    fill_header(L, buf, sz+5);
    fill_uint32(buf+2, session);
    buf[6] = ok;
    memcpy(buf+7,msg,sz);
    lua_pushlstring(L, (const char *)buf, sz+7);

    return 1;
}

頭部信息有:

  • 0~1個(gè)字節(jié):消息長(zhǎng)度
  • 2~5個(gè)字節(jié):session
  • 第6個(gè)字節(jié):狀態(tài)碼
  • 第7個(gè)字節(jié)起:msg消息

到此,就將完了skynet集群部分大概是如何建立、以及如何相互通信的了。當(dāng)然,還有一些細(xì)節(jié)部分沒(méi)仔細(xì)分析,不過(guò)對(duì)于大家來(lái)說(shuō),應(yīng)該不是什么難事了(o′ω`o)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容