messagequeue
webrtc/base/messagequeue.h /messagequeue.cc 文件是多路信號(hào)分離器的重要組成部分。它實(shí)現(xiàn)了消息一個(gè)完整地消息隊(duì)列,該隊(duì)列包括立即執(zhí)行消息隊(duì)列、延遲執(zhí)行消息隊(duì)列和具有優(yōu)先級(jí)的消息隊(duì)列。其中,MessageQueue 類也是 Thread 類的基類。所以,所有的 WebRTC 的線程都是支持消息隊(duì)列的。
MessageQueueManager
MessageQueueManager 類是一個(gè)全局單例類。這個(gè)類看似比較復(fù)雜,但是功能其實(shí)非常簡(jiǎn)單——僅僅為了在所有的 MessagerQueue 中刪除與指定的 MessageHandler 相關(guān)的消息。WebRTC 的消息隊(duì)列在發(fā)送消息的時(shí)候要指定消息處理器( MessageHandler )。如果某個(gè)消息處理器被析構(gòu),那么與之相關(guān)的所有消息都將無(wú)法處理。所以,創(chuàng)建了這個(gè)全局單例類來(lái)解決這個(gè)問(wèn)題(見(jiàn) MessageHandler 析構(gòu)函數(shù))。
MessageQueueManager 的代碼沒(méi)有涉及任何跨平臺(tái)的 API 調(diào)用,而且本身功能也非常簡(jiǎn)單。所以我就不討論它如何使用 vector 管理 MessageQueue。唯一需要注意的就是 MessageQueueManager 如何保證自己在第一個(gè) Thread 類實(shí)例化之前完成 MessageQueueManager 全局單例的實(shí)例化。這當(dāng)中有個(gè)有趣的狀況, MessageQueueManager 在保證了自己必然在第一條子線程被創(chuàng)建之前自己被實(shí)例化, MessageQueueManager::Instance 函數(shù)內(nèi)部沒(méi)有使用任何鎖來(lái)保護(hù) MessageQueueManager::instance_ 實(shí)例的創(chuàng)建。
如前面所說(shuō), MessagerQueue 是 Thread 的基類。在創(chuàng)建 Thread 時(shí)必然會(huì)調(diào)用 MessagerQueue 的構(gòu)造函數(shù)。在 MessagerQueue 的構(gòu)造函數(shù)中調(diào)用了 MessageQueueManager::Add 函數(shù),而該函數(shù)會(huì)使用 MessageQueueManager::Instance 函數(shù)創(chuàng)建 MessagerQueueManager 的實(shí)例。由于 ThreadManager 保證了在創(chuàng)建第一個(gè)子線程之前,主線程會(huì)被包裝成 Thread 對(duì)象,所以 MessageQueueManager 必然可以將主線程作為第一個(gè) MessageQueue 對(duì)象納入管理。
以上的描述可能比較晦澀難懂,這是因?yàn)檎麄€(gè)流程涉及到了 Thread 和 ThreadManager 等類。而這些都是尚未講解過(guò)他們的代碼。不過(guò)即使看不明白也沒(méi)關(guān)系,我會(huì)在講解完所有相關(guān)類之后演示2段范例代碼,并將范例代碼的調(diào)用棧完全展開??催^(guò)范例代碼后絕大多數(shù)讀者都應(yīng)該能夠明白 MessageQueueManager 的原理。
MessageQueueManager 還有最后一個(gè)問(wèn)題,那就是它什么時(shí)候被析構(gòu)。 MessageQueue 的析構(gòu)函數(shù)會(huì)調(diào)用 MessageQueueManager::Remove 函數(shù),并且“理論上來(lái)說(shuō)”在最后一個(gè) MessageQueue 從隊(duì)列中移除之后會(huì)析構(gòu) MessageQueueManager。既然,所有的線程都被移除,那就意味著 MessageQueueManager 實(shí)例在被 delete 時(shí)重新回到了單線程的環(huán)境,所以也沒(méi)有任何鎖的保護(hù)。
MessageData
這一節(jié)的內(nèi)容將包括 MessageData 類以及多個(gè)它的子類和幾個(gè)工具函數(shù)。這些類和函數(shù)都很簡(jiǎn)單,所以就不介紹代碼和原理,僅僅羅列一下它們的功能。
MessageData
定義了基類,并將析構(gòu)函數(shù)定義為虛函數(shù)。
TypedMessageData
使用模板定義的 MessageData 的一個(gè)子類,便于擴(kuò)展。
ScopedMessageData
類似于 TypedMessageData,用于指針類型。在析構(gòu)函數(shù)中,自動(dòng)對(duì)該指針調(diào)用 delete。
ScopedRefMessageData
類似于 TypedMessageData,用于引用計(jì)數(shù)的指針類型。
WrapMessageData函數(shù)
模板函數(shù),便于創(chuàng)建 TypedMessageData
UseMessageData函數(shù)
模板函數(shù),用于將 TypedMessageData 中的 Data 取出
DisposeData
這是一個(gè)很特殊的消息,用以將某個(gè)對(duì)象交給消息引擎銷毀。可能的用途有 2 個(gè):
- 有些函數(shù)不便在當(dāng)前函數(shù)范圍內(nèi)銷毀對(duì)象,見(jiàn)范例 HttpServer::Connection::~Connection;
- 2.某對(duì)象屬于某一線程,因此銷毀操作應(yīng)該交給所有者線程(未見(jiàn)范例)。WebRTC 用戶不需要自行使用該類,調(diào)用 MessageQueue::Dispose 函數(shù)即可使用它的功能。
以上7個(gè)類或函數(shù)的實(shí)現(xiàn)非常簡(jiǎn)單,有C++使用經(jīng)驗(yàn)的讀者非常容易就能理解(標(biāo)準(zhǔn)庫(kù)中就有相似的類)。
Message
這一節(jié)將簡(jiǎn)單介紹一下3個(gè)類: Message、 DelayedMessage 和 MessageList。
Message
定義了消息的基本數(shù)據(jù)結(jié)構(gòu)。
DelayedMessage
定義了延遲觸發(fā)消息的數(shù)據(jù)結(jié)構(gòu)。在 MessageQueue 中,延遲消息被存放在以 DelayedMessage::msTrigger_ 排序( DelayedMessage 類定義了 operator<)的隊(duì)列中。如果 2 個(gè)延遲消息的觸發(fā)時(shí)間相同,響應(yīng)順序按先進(jìn)先出原則。
這里我將簡(jiǎn)單介紹一下各個(gè)成員變量的用途:
- cmsDelay_:延遲多久觸發(fā)消息,僅作調(diào)試使用
- msTrigger_:觸發(fā)消息的時(shí)間
- num_:添加消息的時(shí)間
- msg_:消息本身
在使用延遲消息時(shí),不需要自行構(gòu)建 DelayedMessage 實(shí)例。直接調(diào)用 MessageQueue::PostDelayed 或者 MessageQueue::PostAt 函數(shù)即可。
MessageList
消息列表,定義為 std::list< Message>
MessageQueue
現(xiàn)在我們正式進(jìn)入多線程篇最為激動(dòng)人心的部分——多路信號(hào)分離器的消息隊(duì)列組件。WebRTC 的多路型號(hào)分離器由 2 部分組成:消息隊(duì)列和 SocketServer(主要實(shí)現(xiàn)就是 PhysicalSocketServer )。消息隊(duì)列負(fù)責(zé)接受消息,并使用消息處理器( MessageHandler 的子類)處理消息。在處理完所有消息后,消息隊(duì)列調(diào)用 SocketServer::Wait 函數(shù)阻塞等待新的IO信號(hào)。如果有新的消息進(jìn)入,消息隊(duì)列會(huì)調(diào)用 SocketServer::WakeUp 喚醒 SocketServer 阻塞等待。這就是消息隊(duì)列和 SocketServer 協(xié)同工作的基本流程。
讓我們先來(lái)看一下,消息隊(duì)列的實(shí)現(xiàn)。消息隊(duì)列的主要功能是接收和處理消息,并作為 Thread 的基類出現(xiàn)。
它的主要部件(成員變量)包括:
ss_:協(xié)同工作的 SocketServer
default_ss_:默認(rèn)的 SocketServer。如果在構(gòu)造的時(shí)候提供 SocketServer,那么消息隊(duì)列就使用用戶提供的;如果沒(méi)有提供,消息隊(duì)列就初始化一個(gè) PhysicalSocketServer 并保存在 default_ss_ 上,然后賦值給 ss_。在消息隊(duì)列析構(gòu)的時(shí)候,如果 default_ss_ 保存有默認(rèn)構(gòu)造 PhysicalSocketServer,那就銷毀它
msgq_:消息隊(duì)列,隊(duì)列內(nèi)的消息按照先進(jìn)先出的原則立即執(zhí)行
dmsgq_:延遲消息隊(duì)列,隊(duì)列內(nèi)的消息按照指定的時(shí)間延遲執(zhí)行
接著,我們來(lái)看一下 MessageQueue 的主要成員函數(shù):
MessageQueue::Get:等待接收消息。
參數(shù)說(shuō)明:
pmsg:存放消息的指針,用于返回消息
cmsWait:等待的時(shí)間,kForever表示無(wú)限等待
process_io:是否要求 SocketServer處理IO信號(hào)
MessageQueue::PostDelayed:發(fā)送一個(gè)延遲消息(從當(dāng)前時(shí)間計(jì)算延遲處理時(shí)間)
參數(shù)說(shuō)明:
cmsDelay:延遲毫秒數(shù)
phandler:消息處理器( MessageHandler 的子類)
id:消息ID
pdata:MessageData指針
MessageQueue::PostAt:發(fā)送一個(gè)延遲消息(直接指定延遲處理時(shí)間)
參數(shù)說(shuō)明:
tstamp:消息觸發(fā)的時(shí)間
phandler:消息處理器( MessageHandler的子類)
id:消息ID
pdata:MessageData指針
MessageQueue::Clear:通過(guò)指定 MessageHandler和消息ID刪除消息
參數(shù)說(shuō)明:
phandler:指定被刪除消息的 MessageHandler
id:指定被刪除消息的ID;如果該id為MQID_ANY所有與phandler相關(guān)的消息都將被刪除
removed:返回所有被刪除消息的列表
MessageQueue::GetDelay:從現(xiàn)在到下一條將要觸發(fā)的延遲消息的觸發(fā)時(shí)間的毫秒數(shù)(無(wú)參數(shù))
MessageQueue::Dispose:請(qǐng)求消息引擎刪除一個(gè)對(duì)象(delete對(duì)象指針)
參數(shù)說(shuō)明:
doomed:將要被刪除的對(duì)象的指針
MessageQueue::SignalQueueDestroyed(signal slot):通知接收者(observer)消息隊(duì)列將要?jiǎng)h除(參數(shù)無(wú))
MessageQueue 類的核心是 MessageQueue::Get 函數(shù)。Get 函數(shù)的主循環(huán)首先檢查 dmsgq_ 和 msgq_ 是否有立即需要處理的消息。如果檢查到需要立即處理的消息,就馬上將該消息返回。如果沒(méi)有檢查到需要立即處理的消息,那么線程就阻塞等待在 SocketServer::Wait 函數(shù)上。如果阻塞等待期間有新的消息進(jìn)入隊(duì)列或者線程需要停止退出,通過(guò) SocketServer::WakeUp 函數(shù)喚醒被阻塞的 SocketServer::Wait 函數(shù)。從 SocketServer::Wait函數(shù)返回后, MessageQueue::Get 函數(shù)會(huì)重新檢查是否有可以返回的消息,如果沒(méi)有則再次阻塞等待在 SocketServer::Wait 函數(shù)上。以上流程如下圖所示:
MessageQueue 的原理大致如此。