前言
本文將介紹消息循環(huán)中的消息(Message),消息中持有的數(shù)據(jù)(MessageData),處理消息的Handler(MessageHandler)的基本內(nèi)容。
其中Message與MessageData相關(guān)的結(jié)構(gòu)體位于rtc_base/message_queue.h中,MessageHandler相關(guān)的類位于rtc_base/message_handler.h中
消息Message
WebRTC中消息相關(guān)的類分為兩種,一種是Message,表征的是即時消息,投放到消息循環(huán)中期待能被立馬消費;另外一種是DelayedMessage,表征的是延遲消息,投放到消息循環(huán)中不會立馬被消費,而是延遲一段時間才會被消費。
Message 源碼如下所示
struct Message {
Message()
: phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
inline bool Match(MessageHandler* handler, uint32_t id) const {
return (handler == nullptr || handler == phandler) &&
(id == MQID_ANY || id == message_id);
}
Location posted_from; // 消息自哪兒產(chǎn)生
MessageHandler* phandler; // 消息如何處理
uint32_t message_id; // 消息id
MessageData* pdata; // 消息中攜帶的數(shù)據(jù)
int64_t ts_sensitive; // 消息產(chǎn)生的時間
};
Message結(jié)構(gòu)體成員有如下幾種:
- Location posted_from: 該成員描述了此消息結(jié)構(gòu)是在哪個函數(shù),哪個文件的哪一行產(chǎn)生的,即消息產(chǎn)生的源頭。通常實際使用中使用宏RTC_FROM_HERE來產(chǎn)生Location對象,Location類的簡單分析見WebRTC源碼分析-定位之Location。
- MessageHandler* phandler:該成員持有消息如何被消費的方法,當(dāng)消息從消息循環(huán)中被取出后,將使用 MessageHandler的OnMessage(Message* msg)來對消息進行處理。特別的PeerConnection對象也是MessageHandler,實現(xiàn)了OnMessage(Message* msg)方法。
- uint32_t message_id:32位無符號整型的消息id。通常有兩類特別的消息,使用特殊的消息id來識別,分別是MQID_ANY (-1)表示所有的消息;MQID_DISPOSE(-2)表示需要丟棄的消息,該消息已經(jīng)在MQM的分析文章中出現(xiàn),詳見WebRTC源碼分析-線程基礎(chǔ)之MessageQueueManager。當(dāng)然,還有使用其他id值用以做特殊處理的,比如某個實現(xiàn)了MessageHandler.OnMessage方法的類,可能需要處理好幾種不同的消息,那么就可以將不同消息id值當(dāng)作區(qū)分消息類別的標(biāo)志,從而在OnMessage方法中分門別類處理好幾種消息了。
- int64_t ts_sensitive:64位時間戳,單位ms。當(dāng)不關(guān)心消息是否處理過慢時,也即消息時間不敏感時,該值為0;若關(guān)心消息是否得到即時處理,一般會設(shè)置ts_sensitive為消息創(chuàng)建時的時間戳 + kMaxMsgLatency常量(150ms),當(dāng)該消息從消息循環(huán)中取出被處理時,將會檢測當(dāng)前時間msCurrent與ts_sensitive的大小,若msCurrent>ts_sensitive,則表示該消息并沒有得到即時的處理,會打印警告日志。超時時間計算為msCurrent-ts_sensitive+kMaxMsgLatency。
- MessageData* pdata:消息數(shù)據(jù)。有多個變種,后面詳述。
Message結(jié)構(gòu)體還提供了兩個方法。
- 構(gòu)造函數(shù):沒啥好說,Message沒有提供析構(gòu)函數(shù),那么Message持有的消息處理器phandler以及消息數(shù)據(jù)pdata何時被銷毀就非常值得注意了。
- 匹配函數(shù):該函數(shù)在MQ清理消息時,用于評判哪些消息是滿足條件的,即匹配上了。看源碼可知,只要handler為空或者相等 并且 消息id為MQID_ANY或者相等,就被認(rèn)為是找到了滿足條件的消息。
DelayedMessage源碼如下
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
public:
DelayedMessage(int64_t delay,
int64_t trigger,
uint32_t num,
const Message& msg)
: cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
bool operator<(const DelayedMessage& dmsg) const {
return (dmsg.msTrigger_ < msTrigger_) ||
((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
}
int64_t cmsDelay_; // for debugging
int64_t msTrigger_;
uint32_t num_;
Message msg_;
};
DelayedMessage與Message并非是繼承is-a關(guān)系,而是has-a的關(guān)系。提供了除Message msg_成員之外的另幾個成員:
- int64_t cmsDelay_:延遲時間,消息延遲多長時間后需要被處理,單位ms
- int64_t msTrigger_:觸發(fā)時間,消息需要被處理的時間戳,為消息創(chuàng)建時的時間戳+cmsDelay_,單位ms
- uint32_t num_:消息的序號,在某個MQ中延遲消息的num_單調(diào)遞增。
由于DelayedMessage在MQ中會放至于專門的優(yōu)先級隊列中,優(yōu)先級如何確定?DelayedMessage重載了小于"<"運算符以確定排序,先按消息需要被處理的觸發(fā)時間戳msTrigger_排序;若觸發(fā)時間相同,則按消息序號排序。當(dāng)某個延遲消息要投放到延遲隊列中時,該優(yōu)先級隊列會根據(jù)“<”運算符去比較這個延遲消息與隊列中的消息以確定應(yīng)在隊列的哪個位置插入該消息。
MessageList被定義為std::list<Message>, MessageQueue中的即時消息就存儲于MessageList類別的列表中,按照先入先出的方式挨個進行處理。
typedef std::list<Message> MessageList;
消息MessgeData
MessgeData 消息數(shù)據(jù)基類,其析構(gòu)函數(shù)被定義為virtual類型,以防內(nèi)存泄漏
class MessageData {
public:
MessageData() {}
virtual ~MessageData() {}
};
TypedMessageData 使用模板定義的MessageData 的一個子類,便于擴展。
template <class T>
class TypedMessageData : public MessageData {
public:
explicit TypedMessageData(const T& data) : data_(data) {}
const T& data() const { return data_; }
T& data() { return data_; }
private:
T data_;
};
ScopedMessageData 類似于 TypedMessageData,用于指針類型。在析構(gòu)函數(shù)中,自動對該指針調(diào)用 delete。
// Like TypedMessageData, but for pointers that require a delete.
template <class T>
class ScopedMessageData : public MessageData {
public:
explicit ScopedMessageData(std::unique_ptr<T> data)
: data_(std::move(data)) {}
// Deprecated.
// TODO(deadbeef): Remove this once downstream applications stop using it.
explicit ScopedMessageData(T* data) : data_(data) {}
// Deprecated.
// TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
// this once downstream applications stop using it, then rename inner_data to
// just data.
const std::unique_ptr<T>& data() const { return data_; }
std::unique_ptr<T>& data() { return data_; }
const T& inner_data() const { return *data_; }
T& inner_data() { return *data_; }
private:
std::unique_ptr<T> data_;
};
ScopedRefMessageData 類似于ScopedMessageData,用于引用計數(shù)的指針類型。
// Like ScopedMessageData, but for reference counted pointers.
template <class T>
class ScopedRefMessageData : public MessageData {
public:
explicit ScopedRefMessageData(T* data) : data_(data) {}
const scoped_refptr<T>& data() const { return data_; }
scoped_refptr<T>& data() { return data_; }
private:
scoped_refptr<T> data_;
};
DisposeData 這個值得注意下,和前面幾個MessageData子類不一樣,該對象并沒有提供其內(nèi)部數(shù)據(jù)data_的方法,那么該對象正如其名,持有需要進行銷毀的數(shù)據(jù)。在哪些場景下使用?
- 有些函數(shù)不便在當(dāng)前函數(shù)范圍內(nèi)銷毀對象,那么就可以將需要銷毀的對象封裝到DisposeData中,并進一步封裝成消息并投遞到消息隊列中,等待線程的消息循環(huán)取出消息并進行銷毀時將該對象銷毀。見范例 HttpServer::Connection::~Connection;這里類似于QT中QObject的deletelater()方法的作用,起到延遲銷毀的作用。
- 某對象屬于某一線程,因此銷毀操作應(yīng)該交給所有者線程。這個可以通過調(diào)用對應(yīng)線程對象Thread的MessageQueue的Dispose(T* doomed)來實現(xiàn)。在介紹MessageQueue的文章中會詳述。
template <class T>
class DisposeData : public MessageData {
public:
explicit DisposeData(T* data) : data_(data) {}
virtual ~DisposeData() { delete data_; }
private:
T* data_;
};
WrapMessageData && UseMessageData 兩個模板方法,分別用來將數(shù)據(jù)封裝成MessageData以及取出對應(yīng)的數(shù)據(jù),封裝拆箱操作。
template <class T>
inline MessageData* WrapMessageData(const T& data) {
return new TypedMessageData<T>(data);
}
template <class T>
inline const T& UseMessageData(MessageData* data) {
return static_cast<TypedMessageData<T>*>(data)->data();
}
消息處理器MessageHandler
MessageHandler 消息處理器的基類,子類在繼承了該類之后要重載 OnMessage 函數(shù),在其中實現(xiàn)消息響應(yīng)的邏輯。
class MessageHandler {
public:
virtual ~MessageHandler();
virtual void OnMessage(Message* msg) = 0;
protected:
MessageHandler() {}
private:
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
FunctorMessageHandler MessageHandler的模板子類,用于幫助實現(xiàn)阻塞地跨線程在指定線程上執(zhí)行某個方法,并可獲取執(zhí)行結(jié)果。Thread的Invoke()方法使用該類。該類的構(gòu)造函數(shù)中使用了C++11的右值引用,轉(zhuǎn)發(fā)語義std::forward,以及轉(zhuǎn)移語義std::move。詳見博客C++11 std::move和std::forward
// Helper class to facilitate executing a functor on a thread.
template <class ReturnT, class FunctorT>
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
virtual void OnMessage(Message* msg) { result_ = functor_(); }
const ReturnT& result() const { return result_; }
// Returns moved result. Should not call result() or MoveResult() again
// after this.
ReturnT MoveResult() { return std::move(result_); }
private:
FunctorT functor_;
ReturnT result_;
};
無返回值特例FunctorMessageHandler 返回值類型為 void 的函數(shù)的FunctorMessageHandler特化版本
// Specialization for ReturnT of void.
template <class FunctorT>
class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
public:
explicit FunctorMessageHandler(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
virtual void OnMessage(Message* msg) { functor_(); }
void result() const {}
void MoveResult() {}
private:
FunctorT functor_;
};
總結(jié)
至此,基本上將MQ相關(guān)的“邊角料”介紹完畢了,重點的知識再回顧下:
- WebRTC中有兩類消息需要在消息循環(huán)中得以處理,即時消息Message以及延遲消息DelayedMessage。他們被投遞進入消息循環(huán)時,分別進入不同的隊列,即時消息Message進入MessageLits類別的即時消息隊列,該隊列是先入先出的對列,這類消息期待得到立即的處理;延遲消息DelayedMessage進入PriorityQueue類別的延遲消息隊列,該隊列是優(yōu)先級隊列,根據(jù)延遲消息本身的觸發(fā)時間以及消息序號進行排序,越早觸發(fā)的消息將越早得以處理。如果再算上線程上同步發(fā)送消息,同步阻塞執(zhí)行方法的話還有另外一個SendList,當(dāng)然,這不是本文需要說明的內(nèi)容了。
- 消息數(shù)據(jù)的類別有好多種,各自起到不同的作用,尤其要注意DisposeData用來利用消息循環(huán)處理消息的功能來自然而然地銷毀某個類別的數(shù)據(jù)。
- 消息處理器最重要的就是其OnMessage方法,該方法是消息最終得以處理的地方。WebRTC中的很多重要的類就是MessageHandler的子類,比如PeerConnection;
- 消息處理器的子類FunctorMessageHandler為跨線程執(zhí)行方法提供了便利,后續(xù)會在線程相關(guān)的文章中重點闡述。