skynet源碼分析(6)--消息機制之消息分發(fā)

作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者

第5篇講到了消息的處理,消息的處理實際上就是對工作隊列里的消息不停地調(diào)回調(diào)函數(shù)。那么消息是怎么放進消息隊列的呢。帶著這個疑問,讓我們從lua層開始追根溯源。

在lua層有兩個api,一個是skynet.send,這個是非阻塞發(fā)消息。另一個是skynet.call,這個是阻塞式發(fā)完消息等回應。skynet.call使用一個session來實現(xiàn)等待,這個session實際就是一個自增的數(shù)字,溢出了以后又從1開始。

以skynet.send為例進行分析。skynet.send位于skynet/lualib/skynet.lua文件中。

function skynet.send(addr, typename, ...)
    local p = proto[typename]
    return c.send(addr, p.id, 0 , p.pack(...))  -- c就是skynet.core
end

可以看出skynet.send實際上是調(diào)了skynet.core里的send函數(shù)。而skynet.core的定義在skynet/lualib-src/lua-skynet.c這個文件中。因為下面有一部分代碼涉及到lua c api,這部分我不打算說明了。

LUAMOD_API int
luaopen_skynet_core(lua_State *L) { //這個就是skynet.core
    luaL_checkversion(L);

    luaL_Reg l[] = {
        { "send" , lsend },  //這個就是send對應的函數(shù)lsend
        { "genid", lgenid },
    省略.....
        { NULL, NULL },
    };
}

從上面的代碼可以知道,skynet.core.send實際對應的是lsend函數(shù)。

static int
lsend(lua_State *L) {
    return send_message(L, 0, 2);
}

而lsend又調(diào)的是send_message函數(shù)。


static int
send_message(lua_State *L, int source, int idx_type) {
略...
    switch (mtype) {
    case LUA_TSTRING: {
        
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type, session , msg, len);
        } else {
            session = skynet_send(context, source, dest, type, session , msg, len);
        }
        break;
    }
    case LUA_TLIGHTUSERDATA: {
        void * msg = lua_touserdata(L,idx_type+2);
        int size = luaL_checkinteger(L,idx_type+3);
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        } else {
            session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        }
        break;
    }
    default:
        luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
    }
     略...
}

從上面代碼看出,最終調(diào)的是skynet_sendname和skynet_send,而skynet_sendname實際上是調(diào)用的skynet_send。

int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
    if ((sz & MESSAGE_TYPE_MASK) != sz) {
        skynet_error(context, "The message to %x is too large", destination);
        if (type & PTYPE_TAG_DONTCOPY) {
            skynet_free(data);
        }
        return -1;
    }
    _filter_args(context, type, &session, (void **)&data, &sz);

    if (source == 0) {
        source = context->handle;
    }

    if (destination == 0) {
        return session;
    }
    if (skynet_harbor_message_isremote(destination)) { //遠程消息
        struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
        rmsg->destination.handle = destination;
        rmsg->message = data;
        rmsg->sz = sz;
        skynet_harbor_send(rmsg, source, session); //分布式消息發(fā)送
    } else {  //本地消息發(fā)送
        struct skynet_message smsg;
        smsg.source = source;
        smsg.session = session;
        smsg.data = data;
        smsg.sz = sz;
        //消息隊列加一條消息
        if (skynet_context_push(destination, &smsg)) {
            skynet_free(data);
            return -1;
        }
    }
    return session;
}

看到這里終于看到消息結構體了。

int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
    struct skynet_context * ctx = skynet_handle_grab(handle); //增加ctx引用計數(shù)
    if (ctx == NULL) {
        return -1;
    }
    skynet_mq_push(ctx->queue, message);  //消息入隊完成
    skynet_context_release(ctx); //減少ctx引用計數(shù)

    return 0;
}

從上面的代碼分析可以很清楚地看到,skynet.send實際上就是往目標服務的消息隊列里增加一條消息。

在這篇文章里,消息處理涉及到了lua c api,這個東西不是一兩篇文章能說得清楚的。所以直接略過了。

從代碼跟蹤的過程來看,發(fā)送一個消息實際上要進行6層函數(shù)調(diào)用。消息最終才能投到目標服務的消息隊列中。而消息隊列是怎么被處理的,在第5篇中已經(jīng)講過了。剩下的就是,消息的回調(diào)到底是怎么被注冊的。這個將在下一篇中講到。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容