在了解cluster之前,先看看example下的cluster1.lua和cluster2.lua例子 ,為了方便理解,我對(duì)這兩個(gè)例子做了相應(yīng)的修改:
--cluster1.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local snax = require "skynet.snax"
require "skynet.manager"
skynet.start(function()
cluster.reload {
db = "127.0.0.1:2528",
db2 = "127.0.0.1:2529",
}
local sdb = skynet.newservice("simpledb")
skynet.name("sdb", sdb)
print(skynet.call(sdb, "lua", "SET", "a", "foobar"))
cluster.open "db"
cluster.open "db2"
end)
--cluster2.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
print(cluster.call("db", "sdb", "GET", "a"))
end)
現(xiàn)在就來(lái)具體分析了解一下
--cluster1.lua
cluster.reload {
db = "127.0.0.1:2528",
db2 = "127.0.0.1:2529",
}
--clusterd.lua
local function loadconfig(tmp)
...
for name,address in pairs(tmp) do
assert(address == false or type(address) == "string")
if node_address[name] ~= address then
-- address changed,用rawget是為了不觸對(duì)發(fā)元表的訪問(wèn)
if rawget(node_channel, name) then
node_channel[name] = nil -- reset connection
end
node_address[name] = address
end
...
end
end
function command.reload(source, config)
loadconfig(config)
skynet.ret(skynet.pack(nil))
end
cluster.reload 的作用主要是先將節(jié)點(diǎn)名和與其相當(dāng)于的地址保存到表node_address中,目的是為了后續(xù)發(fā)起遠(yuǎn)程請(qǐng)求用到,如cluster.send或者cluster.call。
--cluster1.lua
local sdb = skynet.newservice("simpledb")
skynet.name("sdb", sdb)
--clusterd.lua
local register_name = {}
function command.register(source, name, addr)
assert(register_name[name] == nil)
addr = addr or source
local old_name = register_name[addr]
if old_name then
register_name[old_name] = nil
end
register_name[addr] = name
register_name[name] = addr
skynet.ret(nil)
skynet.error(string.format("Register [%s] :%08x", name, addr))
end
創(chuàng)建一個(gè)simpledb服務(wù),并為 sdb 服務(wù)的 addr 起一個(gè)別名"sdb",這里我做了稍微的修改,不使用原來(lái)例子的 cluster.register("sdb", sdb) 。主要方便 cluster2.lua 用節(jié)點(diǎn)名 + 服務(wù)名來(lái)做遠(yuǎn)程訪問(wèn)。
--cluster1.lua
cluster.open "db"
cluster.open "db2"
--cluster.lua
function cluster.open(port)
if type(port) == "string" then
skynet.call(clusterd, "lua", "listen", port)
else
skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
end
end
--clusterd.lua
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
local address = assert(node_address[addr], addr .. " is down")
addr, port = string.match(address, "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
--gate.lua
...
gateserver.start(handler)
--gateserver.lua
function gateserver.start(handler)
assert(handler.message)
assert(handler.connect)
function CMD.open( source, conf )
...
skynet.error(string.format("Listen on %s:%d", address, port))
socket = socketdriver.listen(address, port) --監(jiān)聽(tīng)ip地址和port端口號(hào)
socketdriver.start(socket)
if handler.open then
return handler.open(source, conf)
end
end
...
接下來(lái)是 cluster.open "db" 和 "db2" ,通過(guò)node_address來(lái)獲取其之前保存 db 和 db2 的addr,然后創(chuàng)建gate網(wǎng)關(guān),調(diào)用gate 的open方法,因?yàn)樵趃ate的消息分發(fā)函數(shù)是寫在gateserver.lua文件里面的,所以 skynet.call(gate, "lua", "open", { address = addr, port = port }) 其實(shí)是跑到了 gateserver.lua 里面的open方法中,當(dāng)調(diào)用完成時(shí),就開(kāi)始監(jiān)聽(tīng)地址和端口號(hào)了。
接下來(lái)再看看 cluster2.lua 是如何遠(yuǎn)程調(diào)用 cluster1.luad 的:
--cluster2.lua
print(cluster.call("db", "sdb", "GET", "a"))
這里的遠(yuǎn)程調(diào)用也很簡(jiǎn)單,只需要知道cluster1.lua 節(jié)點(diǎn)的監(jiān)聽(tīng)地址和它提供了哪些服務(wù)(通過(guò) skynet.name 起的別名來(lái)查找)。
接著,再看看 cluster.call("db", "sdb", "GET", "a") 是如何發(fā)送數(shù)據(jù)給 cluster1節(jié)點(diǎn)的sdb服務(wù)的。
--cluster.lua
function cluster.call(node, address, ...)
return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end
--clusterd.lua
local function send_request(source, node, addr, msg, sz)
local session = node_session[node] or 1
-- msg is a local pointer, cluster.packrequest will free it
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
node_session[node] = new_session
-- node_channel[node] may yield or throw error
local c = node_channel[node]
return c:request(request, session, padding)
end
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
else
skynet.error(msg)
skynet.response()(false)
end
end
這里,先對(duì)用戶的數(shù)據(jù)進(jìn)行第一層打包skynet.pack(...),對(duì)于skynet.pack 是如何打包數(shù)據(jù)的,由于篇幅有限,將再以后的章節(jié)中具體再描述。大家只需要先知道它對(duì)數(shù)據(jù)打包后會(huì)返回 一個(gè)用戶自定義類型 msg 和長(zhǎng)度 sz,就好了。
cluster.packrequest(addr, session, msg, sz) 就是對(duì) skynet.pack 打包后得到的 msg 再一次打包,其實(shí)也就是加上頭部信息,并重新放到一塊新的內(nèi)存中。
--lua-cluster.c
//宏定義
#define TEMP_LENGTH 0x8200 //十進(jìn)制 33280
#define MULTI_PART 0x8000 //十進(jìn)制 32768
// 對(duì)session打包,占用buf 4個(gè)字節(jié)
static void
fill_uint32(uint8_t * buf, uint32_t n) {
buf[0] = n & 0xff;
buf[1] = (n >> 8) & 0xff;
buf[2] = (n >> 16) & 0xff;
buf[3] = (n >> 24) & 0xff;
}
//對(duì)消息長(zhǎng)度打包,占用buf 2個(gè)字節(jié)
static void
fill_header(lua_State *L, uint8_t *buf, int sz) {
assert(sz < 0x10000);
buf[0] = (sz >> 8) & 0xff; //sz 左移8位,得到高8位數(shù)據(jù)
buf[1] = sz & 0xff; //sz & 0xff,屏蔽高位數(shù)據(jù),得到sz低8位數(shù)據(jù)
}
static int
packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
size_t namelen = 0;
const char *name = lua_tolstring(L, 1, &namelen);
if (name == NULL || namelen < 1 || namelen > 255) {
skynet_free(msg);
luaL_error(L, "name is too long %s", name);
}
uint8_t buf[TEMP_LENGTH];
if (sz < MULTI_PART) {
fill_header(L, buf, sz+6+namelen);
buf[2] = 0x80;
buf[3] = (uint8_t)namelen;
memcpy(buf+4, name, namelen);
fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
memcpy(buf+8+namelen,msg,sz);
lua_pushlstring(L, (const char *)buf, sz+8+namelen);
return 0;
} else {
int part = (sz - 1) / MULTI_PART + 1;
fill_header(L, buf, 10+namelen);
buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
buf[3] = (uint8_t)namelen;
memcpy(buf+4, name, namelen);
fill_uint32(buf+4+namelen, (uint32_t)session);
fill_uint32(buf+8+namelen, sz);
lua_pushlstring(L, (const char *)buf, 12+namelen);
return part;
}
}
static int
packrequest(lua_State *L, int is_push) {
void *msg = lua_touserdata(L,3);
if (msg == NULL) {
return luaL_error(L, "Invalid request message");
}
uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
int session = luaL_checkinteger(L,2);
...
int addr_type = lua_type(L,1);
int multipak;
...
multipak = packreq_string(L, session, msg, sz, is_push);
...
uint32_t new_session = (uint32_t)session + 1;
...
lua_pushinteger(L, new_session);
...
skynet_free(msg);
return 2;
...
}
static int
lpackrequest(lua_State *L) {
return packrequest(L, 0);
}
static int
lpackpush(lua_State *L) {
return packrequest(L, 1);
}
由于 packrequest 函數(shù)會(huì)對(duì) addr 地址進(jìn)行判斷,是否是數(shù)字或者是字符串,然后再按其類型打包。這里就以 addr 是字符串為例,先從 cluster.packrequest(addr, session, msg, sz) 中獲取第3個(gè)參數(shù) msg,判斷如果 msg 為空的活,就沒(méi)有必要再進(jìn)行打包了,接著再獲取第4個(gè)參數(shù) sz,第2個(gè)參數(shù) session,這里會(huì)對(duì)session進(jìn)行加一操作,得到一個(gè)新的new_session,目的就是 new_session 用來(lái)標(biāo)識(shí)遠(yuǎn)程會(huì)話記錄,在上上面的代碼中有所體現(xiàn)
node_session[node] = new_session
packreq_string(L, session, msg, sz, is_push); 會(huì)對(duì) sz 長(zhǎng)度進(jìn)行判斷,如果大于 MULTI_PART (32k字節(jié))的話,并且 packrequest的第二個(gè)參數(shù)是 0 的話就是,表明 rpc 是一次請(qǐng)求 + 響應(yīng)過(guò)程,那么 buf[2] = 0x81。如果是1,表示這一次請(qǐng)求是推送的,不需要有回復(fù),那么 buf[2] = 0xc1,并且會(huì)根據(jù) part 進(jìn)行多次發(fā)送。如果 sz 小于32 k字節(jié),那么就好辦了, buf 存儲(chǔ)的內(nèi)容如下:
- 第0~1個(gè)字節(jié) : 度信息(msg消息長(zhǎng)度+5+namelen服務(wù)名長(zhǎng)度)
- 第2個(gè)字節(jié) : type類型
- 第3個(gè)字節(jié):服務(wù)名長(zhǎng)度
- 第4~namelen個(gè)字節(jié)(namelen個(gè)字節(jié)):服務(wù)名
- 第4+namelen~4+namelen+4個(gè)字節(jié)(4個(gè)字節(jié)):session
- 之后就是存儲(chǔ) msg 消息的內(nèi)容了
此時(shí),整個(gè) c 層調(diào)用完之后,將會(huì)得到 buf 和 new_session(padding只有在sz大于32k時(shí)才存在)。
--clusterd.lua
local function open_channel(t, key)
...
local address = node_address[key] --在
...
if address then
local host, port = string.match(address, "([^:]+):(.*)$")
c = sc.channel {
host = host,
port = tonumber(port),
response = read_response,
nodelay = true,
}
succ, err = pcall(c.connect, c, true) -- 發(fā)起遠(yuǎn)程連接
if succ then
t[key] = c
ct.channel = c
end
else
err = "cluster node [" .. key .. "] is down."
end
...
return c
end
--設(shè)置 node_channel 元表為 open_channel
local node_channel = setmetatable({}, { __index = open_channel })
local function send_request(source, node, addr, msg, sz)
local session = node_session[node] or 1
-- msg is a local pointer, cluster.packrequest will free it
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
node_session[node] = new_session
--再對(duì)下面兩行代碼進(jìn)行分析
-- node_channel[node] may yield or throw error
local c = node_channel[node]
return c:request(request, session, padding)
end
此時(shí),在執(zhí)行 local c = node_channel[node] 的時(shí)候,就已經(jīng)發(fā)起了遠(yuǎn)程連接的請(qǐng)求了。為什么呢,在學(xué)習(xí)lua語(yǔ)法時(shí),有個(gè)元表的概念,如果索引 key 在本 table 中找不到,并且存在元表的情況下,那么它會(huì)去元表再找一次。此時(shí),就會(huì)調(diào)用到 open_channel 方法,發(fā)起遠(yuǎn)程連接。
連接請(qǐng)求的發(fā)送函數(shù)在socketchannel.lua文件中:
--socketchannel.lua
local function connect_once(self)
...
local fd,err = socket.open(self.__host, self.__port) --調(diào)用 c底層的網(wǎng)絡(luò)API
...
end
local function try_connect(self , once)
local t = 0
while not self.__closed do
local ok, err = connect_once(self)
...
end
end
local function block_connect(self, once)
...
if #self.__connecting > 0 then
-- connecting in other coroutine
local co = coroutine.running()
table.insert(self.__connecting, co)
skynet.wait(co)
else
self.__connecting[1] = true
err = try_connect(self, once)
self.__connecting[1] = nil
for i=2, #self.__connecting do
local co = self.__connecting[i]
self.__connecting[i] = nil
skynet.wakeup(co)
end
end
...
end
function channel:connect(once)
...
return block_connect(self, once)
end
緊接著再看看 c:request(request, session, padding) 又調(diào)用了哪些函數(shù):
--socketchannel.lua
function channel:request(request, response, padding)
assert(block_connect(self, true)) -- connect once 由于之前已經(jīng)發(fā)起連接過(guò)了,這里不會(huì)再去連接,大家可以放心。
local fd = self.__sock[1]
if padding then
-- padding may be a table, to support multi part request
-- multi part request use low priority socket write
-- now socket_lwrite returns as socket_write
if not socket_lwrite(fd , request) then
sock_err(self)
end
--這里就將之前大于32k的數(shù)據(jù)包分多次發(fā)送
for _,v in ipairs(padding) do
if not socket_lwrite(fd, v) then
sock_err(self)
end
end
else
--小于32k 的數(shù)據(jù)包一次發(fā)送完
if not socket_write(fd , request) then
sock_err(self)
end
end
if response == nil then
-- no response
return
end
--發(fā)送完數(shù)據(jù),那么就要掛起當(dāng)前協(xié)程,等待對(duì)方響應(yīng)消息了。
return wait_for_response(self, response)
end
在等待函數(shù) wait_for_response() 中,又做了哪些事呢。
--socketchannel.lua
local function wait_for_response(self, response)
local co = coroutine.running()
push_response(self, response, co)
skynet.wait(co) --掛起當(dāng)前協(xié)程
local result = self.__result[co] -- 存放 本次 co 的錯(cuò)誤碼
self.__result[co] = nil
local result_data = self.__result_data[co] --存放遠(yuǎn)程服務(wù)返回的數(shù)據(jù),這里就是你最想要的結(jié)果數(shù)據(jù)了
self.__result_data[co] = nil
if result == socket_error then
if result_data then
error(result_data)
else
error(socket_error)
end
else
assert(result, result_data)
return result_data --如果遠(yuǎn)程調(diào)用沒(méi)有錯(cuò)誤,就返回?cái)?shù)據(jù),這個(gè)數(shù)據(jù)還是經(jīng)過(guò)打包的。
end
end
那么大家可能會(huì)問(wèn),既然掛起了,在什么時(shí)候會(huì)被喚醒呢,還記得之前講過(guò)的
local function open_channel(t, key) 函數(shù)嗎, 這個(gè)函數(shù)里面有這么一段代碼:
--clusterd.lua
c = sc.channel {
host = host,
port = tonumber(port),
response = read_response, --設(shè)置讀取響應(yīng)結(jié)果的回調(diào)函數(shù)
nodelay = true,
}
就是這個(gè)讀取響應(yīng)函數(shù)起的作用。接著再來(lái)仔細(xì)看看:
--clusterd.lua
local function read_response(sock)
local sz = socket.header(sock:read(2)) --阻塞的讀取socket數(shù)據(jù)
local msg = sock:read(sz) --讀取內(nèi)容
return cluster.unpackresponse(msg) -- session, ok, data, padding 稍后介紹到
end
--socketchannel.lua
local function dispatch_by_session(self)
local response = self.__response
-- response() return session
while self.__sock do
--這里的 response 函數(shù),就是之前設(shè)置的 read_response 函數(shù)了。
--這里會(huì)一直阻塞,直到回調(diào)函數(shù)返回,等待結(jié)果。
local ok , session, result_ok, result_data, padding = pcall(response, self.__sock) --這里的result_data就是對(duì)方響應(yīng)的內(nèi)容了,經(jīng)skynet.pack打包。
if ok and session then
local co = self.__thread[session]
if co then
if padding and result_ok then
-- If padding is true, append result_data to a table (self.__result_data[co])
local result = self.__result_data[co] or {}
self.__result_data[co] = result
table.insert(result, result_data)
else
self.__thread[session] = nil
self.__result[co] = result_ok
if result_ok and self.__result_data[co] then
table.insert(self.__result_data[co], result_data)
else
self.__result_data[co] = result_data
end
skynet.wakeup(co) --在這里被換醒了,wait_for_response 函數(shù)就可以往下走了
end
...
end
end
exit_thread(self)
end
local function dispatch_function(self)
if self.__response then
return dispatch_by_session --假設(shè)需要有響應(yīng)結(jié)果,那么就會(huì)返回這個(gè)函數(shù)(根據(jù)cluster.call決定)
else
return dispatch_by_order --假設(shè)不需要有響應(yīng)結(jié)果,那么就會(huì)返回這個(gè)函數(shù)(根據(jù)cluster.send決定)
end
end
local function connect_once(self)
...
--fork一個(gè)協(xié)程出來(lái),在下一幀執(zhí)行
--這里就是要等待響應(yīng)結(jié)果的關(guān)鍵入口
self.__dispatch_thread = skynet.fork(dispatch_function(self), self)
...
end
這里,又涉及到一個(gè) c層的關(guān)鍵調(diào)用,read_response 函數(shù)中的 cluster.unpackresponse(msg),看看它做了些什么:
//lua-cluster.c
static int
lunpackresponse(lua_State *L) {
size_t sz;
const char * buf = luaL_checklstring(L, 1, &sz);
if (sz < 5) {
return 0;
}
uint32_t session = unpack_uint32((const uint8_t *)buf); //session占4個(gè)字節(jié),跟打包一一對(duì)應(yīng)
lua_pushinteger(L, (lua_Integer)session); //將session壓入棧中,作為函數(shù)的第一個(gè)返回?cái)?shù)據(jù)
switch(buf[4]) {
case 0: // error
lua_pushboolean(L, 0);
lua_pushlstring(L, buf+5, sz-5);
return 3;
case 1: // ok
case 4: // multi end
lua_pushboolean(L, 1);
lua_pushlstring(L, buf+5, sz-5);
return 3;
case 2: // multi begin
if (sz != 9) {
return 0;
}
sz = unpack_uint32((const uint8_t *)buf+5);
lua_pushboolean(L, 1);
lua_pushinteger(L, sz);
lua_pushboolean(L, 1);
return 4;
case 3: // multi part
lua_pushboolean(L, 1);
lua_pushlstring(L, buf+5, sz-5);
lua_pushboolean(L, 1);
return 4;
default:
return 0;
}
}
lunpackresponse 函數(shù)主要對(duì) msg 內(nèi)容進(jìn)行第一層解包,主要是根據(jù)頭部消息來(lái)解。包頭有:
- 0~3個(gè)字節(jié):session
- 第4個(gè)字節(jié):type
- 第5個(gè)字節(jié)開(kāi)始:skynet.pack打包的內(nèi)容了
- 最后一個(gè)字節(jié)(可能有,也可能沒(méi)有,主要數(shù)據(jù)包不超過(guò)32k,就不會(huì)有):padding
到了這里,再一層一層的返回,我們就可以看到返回的結(jié)果了:
skynet.lua
function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session)) --這里再進(jìn)行第二層解包,最后就是用戶想要的遠(yuǎn)程響應(yīng)結(jié)果了
end
--clusterd.lua
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
--數(shù)據(jù)原路返回
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
...
end
--cluster.lua
function cluster.call(node, address, ...)
-- skynet.pack(...) will free by cluster.core.packrequest
return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end
好了,到此,我們可以了解到 cluster2.lua 是如何發(fā)起請(qǐng)求數(shù)據(jù),以及如何獲取響應(yīng)結(jié)果了,也完成了遠(yuǎn)程調(diào)用的一半內(nèi)容了。
接下來(lái),再看看cluster1.lua 在接收到數(shù)據(jù)后是如何轉(zhuǎn)發(fā)到相應(yīng)的服務(wù),以及服務(wù)是如何回消息的。
之前也有提到過(guò),cluster.open "db" 最終會(huì)創(chuàng)建 gate 網(wǎng)關(guān)來(lái)監(jiān)聽(tīng)。
--gate.lua
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent --由于之前clusterd.lua在創(chuàng)建 gate 服務(wù)時(shí),并沒(méi)有指定 agent,所以這里的 agent 是 nil
if agent then
skynet.redirect(agent, c.client, "client", 1, msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz)) --轉(zhuǎn)發(fā)到 clusterd.lua 的socket方法
end
end
--gateserver.lua
local function dispatch_msg(fd, msg, sz)
if connection[fd] then
handler.message(fd, msg, sz) --回調(diào) gate.lua 的 message 方法
else
skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
end
end
MSG.data = dispatch_msg
--注冊(cè) socket消息
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...) --設(shè)置回調(diào)函數(shù)
end
end
}
--clusterd.lua
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
...
skynet.call(gate, "lua", "open", { address = addr, port = port })
end
這里再次回顧一下 clusterd.lua 是如何創(chuàng)建 gate 服務(wù)的。以及如何接收遠(yuǎn)程發(fā)送過(guò)來(lái)的消息。接下來(lái),就看看gate 再接收消息后,clusterd.lua又是如何來(lái)處理的。
--clusterd.lua
function command.socket(source, subcmd, fd, msg)
if subcmd == "data" then
local sz
local addr, session, msg, padding, is_push = cluster.unpackrequest(msg)
if padding then --(1)
local requests = large_request[fd]
if requests == nil then
requests = {}
large_request[fd] = requests
end
local req = requests[session] or { addr = addr , is_push = is_push }
requests[session] = req
table.insert(req, msg)
return
else
local requests = large_request[fd]
if requests then
local req = requests[session]
if req then
requests[session] = nil
table.insert(req, msg)
msg,sz = cluster.concat(req)
addr = req.addr
is_push = req.is_push
end
end
if not msg then
local response = cluster.packresponse(session, false, "Invalid large req")
socket.write(fd, response)
return
end
end
local ok, response
if addr == 0 then
local name = skynet.unpack(msg, sz)
local addr = register_name[name]
if addr then
ok = true
msg, sz = skynet.pack(addr)
else
ok = false
msg = "name not found"
end
elseif is_push then --(2)
skynet.rawsend(addr, "lua", msg, sz)
return -- no response
else --(3)
ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
end
if ok then
response = cluster.packresponse(session, true, msg, sz)
if type(response) == "table" then
for _, v in ipairs(response) do
socket.lwrite(fd, v)
end
else
socket.write(fd, response)
end
else
response = cluster.packresponse(session, false, msg) --根據(jù) session 返回給對(duì)應(yīng)的請(qǐng)求方
socket.write(fd, response)
end
elseif subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
skynet.call(source, "lua", "accept", fd)
else
large_request[fd] = nil
skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
end
end
為了方便起見(jiàn),這里假設(shè)padding 為 nil,數(shù)據(jù)包不超過(guò)32k,那么就不會(huì)走流程(1)處代碼。如果是對(duì)方節(jié)點(diǎn)發(fā)起的請(qǐng)求是 cluster.send 方式(推送方式),則走流程(2)。如果是 cluster.call 方式(請(qǐng)求響應(yīng)),則走流程(3)。
對(duì)于流程(2),調(diào)用 skynet.rawsend(addr, "lua", msg, sz), 就是對(duì)消息進(jìn)行派發(fā),發(fā)送給指定的 addr 服務(wù)。addr 可以是字符串也可以是數(shù)字,但對(duì)于我們之前說(shuō)的,addr 就是 "sdb" 字符串。它不需要響應(yīng),所以這里直接返回,就是 (3)上一行代碼 return -- no response。
對(duì)于流程(3),在調(diào)用 ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 完后,會(huì)得到響應(yīng)消息。如果調(diào)用成功后,那么就會(huì)對(duì) msg 進(jìn)行打包,加上頭部消息,從而通過(guò) socket.write(fd, response) 發(fā)送回去,這樣就完成了一次遠(yuǎn)程過(guò)程調(diào)用。
對(duì)于skynet.rawsend 和 skynet.rawcall 不是很了解的,可以先看看 skynet源碼賞析。
現(xiàn)在對(duì) cluster.unpackrequest(msg) 進(jìn)行分析,看看是如何解包的。
//lua-cluster.c
static int
unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
if (sz < 2) {
return luaL_error(L, "Invalid cluster message (size=%d)", sz);
}
size_t namesz = buf[1]; //獲取服務(wù)名長(zhǎng)度
if (sz < namesz + 6) {
return luaL_error(L, "Invalid cluster message (size=%d)", sz);
}
lua_pushlstring(L, (const char *)buf+2, namesz); //返回服務(wù)名
uint32_t session = unpack_uint32(buf + namesz + 2);
lua_pushinteger(L, (uint32_t)session); //返回session
lua_pushlstring(L, (const char *)buf+2+namesz+4, sz - namesz - 6); //返回消息內(nèi)容 msg
if (session == 0) {
lua_pushnil(L);
lua_pushboolean(L,1); // is_push, no reponse
return 5;
}
return 3;
}
static int
lunpackrequest(lua_State *L) {
size_t ssz;
const char *msg = luaL_checklstring(L,1,&ssz);
int sz = (int)ssz;
switch (msg[0]) {
...
case '\x80': //地址是一個(gè)字符串,且內(nèi)容不超過(guò) 32k
return unpackreq_string(L, (const uint8_t *)msg, sz);
...
}
}
與之前講到的 packreq_string(L, session, msg, sz, is_push); 相對(duì)應(yīng)。先獲取服務(wù)名長(zhǎng)度namesz。再通過(guò)namesz獲取服務(wù)名,之后就是 session,最后就是消息體了。
再看看 cluster.packresponse(session, false, msg),是如何對(duì) msg 打包加上頭部的吧。
//lua-cluster.c
static int
lpackresponse(lua_State *L) {
uint32_t session = (uint32_t)luaL_checkinteger(L,1);
// clusterd.lua:command.socket call lpackresponse,
// and the msg/sz is return by skynet.rawcall , so don't free(msg)
int ok = lua_toboolean(L,2);
void * msg;
size_t sz;
if (lua_type(L,3) == LUA_TSTRING) { //
msg = (void *)lua_tolstring(L, 3, &sz); //msg指向消息體
}
...
//接下來(lái)就是打包頭部信息了
uint8_t buf[TEMP_LENGTH];
fill_header(L, buf, sz+5);
fill_uint32(buf+2, session);
buf[6] = ok;
memcpy(buf+7,msg,sz);
lua_pushlstring(L, (const char *)buf, sz+7);
return 1;
}
頭部信息有:
- 0~1個(gè)字節(jié):消息長(zhǎng)度
- 2~5個(gè)字節(jié):session
- 第6個(gè)字節(jié):狀態(tài)碼
- 第7個(gè)字節(jié)起:msg消息
到此,就將完了skynet集群部分大概是如何建立、以及如何相互通信的了。當(dāng)然,還有一些細(xì)節(jié)部分沒(méi)仔細(xì)分析,不過(guò)對(duì)于大家來(lái)說(shuō),應(yīng)該不是什么難事了(o′ω`o)。