skynet消息調(diào)度及處理

skynet內(nèi)部服務(wù)都是由一個一個的消息所驅(qū)動,每個服務(wù)的上下文結(jié)構(gòu)體struct skynet_context有個字段struct message_queue *queue描述其消息隊列,所有服務(wù)的消息隊列掛在全局消息對列的列表struct global_queue *Q

skynet在啟動時會啟動config->threadworker線程來處理所有服務(wù)的消息,worker線程的入口函數(shù)為static void *thread_worker(void *p),其處理邏輯如下:

  1. 如果當(dāng)前要處理的消息隊列為空,則從全局消息隊列的列表中取下一個消息隊列
  2. 對消息隊列中的每個消息,調(diào)用該消息隊列所屬服務(wù)的回調(diào)函數(shù),每次至少處理一個消息,之多處理消息隊列長度右移weight個消息,其中weight是事先配置好的
static int weight[] = { 
        -1, -1, -1, -1, 0, 0, 0, 0,
        1, 1, 1, 1, 1, 1, 1, 1, 
        2, 2, 2, 2, 2, 2, 2, 2, 
        3, 3, 3, 3, 3, 3, 3, 3, };

比如配置啟動nworker線程,第i個線程的weight為:當(dāng)i小于weight數(shù)組長度時,線程weightweight[i-1],否則為0

每個服務(wù)的消息隊列都會被worker進(jìn)程公平的進(jìn)行處理,但是每個線程一次處理的消息個數(shù)由工作線程配置的權(quán)重決定。

下面以snlua為例理解消息回調(diào)處理,在dispatch_message函數(shù)中,通過調(diào)用服務(wù)的回調(diào)函數(shù)來讓服務(wù)處理其收到的消息:
ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)

snlua是執(zhí)行lua服務(wù)的沙盒環(huán)境,啟動一個lua服務(wù)之后,在lua代碼中會設(shè)置回調(diào)函數(shù),通常在skynet.lua文件中的skynet.start中設(shè)置c.callback(skynet.dispatch_message),c.callback調(diào)用的是:

  83 static int
  84 _callback(lua_State *L) {
  85     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
  86     int forward = lua_toboolean(L, 2);
  87     luaL_checktype(L,1,LUA_TFUNCTION);
  88     lua_settop(L,1);
  89     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
  90
  91     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
  92     lua_State *gL = lua_tothread(L,-1);
  93
  94     if (forward) {
  95         skynet_callback(context, gL, forward_cb);
  96     } else {
  97         skynet_callback(context, gL, _cb);
  98     }
  99
 100     return 0;
 101 }
  • 85行獲取服務(wù)的上下文結(jié)構(gòu),此upvalue是在啟動次服務(wù)的時候設(shè)置的
  • 89行在注冊表中設(shè)置_cb=>skynet.dispatch_message
  • 91-92行獲取服務(wù)的LUA狀態(tài)機(jī)結(jié)構(gòu)
  • 95或者97行設(shè)置服務(wù)上下結(jié)構(gòu)體中的回調(diào)函數(shù)為_cb,回調(diào)函數(shù)私有數(shù)據(jù)為LUA狀態(tài)機(jī)gL

下面來分析回調(diào)函數(shù)_cb,任何LUA沙盒服務(wù)收到的消息的回調(diào)函數(shù)入口都是_cb

  30 static int
  31 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
  32     lua_State *L = ud;
  33     int trace = 1;
  34     int r;
  35     int top = lua_gettop(L);
  36     if (top == 0) {
  37         lua_pushcfunction(L, traceback);
  38         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
  39     } else {
  40         assert(top == 2);
  41     }
  42     lua_pushvalue(L,2);
  43
  44     lua_pushinteger(L, type);
  45     lua_pushlightuserdata(L, (void *)msg);
  46     lua_pushinteger(L,sz);
  47     lua_pushinteger(L, session);
  48     lua_pushinteger(L, source);
  49
  50     r = lua_pcall(L, 5, 0 , trace);
  51
  52     if (r == LUA_OK) {
  53         return 0;
  54     }
  55     const char * self = skynet_command(context, "REG", NULL);
  56     switch (r) {
  57     case LUA_ERRRUN:
  58         skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
  59         break;
  60     case LUA_ERRMEM:
  61         skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
  62         break;
  63     case LUA_ERRERR:
  64         skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
  65         break;
  66     case LUA_ERRGCMM:
  67         skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
  68         break;
  69     };
  70
  71     lua_pop(L,1);
  72
  73     return 0;
  74 }
  • 32-38行在LUA狀態(tài)機(jī)的棧中設(shè)置即將執(zhí)行的LUA函數(shù)及參數(shù),依次是traceback,skynet.dispatch_messagetype,msg,szsession,source
  • 50行在保護(hù)模式執(zhí)行skynet.dispatch_message函數(shù),在此函數(shù)進(jìn)行真正消息處理

以上粗略的分析了skynet框架是如何調(diào)度每個服務(wù)的消息隊列,以及如何通過回調(diào)函數(shù)來對服務(wù)的消息進(jìn)行處理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容