作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者
在第5篇和第6篇已經(jīng)分析過消息的發(fā)送和消息的處理,但是沒有談到消息回調(diào)函數(shù)的注冊,還有消息回調(diào)的詳細過程。第9篇已經(jīng)講了一部分消息的回調(diào)處理。
skynet中的回調(diào)對C服務(wù)和對LUA服務(wù)的注冊機制是不同的,C服務(wù)的回調(diào)可以直接掛載。但是lua服務(wù)不行,它必須經(jīng)過一次中轉(zhuǎn)。這個在第9篇中談到過,但是第9篇主要是介紹lua c api的協(xié)議的。
本篇分為兩部分,第一部分介紹C服務(wù)的回調(diào)。第二部分介紹lua服務(wù)的注冊與回調(diào)。第一部分比較簡短,第二部分會比較長。
第一部分:C服務(wù)的回調(diào)
C服務(wù)的回調(diào)非常簡單,直接把函數(shù)掛上去就可以。我們以skynet中的日志服務(wù)為例說明一下這個過程。
skynet中的日志服務(wù)在skynet/service-src/service-logger.c中。在第一篇介紹模塊(服務(wù))的時候就提到過,每個服務(wù)有create/init/release/signal四類函數(shù)。而logger服務(wù)也不例外,回調(diào)的掛載就是在init函數(shù)中進行的。
int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
if (parm) {
inst->handle = fopen(parm,"w");
if (inst->handle == NULL) {
return 1;
}
inst->filename = skynet_malloc(strlen(parm)+1);
strcpy(inst->filename, parm);
inst->close = 1;
} else {
inst->handle = stdout;
}
if (inst->handle) {
skynet_callback(ctx, inst, logger_cb); //注冊回調(diào)
skynet_command(ctx, "REG", ".logger"); //注冊服務(wù)
return 0;
}
return 1;
}
C服務(wù)注冊回調(diào)用的是skynet_callback函數(shù),這個函數(shù)只有2行。
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb; //回調(diào)函數(shù)掛載
context->cb_ud = ud; //這個是個輔助指針
}
C服務(wù)的回調(diào)掛載/注冊就是這么簡單。最后再來看一下這個回調(diào)是在哪里被調(diào)用的,是被怎么調(diào)用的。以便形成一個比較系統(tǒng)的概念,而不是盲人摸象。
回調(diào)的調(diào)用是在dispatch_message函數(shù)中進行的,這個函數(shù)前面已經(jīng)分析過了,只是沒有講回調(diào)的注冊過程。
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
//這里回調(diào)了,看到cb_ud了沒有,它會在回調(diào)時傳進去
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
//這里回調(diào)了,看到cb_ud了沒有,它會在回調(diào)時傳進去
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
第一部分C服務(wù)回調(diào)的注冊和調(diào)用到此就完全清楚了。
第二部分:LUA服務(wù)的回調(diào)
LUA服務(wù)的回調(diào)注冊和回調(diào)的調(diào)用層次比較多,也不直觀,所以理解起來難度會比較大,我盡可能地把這個過程簡化描述。
在寫服務(wù)的時候,消息注冊的慣用法為:
local CMD = {}
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
f(...)
end)
這個skynet.dispatch會把消息的回調(diào)函數(shù)注冊到服務(wù)。當(dāng)然這不是唯一的途徑,但是這篇文章只講這個途徑。
依然從skynet/lualib/skynet.lua中找dispatch,找到了的話會是這樣的:
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
這個proto是什么呢?好像到了這就分析不下去了,這個函數(shù)和c底層的ctx->cb有什么關(guān)系?還是看個全一點的東西比較好,找個能正經(jīng)干活的例子來看選一個skynet/example/simpledb.lua試試。
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)
cmd = cmd:upper()
if cmd == "PING" then
assert(session == 0)
local str = (...)
if #str > 20 then
str = str:sub(1,20) .. "...(" .. #str .. ")"
end
skynet.error(string.format("%s ping %s", skynet.address(address), str))
return
end
local f = command[cmd]
if f then
skynet.ret(skynet.pack(f(...)))
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)
skynet.register "SIMPLEDB"
end)
好,出現(xiàn)新花樣了,skynet.start這個函數(shù)
function skynet.start(start_func)
c.callback(skynet.dispatch_message) -- skynet.core.callback
skynet.timeout(0, function()
skynet.init_service(start_func) --服務(wù)初始化
end)
end
這里出現(xiàn)了前面說的,分析a的時候涉及到b.c.d的問題。一個一個來吧,先看c.callback這個東西是干什么用的。
1.c.callback
c.callback最終定位到是在skynet/lualib-src/lua-skynet.c這個文件中,具體跟蹤過程和第6篇中一樣。
static int
lcallback(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION); //檢查是不是數(shù)據(jù)類型是不是函數(shù)
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); //把_cb保存到用戶表里,詳見lua參考手冊
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);
if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb); //這個地方調(diào)用了C函數(shù)
}
return 0;
}
//設(shè)置回調(diào)
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb; //看這里
context->cb_ud = ud;
}
很明顯,c.callback就是設(shè)置回調(diào)函數(shù)到服務(wù)(模塊)的上下文中,而且設(shè)置的是skynet.dispatch_message這個lua方法為回調(diào)函數(shù)。
2.從代碼中看到,最終調(diào)用了skynet_callback這個C函數(shù),這個C函數(shù)的第三個參數(shù),是一個中轉(zhuǎn)函數(shù)。所以lua服務(wù)的回調(diào)它不是被直接調(diào)的,首先要在_cb這個函數(shù)處理一下數(shù)據(jù),在_cb里面去調(diào)lua的回調(diào)函數(shù)。_cb這個函數(shù)主要就是按照Lua api的協(xié)議,將參數(shù)準(zhǔn)備好,然后調(diào)lua的函數(shù)。在第9篇分析過,這里簡短地介紹一下:
static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
lua_State *L = ud;
int trace = 1;
int r;
int top = lua_gettop(L);
if (top == 0) {
lua_pushcfunction(L, traceback); //錯誤處理的函數(shù)
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); //把表里的回調(diào)函數(shù)取出來
} else {
assert(top == 2);
}
lua_pushvalue(L,2); //回調(diào)函數(shù)入棧
lua_pushinteger(L, type); //參數(shù)type入棧
lua_pushlightuserdata(L, (void *)msg); //參數(shù)msg入棧
lua_pushinteger(L,sz); //參數(shù)sz,消息長度,入棧
lua_pushinteger(L, session); //參數(shù)session入棧
lua_pushinteger(L, source); //參數(shù)session入棧
r = lua_pcall(L, 5, 0 , trace); //調(diào)用lua的回調(diào)函數(shù),也就是skynet.dispatch_message
if (r == LUA_OK) {
return 0;
}
const char * self = skynet_command(context, "REG", NULL);
switch (r) {
case LUA_ERRRUN:
skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
break;
case LUA_ERRMEM:
skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
break;
case LUA_ERRERR:
skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
break;
case LUA_ERRGCMM:
skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
break;
};
lua_pop(L,1);
return 0;
}
3.skynet.dispatch_message,這個函數(shù)呢又涉及到lua的協(xié)程,這個協(xié)程暫時不講,我把函數(shù)精簡一下。dispatch_message實際調(diào)的是raw_dispatch_message,這是個lua函數(shù)。
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz))
end
else
local p = proto[prototype] --skynet.dispatch對應(yīng)的proto
if p == nil then
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch --取真正的回調(diào)函數(shù),也就是skynet.dispath設(shè)的那個函數(shù)
if f then
local ref = watching_service[source]
if ref then
watching_service[source] = ref + 1
else
watching_service[source] = 1
end
local co = co_create(f) --這里創(chuàng)建協(xié)程
session_coroutine_id[co] = session
session_coroutine_address[co] = source
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) //這里喚醒協(xié)程
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
第二部分到了這里,基本上流程就清楚了。第一步,skynet.dispatch把回調(diào)注冊到proto表中,并在表中設(shè)置服務(wù)的回調(diào)函數(shù)。第二步,skynet.start會調(diào)用C層的lcallback函數(shù),把lua函數(shù)skynet.dispatch_message設(shè)為lua層偽回調(diào),這個偽回調(diào)被存在用戶表里,這個lua層的偽回調(diào)會被C層的偽回調(diào)所調(diào)用。這個lua層的偽回調(diào)從proto表中取到dispatch注冊的真正的服務(wù)的回調(diào)函數(shù),然后調(diào)用它。第三步,lcallback函數(shù)會設(shè)置一個C層的偽回調(diào),這個偽回調(diào)的作用是做c到lua層的協(xié)議轉(zhuǎn)換。
語言描述可能還是不能為所有人理解,畫個簡單的關(guān)系圖吧
skynet.dispatch(callback) ---------------------------> proto[typename].dispach = callback
|
skynet.core.callback(skynet.dispatch_message) -----------tbl[k] = skynet.dispatch_message
|
|
C dispatch_message->_cb ------------------------------------------------|
也就是說C層弄了一個函數(shù)叫_cb,它在lua注冊服務(wù)時被注冊?;卣{(diào)時被調(diào)用,回調(diào)時做lua api協(xié)議適配,然后取用戶表里的一個lua回調(diào)函數(shù),這個回調(diào)函數(shù)叫做skynet.dispatch_message。這個dispatch_message又會去一個叫proto的表里找到服務(wù)真正的回調(diào)函數(shù),而這個真正的回調(diào)函數(shù)是通過skynet.dispatch注冊到proto[typename].dispatch上的。
如果還是有人不理解我就無能為力了。