作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者
第5篇講到了消息的處理,消息的處理實際上就是對工作隊列里的消息不停地調(diào)回調(diào)函數(shù)。那么消息是怎么放進消息隊列的呢。帶著這個疑問,讓我們從lua層開始追根溯源。
在lua層有兩個api,一個是skynet.send,這個是非阻塞發(fā)消息。另一個是skynet.call,這個是阻塞式發(fā)完消息等回應。skynet.call使用一個session來實現(xiàn)等待,這個session實際就是一個自增的數(shù)字,溢出了以后又從1開始。
以skynet.send為例進行分析。skynet.send位于skynet/lualib/skynet.lua文件中。
function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0 , p.pack(...)) -- c就是skynet.core
end
可以看出skynet.send實際上是調(diào)了skynet.core里的send函數(shù)。而skynet.core的定義在skynet/lualib-src/lua-skynet.c這個文件中。因為下面有一部分代碼涉及到lua c api,這部分我不打算說明了。
LUAMOD_API int
luaopen_skynet_core(lua_State *L) { //這個就是skynet.core
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend }, //這個就是send對應的函數(shù)lsend
{ "genid", lgenid },
省略.....
{ NULL, NULL },
};
}
從上面的代碼可以知道,skynet.core.send實際對應的是lsend函數(shù)。
static int
lsend(lua_State *L) {
return send_message(L, 0, 2);
}
而lsend又調(diào)的是send_message函數(shù)。
static int
send_message(lua_State *L, int source, int idx_type) {
略...
switch (mtype) {
case LUA_TSTRING: {
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
略...
}
從上面代碼看出,最終調(diào)的是skynet_sendname和skynet_send,而skynet_sendname實際上是調(diào)用的skynet_send。
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
return session;
}
if (skynet_harbor_message_isremote(destination)) { //遠程消息
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz;
skynet_harbor_send(rmsg, source, session); //分布式消息發(fā)送
} else { //本地消息發(fā)送
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
//消息隊列加一條消息
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
看到這里終于看到消息結構體了。
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle); //增加ctx引用計數(shù)
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message); //消息入隊完成
skynet_context_release(ctx); //減少ctx引用計數(shù)
return 0;
}
從上面的代碼分析可以很清楚地看到,skynet.send實際上就是往目標服務的消息隊列里增加一條消息。
在這篇文章里,消息處理涉及到了lua c api,這個東西不是一兩篇文章能說得清楚的。所以直接略過了。
從代碼跟蹤的過程來看,發(fā)送一個消息實際上要進行6層函數(shù)調(diào)用。消息最終才能投到目標服務的消息隊列中。而消息隊列是怎么被處理的,在第5篇中已經(jīng)講過了。剩下的就是,消息的回調(diào)到底是怎么被注冊的。這個將在下一篇中講到。