這是最后一篇了,其實還有很多重要的模塊要分析的,但留給以后有多余時間再去研究吧,有興趣的可以自行下載源碼分析。這部分主要是圍繞第三小問題展開,并附加些其他skynet中與此有關(guān)的設(shè)計,即:當(dāng)并發(fā)時,如何保證消息的正確時序,以及如何使用協(xié)程處理消息(同步/異步/超時);包括創(chuàng)建協(xié)程處理消息,掛起協(xié)程,切換。這塊其實是針對lua上層來說的,底層框架的消息隊列只是保證消息順序入隊列且出隊列,如果交叉執(zhí)行比如lua層的協(xié)程掛起,那么就會出現(xiàn)時序問題。
先簡單回顧下前幾篇博客的分析,包括skynet本身的設(shè)計,及C++協(xié)程。對于C++協(xié)程,比如一個請求a過來后,從協(xié)程池中pop一個協(xié)程并處理該請求a,如果需要等待,則讓出協(xié)程并掛上定時器,然后再處理下一個請求b,如果此時a和b是相關(guān)聯(lián)的,且b有可能依賴于a的執(zhí)行結(jié)果,那么就會出現(xiàn)問題。這對于游戲中的業(yè)務(wù)來說,尤其涉及到金錢相關(guān)的邏輯,那是大問題。而那種獨立的請求間,只是讀之類的操作,那是沒問題的。如果需要結(jié)合業(yè)務(wù),那么就需要改造。
而對于skynet來說,當(dāng)并發(fā)上來時,考慮到這個時序問題,底層實現(xiàn)相關(guān)的順序隊列,大概思路就是lua協(xié)程執(zhí)行a到一半后,哪怕有b的消息被協(xié)程調(diào)度處理,此時會把這個b協(xié)程壓入隊列(lua中的table也可以,使用數(shù)組部分),必須等a執(zhí)行完畢或超時后,再處理b的,也就是在業(yè)務(wù)上層串行化了服務(wù)的消息處理。這樣保證了時序。
但這又引起了另一個問題,即可能存在后面的消息都超時了,然而上層如果無法識別繼續(xù)處理,那么就白白浪費了資源,處理了無用的消息。這類的相關(guān)介紹在另一篇“談?wù)劸彺娲┩秆┍篮瓦^載保護以及一致性等問題”中有相關(guān)的介紹及應(yīng)對方案。
本節(jié)分為兩個小點討論,即:
1)如何保證消息的正確時序,以及如何使用協(xié)程處理消息(同步/異步/超時);
2)創(chuàng)建協(xié)程處理消息,掛起協(xié)程,切換;
第一小點,撇開語言方面的限制,考慮skynet本身的框架設(shè)計,而不摻雜業(yè)務(wù)框架的設(shè)計。對于單進程多線程,要想并發(fā)的處理同一個客戶端的請求,不管是讀還是寫,都必須路由到同一個線程處理,這樣就保證了不會導(dǎo)致同一個client的請求分發(fā)到不同的線程,在skynet底層抽象client為一個agent service,有自己的消息隊列,并且當(dāng)工作線程處理這個agent消息時,先把這個消息隊列從全局隊列出摘出來,從這個隊列pop一條消息,處理完畢后,再把這個消息隊列掛到全局隊列中;而對于push消息到agent隊列則沒有這種過程,只要獲得自旋鎖即可,相關(guān)源碼可以見前面的分析。
這一層就保證了消息不會亂序,但是對于業(yè)務(wù)層,使用lua協(xié)程來提高并發(fā),那么就要好好設(shè)計。
這里舉例比如在主場景中,這樣可以考慮到client的所有消息都路由到場景后需要考慮到的時序問題。
當(dāng)與client有關(guān)的兩條有依賴關(guān)系的消息a和b被場景服務(wù)dispatch分發(fā)處理時,不考慮讀還是寫,都會創(chuàng)建一個協(xié)程,并執(zhí)行相關(guān)的處理函數(shù)。比如數(shù)據(jù)安全性不是特別嚴重的例子,玩家在幫派中,然后點領(lǐng)取今日獎勵b消息,此時幫主把玩家踢出幫派a消息,本來是a先執(zhí)行完畢后再b執(zhí)行的順序,這時可能出現(xiàn)a先執(zhí)行導(dǎo)致掛起,而b執(zhí)行完畢后,接著執(zhí)行a的情況,多領(lǐng)了一份獎勵。當(dāng)然這里只是為了舉例,通過檢查可以避免這種問題。
簡單分析下,在skynet的做法中,為每個服務(wù)加個lua層的消息隊列,進入該隊列的消息會被依次處理完畢,不管中間是否掛起,這樣帶來的問題是,并發(fā)度降底了且引入了一定的復(fù)雜度。
17 dispatch = function(session, from, ...)
18 table.insert(message_queue, {session = session, addr = from, ... })
19 if thread_id then //有消息,如果有等待則wakeup
20 skynet.wakeup(thread_id)
21 thread_id = nil
22 end
23 end
26 local function do_func(f, msg)
27 return pcall(f, table.unpack(msg))
28 end
29
30 local function message_dispatch(f)
31 while true do
32 if #message_queue==0 then //沒消息則掛起
33 thread_id = coroutine.running()
34 skynet.wait()
35 else
36 local msg = table.remove(message_queue,1) //依次處理消息
37 local session = msg.session
38 if session == 0 then //不需要響應(yīng)
39 local ok, msg = do_func(f, msg)
40 if ok then
41 if msg then
42 skynet.fork(message_dispatch,f)
44 end
45 else
46 skynet.fork(message_dispatch,f)
48 end
49 else
50 local data, size = skynet.pack(do_func(f,msg))
51 -- 1 means response
52 c.send(msg.addr, 1, session, data, size) //需要響應(yīng)
53 end
54 end
55 end
56 end
上面代碼實現(xiàn)細節(jié)不作過多分析,簡單注釋了下,大致就是從table數(shù)組中remove前面的消息并處理之,如果會掛起則等響應(yīng)結(jié)果或超時,再處理下一條。
如上面的實現(xiàn),新消息來了fork一個協(xié)程處理:
533 function skynet.fork(func,...)
534 local args = table.pack(...) //打包參數(shù)
535 local co = co_create(function()
536 func(table.unpack(args,1,args.n)) //設(shè)置協(xié)程執(zhí)行函數(shù)和參數(shù)
537 end)
538 table.insert(fork_queue, co) //回收協(xié)程資源
539 return co
540 end
104 local function co_create(f)
105 local co = table.remove(coroutine_pool)
106 if co == nil then
107 co = coroutine.create(function(...)
108 f(...)
109 while true do
110 local session = session_coroutine_id[co]
111 if session and session ~= 0 then
112 local source = debug.getinfo(f,"S")
//log error
117 end
118 f = nil
119 coroutine_pool[#coroutine_pool+1] = co
120 f = coroutine_yield "EXIT"
121 f(coroutine_yield())
122 end
123 end)
124 else
125 coroutine_resume(co, f)
126 end
127 return co
128 end
上面co_create就從協(xié)程池中取一個協(xié)程對象處理消息,如果沒有協(xié)程對象則創(chuàng)建。你一定會好奇執(zhí)行完后,返回結(jié)果在哪?
對于lua的協(xié)程api,當(dāng)create協(xié)程時它的狀態(tài)還沒開始,處于掛起suspended狀態(tài),然后resume后會處理running狀態(tài),執(zhí)行完后為dead狀態(tài),引用下面的:
a)coroutine.create(arg):根據(jù)一個函數(shù)創(chuàng)建一個協(xié)同程序,參數(shù)為一個函數(shù);
b)coroutine.resume(co):使協(xié)同從掛起變?yōu)檫\行(1)激活coroutine,也就是讓協(xié)程函數(shù)開始運行;(2)喚醒yield,使掛起的協(xié)同接著上次的地方繼續(xù)運行。該函數(shù)可以傳入?yún)?shù);
c)coroutine.yield():使正在運行的協(xié)同掛起,可以傳入?yún)?shù);
而真正強大之處在于當(dāng)?shù)诙蝦esume時,resume和yield相關(guān)交換數(shù)據(jù),具體怎么交互的建議看下lua協(xié)程基礎(chǔ)。
在skynet中進行了對lua原始協(xié)程api進行封裝并管理,下面說明第二個小點,當(dāng)然會把第一小點也部分說明下,畢竟是個整體,從創(chuàng)建到處理到回收,以及中間的注意點。通過幾個常用的接口來說明這套工作流程。
以下實現(xiàn)是wakeup相關(guān):
493 function skynet.wakeup(token)
494 if sleep_session[token] then
495 table.insert(wakeup_queue, token) //在下一次suspend時被處理
496 return true
497 end
498 end
339 function skynet.wait(token)
340 local session = c.genid()
341 token = token or coroutine.running()
342 local ret, msg = coroutine_yield("SLEEP", session, token)//切出協(xié)程(A)
343 sleep_session[token] = nil //協(xié)程切回來重置相關(guān)數(shù)據(jù)
344 session_id_coroutine[session] = nil
345 end
130 local function dispatch_wakeup()
131 local token = table.remove(wakeup_queue,1)
132 if token then
133 local session = sleep_session[token]
134 if session then
135 local co = session_id_coroutine[session]
136 local tag = session_coroutine_tracetag[co]
137 if tag then c.trace(tag, "resume") end
138 session_id_coroutine[session] = "BREAK"
139 return suspend(co, coroutine_resume(co, false, "BREAK"))(B) 調(diào)度被掛起的協(xié)程
140 end
141 end
142 end
157 function suspend(co, result, command, param, param2)
//more code
183 elseif command == "SLEEP" then
184 local tag = session_coroutine_tracetag[co]
185 if tag then c.trace(tag, "sleep", co, 2) end
186 session_id_coroutine[param] = co
187 if sleep_session[param2] then
188 error(debug.traceback(co, "token duplicative"))
189 end
190 sleep_session[param2] = param
307 dispatch_wakeup()
308 dispatch_error_queue()
309 end
把要喚醒的協(xié)程通過token插入到wakeup_queue數(shù)組中(注意下,很多實現(xiàn)邏輯是使用table的數(shù)組部分,因為有序但帶來的問題是從索引x處刪除元素后,涉及到移動)
然后dispatch_wakeup會處理wakeup_queue,重點是這一句return suspend(co, coroutine_resume(co, false, "BREAK")),這部分在后面分析。
(A)處把當(dāng)前協(xié)程切出去后,那三個參數(shù)作為主協(xié)程的返回值,即coroutine_resume的返回值,再加一個本身返回的true or false,然后調(diào)用suspend,同理coroutine_resume的后兩個參數(shù)作為coroutine_yield的返回值。
以上部分還是比較容易理解,這里可以結(jié)合c++協(xié)程中的實現(xiàn),有專門的協(xié)程調(diào)度器,要么超時要么有數(shù)據(jù)過來(響應(yīng))進而切回相應(yīng)的協(xié)程處理。
不過經(jīng)歷過的項目貌似沒有那種加限時的請求,如果call長時間收不到響應(yīng),可能會出問題,這個需要多研究下。不過,結(jié)合skynet基礎(chǔ)實現(xiàn)也好辦;另外底層框架也是skynet,lua層的源碼部分都有返回,不管正確還是失敗都會返回,除非這條call請求消息根本沒有被目標(biāo)服務(wù)的消息隊列收到(可能出錯),或者沒有被工作線程調(diào)度,再或者沒有被上層服務(wù)處理;前者可能基本為零,第一種可能性不大,因為框架已經(jīng)保證消息一定會被發(fā)送到消息隊列中(消息隊列目前是無界的),而后面兩種可能確實存在,比如一個死循環(huán)或者處理耗時的功能等,這些只能靠開發(fā)人員注意及必要code review了。
311 function skynet.timeout(ti, func)
312 local session = c.intcommand("TIMEOUT",ti)
313 assert(session)
314 local co = co_create(func)
315 assert(session_id_coroutine[session] == nil)
316 session_id_coroutine[session] = co
317 end
318
319 function skynet.sleep(ti, token)
320 local session = c.intcommand("TIMEOUT",ti)
321 assert(session)
322 token = token or coroutine.running()
323 local succ, ret = coroutine_yield("SLEEP", session, token)
324 sleep_session[token] = nil
325 if succ then
326 return
327 end
328 if ret == "BREAK" then
329 return "BREAK"
330 else
331 error(ret)
332 end
333 end
上面就是超時的實現(xiàn),也即弄個協(xié)程,向skynet框架注冊個定時器,當(dāng)超時時,發(fā)條消息到上層,上層創(chuàng)建協(xié)程處理。這個跟c++協(xié)程一樣,實現(xiàn)中不能有sleep這種調(diào)用,只能用超時,然后掛到事件列表中,超時resume協(xié)程回調(diào),不然阻塞其他。
剩下的不過多分析,這三篇只是簡單分析了個大概,還有蠻多值得學(xué)習(xí),關(guān)鍵在于思考為什么要這么做,可以根據(jù)自己的經(jīng)驗,去嘗試改進或在github上提pr,分析別人的設(shè)計,可能并不像作者一路踩坑過來,并持續(xù)重構(gòu)那樣,恰到好處的設(shè)計。
接下來的一篇準備研究下鎖的性能,主要是對前幾天學(xué)習(xí)的一個總結(jié)。
skynet 中 Lua 服務(wù)的消息處理
Lua中的協(xié)同程序 coroutine
Lua Coroutine詳解
skynet 里的 coroutine
skynet coroutine 運行筆記