webrtc的線程模塊設(shè)計(jì)的源碼分析

對(duì)于一個(gè)大的項(xiàng)目,比如webrtc或者其它公司內(nèi)項(xiàng)目,如果采用了并發(fā)的設(shè)計(jì),那線程的模型就非常非常重要了,可以這么說一定程度上決定了項(xiàng)目的成敗,而webrtc的線程模型值得深入學(xué)習(xí),網(wǎng)上有很多都是基于m85分支甚至更老的分支總結(jié),現(xiàn)依據(jù)最近的m115分支總結(jié)下,設(shè)計(jì)思想大體沒有太多變化,但是代碼實(shí)現(xiàn)還是有很多差別。
webrtc中一個(gè)TaskQueue就是一個(gè)線程,在webrtc中有兩種線程封裝,一種是rtc:thread(也是基于TaskQueueBase的實(shí)現(xiàn)),他包含了某個(gè)SockerServer,用戶處理網(wǎng)絡(luò)相關(guān)的請(qǐng)求,比如PeerConnection中就有network_thread、worker_thread、signaling_thread;另一種就是TaskQueue,也封裝了線程的實(shí)現(xiàn),應(yīng)用范圍也很廣,比如音視頻的編解碼和渲染都有對(duì)應(yīng)的TaskQueue線程,那么這些線程是如何實(shí)現(xiàn)的呢?這兩種又有什么區(qū)別呢?從網(wǎng)上盜了一張圖,該圖總結(jié)了webrtc的線程模型,以供參考,后續(xù)會(huì)逐步展開對(duì)webrtc源碼的研究。


image.png

包括三個(gè)部分,api層面的TaskQueue接口、具體實(shí)現(xiàn)層面的TaskQueue和rtc::Thread

api層面的TaskQueue接口
定義了TaskQueueBase和TaskQueueFactory這個(gè)兩個(gè)最重要虛基類

Image.png

  • TaskQueueBase
    實(shí)現(xiàn)了PostTask、PostDelayedTask、PostDelayedHighPrecisionTask、PostDelayedTaskWithPrecision,具體的PostxxxxxImpl虛函數(shù)由其子類實(shí)現(xiàn),比如rtc_base下的TaskQueueWin
namespace webrtc {

class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
public:
  enum class DelayPrecision {
    // This may include up to a 17 ms leeway in addition to OS timer precision.
    // See PostDelayedTask() for more information.
    kLow,

    // This does not have the additional delay that kLow has, but it is still
    // limited by OS timer precision. See PostDelayedHighPrecisionTask() for
    // more information.
    kHigh,
  };

  virtual void Delete() = 0;
  void PostTask(absl::AnyInvocable<void() &&> task,
         const Location& location = Location::Current()) {
    PostTaskImpl(std::move(task), PostTaskTraits{}, location);
  }

  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
                       TimeDelta delay,
                       const Location& location = Location::Current()) {
    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = false},
                        location);
  }

  void PostDelayedHighPrecisionTask(
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = true},
                        location);
  }

  // As specified by `precision`, calls either PostDelayedTask() or
  // PostDelayedHighPrecisionTask().
  void PostDelayedTaskWithPrecision(
      DelayPrecision precision,
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    switch (precision) {
      case DelayPrecision::kLow:
        PostDelayedTask(std::move(task), delay, location);
        break;

      case DelayPrecision::kHigh:
        PostDelayedHighPrecisionTask(std::move(task), delay, location);
        break;
    }
  }

  // Returns the task queue that is running the current thread.
  // Returns nullptr if this thread is not associated with any task queue.
  // May be called on any thread or task queue, including this task queue.

  static TaskQueueBase* Current();
  bool IsCurrent() const { return Current() == this; }

protected:
  // This is currently only present here to simplify introduction of future
  // planned task queue changes.
  struct PostTaskTraits {};

  struct PostDelayedTaskTraits {
    // If `high_precision` is false, tasks may execute within up to a 17 ms
    // leeway in addition to OS timer precision. Otherwise the task should be
    // limited to OS timer precision. See PostDelayedTask() and
    // PostDelayedHighPrecisionTask() for more information.

    bool high_precision = false;
  };

  class RTC_EXPORT CurrentTaskQueueSetter {
   public:
    explicit CurrentTaskQueueSetter(TaskQueueBase* task_queue);
    CurrentTaskQueueSetter(const CurrentTaskQueueSetter&) = delete;
    CurrentTaskQueueSetter& operator=(const CurrentTaskQueueSetter&) = delete;
    ~CurrentTaskQueueSetter();

   private:
    TaskQueueBase* const previous_;
  };

  // Subclasses should implement this method to support the behavior defined in
  // the PostTask and PostTaskTraits docs above.
  virtual void PostTaskImpl(absl::AnyInvocable<void() &&> task,
                            const PostTaskTraits& traits,
                            const Location& location) = 0;

  // Subclasses should implement this method to support the behavior defined in
  // the PostDelayedTask/PostHighPrecisionDelayedTask and PostDelayedTaskTraits
  // docs above.

  virtual void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                                   TimeDelta delay,
                                   const PostDelayedTaskTraits& traits,
                                   const Location& location) = 0;

  // Users of the TaskQueue should call Delete instead of directly deleting
  // this object.

  virtual ~TaskQueueBase() = default;
};

struct TaskQueueDeleter {
  void operator()(TaskQueueBase* task_queue) const { task_queue->Delete(); }
};

}  // namespace webrtc

  • TaskQueueFactory
    創(chuàng)建TaskQueue的工廠類基類,
namespace webrtc {

// The implementation of this interface must be thread-safe.
class TaskQueueFactory {

public:
  // TaskQueue priority levels. On some platforms these will map to thread
  // priorities, on others such as Mac and iOS, GCD queue priorities.
  enum class Priority { NORMAL = 0, HIGH, LOW };

  virtual ~TaskQueueFactory() = default;
  virtual std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const = 0;
};

}  // namespace webrtc

CreateDefaultTaskQueueFactory接口
在default_task_queue_factory.h文件中,申明了CreateDefaultTaskQueueFactory接口

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials = nullptr);
}  // namespace webrtc

在default_task_queue_factory_win.cc、default_task_queue_factory_stdlib.cc、default_task_queue_factory_stdlib_or_libevent_experiment.cc、default_task_queue_factory_libevent.cc、default_task_queue_factory_gcd.cc都實(shí)現(xiàn)了CreateDefaultTaskQueueFactory,比如default_task_queue_factory_win.cc中的實(shí)現(xiàn)如下,即CreateTaskQueueWinFactory的實(shí)現(xiàn)最終在rtc_base\task_queue_win.cc中。

\\ api\default_task_queue_factory_win.cc

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials) {
  return CreateTaskQueueWinFactory();
}

}  // namespace webrtc

\\ rtc_base\task_queue_win.cc

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

CreateDefaultTaskQueueFactory接口有多種實(shí)現(xiàn),那到底使用那一個(gè)呢?這是在編譯期間決定的,在api/task_queue的BUILD.gn中指定了按照編譯開關(guān)決定使用那個(gè)實(shí)現(xiàn),比如Android使用default_task_queue_factory_stdlib_or_libevent_experiment.cc的實(shí)現(xiàn),windows使用default_task_queue_factory_win.cc的實(shí)現(xiàn)。

rtc_library("default_task_queue_factory") {
  visibility = [ "*" ]
  if (!is_ios && !is_android) {
    poisonous = [ "default_task_queue" ]
  }

  sources = [ "default_task_queue_factory.h" ]
  deps = [
    ":task_queue",
    "../../api:field_trials_view",
    "../../rtc_base/memory:always_valid_pointer",
  ]

  if (rtc_enable_libevent) {
    if (is_android) {
      sources +=
          [ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
      deps += [
        "../../api/transport:field_trial_based_config",
        "../../rtc_base:logging",
        "../../rtc_base:rtc_task_queue_libevent",
        "../../rtc_base:rtc_task_queue_stdlib",
      ]
    } else {
      sources += [ "default_task_queue_factory_libevent.cc" ]
      deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
    }
  } else if (is_mac || is_ios) {
    sources += [ "default_task_queue_factory_gcd.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
  } else if (is_win && current_os != "winuwp") {
    sources += [ "default_task_queue_factory_win.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_win" ]
  } else {
    sources += [ "default_task_queue_factory_stdlib.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
  }
}

至此在api層面交代了TaskQueue的架構(gòu)設(shè)計(jì),即編譯開關(guān)加上不同的工廠創(chuàng)建方法,對(duì)外提供了統(tǒng)一的創(chuàng)建TaskQueue的接口,具體接口的實(shí)現(xiàn)在rtc_base目錄下。

  • rtc_base對(duì)TaskQueueBase和TaskQueueFactory的實(shí)現(xiàn)
  • rtc_base/platform_thread.cc
    跨平臺(tái)封裝了線程的實(shí)現(xiàn),對(duì)于windows調(diào)用CreateThread創(chuàng)建線程,對(duì)于Linux調(diào)用pthread_create創(chuàng)建線程,創(chuàng)建線程的時(shí)候可以設(shè)置線程的優(yōu)先級(jí),優(yōu)先級(jí)定義如下:
enum class ThreadPriority {
  kLow = 1,
  kNormal,
  kHigh,
  kRealtime,
};
  • TaskQueueBase、TaskQueueFactory的子類實(shí)現(xiàn)和對(duì)應(yīng)創(chuàng)建TaskQueueFactory實(shí)例的接口
    不同的實(shí)現(xiàn)細(xì)節(jié)都不同,主要體現(xiàn)在事件循環(huán)的機(jī)制上。
  • task_queue_win.h \ task_queue_win.cc
namespace webrtc {
class TaskQueueWin : public TaskQueueBase {
  ..
};

class TaskQueueWinFactory : public TaskQueueFactory {
public:

  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

}  // namespace webrtc
  • task_queue_stdlib.h \ task_queue_stdlib.cc
namespace webrtc {
class TaskQueueStdlib final : public TaskQueueBase {
  ..
};

class TaskQueueStdlibFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
  return std::make_unique<TaskQueueStdlibFactory>();
}

} // namespace webrtc
  • task_queue_libevent.h \ task_queue_libevent.cc
namespace webrtc {
namespace {
class TaskQueueLibevent final : public TaskQueueBase {
  ..

};

class TaskQueueLibeventFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueLibevent(name,
                              TaskQueuePriorityToThreadPriority(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
  return std::make_unique<TaskQueueLibeventFactory>();
}
  • task_queue_gcd.h \ task_queue_gcd.cc
namespace webrtc {
namespace {
class TaskQueueGcd final : public TaskQueueBase {
  ..
};

class TaskQueueGcdFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
  return std::make_unique<TaskQueueGcdFactory>();
}

}  // namespace webrtc

rtc::Thread

實(shí)現(xiàn)見rtc_base\thread.h, rtc_base\thread.cc兩個(gè)文件。

  • rtc::Thread和ThreadManager

除了擁有TaskQueue的PostTask、PostDelayTask方法外還有以下功能特性,這些功能特性方便我們對(duì)所有的線程的健康狀態(tài)進(jìn)行監(jiān)控:

  • 阻塞調(diào)用的接口BlockingCall,當(dāng)然也可以設(shè)置線程禁止阻塞調(diào)用;
  void BlockingCall(
      FunctionView<void()> functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    BlockingCallImpl(std::move(functor), location);
  }

  template <typename Functor,
            typename ReturnT = std::invoke_result_t<Functor>,
            typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>

  ReturnT BlockingCall(
      Functor&& functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    ReturnT result;
    BlockingCall([&] { result = std::forward<Functor>(functor)(); }, location);
    return result;
  }
  • 統(tǒng)計(jì)一段時(shí)間內(nèi)當(dāng)前線程有多少個(gè)阻塞調(diào)用,并將結(jié)果輸出到日志中,具體查看RTC_LOG_THREAD_BLOCK_COUNT宏的實(shí)現(xiàn);
  • 防止阻塞調(diào)用過載,具體查看RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN宏的實(shí)現(xiàn);
  • 調(diào)試阻塞調(diào)用的耗時(shí),具體查看RegisterSendAndCheckForCycles函數(shù)的實(shí)現(xiàn),這樣可以檢測(cè)目標(biāo)線程是否死鎖;
  • 巡檢所有線程是否健康,具體看ProcessAllMessageQueuesInternal
  • 統(tǒng)計(jì)任務(wù)執(zhí)行的耗時(shí),大于某個(gè)閾值則會(huì)告警,具體看Dispatch的dispatch_warning_ms_定義
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
  TRACE_EVENT0("webrtc", "Thread::Dispatch");
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  std::move(task)();
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch.";

    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}
  • 事件循環(huán)單次循環(huán)的耗時(shí)限制,超過限制后進(jìn)入wait狀態(tài),具體看Get函數(shù)的實(shí)現(xiàn)。

參考文檔:

https://zhuanlan.zhihu.com/p/136070941

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容