對(duì)于一個(gè)大的項(xiàng)目,比如webrtc或者其它公司內(nèi)項(xiàng)目,如果采用了并發(fā)的設(shè)計(jì),那線程的模型就非常非常重要了,可以這么說一定程度上決定了項(xiàng)目的成敗,而webrtc的線程模型值得深入學(xué)習(xí),網(wǎng)上有很多都是基于m85分支甚至更老的分支總結(jié),現(xiàn)依據(jù)最近的m115分支總結(jié)下,設(shè)計(jì)思想大體沒有太多變化,但是代碼實(shí)現(xiàn)還是有很多差別。
webrtc中一個(gè)TaskQueue就是一個(gè)線程,在webrtc中有兩種線程封裝,一種是rtc:thread(也是基于TaskQueueBase的實(shí)現(xiàn)),他包含了某個(gè)SockerServer,用戶處理網(wǎng)絡(luò)相關(guān)的請(qǐng)求,比如PeerConnection中就有network_thread、worker_thread、signaling_thread;另一種就是TaskQueue,也封裝了線程的實(shí)現(xiàn),應(yīng)用范圍也很廣,比如音視頻的編解碼和渲染都有對(duì)應(yīng)的TaskQueue線程,那么這些線程是如何實(shí)現(xiàn)的呢?這兩種又有什么區(qū)別呢?從網(wǎng)上盜了一張圖,該圖總結(jié)了webrtc的線程模型,以供參考,后續(xù)會(huì)逐步展開對(duì)webrtc源碼的研究。

包括三個(gè)部分,api層面的TaskQueue接口、具體實(shí)現(xiàn)層面的TaskQueue和rtc::Thread
api層面的TaskQueue接口
定義了TaskQueueBase和TaskQueueFactory這個(gè)兩個(gè)最重要虛基類

- TaskQueueBase
實(shí)現(xiàn)了PostTask、PostDelayedTask、PostDelayedHighPrecisionTask、PostDelayedTaskWithPrecision,具體的PostxxxxxImpl虛函數(shù)由其子類實(shí)現(xiàn),比如rtc_base下的TaskQueueWin
namespace webrtc {
class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
public:
enum class DelayPrecision {
// This may include up to a 17 ms leeway in addition to OS timer precision.
// See PostDelayedTask() for more information.
kLow,
// This does not have the additional delay that kLow has, but it is still
// limited by OS timer precision. See PostDelayedHighPrecisionTask() for
// more information.
kHigh,
};
virtual void Delete() = 0;
void PostTask(absl::AnyInvocable<void() &&> task,
const Location& location = Location::Current()) {
PostTaskImpl(std::move(task), PostTaskTraits{}, location);
}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
PostDelayedTaskImpl(std::move(task), delay,
PostDelayedTaskTraits{.high_precision = false},
location);
}
void PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
PostDelayedTaskImpl(std::move(task), delay,
PostDelayedTaskTraits{.high_precision = true},
location);
}
// As specified by `precision`, calls either PostDelayedTask() or
// PostDelayedHighPrecisionTask().
void PostDelayedTaskWithPrecision(
DelayPrecision precision,
absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const Location& location = Location::Current()) {
switch (precision) {
case DelayPrecision::kLow:
PostDelayedTask(std::move(task), delay, location);
break;
case DelayPrecision::kHigh:
PostDelayedHighPrecisionTask(std::move(task), delay, location);
break;
}
}
// Returns the task queue that is running the current thread.
// Returns nullptr if this thread is not associated with any task queue.
// May be called on any thread or task queue, including this task queue.
static TaskQueueBase* Current();
bool IsCurrent() const { return Current() == this; }
protected:
// This is currently only present here to simplify introduction of future
// planned task queue changes.
struct PostTaskTraits {};
struct PostDelayedTaskTraits {
// If `high_precision` is false, tasks may execute within up to a 17 ms
// leeway in addition to OS timer precision. Otherwise the task should be
// limited to OS timer precision. See PostDelayedTask() and
// PostDelayedHighPrecisionTask() for more information.
bool high_precision = false;
};
class RTC_EXPORT CurrentTaskQueueSetter {
public:
explicit CurrentTaskQueueSetter(TaskQueueBase* task_queue);
CurrentTaskQueueSetter(const CurrentTaskQueueSetter&) = delete;
CurrentTaskQueueSetter& operator=(const CurrentTaskQueueSetter&) = delete;
~CurrentTaskQueueSetter();
private:
TaskQueueBase* const previous_;
};
// Subclasses should implement this method to support the behavior defined in
// the PostTask and PostTaskTraits docs above.
virtual void PostTaskImpl(absl::AnyInvocable<void() &&> task,
const PostTaskTraits& traits,
const Location& location) = 0;
// Subclasses should implement this method to support the behavior defined in
// the PostDelayedTask/PostHighPrecisionDelayedTask and PostDelayedTaskTraits
// docs above.
virtual void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
TimeDelta delay,
const PostDelayedTaskTraits& traits,
const Location& location) = 0;
// Users of the TaskQueue should call Delete instead of directly deleting
// this object.
virtual ~TaskQueueBase() = default;
};
struct TaskQueueDeleter {
void operator()(TaskQueueBase* task_queue) const { task_queue->Delete(); }
};
} // namespace webrtc
- TaskQueueFactory
創(chuàng)建TaskQueue的工廠類基類,
namespace webrtc {
// The implementation of this interface must be thread-safe.
class TaskQueueFactory {
public:
// TaskQueue priority levels. On some platforms these will map to thread
// priorities, on others such as Mac and iOS, GCD queue priorities.
enum class Priority { NORMAL = 0, HIGH, LOW };
virtual ~TaskQueueFactory() = default;
virtual std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const = 0;
};
} // namespace webrtc
CreateDefaultTaskQueueFactory接口
在default_task_queue_factory.h文件中,申明了CreateDefaultTaskQueueFactory接口
namespace webrtc {
std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
const FieldTrialsView* field_trials = nullptr);
} // namespace webrtc
在default_task_queue_factory_win.cc、default_task_queue_factory_stdlib.cc、default_task_queue_factory_stdlib_or_libevent_experiment.cc、default_task_queue_factory_libevent.cc、default_task_queue_factory_gcd.cc都實(shí)現(xiàn)了CreateDefaultTaskQueueFactory,比如default_task_queue_factory_win.cc中的實(shí)現(xiàn)如下,即CreateTaskQueueWinFactory的實(shí)現(xiàn)最終在rtc_base\task_queue_win.cc中。
\\ api\default_task_queue_factory_win.cc
namespace webrtc {
std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
const FieldTrialsView* field_trials) {
return CreateTaskQueueWinFactory();
}
} // namespace webrtc
\\ rtc_base\task_queue_win.cc
std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
return std::make_unique<TaskQueueWinFactory>();
}
CreateDefaultTaskQueueFactory接口有多種實(shí)現(xiàn),那到底使用那一個(gè)呢?這是在編譯期間決定的,在api/task_queue的BUILD.gn中指定了按照編譯開關(guān)決定使用那個(gè)實(shí)現(xiàn),比如Android使用default_task_queue_factory_stdlib_or_libevent_experiment.cc的實(shí)現(xiàn),windows使用default_task_queue_factory_win.cc的實(shí)現(xiàn)。
rtc_library("default_task_queue_factory") {
visibility = [ "*" ]
if (!is_ios && !is_android) {
poisonous = [ "default_task_queue" ]
}
sources = [ "default_task_queue_factory.h" ]
deps = [
":task_queue",
"../../api:field_trials_view",
"../../rtc_base/memory:always_valid_pointer",
]
if (rtc_enable_libevent) {
if (is_android) {
sources +=
[ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
deps += [
"../../api/transport:field_trial_based_config",
"../../rtc_base:logging",
"../../rtc_base:rtc_task_queue_libevent",
"../../rtc_base:rtc_task_queue_stdlib",
]
} else {
sources += [ "default_task_queue_factory_libevent.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
}
} else if (is_mac || is_ios) {
sources += [ "default_task_queue_factory_gcd.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
} else if (is_win && current_os != "winuwp") {
sources += [ "default_task_queue_factory_win.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_win" ]
} else {
sources += [ "default_task_queue_factory_stdlib.cc" ]
deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
}
}
至此在api層面交代了TaskQueue的架構(gòu)設(shè)計(jì),即編譯開關(guān)加上不同的工廠創(chuàng)建方法,對(duì)外提供了統(tǒng)一的創(chuàng)建TaskQueue的接口,具體接口的實(shí)現(xiàn)在rtc_base目錄下。
- rtc_base對(duì)TaskQueueBase和TaskQueueFactory的實(shí)現(xiàn)
- rtc_base/platform_thread.cc
跨平臺(tái)封裝了線程的實(shí)現(xiàn),對(duì)于windows調(diào)用CreateThread創(chuàng)建線程,對(duì)于Linux調(diào)用pthread_create創(chuàng)建線程,創(chuàng)建線程的時(shí)候可以設(shè)置線程的優(yōu)先級(jí),優(yōu)先級(jí)定義如下:
enum class ThreadPriority {
kLow = 1,
kNormal,
kHigh,
kRealtime,
};
- TaskQueueBase、TaskQueueFactory的子類實(shí)現(xiàn)和對(duì)應(yīng)創(chuàng)建TaskQueueFactory實(shí)例的接口
不同的實(shí)現(xiàn)細(xì)節(jié)都不同,主要體現(xiàn)在事件循環(huán)的機(jī)制上。 - task_queue_win.h \ task_queue_win.cc
namespace webrtc {
class TaskQueueWin : public TaskQueueBase {
..
};
class TaskQueueWinFactory : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
}
};
std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
return std::make_unique<TaskQueueWinFactory>();
}
} // namespace webrtc
- task_queue_stdlib.h \ task_queue_stdlib.cc
namespace webrtc {
class TaskQueueStdlib final : public TaskQueueBase {
..
};
class TaskQueueStdlibFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
}
};
std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
return std::make_unique<TaskQueueStdlibFactory>();
}
} // namespace webrtc
- task_queue_libevent.h \ task_queue_libevent.cc
namespace webrtc {
namespace {
class TaskQueueLibevent final : public TaskQueueBase {
..
};
class TaskQueueLibeventFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueLibevent(name,
TaskQueuePriorityToThreadPriority(priority)));
}
};
} // namespace
std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
return std::make_unique<TaskQueueLibeventFactory>();
}
- task_queue_gcd.h \ task_queue_gcd.cc
namespace webrtc {
namespace {
class TaskQueueGcd final : public TaskQueueBase {
..
};
class TaskQueueGcdFactory final : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
}
};
} // namespace
std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
return std::make_unique<TaskQueueGcdFactory>();
}
} // namespace webrtc
rtc::Thread
實(shí)現(xiàn)見rtc_base\thread.h, rtc_base\thread.cc兩個(gè)文件。
- rtc::Thread和ThreadManager
除了擁有TaskQueue的PostTask、PostDelayTask方法外還有以下功能特性,這些功能特性方便我們對(duì)所有的線程的健康狀態(tài)進(jìn)行監(jiān)控:
- 阻塞調(diào)用的接口BlockingCall,當(dāng)然也可以設(shè)置線程禁止阻塞調(diào)用;
void BlockingCall(
FunctionView<void()> functor,
const webrtc::Location& location = webrtc::Location::Current()) {
BlockingCallImpl(std::move(functor), location);
}
template <typename Functor,
typename ReturnT = std::invoke_result_t<Functor>,
typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>
ReturnT BlockingCall(
Functor&& functor,
const webrtc::Location& location = webrtc::Location::Current()) {
ReturnT result;
BlockingCall([&] { result = std::forward<Functor>(functor)(); }, location);
return result;
}
- 統(tǒng)計(jì)一段時(shí)間內(nèi)當(dāng)前線程有多少個(gè)阻塞調(diào)用,并將結(jié)果輸出到日志中,具體查看RTC_LOG_THREAD_BLOCK_COUNT宏的實(shí)現(xiàn);
- 防止阻塞調(diào)用過載,具體查看RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN宏的實(shí)現(xiàn);
- 調(diào)試阻塞調(diào)用的耗時(shí),具體查看RegisterSendAndCheckForCycles函數(shù)的實(shí)現(xiàn),這樣可以檢測(cè)目標(biāo)線程是否死鎖;
- 巡檢所有線程是否健康,具體看ProcessAllMessageQueuesInternal
- 統(tǒng)計(jì)任務(wù)執(zhí)行的耗時(shí),大于某個(gè)閾值則會(huì)告警,具體看Dispatch的dispatch_warning_ms_定義
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
TRACE_EVENT0("webrtc", "Thread::Dispatch");
RTC_DCHECK_RUN_ON(this);
int64_t start_time = TimeMillis();
std::move(task)();
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch.";
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_ = diff + 1;
}
}
- 事件循環(huán)單次循環(huán)的耗時(shí)限制,超過限制后進(jìn)入wait狀態(tài),具體看Get函數(shù)的實(shí)現(xiàn)。
參考文檔: