WebRTC源碼分析-線程基礎(chǔ)之Message && MessageData && MessageHandler

前言

本文將介紹消息循環(huán)中的消息(Message),消息中持有的數(shù)據(jù)(MessageData),處理消息的Handler(MessageHandler)的基本內(nèi)容。

其中Message與MessageData相關(guān)的結(jié)構(gòu)體位于rtc_base/message_queue.h中,MessageHandler相關(guān)的類位于rtc_base/message_handler.h中

消息Message

WebRTC中消息相關(guān)的類分為兩種,一種是Message,表征的是即時消息,投放到消息循環(huán)中期待能被立馬消費;另外一種是DelayedMessage,表征的是延遲消息,投放到消息循環(huán)中不會立馬被消費,而是延遲一段時間才會被消費。

Message 源碼如下所示

struct Message {
  Message()
      : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
  inline bool Match(MessageHandler* handler, uint32_t id) const {
    return (handler == nullptr || handler == phandler) &&
           (id == MQID_ANY || id == message_id);
  }
  Location posted_from;             // 消息自哪兒產(chǎn)生
  MessageHandler* phandler;    // 消息如何處理
  uint32_t message_id;              // 消息id
  MessageData* pdata;              // 消息中攜帶的數(shù)據(jù)
  int64_t ts_sensitive;                 // 消息產(chǎn)生的時間
};

Message結(jié)構(gòu)體成員有如下幾種:

  • Location posted_from: 該成員描述了此消息結(jié)構(gòu)是在哪個函數(shù),哪個文件的哪一行產(chǎn)生的,即消息產(chǎn)生的源頭。通常實際使用中使用宏RTC_FROM_HERE來產(chǎn)生Location對象,Location類的簡單分析見WebRTC源碼分析-定位之Location。
  • MessageHandler* phandler:該成員持有消息如何被消費的方法,當(dāng)消息從消息循環(huán)中被取出后,將使用 MessageHandler的OnMessage(Message* msg)來對消息進行處理。特別的PeerConnection對象也是MessageHandler,實現(xiàn)了OnMessage(Message* msg)方法。
  • uint32_t message_id:32位無符號整型的消息id。通常有兩類特別的消息,使用特殊的消息id來識別,分別是MQID_ANY (-1)表示所有的消息;MQID_DISPOSE(-2)表示需要丟棄的消息,該消息已經(jīng)在MQM的分析文章中出現(xiàn),詳見WebRTC源碼分析-線程基礎(chǔ)之MessageQueueManager。當(dāng)然,還有使用其他id值用以做特殊處理的,比如某個實現(xiàn)了MessageHandler.OnMessage方法的類,可能需要處理好幾種不同的消息,那么就可以將不同消息id值當(dāng)作區(qū)分消息類別的標(biāo)志,從而在OnMessage方法中分門別類處理好幾種消息了。
  • int64_t ts_sensitive:64位時間戳,單位ms。當(dāng)不關(guān)心消息是否處理過慢時,也即消息時間不敏感時,該值為0;若關(guān)心消息是否得到即時處理,一般會設(shè)置ts_sensitive為消息創(chuàng)建時的時間戳 + kMaxMsgLatency常量(150ms),當(dāng)該消息從消息循環(huán)中取出被處理時,將會檢測當(dāng)前時間msCurrent與ts_sensitive的大小,若msCurrent>ts_sensitive,則表示該消息并沒有得到即時的處理,會打印警告日志。超時時間計算為msCurrent-ts_sensitive+kMaxMsgLatency。
  • MessageData* pdata:消息數(shù)據(jù)。有多個變種,后面詳述。

Message結(jié)構(gòu)體還提供了兩個方法。

  • 構(gòu)造函數(shù):沒啥好說,Message沒有提供析構(gòu)函數(shù),那么Message持有的消息處理器phandler以及消息數(shù)據(jù)pdata何時被銷毀就非常值得注意了。
  • 匹配函數(shù):該函數(shù)在MQ清理消息時,用于評判哪些消息是滿足條件的,即匹配上了。看源碼可知,只要handler為空或者相等 并且 消息id為MQID_ANY或者相等,就被認(rèn)為是找到了滿足條件的消息。

DelayedMessage源碼如下

// DelayedMessage goes into a priority queue, sorted by trigger time.  Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
 public:
  DelayedMessage(int64_t delay,
                 int64_t trigger,
                 uint32_t num,
                 const Message& msg)
      : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}

  bool operator<(const DelayedMessage& dmsg) const {
    return (dmsg.msTrigger_ < msTrigger_) ||
           ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
  }

  int64_t cmsDelay_;  // for debugging
  int64_t msTrigger_;
  uint32_t num_;
  Message msg_;
};

DelayedMessage與Message并非是繼承is-a關(guān)系,而是has-a的關(guān)系。提供了除Message msg_成員之外的另幾個成員:

  • int64_t cmsDelay_:延遲時間,消息延遲多長時間后需要被處理,單位ms
  • int64_t msTrigger_:觸發(fā)時間,消息需要被處理的時間戳,為消息創(chuàng)建時的時間戳+cmsDelay_,單位ms
  • uint32_t num_:消息的序號,在某個MQ中延遲消息的num_單調(diào)遞增。

由于DelayedMessage在MQ中會放至于專門的優(yōu)先級隊列中,優(yōu)先級如何確定?DelayedMessage重載了小于"<"運算符以確定排序,先按消息需要被處理的觸發(fā)時間戳msTrigger_排序;若觸發(fā)時間相同,則按消息序號排序。當(dāng)某個延遲消息要投放到延遲隊列中時,該優(yōu)先級隊列會根據(jù)“<”運算符去比較這個延遲消息與隊列中的消息以確定應(yīng)在隊列的哪個位置插入該消息。

MessageList被定義為std::list<Message>, MessageQueue中的即時消息就存儲于MessageList類別的列表中,按照先入先出的方式挨個進行處理。

typedef std::list<Message> MessageList;

消息MessgeData

MessgeData 消息數(shù)據(jù)基類,其析構(gòu)函數(shù)被定義為virtual類型,以防內(nèi)存泄漏

class MessageData {
 public:
  MessageData() {}
  virtual ~MessageData() {}
};

TypedMessageData 使用模板定義的MessageData 的一個子類,便于擴展。

template <class T>
class TypedMessageData : public MessageData {
 public:
  explicit TypedMessageData(const T& data) : data_(data) {}
  const T& data() const { return data_; }
  T& data() { return data_; }
 private:
  T data_;
};

ScopedMessageData 類似于 TypedMessageData,用于指針類型。在析構(gòu)函數(shù)中,自動對該指針調(diào)用 delete。

// Like TypedMessageData, but for pointers that require a delete.
template <class T>
class ScopedMessageData : public MessageData {
 public:
  explicit ScopedMessageData(std::unique_ptr<T> data)
      : data_(std::move(data)) {}
  // Deprecated.
  // TODO(deadbeef): Remove this once downstream applications stop using it.
  explicit ScopedMessageData(T* data) : data_(data) {}
  // Deprecated.
  // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
  // this once downstream applications stop using it, then rename inner_data to
  // just data.
  const std::unique_ptr<T>& data() const { return data_; }
  std::unique_ptr<T>& data() { return data_; }

  const T& inner_data() const { return *data_; }
  T& inner_data() { return *data_; }

 private:
  std::unique_ptr<T> data_;
};

ScopedRefMessageData 類似于ScopedMessageData,用于引用計數(shù)的指針類型。

// Like ScopedMessageData, but for reference counted pointers.
template <class T>
class ScopedRefMessageData : public MessageData {
 public:
  explicit ScopedRefMessageData(T* data) : data_(data) {}
  const scoped_refptr<T>& data() const { return data_; }
  scoped_refptr<T>& data() { return data_; }

 private:
  scoped_refptr<T> data_;
};

DisposeData 這個值得注意下,和前面幾個MessageData子類不一樣,該對象并沒有提供其內(nèi)部數(shù)據(jù)data_的方法,那么該對象正如其名,持有需要進行銷毀的數(shù)據(jù)。在哪些場景下使用?

  • 有些函數(shù)不便在當(dāng)前函數(shù)范圍內(nèi)銷毀對象,那么就可以將需要銷毀的對象封裝到DisposeData中,并進一步封裝成消息并投遞到消息隊列中,等待線程的消息循環(huán)取出消息并進行銷毀時將該對象銷毀。見范例 HttpServer::Connection::~Connection;這里類似于QT中QObject的deletelater()方法的作用,起到延遲銷毀的作用。
  • 某對象屬于某一線程,因此銷毀操作應(yīng)該交給所有者線程。這個可以通過調(diào)用對應(yīng)線程對象Thread的MessageQueue的Dispose(T* doomed)來實現(xiàn)。在介紹MessageQueue的文章中會詳述。
template <class T>
class DisposeData : public MessageData {
 public:
  explicit DisposeData(T* data) : data_(data) {}
  virtual ~DisposeData() { delete data_; }

 private:
  T* data_;
};

WrapMessageData && UseMessageData 兩個模板方法,分別用來將數(shù)據(jù)封裝成MessageData以及取出對應(yīng)的數(shù)據(jù),封裝拆箱操作。

template <class T>
inline MessageData* WrapMessageData(const T& data) {
  return new TypedMessageData<T>(data);
}

template <class T>
inline const T& UseMessageData(MessageData* data) {
  return static_cast<TypedMessageData<T>*>(data)->data();
}

消息處理器MessageHandler

MessageHandler 消息處理器的基類,子類在繼承了該類之后要重載 OnMessage 函數(shù),在其中實現(xiàn)消息響應(yīng)的邏輯。

class MessageHandler {
 public:
  virtual ~MessageHandler();
  virtual void OnMessage(Message* msg) = 0;
 protected:
  MessageHandler() {}
 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};

FunctorMessageHandler MessageHandler的模板子類,用于幫助實現(xiàn)阻塞地跨線程在指定線程上執(zhí)行某個方法,并可獲取執(zhí)行結(jié)果。Thread的Invoke()方法使用該類。該類的構(gòu)造函數(shù)中使用了C++11的右值引用,轉(zhuǎn)發(fā)語義std::forward,以及轉(zhuǎn)移語義std::move。詳見博客C++11 std::move和std::forward

// Helper class to facilitate executing a functor on a thread.
template <class ReturnT, class FunctorT>
class FunctorMessageHandler : public MessageHandler {
 public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { result_ = functor_(); }
  const ReturnT& result() const { return result_; }
  // Returns moved result. Should not call result() or MoveResult() again
  // after this.
  ReturnT MoveResult() { return std::move(result_); }
 private:
  FunctorT functor_;
  ReturnT result_;
};

無返回值特例FunctorMessageHandler 返回值類型為 void 的函數(shù)的FunctorMessageHandler特化版本

// Specialization for ReturnT of void.
template <class FunctorT>
class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
 public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { functor_(); }
  void result() const {}
  void MoveResult() {}
 private:
  FunctorT functor_;
};

總結(jié)

至此,基本上將MQ相關(guān)的“邊角料”介紹完畢了,重點的知識再回顧下:

  • WebRTC中有兩類消息需要在消息循環(huán)中得以處理,即時消息Message以及延遲消息DelayedMessage。他們被投遞進入消息循環(huán)時,分別進入不同的隊列,即時消息Message進入MessageLits類別的即時消息隊列,該隊列是先入先出的對列,這類消息期待得到立即的處理;延遲消息DelayedMessage進入PriorityQueue類別的延遲消息隊列,該隊列是優(yōu)先級隊列,根據(jù)延遲消息本身的觸發(fā)時間以及消息序號進行排序,越早觸發(fā)的消息將越早得以處理。如果再算上線程上同步發(fā)送消息,同步阻塞執(zhí)行方法的話還有另外一個SendList,當(dāng)然,這不是本文需要說明的內(nèi)容了。
  • 消息數(shù)據(jù)的類別有好多種,各自起到不同的作用,尤其要注意DisposeData用來利用消息循環(huán)處理消息的功能來自然而然地銷毀某個類別的數(shù)據(jù)。
  • 消息處理器最重要的就是其OnMessage方法,該方法是消息最終得以處理的地方。WebRTC中的很多重要的類就是MessageHandler的子類,比如PeerConnection;
  • 消息處理器的子類FunctorMessageHandler為跨線程執(zhí)行方法提供了便利,后續(xù)會在線程相關(guān)的文章中重點闡述。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容