淺析skynet底層框架下篇

這是最后一篇了,其實還有很多重要的模塊要分析的,但留給以后有多余時間再去研究吧,有興趣的可以自行下載源碼分析。這部分主要是圍繞第三小問題展開,并附加些其他skynet中與此有關(guān)的設(shè)計,即:當(dāng)并發(fā)時,如何保證消息的正確時序,以及如何使用協(xié)程處理消息(同步/異步/超時);包括創(chuàng)建協(xié)程處理消息,掛起協(xié)程,切換。這塊其實是針對lua上層來說的,底層框架的消息隊列只是保證消息順序入隊列且出隊列,如果交叉執(zhí)行比如lua層的協(xié)程掛起,那么就會出現(xiàn)時序問題。

先簡單回顧下前幾篇博客的分析,包括skynet本身的設(shè)計,及C++協(xié)程。對于C++協(xié)程,比如一個請求a過來后,從協(xié)程池中pop一個協(xié)程并處理該請求a,如果需要等待,則讓出協(xié)程并掛上定時器,然后再處理下一個請求b,如果此時a和b是相關(guān)聯(lián)的,且b有可能依賴于a的執(zhí)行結(jié)果,那么就會出現(xiàn)問題。這對于游戲中的業(yè)務(wù)來說,尤其涉及到金錢相關(guān)的邏輯,那是大問題。而那種獨立的請求間,只是讀之類的操作,那是沒問題的。如果需要結(jié)合業(yè)務(wù),那么就需要改造。

而對于skynet來說,當(dāng)并發(fā)上來時,考慮到這個時序問題,底層實現(xiàn)相關(guān)的順序隊列,大概思路就是lua協(xié)程執(zhí)行a到一半后,哪怕有b的消息被協(xié)程調(diào)度處理,此時會把這個b協(xié)程壓入隊列(lua中的table也可以,使用數(shù)組部分),必須等a執(zhí)行完畢或超時后,再處理b的,也就是在業(yè)務(wù)上層串行化了服務(wù)的消息處理。這樣保證了時序。

但這又引起了另一個問題,即可能存在后面的消息都超時了,然而上層如果無法識別繼續(xù)處理,那么就白白浪費了資源,處理了無用的消息。這類的相關(guān)介紹在另一篇“談?wù)劸彺娲┩秆┍篮瓦^載保護以及一致性等問題”中有相關(guān)的介紹及應(yīng)對方案。

本節(jié)分為兩個小點討論,即:
1)如何保證消息的正確時序,以及如何使用協(xié)程處理消息(同步/異步/超時);
2)創(chuàng)建協(xié)程處理消息,掛起協(xié)程,切換;

第一小點,撇開語言方面的限制,考慮skynet本身的框架設(shè)計,而不摻雜業(yè)務(wù)框架的設(shè)計。對于單進程多線程,要想并發(fā)的處理同一個客戶端的請求,不管是讀還是寫,都必須路由到同一個線程處理,這樣就保證了不會導(dǎo)致同一個client的請求分發(fā)到不同的線程,在skynet底層抽象client為一個agent service,有自己的消息隊列,并且當(dāng)工作線程處理這個agent消息時,先把這個消息隊列從全局隊列出摘出來,從這個隊列pop一條消息,處理完畢后,再把這個消息隊列掛到全局隊列中;而對于push消息到agent隊列則沒有這種過程,只要獲得自旋鎖即可,相關(guān)源碼可以見前面的分析。

這一層就保證了消息不會亂序,但是對于業(yè)務(wù)層,使用lua協(xié)程來提高并發(fā),那么就要好好設(shè)計。

這里舉例比如在主場景中,這樣可以考慮到client的所有消息都路由到場景后需要考慮到的時序問題。
當(dāng)與client有關(guān)的兩條有依賴關(guān)系的消息a和b被場景服務(wù)dispatch分發(fā)處理時,不考慮讀還是寫,都會創(chuàng)建一個協(xié)程,并執(zhí)行相關(guān)的處理函數(shù)。比如數(shù)據(jù)安全性不是特別嚴重的例子,玩家在幫派中,然后點領(lǐng)取今日獎勵b消息,此時幫主把玩家踢出幫派a消息,本來是a先執(zhí)行完畢后再b執(zhí)行的順序,這時可能出現(xiàn)a先執(zhí)行導(dǎo)致掛起,而b執(zhí)行完畢后,接著執(zhí)行a的情況,多領(lǐng)了一份獎勵。當(dāng)然這里只是為了舉例,通過檢查可以避免這種問題。

簡單分析下,在skynet的做法中,為每個服務(wù)加個lua層的消息隊列,進入該隊列的消息會被依次處理完畢,不管中間是否掛起,這樣帶來的問題是,并發(fā)度降底了且引入了一定的復(fù)雜度。

 17     dispatch = function(session, from, ...)
 18         table.insert(message_queue, {session = session, addr = from, ... })
 19         if thread_id then //有消息,如果有等待則wakeup
 20             skynet.wakeup(thread_id)
 21             thread_id = nil
 22         end
 23     end

 26 local function do_func(f, msg)
 27     return pcall(f, table.unpack(msg))
 28 end
 29 
 30 local function message_dispatch(f)
 31     while true do
 32         if #message_queue==0 then  //沒消息則掛起
 33             thread_id = coroutine.running()
 34             skynet.wait()
 35         else
 36             local msg = table.remove(message_queue,1)  //依次處理消息
 37             local session = msg.session
 38             if session == 0 then  //不需要響應(yīng)
 39                 local ok, msg = do_func(f, msg)
 40                 if ok then
 41                     if msg then
 42                         skynet.fork(message_dispatch,f)
 44                     end
 45                 else
 46                     skynet.fork(message_dispatch,f)
 48                 end
 49             else
 50                 local data, size = skynet.pack(do_func(f,msg))
 51                 -- 1 means response
 52                 c.send(msg.addr, 1, session, data, size) //需要響應(yīng)
 53             end
 54         end
 55     end
 56 end

上面代碼實現(xiàn)細節(jié)不作過多分析,簡單注釋了下,大致就是從table數(shù)組中remove前面的消息并處理之,如果會掛起則等響應(yīng)結(jié)果或超時,再處理下一條。

如上面的實現(xiàn),新消息來了fork一個協(xié)程處理:

533 function skynet.fork(func,...)
534     local args = table.pack(...)  //打包參數(shù)
535     local co = co_create(function()
536         func(table.unpack(args,1,args.n)) //設(shè)置協(xié)程執(zhí)行函數(shù)和參數(shù)
537     end)
538     table.insert(fork_queue, co) //回收協(xié)程資源
539     return co
540 end

104 local function co_create(f)
105     local co = table.remove(coroutine_pool)
106     if co == nil then
107         co = coroutine.create(function(...)
108             f(...)
109             while true do
110                 local session = session_coroutine_id[co]
111                 if session and session ~= 0 then
112                     local source = debug.getinfo(f,"S")
                        //log error
117                 end
118                 f = nil
119                 coroutine_pool[#coroutine_pool+1] = co
120                 f = coroutine_yield "EXIT"
121                 f(coroutine_yield())
122             end
123         end)
124     else
125         coroutine_resume(co, f)
126     end
127     return co
128 end

上面co_create就從協(xié)程池中取一個協(xié)程對象處理消息,如果沒有協(xié)程對象則創(chuàng)建。你一定會好奇執(zhí)行完后,返回結(jié)果在哪?

對于lua的協(xié)程api,當(dāng)create協(xié)程時它的狀態(tài)還沒開始,處于掛起suspended狀態(tài),然后resume后會處理running狀態(tài),執(zhí)行完后為dead狀態(tài),引用下面的:
a)coroutine.create(arg):根據(jù)一個函數(shù)創(chuàng)建一個協(xié)同程序,參數(shù)為一個函數(shù);
b)coroutine.resume(co):使協(xié)同從掛起變?yōu)檫\行(1)激活coroutine,也就是讓協(xié)程函數(shù)開始運行;(2)喚醒yield,使掛起的協(xié)同接著上次的地方繼續(xù)運行。該函數(shù)可以傳入?yún)?shù);
c)coroutine.yield():使正在運行的協(xié)同掛起,可以傳入?yún)?shù);

而真正強大之處在于當(dāng)?shù)诙蝦esume時,resume和yield相關(guān)交換數(shù)據(jù),具體怎么交互的建議看下lua協(xié)程基礎(chǔ)。

在skynet中進行了對lua原始協(xié)程api進行封裝并管理,下面說明第二個小點,當(dāng)然會把第一小點也部分說明下,畢竟是個整體,從創(chuàng)建到處理到回收,以及中間的注意點。通過幾個常用的接口來說明這套工作流程。

以下實現(xiàn)是wakeup相關(guān):

493 function skynet.wakeup(token)
494     if sleep_session[token] then
495         table.insert(wakeup_queue, token) //在下一次suspend時被處理
496         return true
497     end
498 end

339 function skynet.wait(token)
340     local session = c.genid()
341     token = token or coroutine.running()
342     local ret, msg = coroutine_yield("SLEEP", session, token)//切出協(xié)程(A)
343     sleep_session[token] = nil  //協(xié)程切回來重置相關(guān)數(shù)據(jù)
344     session_id_coroutine[session] = nil
345 end

130 local function dispatch_wakeup()
131     local token = table.remove(wakeup_queue,1)
132     if token then
133         local session = sleep_session[token]
134         if session then
135             local co = session_id_coroutine[session]
136             local tag = session_coroutine_tracetag[co]
137             if tag then c.trace(tag, "resume") end
138             session_id_coroutine[session] = "BREAK"
139             return suspend(co, coroutine_resume(co, false, "BREAK"))(B)  調(diào)度被掛起的協(xié)程
140         end
141     end
142 end

157 function suspend(co, result, command, param, param2)
        //more code
183     elseif command == "SLEEP" then
184         local tag = session_coroutine_tracetag[co]
185         if tag then c.trace(tag, "sleep", co, 2) end
186         session_id_coroutine[param] = co
187         if sleep_session[param2] then
188             error(debug.traceback(co, "token duplicative"))
189         end
190         sleep_session[param2] = param
307     dispatch_wakeup()
308     dispatch_error_queue()
309 end

把要喚醒的協(xié)程通過token插入到wakeup_queue數(shù)組中(注意下,很多實現(xiàn)邏輯是使用table的數(shù)組部分,因為有序但帶來的問題是從索引x處刪除元素后,涉及到移動)

然后dispatch_wakeup會處理wakeup_queue,重點是這一句return suspend(co, coroutine_resume(co, false, "BREAK")),這部分在后面分析。
(A)處把當(dāng)前協(xié)程切出去后,那三個參數(shù)作為主協(xié)程的返回值,即coroutine_resume的返回值,再加一個本身返回的true or false,然后調(diào)用suspend,同理coroutine_resume的后兩個參數(shù)作為coroutine_yield的返回值。

以上部分還是比較容易理解,這里可以結(jié)合c++協(xié)程中的實現(xiàn),有專門的協(xié)程調(diào)度器,要么超時要么有數(shù)據(jù)過來(響應(yīng))進而切回相應(yīng)的協(xié)程處理。

不過經(jīng)歷過的項目貌似沒有那種加限時的請求,如果call長時間收不到響應(yīng),可能會出問題,這個需要多研究下。不過,結(jié)合skynet基礎(chǔ)實現(xiàn)也好辦;另外底層框架也是skynet,lua層的源碼部分都有返回,不管正確還是失敗都會返回,除非這條call請求消息根本沒有被目標(biāo)服務(wù)的消息隊列收到(可能出錯),或者沒有被工作線程調(diào)度,再或者沒有被上層服務(wù)處理;前者可能基本為零,第一種可能性不大,因為框架已經(jīng)保證消息一定會被發(fā)送到消息隊列中(消息隊列目前是無界的),而后面兩種可能確實存在,比如一個死循環(huán)或者處理耗時的功能等,這些只能靠開發(fā)人員注意及必要code review了。

311 function skynet.timeout(ti, func)
312     local session = c.intcommand("TIMEOUT",ti)
313     assert(session)
314     local co = co_create(func)
315     assert(session_id_coroutine[session] == nil)
316     session_id_coroutine[session] = co
317 end
318 
319 function skynet.sleep(ti, token)
320     local session = c.intcommand("TIMEOUT",ti)
321     assert(session)
322     token = token or coroutine.running()
323     local succ, ret = coroutine_yield("SLEEP", session, token)
324     sleep_session[token] = nil
325     if succ then
326         return
327     end
328     if ret == "BREAK" then
329         return "BREAK"
330     else
331         error(ret)
332     end
333 end

上面就是超時的實現(xiàn),也即弄個協(xié)程,向skynet框架注冊個定時器,當(dāng)超時時,發(fā)條消息到上層,上層創(chuàng)建協(xié)程處理。這個跟c++協(xié)程一樣,實現(xiàn)中不能有sleep這種調(diào)用,只能用超時,然后掛到事件列表中,超時resume協(xié)程回調(diào),不然阻塞其他。

剩下的不過多分析,這三篇只是簡單分析了個大概,還有蠻多值得學(xué)習(xí),關(guān)鍵在于思考為什么要這么做,可以根據(jù)自己的經(jīng)驗,去嘗試改進或在github上提pr,分析別人的設(shè)計,可能并不像作者一路踩坑過來,并持續(xù)重構(gòu)那樣,恰到好處的設(shè)計。

接下來的一篇準備研究下鎖的性能,主要是對前幾天學(xué)習(xí)的一個總結(jié)。

skynet 中 Lua 服務(wù)的消息處理
Lua中的協(xié)同程序 coroutine
Lua Coroutine詳解
skynet 里的 coroutine
skynet coroutine 運行筆記

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容