skynet源碼分析(10)--消息機制之消息注冊和回調(diào)

作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者

在第5篇和第6篇已經(jīng)分析過消息的發(fā)送和消息的處理,但是沒有談到消息回調(diào)函數(shù)的注冊,還有消息回調(diào)的詳細過程。第9篇已經(jīng)講了一部分消息的回調(diào)處理。

skynet中的回調(diào)對C服務(wù)和對LUA服務(wù)的注冊機制是不同的,C服務(wù)的回調(diào)可以直接掛載。但是lua服務(wù)不行,它必須經(jīng)過一次中轉(zhuǎn)。這個在第9篇中談到過,但是第9篇主要是介紹lua c api的協(xié)議的。

本篇分為兩部分,第一部分介紹C服務(wù)的回調(diào)。第二部分介紹lua服務(wù)的注冊與回調(diào)。第一部分比較簡短,第二部分會比較長。

第一部分:C服務(wù)的回調(diào)
C服務(wù)的回調(diào)非常簡單,直接把函數(shù)掛上去就可以。我們以skynet中的日志服務(wù)為例說明一下這個過程。

skynet中的日志服務(wù)在skynet/service-src/service-logger.c中。在第一篇介紹模塊(服務(wù))的時候就提到過,每個服務(wù)有create/init/release/signal四類函數(shù)。而logger服務(wù)也不例外,回調(diào)的掛載就是在init函數(shù)中進行的。

int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
    if (parm) {
        inst->handle = fopen(parm,"w");
        if (inst->handle == NULL) {
            return 1;
        }
        inst->filename = skynet_malloc(strlen(parm)+1);
        strcpy(inst->filename, parm);
        inst->close = 1;
    } else {
        inst->handle = stdout;
    }
    if (inst->handle) {
        skynet_callback(ctx, inst, logger_cb);  //注冊回調(diào)
        skynet_command(ctx, "REG", ".logger"); //注冊服務(wù)
        return 0;
    }
    return 1;
}

C服務(wù)注冊回調(diào)用的是skynet_callback函數(shù),這個函數(shù)只有2行。

void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
    context->cb = cb;  //回調(diào)函數(shù)掛載
    context->cb_ud = ud; //這個是個輔助指針
}

C服務(wù)的回調(diào)掛載/注冊就是這么簡單。最后再來看一下這個回調(diào)是在哪里被調(diào)用的,是被怎么調(diào)用的。以便形成一個比較系統(tǒng)的概念,而不是盲人摸象。

回調(diào)的調(diào)用是在dispatch_message函數(shù)中進行的,這個函數(shù)前面已經(jīng)分析過了,只是沒有講回調(diào)的注冊過程。

static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
    assert(ctx->init);
    CHECKCALLING_BEGIN(ctx)
    pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
    int type = msg->sz >> MESSAGE_TYPE_SHIFT;
    size_t sz = msg->sz & MESSAGE_TYPE_MASK;
    if (ctx->logfile) {
        skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
    }
    ++ctx->message_count;
    int reserve_msg;
    if (ctx->profile) {
        ctx->cpu_start = skynet_thread_time();
//這里回調(diào)了,看到cb_ud了沒有,它會在回調(diào)時傳進去
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
        uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
        ctx->cpu_cost += cost_time;
    } else {
//這里回調(diào)了,看到cb_ud了沒有,它會在回調(diào)時傳進去
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); 
    }
    if (!reserve_msg) {
        skynet_free(msg->data);
    }
    CHECKCALLING_END(ctx)
}

第一部分C服務(wù)回調(diào)的注冊和調(diào)用到此就完全清楚了。

第二部分:LUA服務(wù)的回調(diào)

LUA服務(wù)的回調(diào)注冊和回調(diào)的調(diào)用層次比較多,也不直觀,所以理解起來難度會比較大,我盡可能地把這個過程簡化描述。

在寫服務(wù)的時候,消息注冊的慣用法為:

local CMD = {}

skynet.dispatch("lua", function(session, source, cmd, ...)
  local f = assert(CMD[cmd])
  f(...)
end)

這個skynet.dispatch會把消息的回調(diào)函數(shù)注冊到服務(wù)。當(dāng)然這不是唯一的途徑,但是這篇文章只講這個途徑。

依然從skynet/lualib/skynet.lua中找dispatch,找到了的話會是這樣的:

function skynet.dispatch(typename, func)
    local p = proto[typename]
    if func then
        local ret = p.dispatch
        p.dispatch = func
        return ret
    else
        return p and p.dispatch
    end
end

這個proto是什么呢?好像到了這就分析不下去了,這個函數(shù)和c底層的ctx->cb有什么關(guān)系?還是看個全一點的東西比較好,找個能正經(jīng)干活的例子來看選一個skynet/example/simpledb.lua試試。

skynet.start(function()
        skynet.dispatch("lua", function(session, address, cmd, ...)
                cmd = cmd:upper()
                if cmd == "PING" then
                        assert(session == 0)
                        local str = (...)
                        if #str > 20 then
                                str = str:sub(1,20) .. "...(" .. #str .. ")"
                        end
                        skynet.error(string.format("%s ping %s", skynet.address(address), str))
                        return
                end
                local f = command[cmd]
                if f then
                        skynet.ret(skynet.pack(f(...)))
                else
                        error(string.format("Unknown command %s", tostring(cmd)))
                end
        end)
        skynet.register "SIMPLEDB"
end)

好,出現(xiàn)新花樣了,skynet.start這個函數(shù)

function skynet.start(start_func)
    c.callback(skynet.dispatch_message) -- skynet.core.callback
    skynet.timeout(0, function()
        skynet.init_service(start_func) --服務(wù)初始化
    end)
end

這里出現(xiàn)了前面說的,分析a的時候涉及到b.c.d的問題。一個一個來吧,先看c.callback這個東西是干什么用的。
1.c.callback
c.callback最終定位到是在skynet/lualib-src/lua-skynet.c這個文件中,具體跟蹤過程和第6篇中一樣。

static int
lcallback(lua_State *L) {
    struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
    int forward = lua_toboolean(L, 2);
    luaL_checktype(L,1,LUA_TFUNCTION); //檢查是不是數(shù)據(jù)類型是不是函數(shù)
    lua_settop(L,1);
    lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); //把_cb保存到用戶表里,詳見lua參考手冊

    lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
    lua_State *gL = lua_tothread(L,-1);

    if (forward) {
        skynet_callback(context, gL, forward_cb);
    } else {
        skynet_callback(context, gL, _cb); //這個地方調(diào)用了C函數(shù)
    }

    return 0;
}
//設(shè)置回調(diào)
void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
    context->cb = cb; //看這里
    context->cb_ud = ud;
}

很明顯,c.callback就是設(shè)置回調(diào)函數(shù)到服務(wù)(模塊)的上下文中,而且設(shè)置的是skynet.dispatch_message這個lua方法為回調(diào)函數(shù)。

2.從代碼中看到,最終調(diào)用了skynet_callback這個C函數(shù),這個C函數(shù)的第三個參數(shù),是一個中轉(zhuǎn)函數(shù)。所以lua服務(wù)的回調(diào)它不是被直接調(diào)的,首先要在_cb這個函數(shù)處理一下數(shù)據(jù),在_cb里面去調(diào)lua的回調(diào)函數(shù)。_cb這個函數(shù)主要就是按照Lua api的協(xié)議,將參數(shù)準(zhǔn)備好,然后調(diào)lua的函數(shù)。在第9篇分析過,這里簡短地介紹一下:

static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
    lua_State *L = ud;
    int trace = 1;
    int r;
    int top = lua_gettop(L);
    if (top == 0) {
        lua_pushcfunction(L, traceback); //錯誤處理的函數(shù)
        lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); //把表里的回調(diào)函數(shù)取出來
    } else {
        assert(top == 2);
    }
    lua_pushvalue(L,2); //回調(diào)函數(shù)入棧

    lua_pushinteger(L, type);  //參數(shù)type入棧
    lua_pushlightuserdata(L, (void *)msg); //參數(shù)msg入棧
    lua_pushinteger(L,sz); //參數(shù)sz,消息長度,入棧
    lua_pushinteger(L, session); //參數(shù)session入棧
    lua_pushinteger(L, source); //參數(shù)session入棧

    r = lua_pcall(L, 5, 0 , trace); //調(diào)用lua的回調(diào)函數(shù),也就是skynet.dispatch_message

    if (r == LUA_OK) {
        return 0;
    }
    const char * self = skynet_command(context, "REG", NULL);
    switch (r) {
    case LUA_ERRRUN:
        skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
        break;
    case LUA_ERRMEM:
        skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
        break;
    case LUA_ERRERR:
        skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
        break;
    case LUA_ERRGCMM:
        skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
        break;
    };

    lua_pop(L,1);

    return 0;
}

3.skynet.dispatch_message,這個函數(shù)呢又涉及到lua的協(xié)程,這個協(xié)程暫時不講,我把函數(shù)精簡一下。dispatch_message實際調(diào)的是raw_dispatch_message,這是個lua函數(shù)。


local function raw_dispatch_message(prototype, msg, sz, session, source)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then
        local co = session_id_coroutine[session]
        if co == "BREAK" then
            session_id_coroutine[session] = nil
        elseif co == nil then
            unknown_response(session, source, msg, sz)
        else
            session_id_coroutine[session] = nil
            suspend(co, coroutine_resume(co, true, msg, sz))
        end
    else
        local p = proto[prototype] --skynet.dispatch對應(yīng)的proto
        if p == nil then
            if session ~= 0 then
                c.send(source, skynet.PTYPE_ERROR, session, "")
            else
                unknown_request(session, source, msg, sz, prototype)
            end
            return
        end
        local f = p.dispatch  --取真正的回調(diào)函數(shù),也就是skynet.dispath設(shè)的那個函數(shù)
        if f then
            local ref = watching_service[source]
            if ref then
                watching_service[source] = ref + 1
            else
                watching_service[source] = 1
            end
            local co = co_create(f)  --這里創(chuàng)建協(xié)程
            session_coroutine_id[co] = session
            session_coroutine_address[co] = source
            suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) //這里喚醒協(xié)程
        elseif session ~= 0 then
            c.send(source, skynet.PTYPE_ERROR, session, "")
        else
            unknown_request(session, source, msg, sz, proto[prototype].name)
        end
    end
end

第二部分到了這里,基本上流程就清楚了。第一步,skynet.dispatch把回調(diào)注冊到proto表中,并在表中設(shè)置服務(wù)的回調(diào)函數(shù)。第二步,skynet.start會調(diào)用C層的lcallback函數(shù),把lua函數(shù)skynet.dispatch_message設(shè)為lua層偽回調(diào),這個偽回調(diào)被存在用戶表里,這個lua層的偽回調(diào)會被C層的偽回調(diào)所調(diào)用。這個lua層的偽回調(diào)從proto表中取到dispatch注冊的真正的服務(wù)的回調(diào)函數(shù),然后調(diào)用它。第三步,lcallback函數(shù)會設(shè)置一個C層的偽回調(diào),這個偽回調(diào)的作用是做c到lua層的協(xié)議轉(zhuǎn)換。

語言描述可能還是不能為所有人理解,畫個簡單的關(guān)系圖吧

skynet.dispatch(callback) ---------------------------> proto[typename].dispach = callback
                                                                        |
skynet.core.callback(skynet.dispatch_message) -----------tbl[k] = skynet.dispatch_message
                                                                        |          
                                                                        |
C dispatch_message->_cb ------------------------------------------------|

也就是說C層弄了一個函數(shù)叫_cb,它在lua注冊服務(wù)時被注冊?;卣{(diào)時被調(diào)用,回調(diào)時做lua api協(xié)議適配,然后取用戶表里的一個lua回調(diào)函數(shù),這個回調(diào)函數(shù)叫做skynet.dispatch_message。這個dispatch_message又會去一個叫proto的表里找到服務(wù)真正的回調(diào)函數(shù),而這個真正的回調(diào)函數(shù)是通過skynet.dispatch注冊到proto[typename].dispatch上的。

如果還是有人不理解我就無能為力了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容