webrtc 多線程四 messagequeue

messagequeue

webrtc/base/messagequeue.h /messagequeue.cc 文件是多路信號(hào)分離器的重要組成部分。它實(shí)現(xiàn)了消息一個(gè)完整地消息隊(duì)列,該隊(duì)列包括立即執(zhí)行消息隊(duì)列、延遲執(zhí)行消息隊(duì)列和具有優(yōu)先級(jí)的消息隊(duì)列。其中,MessageQueue 類也是 Thread 類的基類。所以,所有的 WebRTC 的線程都是支持消息隊(duì)列的。

MessageQueueManager

MessageQueueManager 類是一個(gè)全局單例類。這個(gè)類看似比較復(fù)雜,但是功能其實(shí)非常簡(jiǎn)單——僅僅為了在所有的 MessagerQueue 中刪除與指定的 MessageHandler 相關(guān)的消息。WebRTC 的消息隊(duì)列在發(fā)送消息的時(shí)候要指定消息處理器( MessageHandler )。如果某個(gè)消息處理器被析構(gòu),那么與之相關(guān)的所有消息都將無(wú)法處理。所以,創(chuàng)建了這個(gè)全局單例類來(lái)解決這個(gè)問(wèn)題(見(jiàn) MessageHandler 析構(gòu)函數(shù))。

MessageQueueManager 的代碼沒(méi)有涉及任何跨平臺(tái)的 API 調(diào)用,而且本身功能也非常簡(jiǎn)單。所以我就不討論它如何使用 vector 管理 MessageQueue。唯一需要注意的就是 MessageQueueManager 如何保證自己在第一個(gè) Thread 類實(shí)例化之前完成 MessageQueueManager 全局單例的實(shí)例化。這當(dāng)中有個(gè)有趣的狀況, MessageQueueManager 在保證了自己必然在第一條子線程被創(chuàng)建之前自己被實(shí)例化, MessageQueueManager::Instance 函數(shù)內(nèi)部沒(méi)有使用任何鎖來(lái)保護(hù) MessageQueueManager::instance_ 實(shí)例的創(chuàng)建。

如前面所說(shuō), MessagerQueue 是 Thread 的基類。在創(chuàng)建 Thread 時(shí)必然會(huì)調(diào)用 MessagerQueue 的構(gòu)造函數(shù)。在 MessagerQueue 的構(gòu)造函數(shù)中調(diào)用了 MessageQueueManager::Add 函數(shù),而該函數(shù)會(huì)使用 MessageQueueManager::Instance 函數(shù)創(chuàng)建 MessagerQueueManager 的實(shí)例。由于 ThreadManager 保證了在創(chuàng)建第一個(gè)子線程之前,主線程會(huì)被包裝成 Thread 對(duì)象,所以 MessageQueueManager 必然可以將主線程作為第一個(gè) MessageQueue 對(duì)象納入管理。

以上的描述可能比較晦澀難懂,這是因?yàn)檎麄€(gè)流程涉及到了 Thread 和 ThreadManager 等類。而這些都是尚未講解過(guò)他們的代碼。不過(guò)即使看不明白也沒(méi)關(guān)系,我會(huì)在講解完所有相關(guān)類之后演示2段范例代碼,并將范例代碼的調(diào)用棧完全展開??催^(guò)范例代碼后絕大多數(shù)讀者都應(yīng)該能夠明白 MessageQueueManager 的原理。

MessageQueueManager 還有最后一個(gè)問(wèn)題,那就是它什么時(shí)候被析構(gòu)。 MessageQueue 的析構(gòu)函數(shù)會(huì)調(diào)用 MessageQueueManager::Remove 函數(shù),并且“理論上來(lái)說(shuō)”在最后一個(gè) MessageQueue 從隊(duì)列中移除之后會(huì)析構(gòu) MessageQueueManager。既然,所有的線程都被移除,那就意味著 MessageQueueManager 實(shí)例在被 delete 時(shí)重新回到了單線程的環(huán)境,所以也沒(méi)有任何鎖的保護(hù)。

MessageData

這一節(jié)的內(nèi)容將包括 MessageData 類以及多個(gè)它的子類和幾個(gè)工具函數(shù)。這些類和函數(shù)都很簡(jiǎn)單,所以就不介紹代碼和原理,僅僅羅列一下它們的功能。

MessageData

定義了基類,并將析構(gòu)函數(shù)定義為虛函數(shù)。

TypedMessageData

使用模板定義的 MessageData 的一個(gè)子類,便于擴(kuò)展。

ScopedMessageData

類似于 TypedMessageData,用于指針類型。在析構(gòu)函數(shù)中,自動(dòng)對(duì)該指針調(diào)用 delete。

ScopedRefMessageData

類似于 TypedMessageData,用于引用計(jì)數(shù)的指針類型。

WrapMessageData函數(shù)

模板函數(shù),便于創(chuàng)建 TypedMessageData

UseMessageData函數(shù)

模板函數(shù),用于將 TypedMessageData 中的 Data 取出

DisposeData

這是一個(gè)很特殊的消息,用以將某個(gè)對(duì)象交給消息引擎銷毀。可能的用途有 2 個(gè):

    1. 有些函數(shù)不便在當(dāng)前函數(shù)范圍內(nèi)銷毀對(duì)象,見(jiàn)范例 HttpServer::Connection::~Connection;
  • 2.某對(duì)象屬于某一線程,因此銷毀操作應(yīng)該交給所有者線程(未見(jiàn)范例)。WebRTC 用戶不需要自行使用該類,調(diào)用 MessageQueue::Dispose 函數(shù)即可使用它的功能。

以上7個(gè)類或函數(shù)的實(shí)現(xiàn)非常簡(jiǎn)單,有C++使用經(jīng)驗(yàn)的讀者非常容易就能理解(標(biāo)準(zhǔn)庫(kù)中就有相似的類)。

Message

這一節(jié)將簡(jiǎn)單介紹一下3個(gè)類: Message、 DelayedMessage 和 MessageList。

Message

定義了消息的基本數(shù)據(jù)結(jié)構(gòu)

DelayedMessage

定義了延遲觸發(fā)消息的數(shù)據(jù)結(jié)構(gòu)。在 MessageQueue 中,延遲消息被存放在以 DelayedMessage::msTrigger_ 排序( DelayedMessage 類定義了 operator<)的隊(duì)列中。如果 2 個(gè)延遲消息的觸發(fā)時(shí)間相同,響應(yīng)順序按先進(jìn)先出原則。

這里我將簡(jiǎn)單介紹一下各個(gè)成員變量的用途:

  • cmsDelay_:延遲多久觸發(fā)消息,僅作調(diào)試使用
  • msTrigger_:觸發(fā)消息的時(shí)間
  • num_:添加消息的時(shí)間
  • msg_:消息本身

在使用延遲消息時(shí),不需要自行構(gòu)建 DelayedMessage 實(shí)例。直接調(diào)用 MessageQueue::PostDelayed 或者 MessageQueue::PostAt 函數(shù)即可。

MessageList

消息列表,定義為 std::list< Message>

MessageQueue

現(xiàn)在我們正式進(jìn)入多線程篇最為激動(dòng)人心的部分——多路信號(hào)分離器的消息隊(duì)列組件。WebRTC 的多路型號(hào)分離器由 2 部分組成:消息隊(duì)列和 SocketServer(主要實(shí)現(xiàn)就是 PhysicalSocketServer )。消息隊(duì)列負(fù)責(zé)接受消息,并使用消息處理器( MessageHandler 的子類)處理消息。在處理完所有消息后,消息隊(duì)列調(diào)用 SocketServer::Wait 函數(shù)阻塞等待新的IO信號(hào)。如果有新的消息進(jìn)入,消息隊(duì)列會(huì)調(diào)用 SocketServer::WakeUp 喚醒 SocketServer 阻塞等待。這就是消息隊(duì)列和 SocketServer 協(xié)同工作的基本流程。

讓我們先來(lái)看一下,消息隊(duì)列的實(shí)現(xiàn)。消息隊(duì)列的主要功能是接收和處理消息,并作為 Thread 的基類出現(xiàn)。

它的主要部件(成員變量)包括:

  • ss_:協(xié)同工作的 SocketServer

  • default_ss_:默認(rèn)的 SocketServer。如果在構(gòu)造的時(shí)候提供 SocketServer,那么消息隊(duì)列就使用用戶提供的;如果沒(méi)有提供,消息隊(duì)列就初始化一個(gè) PhysicalSocketServer 并保存在 default_ss_ 上,然后賦值給 ss_。在消息隊(duì)列析構(gòu)的時(shí)候,如果 default_ss_ 保存有默認(rèn)構(gòu)造 PhysicalSocketServer,那就銷毀它

  • msgq_:消息隊(duì)列,隊(duì)列內(nèi)的消息按照先進(jìn)先出的原則立即執(zhí)行

  • dmsgq_:延遲消息隊(duì)列,隊(duì)列內(nèi)的消息按照指定的時(shí)間延遲執(zhí)行

接著,我們來(lái)看一下 MessageQueue 的主要成員函數(shù):


MessageQueue::Get:等待接收消息。

參數(shù)說(shuō)明:

pmsg:存放消息的指針,用于返回消息
cmsWait:等待的時(shí)間,kForever表示無(wú)限等待
process_io:是否要求 SocketServer處理IO信號(hào)

 
MessageQueue::PostDelayed:發(fā)送一個(gè)延遲消息(從當(dāng)前時(shí)間計(jì)算延遲處理時(shí)間)

參數(shù)說(shuō)明:

cmsDelay:延遲毫秒數(shù)
phandler:消息處理器( MessageHandler 的子類)
id:消息ID
pdata:MessageData指針

MessageQueue::PostAt:發(fā)送一個(gè)延遲消息(直接指定延遲處理時(shí)間)

參數(shù)說(shuō)明:
tstamp:消息觸發(fā)的時(shí)間
phandler:消息處理器( MessageHandler的子類)
id:消息ID
pdata:MessageData指針

MessageQueue::Clear:通過(guò)指定 MessageHandler和消息ID刪除消息

參數(shù)說(shuō)明:

phandler:指定被刪除消息的 MessageHandler
id:指定被刪除消息的ID;如果該id為MQID_ANY所有與phandler相關(guān)的消息都將被刪除
removed:返回所有被刪除消息的列表

MessageQueue::GetDelay:從現(xiàn)在到下一條將要觸發(fā)的延遲消息的觸發(fā)時(shí)間的毫秒數(shù)(無(wú)參數(shù))

MessageQueue::Dispose:請(qǐng)求消息引擎刪除一個(gè)對(duì)象(delete對(duì)象指針)

參數(shù)說(shuō)明:
doomed:將要被刪除的對(duì)象的指針

MessageQueue::SignalQueueDestroyed(signal slot):通知接收者(observer)消息隊(duì)列將要?jiǎng)h除(參數(shù)無(wú))


MessageQueue 類的核心是 MessageQueue::Get 函數(shù)。Get 函數(shù)的主循環(huán)首先檢查 dmsgq_ 和 msgq_ 是否有立即需要處理的消息。如果檢查到需要立即處理的消息,就馬上將該消息返回。如果沒(méi)有檢查到需要立即處理的消息,那么線程就阻塞等待在 SocketServer::Wait 函數(shù)上。如果阻塞等待期間有新的消息進(jìn)入隊(duì)列或者線程需要停止退出,通過(guò) SocketServer::WakeUp 函數(shù)喚醒被阻塞的 SocketServer::Wait 函數(shù)。從 SocketServer::Wait函數(shù)返回后, MessageQueue::Get 函數(shù)會(huì)重新檢查是否有可以返回的消息,如果沒(méi)有則再次阻塞等待在 SocketServer::Wait 函數(shù)上。以上流程如下圖所示:

webrtc 消息隊(duì)列流程

MessageQueue 的原理大致如此。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容