前幾篇講解了服務(wù)之間互相發(fā)送消息,消息的掛起,恢復(fù),fork等等,感覺還是有點(diǎn)搞不清他們之間是怎么協(xié)作的,例如fork產(chǎn)生的協(xié)程什么時(shí)候被調(diào)用,如果有多個(gè)fork又怎么被調(diào)用.這篇試著講解一下.
首先要明白的是,一般所有的lua層函數(shù)都是以協(xié)程的方式被執(zhí)行的,包括fork產(chǎn)生的函數(shù).除非你在skynet.start()之外調(diào)用函數(shù).這點(diǎn)通過前面的分析可以知道. start()函數(shù)調(diào)用timeout產(chǎn)生協(xié)程. fork產(chǎn)生協(xié)程列表.
我們知道,lua層設(shè)置的回調(diào)函數(shù)為skynet.dispatch_message. 他主要調(diào)用raw_dispatch_message. 在那里才是驅(qū)動協(xié)程函數(shù)執(zhí)行的地方.一個(gè)協(xié)程結(jié)束或掛起之后將由suspend函數(shù)來接管,相信通過前面的講解,大家對這個(gè)函數(shù)不陌生了.
如果入口函數(shù)start沒有調(diào)用fork,sleep,wait之類的函數(shù),那么驅(qū)動start()執(zhí)行的消息將結(jié)束.看看我們的協(xié)程池函數(shù):
local function co_create(f)
local co = table.remove(coroutine_pool)
if co == nil then
co = coroutine.create(function(...)
f(...) --① 函數(shù)執(zhí)行完畢
while true do
f = nil
coroutine_pool[#coroutine_pool+1] = co
f = coroutine_yield "EXIT" --②掛起協(xié)程
f(coroutine_yield())
end
end)
else
coroutine_resume(co, f) --③
end
return co
end
在①處函數(shù)執(zhí)行完畢,然后進(jìn)入②,協(xié)程掛起. 將由suspend函數(shù)接管.執(zhí)行cmd == 'EXIT'分支.
當(dāng)再次收到消息時(shí)(注意,一般再次收到消息是其他服務(wù)的call調(diào)用,類型不為response),調(diào)用skynet.dispatch_message時(shí),參看前面raw_dispatch_message的代碼,如果消息類型不為response,那么將再次調(diào)用co_create().此時(shí)將執(zhí)行3??,回到前次消息掛起的地方,返回的f就是想要執(zhí)行的、在dispatch中設(shè)置的回調(diào)函數(shù)。之后協(xié)程再次掛起,然后又恢復(fù),直至suspend接管。
如果有skynet.sleep(),那么會產(chǎn)生一個(gè)sessionID并被掛起,suspend函數(shù)接管,用session_id_coroutine關(guān)聯(lián)session和當(dāng)前協(xié)程。等到時(shí)間到會產(chǎn)生resond類型的消息,根據(jù)sesssionID找到協(xié)程,恢復(fù)協(xié)程,此時(shí)start(func)中的func函數(shù)才算結(jié)束,即上面的①。
我們注意到suspend函數(shù)的'SLEEP'分支還有一個(gè)sleep_session表,他是用了干嘛的呢?
查找skynet.lua源碼,發(fā)現(xiàn)skynet.wakeup引用到他了。在介紹skynet.wakeup()之前,先說說skynet.fork()。
skynet.fork()也會調(diào)用協(xié)程池函數(shù)co_create()來產(chǎn)生協(xié)程。那么調(diào)用這個(gè)函數(shù)時(shí),會走③的流程,然后coroutine_resume(co)嗎,那樣豈不是亂套了。不會的!因?yàn)檫@個(gè)時(shí)候當(dāng)前函數(shù)還沒有結(jié)束呢,coroutine_pool仍然為空,走的還是if分支。所以他只是創(chuàng)建一個(gè)協(xié)程,插入到fork_queue表中,并沒有啟用協(xié)程。
我們追蹤fork_queue,發(fā)現(xiàn)是在raw_dispatch_message結(jié)束后調(diào)用的,而且是順序取出協(xié)程,并執(zhí)行。raw_dispatch_message調(diào)用結(jié)束,可能是入口函數(shù)執(zhí)行完畢,調(diào)用了yield('EXIT'),也有可能執(zhí)行了sleep,調(diào)用了yield('SLEEP'),這都會導(dǎo)致suspend返回。
這里我們可以回到開頭提出的問題了。skynet.fork()產(chǎn)生的的協(xié)程函數(shù),將在主協(xié)程(start()啟用的)結(jié)束或掛起時(shí)被調(diào)用。有多個(gè)fork產(chǎn)生的協(xié)程函數(shù)時(shí),將按順序挨個(gè)取出執(zhí)行,全部執(zhí)行完畢時(shí),skynet.dispatch_message才算結(jié)束,下個(gè)消息才得以到達(dá)。在c接口層,同一個(gè)服務(wù)的消息隊(duì)列是嚴(yán)格按照順序來執(zhí)行的,所以這樣沒有問題。
再次強(qiáng)調(diào),lua協(xié)程只是模擬了多線程的執(zhí)行,并不會真正有多個(gè)協(xié)程在執(zhí)行,所以確保skynet.fork()函數(shù)中不要出現(xiàn)死循環(huán)或者很費(fèi)時(shí)的操作,因?yàn)樗粫駆inux一樣真正多線程的執(zhí)行。
填一下上面挖的坑。skynet.wakeup是用來干嘛的呢?
我們想提前喚醒執(zhí)行sleep函數(shù)的協(xié)程可以嗎。當(dāng)然可以,skynet.wakeup就是來干這個(gè)的。sleep后,何時(shí)去執(zhí)行skynet.wakeup,畢竟sleep掛起協(xié)程之后要等到超時(shí)才會重新恢復(fù),既然恢復(fù)之后,再調(diào)用wakeup還有什么用呢。你可能想到了,在skynet.fork()產(chǎn)生的協(xié)程中調(diào)用。前面說到了,sleep之后就可以執(zhí)行fork里的協(xié)程了。skynet.wakeup是怎么做到的呢?
看看skynet.wakeup的實(shí)現(xiàn):
function skynet.wakeup(co)
if sleep_session[co] then
table.insert(wakeup_queue, co)
return true
end
end
他只是根據(jù)sleep_session表來產(chǎn)生一個(gè)新的表wakeup_queue。
如果要喚醒的協(xié)程沒有在sleep_session表中,則不會有任何結(jié)果。前面說到sleep()函數(shù)會讓suspend產(chǎn)生sleep_session表來關(guān)聯(lián)要掛起的協(xié)程。所以waitup一個(gè)sleep的協(xié)程會插入到wakeup_queue表中。而wakeup_queue是在disptch_wakeup中取出并恢復(fù)的。disptch_wakeup仍然是在suspend中最后調(diào)用的,也就是說一個(gè)協(xié)程被掛起或結(jié)束后會去檢查wakeup_queue列表。
disptch_wakeup的實(shí)現(xiàn)為:
local function dispatch_wakeup()
local co = table.remove(wakeup_queue,1)
if co then
local session = sleep_session[co]
if session then
session_id_coroutine[session] = "BREAK"
return suspend(co, coroutine_resume(co, false, "BREAK"))
end
end
end
注意,即使調(diào)用了wakeup,也有可能不會立馬喚醒那個(gè)協(xié)程,因?yàn)閣akeup只是插入了一個(gè)掛起的協(xié)程,真正要到disptch_wakeup中去恢復(fù),如果wakeup后面有個(gè)很費(fèi)時(shí)的操作,那么將等到他執(zhí)行完畢,才會在suspend最后去調(diào)用disptch_wakeup。
這下我們明白了sleep_session的作用。然而我們發(fā)現(xiàn)在skynet.wait()函數(shù)里面也有這個(gè)表,部分代碼也與skynet.sleep()類似。
不難猜到skynet.wait()的作用:掛起指定的或者正在運(yùn)行的協(xié)程。由于他不會超時(shí),所以必須由skynet.wakeup()來喚醒。我們可以用這組函數(shù)來實(shí)現(xiàn)資源的同步。
有個(gè)有趣的問題是,sleep之后,被wakeup,sleep超時(shí)之后仍然會收到一條response類型的消息,只不過sessionID對應(yīng)的協(xié)程在dispatch_wakeup中已經(jīng)置為'BREAK'了,他不會再做任何操作了。
最后附一張圖,來說明上面的過程:

至此,skynet lua層中基本所有的消息流程就分析到了。周日花了差不多一下午寫的,坐著很累了,如有遺漏的地方下次再補(bǔ)充。