WebRTC源碼分析-線程基礎(chǔ)之消息循環(huán),消息投遞

前言

如之前的總述文章所述,rtc::Thread類(lèi)封裝了WebRTC中線程的一般功能,比如設(shè)置線程名稱(chēng),啟動(dòng)線程執(zhí)行用戶(hù)代碼,線程的join,sleep,run,stop等方法;同時(shí)也提供了線程內(nèi)部的消息循環(huán),以及線程之間以同步、異步方式投遞消息,同步方式在目標(biāo)線程執(zhí)行方法并返回結(jié)果等線程之間交互的方式;另外,每個(gè)線程均持有SocketServer類(lèi)成員對(duì)象,該類(lèi)實(shí)現(xiàn)了IO多路復(fù)用功能。

本文將針對(duì)rtc::Thread類(lèi)所提供消息循環(huán),消息投遞的功能進(jìn)行介紹。由于Thread類(lèi)是通過(guò)繼承MessageQueue才具有此類(lèi)功能,因此,在介紹Thread相關(guān)API實(shí)現(xiàn)之前應(yīng)先介紹MessageQueue相關(guān)的知識(shí):消息隊(duì)里管理(MessageQueueManager),消息隊(duì)列(MessageQueue),消息(Message,DelayedMessage),消息數(shù)據(jù)(MessageData,TypedMessageData,ScopedMessageData,DisposeData)的相關(guān)知識(shí)。

Thread類(lèi)在rtc_base/thread.h中聲明,定義在rtc_base/thread.c中(只保留了消息循環(huán)以及消息投遞相關(guān)的API):

class RTC_LOCKABLE Thread : public MessageQueue {
 public:
  virtual void Run();

  virtual void Send(const Location& posted_from,
                    MessageHandler* phandler,
                    uint32_t id = 0,
                    MessageData* pdata = nullptr);

  template <class ReturnT, class FunctorT>
  ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
    FunctorMessageHandler<ReturnT, FunctorT> handler(
        std::forward<FunctorT>(functor));
    InvokeInternal(posted_from, &handler);
    return handler.MoveResult();
  }

  bool IsProcessingMessagesForTesting() override;
  void Clear(MessageHandler* phandler,
             uint32_t id = MQID_ANY,
             MessageList* removed = nullptr) override;
  void ReceiveSends() override;

  bool ProcessMessages(int cms);

 protected:
  friend class ScopedDisallowBlockingCalls;

 private:
  void ReceiveSendsFromThread(const Thread* source);
  bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
  void InvokeInternal(const Location& posted_from, MessageHandler* handler);

  std::list<_SendMessage> sendlist_;
  bool blocking_calls_allowed_ = true;

  friend class ThreadManager;

  RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
};

消息循環(huán)的建立

由上一篇文章WebRTC源碼分析-線程基礎(chǔ)之線程基本功能的線程啟動(dòng)分析知道,用戶(hù)在沒(méi)有傳入自己的Runnable對(duì)象時(shí),新的線程上會(huì)執(zhí)行Thread.Run()方法,該方法源碼如下,內(nèi)部會(huì)調(diào)用ProcessMessages(kForever)去運(yùn)行消息循環(huán)。而用戶(hù)如果實(shí)現(xiàn)了自己的Runnable對(duì)象時(shí),也想要運(yùn)行消息循環(huán),那咋辦嘛? 下面的注釋就告知你了,在Runnable.Run()中適時(shí)的調(diào)用ProcessMessages()方法就行。

  // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
  // work, override Run().  To receive and dispatch messages, call
  // ProcessMessages occasionally.
  void Thread::Run() {
      ProcessMessages(kForever);
  }

下面看看這個(gè)ProcessMessages()怎么建立消息循環(huán)的。分兩種情形:
1)默認(rèn)地,ProcessMessages(kForever),告知無(wú)限期進(jìn)行處理。此時(shí)的情形就是函數(shù)內(nèi)部的while循環(huán)不停的調(diào)用Get()去獲取消息,然后處理消息Dispatch()。循環(huán)能夠退出的條件就是Get方法返回false。由
WebRTC源碼分析-線程基礎(chǔ)之MessageQueue 分析Get()方法分析可知,無(wú)限期處理的情況下,只有循環(huán)停止工作或者IO處理出錯(cuò)才會(huì)導(dǎo)致Get()返回false。
2)如果ProcessMessages(int cmsLoop),有限期進(jìn)行處理。那么退出循環(huán)的方式有兩個(gè),一個(gè)是使用時(shí)間已經(jīng)到了,返回true;另外一個(gè)是Get()方法返回false,有限期處理的情況下,Get()返回false的條件有三:循環(huán)停止工作;IO處理出錯(cuò);已經(jīng)耗完所有處理時(shí)間也還未找到一個(gè)MSG。

bool Thread::ProcessMessages(int cmsLoop) {
  // Using ProcessMessages with a custom clock for testing and a time greater
  // than 0 doesn't work, since it's not guaranteed to advance the custom
  // clock's time, and may get stuck in an infinite loop.
  RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
             cmsLoop == kForever);
  // 計(jì)算終止處理消息的時(shí)間
  int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
  // 下次可以進(jìn)行消息獲取的時(shí)間長(zhǎng)度
  int cmsNext = cmsLoop;

  while (true) {
#if defined(WEBRTC_MAC)
    ScopedAutoReleasePool pool;
#endif
    // 獲取消息
    Message msg;
    if (!Get(&msg, cmsNext))
      return !IsQuitting();
    // 處理消息
    Dispatch(&msg);
    // 若不是無(wú)限期,計(jì)算下次可以進(jìn)行消息獲取的時(shí)間。
    if (cmsLoop != kForever) {
      cmsNext = static_cast<int>(TimeUntil(msEnd));
      // 若使用時(shí)間已經(jīng)到了,那么退出循環(huán)
      if (cmsNext < 0)
        return true;
    }
  }
}

Post消息

向一個(gè)線程Post消息,只是簡(jiǎn)單地向消息循環(huán)的隊(duì)列中插入一條待處理的消息,然后Post方法就會(huì)返回,不會(huì)引發(fā)當(dāng)前線程的阻塞。Thread方法并沒(méi)有重寫(xiě)MQ的Post方法,因此,關(guān)于Post方法的細(xì)節(jié)分析見(jiàn) WebRTC源碼分析-線程基礎(chǔ)之MessageQueue

Send消息

向一個(gè)線程Send消息,會(huì)阻塞當(dāng)前線程的運(yùn)行,直到該消息被目標(biāo)線程消費(fèi)完后才會(huì)解除阻塞,從Send方法返回。算法流程如源碼及其注釋如下(分9個(gè)步驟):

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  // 目標(biāo)線程的消息循環(huán)是否還在處理消息?                                        // 步驟1
  if (IsQuitting())
    return;
  // 創(chuàng)建需要處理的消息                                                                       // 步驟2
  Message msg;
  msg.posted_from = posted_from;
  msg.phandler = phandler;
  msg.message_id = id;
  msg.pdata = pdata;
  // 若目標(biāo)線程就是自己,那么直接在此處處理完消息就ok                // 步驟3
  if (IsCurrent()) {
    phandler->OnMessage(&msg);
    return;
  }

  // 斷言當(dāng)前線程是否具有阻塞權(quán)限,無(wú)阻塞權(quán)限,                          // 步驟4
  // 那么向別的線程Send消息就是個(gè)非法操作
  AssertBlockingIsAllowedOnCurrentThread();

  // 確保當(dāng)前線程有一個(gè)Thread對(duì)象與之綁定                                     // 步驟5
  AutoThread thread;
  Thread* current_thread = Thread::Current();
  RTC_DCHECK(current_thread != nullptr);  // AutoThread ensures this

  // 創(chuàng)建一個(gè)SendMessage對(duì)象,放置到目標(biāo)線程對(duì)象的sendlist_              // 步驟6
  // ready表征該消息是否已經(jīng)處理完。
  bool ready = false;
  {
    CritScope cs(&crit_);
    _SendMessage smsg;
    smsg.thread = current_thread;
    smsg.msg = msg;
    smsg.ready = &ready;
    sendlist_.push_back(smsg);
  }

  // 將目標(biāo)線程從IO處理中喚醒,趕緊處理消息啦~                                      // 步驟7
  // 目標(biāo)線程將在其消息循環(huán)中,調(diào)用ReceiveSends()處理Send消息~~
  WakeUpSocketServer();

  // 同步等待消息被處理                                                                              // 步驟8
  bool waited = false;
  crit_.Enter();
  while (!ready) {
    crit_.Leave();
    // 對(duì)方也可能向我Send了消息,可不能都互相阻塞住了
    // 處理對(duì)方可能Send給我的消息。
    current_thread->ReceiveSendsFromThread(this);
    // 處理完對(duì)方的Send消息后,阻塞等待對(duì)方處理完我Send的消息,然后來(lái)喚醒我吧
    // 但這兒會(huì)有個(gè)意外,這就是waited存在的意義了
    current_thread->socketserver()->Wait(kForever, false);
    waited = true;
    crit_.Enter();
  }
  crit_.Leave();

  // 如果出現(xiàn)過(guò)waited,那么再喚醒一次當(dāng)前線程去處理Post消息。       // 步驟9
  if (waited) {
    current_thread->socketserver()->WakeUp();
  }
}

要理解上述算法,需要搞清楚Send方法的代碼是在當(dāng)前線程執(zhí)行的,而調(diào)用的是目標(biāo)線程對(duì)象Thread的Send方法,即Send方法中的this,是目標(biāo)線程線程對(duì)象Thread。捋清楚這點(diǎn)非常重要?。?!這兒我一步步分析上述算法過(guò)程:

  1. 判斷目標(biāo)線程的消息循環(huán)是否仍在工作:IsQuitting()是目標(biāo)線程對(duì)象Thread的方法,但是是在當(dāng)前線程中執(zhí)行的!若消息循環(huán)停止工作,那么會(huì)拒絕處理消息,Send會(huì)直接返回,但是調(diào)用方是無(wú)法獲知的。一般建議是在向線程發(fā)送消息之前調(diào)用IsProcessingMessagesForTesting()判斷下該消息循環(huán)是否還在正常運(yùn)行。
  2. 創(chuàng)建需要消費(fèi)的消息對(duì)象:此處沒(méi)有什么可以多說(shuō)的
  3. 判斷目標(biāo)線程是否就是當(dāng)前線程: 通過(guò)Thread.IsCurrent()可以判別這點(diǎn),如果目標(biāo)線程就是當(dāng)前線程,那就是自己給自己Send消息了,直接在此處消費(fèi)消息并返回。
  4. 斷言當(dāng)前線程是否允許阻塞: 注意,這兒不是斷言目標(biāo)線程。因?yàn)椋蛄硗庖粋€(gè)線程Send消息時(shí),當(dāng)前線程需要阻塞地等待目標(biāo)線程處理完消息后才返回。如果,當(dāng)前線程沒(méi)有阻塞權(quán)限的話(huà),那就是非法操作了。
  5. 確保當(dāng)前線程有一個(gè)關(guān)聯(lián)的Thread對(duì)象: 為什么?因?yàn)楹罄m(xù)的阻塞喚醒操作都要通過(guò)Thread對(duì)象的方法來(lái)實(shí)現(xiàn),如果當(dāng)前線程沒(méi)有關(guān)聯(lián)Thread對(duì)象,那么這些操作就無(wú)法完成。怎么做?通過(guò)創(chuàng)建一個(gè)局部對(duì)象AutoThread thread來(lái)實(shí)現(xiàn)。源碼如下,注意兩點(diǎn):
    1)只有當(dāng)當(dāng)前線程無(wú)Thread關(guān)聯(lián)時(shí),才會(huì)將AutoThread作為當(dāng)前線程的關(guān)聯(lián)Thread;
    2)由于AutoThread thread是局部對(duì)象,當(dāng)Send函數(shù)結(jié)束時(shí)該對(duì)象生命周期走到尾聲,可以利用其析構(gòu)函數(shù)中需要恢復(fù)當(dāng)前對(duì)象無(wú)Thread對(duì)象綁定的狀態(tài)(當(dāng)然,前提是之前就無(wú)Thread對(duì)象關(guān)聯(lián))。
AutoThread::AutoThread()
    : Thread(SocketServer::CreateDefault(), /*do_init=*/false) {
  DoInit();
  if (!ThreadManager::Instance()->CurrentThread()) {
    ThreadManager::Instance()->SetCurrentThread(this);
  }
}

AutoThread::~AutoThread() {
  Stop();
  DoDestroy();
  if (ThreadManager::Instance()->CurrentThread() == this) {
    ThreadManager::Instance()->SetCurrentThread(nullptr);
  }
}
  1. 創(chuàng)建_SendMessage實(shí)例,并入隊(duì): _ SendMessage結(jié)構(gòu)體的聲明如下,該對(duì)象被創(chuàng)建后,會(huì)進(jìn)入目標(biāo)線程的sendlist_。其中Thread* thread存儲(chǔ)的是主動(dòng)投放消息的當(dāng)前線程。當(dāng)目標(biāo)線程在消費(fèi)msg之后,會(huì)將ready標(biāo)志置為true,并且通過(guò)thread->WakeUp()解除當(dāng)前線程的阻塞,從而判斷ready后獲知Send消息已經(jīng)被消費(fèi)了。
struct _SendMessage {
  _SendMessage() {}
  Thread* thread;   // 當(dāng)前線程對(duì)象
  Message msg;     // 消息
  bool* ready;         // 消息是否處理完畢的標(biāo)志
};
  1. 喚醒目標(biāo)線程處理消息: 當(dāng)_ SendMessage消息已經(jīng)進(jìn)入目標(biāo)線程的sendlist_隊(duì)列了,當(dāng)然是要喚醒目標(biāo)線程去處理啦,MQ的WakeUpSocketServer()就干這個(gè)事。好像很簡(jiǎn)單?這里代碼沒(méi)有體現(xiàn)的一點(diǎn)是:目標(biāo)線程是如何處理這個(gè)_ SendMessage消息的,WebRTC源碼分析-線程基礎(chǔ)之MessageQueue 提到過(guò) MQ的Get()方法最開(kāi)始就是優(yōu)先地,阻塞地調(diào)用它的ReceiveSends()處理Send消息,可惜的是MQ的該方法是個(gè)vitural方法,并且啥也沒(méi)干,剛好Thread對(duì)象就重寫(xiě)了ReceiveSends()方法,這正是處理Send消息最佳之處。由于ReceiveSends()是在目標(biāo)線程干的事,為了不打亂節(jié)奏,此處不展開(kāi)描述ReceiveSends()。
  2. 同步等待Send消息被處理:雖然代碼沒(méi)幾行,但是整個(gè)過(guò)程中最難理解的地方,慢慢展開(kāi)來(lái)說(shuō),把這塊兒的代碼再次貼出來(lái)。
  bool waited = false;
  crit_.Enter();
  while (!ready) {
    crit_.Leave();
    current_thread->ReceiveSendsFromThread(this);
    current_thread->socketserver()->Wait(kForever, false);
    waited = true;
    crit_.Enter();
  }
  crit_.Leave();

1)ready這個(gè)參數(shù)會(huì)被當(dāng)前線程訪問(wèn),也會(huì)被目標(biāo)線程訪問(wèn),必然需要加鎖,并且要達(dá)到兩個(gè)線程的互斥,還必須要使用同一個(gè)鎖。而這個(gè)鎖就是目標(biāo)線程的線程對(duì)象Thread的CriticalSection crit_成員。因此,while循環(huán)開(kāi)頭讀取ready時(shí)進(jìn)行了加鎖解鎖操作。
2)按理說(shuō),接下來(lái)當(dāng)前線程阻塞等待目標(biāo)線程完成操作之后通知我解除阻塞就行了。但是,考慮到一點(diǎn),如果,兩個(gè)線程同時(shí)互相Send消息,那豈不是二者都卡在等待對(duì)方通知這個(gè)地方,死鎖了?為了避免這個(gè)情況,在阻塞等待前,先處理所有別人Send給我的消息吧。調(diào)用current_thread->ReceiveSendsFromThread(this)進(jìn)行處理,注意current_thread是當(dāng)前線程對(duì)象,因而是處理的當(dāng)前線程所接收到的Send消息,而this是目標(biāo)線程對(duì)象。 需要注意一點(diǎn)PopSendMessageFromThread()方法只會(huì)把source線程發(fā)送的消息從隊(duì)列中取出,此時(shí)source為this,即目標(biāo)線程對(duì)象。所以,此處ReceiveSendsFromThread()只會(huì)處理目標(biāo)線程Send給當(dāng)前線程的消息,此時(shí),ready置為true,并調(diào)用目標(biāo)線程的WakeUp()方法,喚醒了目標(biāo)線程,使得目標(biāo)線程不會(huì)阻塞在Send方法中,從而使得目標(biāo)線程有機(jī)會(huì)去運(yùn)行其消息循環(huán),從而消費(fèi)當(dāng)前線程Send給它的消息。很繞很繞,我已經(jīng)盡力了。

void Thread::ReceiveSendsFromThread(const Thread* source) {
  _SendMessage smsg;
  crit_.Enter();
  while (PopSendMessageFromThread(source, &smsg)) {
    crit_.Leave();
    Dispatch(&smsg.msg);
    crit_.Enter();
    *smsg.ready = true;
    smsg.thread->socketserver()->WakeUp();
  }
  crit_.Leave();
}

bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
  for (std::list<_SendMessage>::iterator it = sendlist_.begin();
       it != sendlist_.end(); ++it) {
    if (it->thread == source || source == nullptr) {
      *msg = *it;
      sendlist_.erase(it);
      return true;
    }
  }
  return false;
}

3)處理完目標(biāo)線程可能Send給我的消息之后,我終于可以安心地阻塞在IO上等待了,current_thread->socketserver()->Wait(kForever, false)。這時(shí),我們看看目標(biāo)線程如何操作。之前在步驟7中,已經(jīng)闡述過(guò),目標(biāo)線程被喚醒后在消息循環(huán)中優(yōu)先阻塞地調(diào)用ReceiveSends()方法來(lái)處理Send消息,而且ReceiveSends()是被Thread重寫(xiě)過(guò)的方法。代碼如下:咦,這不是上面調(diào)用的ReceiveSendsFromThread()方法嘛?不過(guò)傳入的指針為空,此時(shí)PopSendMessageFromThread()會(huì)將目標(biāo)線程上收到的所有Send消息都拿出來(lái)消費(fèi)完,設(shè)置標(biāo)志位,然后調(diào)用消息發(fā)送者線程的WakeUp()來(lái)喚醒消息發(fā)送者。到此處,終于把這個(gè)Send邏輯搞清楚了。完了?還沒(méi)完呢,喚醒該線程的一定就是目標(biāo)線程處理完Send消息之后嘛?還有這個(gè)waited干嘛用的???

void Thread::ReceiveSends() {
  ReceiveSendsFromThread(nullptr);
}

4)當(dāng)前線程沉睡在current_thread->socketserver()->Wait(kForever, false)上時(shí),喚醒該線程的一定就是目標(biāo)線程處理完Send消息之后嘛?那可不一定,可能是別的線程Wake了它,也可能是目標(biāo)線程Post了一條消息給當(dāng)前線程(此時(shí)會(huì)喚醒當(dāng)前線程,詳見(jiàn)Post消息)。那么意味著當(dāng)前線程Send消息可能還未處理完,自己就被醒了,那么又得進(jìn)行While中的ready變量的訪問(wèn)了,又得加鎖,解鎖,把這個(gè)過(guò)程再走一遍。
5)這個(gè)waited變量干嘛用?就代碼而言,我們發(fā)現(xiàn)waited變量進(jìn)入了一次while循環(huán)就會(huì)變成true,表示我等待過(guò)至少一次。沒(méi)有等待過(guò)的原因就是目標(biāo)線程干活很麻利,在第一次判斷ready值的時(shí)候就已經(jīng)把活干完了,Send消息已被消費(fèi)。具體記錄這個(gè)等待過(guò)一次是做什么用呢?看步驟9

  1. 再喚醒一次 如果進(jìn)入過(guò)一次循環(huán)等待,那么waited變量為true,需要再次喚醒當(dāng)前線程一次current_thread->socketserver()->WakeUp(),讓當(dāng)前線程能處理消息。其實(shí)原因在源碼的注釋上寫(xiě)得很明白了,我就不翻譯了。完畢。
  // Our Wait loop above may have consumed some WakeUp events for this
  // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
  // cause problems for some SocketServers.
  //
  // Concrete example:
  // Win32SocketServer on thread A calls Send on thread B.  While processing the
  // message, thread B Posts a message to A.  We consume the wakeup for that
  // Post while waiting for the Send to complete, which means that when we exit
  // this loop, we need to issue another WakeUp, or else the Posted message
  // won't be processed in a timely manner.
  if (waited) {
    current_thread->socketserver()->WakeUp();
  }

Invoke跨線程同步執(zhí)行方法

Invoke方法提供了一個(gè)方便的方式:阻塞當(dāng)前線程,在另外一個(gè)線程上同步執(zhí)行方法,并且返回執(zhí)行結(jié)果。
本質(zhì)上就是將需要執(zhí)行的方法封裝到消息處理器FunctorMessageHandler中,然后向目標(biāo)線程Send這個(gè)攜帶特殊消息處理器FunctorMessageHandler的消息,該消息被消費(fèi)后,阻塞結(jié)束,F(xiàn)unctorMessageHandler對(duì)象攜帶了方法執(zhí)行的結(jié)果,當(dāng)前線程可以從中獲取到執(zhí)行結(jié)果。其實(shí),這里的重點(diǎn)有二:

  template <class ReturnT, class FunctorT>
  ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
    FunctorMessageHandler<ReturnT, FunctorT> handler(
        std::forward<FunctorT>(functor));
    InvokeInternal(posted_from, &handler);
    return handler.MoveResult();
  }

void Thread::InvokeInternal(const Location& posted_from,
                            MessageHandler* handler) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",
               posted_from.file_and_line(), "src_func",
               posted_from.function_name());
  Send(posted_from, handler);
}

總結(jié)

本文比較詳細(xì)的介紹了Thread的消息循環(huán),線程間Post消息,Send消息,跨線程執(zhí)行方法等功能。由于Thread是通過(guò)繼承MessageQueue才具有這些功能,因此,需要結(jié)合另外3篇文章來(lái)一起看。

本文對(duì)Post消息(非阻塞)只是一筆帶過(guò),因?yàn)樵赥hread并沒(méi)有改寫(xiě)Post方法,而直接是MessageQueue的Post。

本文對(duì)Send消息(阻塞)重點(diǎn)拆解了一番,自己倒是理解了,但不知道是否描述的夠清晰,是否有錯(cuò)誤。博客的作用嘛,就是為了能夠理清楚自己的思路,順便也為他人做些貢獻(xiàn)。如果有不對(duì)的地方,看到此處的同路人可以指出

本文還對(duì)跨線程執(zhí)行方法提供了一種便捷的方式,本質(zhì)就是封裝一個(gè)特殊的消息處理器FunctorMessageHandler到消息中,并使用Send方法使得目標(biāo)線程消費(fèi)消息時(shí)執(zhí)行該方法。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容