Apple平臺(tái)內(nèi)核級(jí)workqueue機(jī)制:完整技術(shù)解析
1. 架構(gòu)概覽:內(nèi)核級(jí)線程池的設(shè)計(jì)理念
?? 核心澄清:Apple的workqueue本質(zhì)上就是一個(gè)高度優(yōu)化的內(nèi)核級(jí)線程池系統(tǒng)
Apple的workqueue機(jī)制將傳統(tǒng)的用戶空間線程池管理完全下沉到內(nèi)核層,形成了一個(gè)高度集成的線程管理子系統(tǒng)。關(guān)鍵在于:這是一個(gè)線程池,不是每次都創(chuàng)建新線程。設(shè)計(jì)的核心思想是讓內(nèi)核調(diào)度器直接參與線程池的創(chuàng)建、調(diào)度和生命周期管理。
workqueue核心架構(gòu)
flowchart TD
A[App發(fā)起任務(wù)] --> B[libdispatch]
B --> C[_dispatch_root_queue_poke]
C --> D[_pthread_workqueue_addthreads]
D --> E[系統(tǒng)調(diào)用: __workq_kernreturn]
E --> F[內(nèi)核: workq_reqthreads]
F --> G{檢查線程池狀態(tài)}
G -->|有空閑線程| H[workq_pop_idle_thread<br/>從wq_thidlelist獲取]
G -->|無空閑線程| I[workq_add_new_idle_thread<br/>創(chuàng)建新線程加入池]
H --> J[workq_thread_wakeup<br/>喚醒池中線程]
I --> K[thread_create_workq_waiting<br/>設(shè)置continuation]
J --> L[workq_unpark_continue<br/>線程開始工作]
K --> M[加入wq_thnewlist<br/>等待分配]
M --> L
L --> N[workq_setup_and_run<br/>跳轉(zhuǎn)用戶空間]
N --> O[_dispatch_worker_thread2<br/>執(zhí)行用戶任務(wù)]
O --> P[任務(wù)完成]
P --> Q[返回內(nèi)核空間]
Q --> R[workq_park_and_unlock<br/>推回線程池]
R --> S[workq_push_idle_thread<br/>進(jìn)入wq_thidlelist]
S --> T[thread_block休眠<br/>等待下次復(fù)用]
T -.->|新任務(wù)到達(dá)| G
style H fill:#e1f5fe
style I fill:#fff3e0
style S fill:#e8f5e8
style T fill:#f3e5f5
workqueue的核心結(jié)構(gòu)
// 內(nèi)核工作隊(duì)列核心結(jié)構(gòu) (bsd/pthread/pthread_workqueue.c)
struct workqueue {
os_refcnt_t wq_refcnt; // 引用計(jì)數(shù)
lck_spin_t wq_lock; // 自旋鎖保護(hù)
uint32_t wq_constrained_threads_scheduled; // 受限線程數(shù)
uint32_t wq_nthreads; // 總線程數(shù)
uint32_t wq_thidlecount; // 空閑線程數(shù) ← 關(guān)鍵:線程池計(jì)數(shù)
uint32_t wq_timer_interval; // 定時(shí)器間隔
// 不同優(yōu)先級(jí)的線程計(jì)數(shù)
uint16_t wq_thscheduled_count[WORKQ_NUM_QOS_BUCKETS];
// 線程池的三層隊(duì)列管理 ← 這就是線程池!
TAILQ_HEAD(, uthread) wq_thrunlist; // 運(yùn)行中線程
TAILQ_HEAD(, uthread) wq_thnewlist; // 新創(chuàng)建線程
TAILQ_HEAD(, uthread) wq_thidlelist; // 空閑線程池 ← 核心池
// 請(qǐng)求隊(duì)列
struct priority_queue wq_overcommit_queue; // 過度提交隊(duì)列
struct priority_queue wq_constrained_queue; // 受限隊(duì)列
};
2. 系統(tǒng)調(diào)用路徑與線程池復(fù)用機(jī)制
2.1 完整調(diào)用鏈路
sequenceDiagram
participant App as 應(yīng)用程序
participant GCD as libdispatch
participant Kern as XNU內(nèi)核
participant Pool as 線程池
participant Thread as 工作線程
App->>GCD: dispatch_async(queue, block)
GCD->>GCD: _dispatch_root_queue_push()
Note over GCD: 檢查是否需要線程
GCD->>Kern: _pthread_workqueue_addthreads(remaining, priority)
Kern->>Kern: __workq_kernreturn(WQOPS_QUEUE_REQTHREADS)
Kern->>Pool: workq_reqthreads(p, numthreads, priority)
alt 優(yōu)先路徑:池中有空閑線程
Pool->>Pool: workq_pop_idle_thread()
Note right of Pool: 零創(chuàng)建開銷,直接復(fù)用
Pool->>Thread: workq_thread_wakeup()
Thread->>Thread: workq_unpark_continue()
else 池中無空閑線程
Pool->>Pool: workq_add_new_idle_thread()
Note right of Pool: 創(chuàng)建時(shí)直接設(shè)置continuation
Pool->>Thread: thread_create_workq_waiting(workq_unpark_continue)
end
Thread->>GCD: workq_setup_and_run()
GCD->>Thread: _dispatch_worker_thread2(priority)
Thread->>App: 執(zhí)行用戶block
Note over Thread: 工作完成,回到池中
Thread->>Pool: workq_park_and_unlock()
Pool->>Pool: workq_push_idle_thread()
Thread->>Thread: thread_block(workq_unpark_continue)
Note right of Thread: 線程在池中休眠等待復(fù)用
2.2 libdispatch → 內(nèi)核的調(diào)用路徑
// libdispatch發(fā)起線程請(qǐng)求的完整路徑
_dispatch_root_queue_poke_slow()
↓
_pthread_workqueue_addthreads(remaining, priority) // libpthread
↓
__workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, priority) // 系統(tǒng)調(diào)用
↓
workq_kernreturn() // XNU內(nèi)核入口 (bsd/pthread/pthread_workqueue.c)
↓
workq_reqthreads(p, numthreads, priority) // 內(nèi)核workqueue子系統(tǒng)
2.3 關(guān)鍵系統(tǒng)調(diào)用操作碼
// bsd/pthread/workqueue_syscalls.h - 內(nèi)核workqueue操作命令
#define WQOPS_THREAD_RETURN 0x004 /* 線程回到內(nèi)核池 */
#define WQOPS_QUEUE_REQTHREADS 0x020 /* 請(qǐng)求指定數(shù)量線程 */
#define WQOPS_THREAD_KEVENT_RETURN 0x040 /* kevent線程回池 */
#define WQOPS_THREAD_WORKLOOP_RETURN 0x100 /* workloop線程回池 */
#define WQOPS_SETUP_DISPATCH 0x400 /* 初始化workqueue */
2.4 線程池復(fù)用核心:優(yōu)先復(fù)用,按需創(chuàng)建
workq_reqthreads的實(shí)際處理邏輯(簡(jiǎn)化版):
// 真實(shí)的線程請(qǐng)求處理 (bsd/pthread/pthread_workqueue.c)
static int
workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp)
{
struct workqueue *wq = proc_get_wqptr(p);
thread_qos_t qos = _pthread_priority_thread_qos(pp);
uint32_t unpaced = reqcount - 1;
workq_lock_spin(wq);
// ?? 關(guān)鍵:優(yōu)先從現(xiàn)有空閑線程池分配
while (unpaced > 0 && wq->wq_thidlecount) {
struct uthread *uth;
bool needs_wakeup;
// 從空閑線程池獲取線程
uth = workq_pop_idle_thread(wq, flags, &needs_wakeup);
// 更新活躍線程計(jì)數(shù)和優(yōu)先級(jí)
_wq_thactive_inc(wq, qos);
wq->wq_thscheduled_count[_wq_bucket(qos)]++;
workq_thread_reset_pri(wq, uth, req, true);
// 設(shè)置線程的upcall參數(shù)
uth->uu_save.uus_workq_park_data.thread_request = req;
if (needs_wakeup) {
workq_thread_wakeup(uth); // 喚醒池中線程
}
unpaced--;
reqcount--;
}
// 只有在池中無足夠空閑線程時(shí)才創(chuàng)建新線程
while (unpaced && wq->wq_nthreads < wq_max_threads) {
if (workq_add_new_idle_thread(p, wq, workq_unpark_continue,
false, NULL) != KERN_SUCCESS) {
break;
}
unpaced--;
}
// 剩余未滿足的請(qǐng)求入隊(duì)等待
if (reqcount > 0) {
req->tr_count = (uint16_t)reqcount;
workq_threadreq_enqueue(wq, req);
workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
}
workq_unlock(wq);
return 0;
}
2.5 線程池管理:從池中獲取線程
// 從線程池獲取空閑線程 (bsd/pthread/pthread_workqueue.c)
static struct uthread *
workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags, bool *needs_wakeup)
{
struct uthread *uth;
// 優(yōu)先從已有的空閑線程池獲取
if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
} else {
// 次選:從新創(chuàng)建但未使用的線程獲取
uth = TAILQ_FIRST(&wq->wq_thnewlist);
TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
}
// 移入運(yùn)行隊(duì)列
TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
wq->wq_threads_scheduled++;
wq->wq_thidlecount--; // 減少空閑計(jì)數(shù)
return uth; // 返回復(fù)用的線程
}
2.6 新線程創(chuàng)建:僅在必要時(shí)
// 創(chuàng)建新的工作線程加入池中 (bsd/pthread/pthread_workqueue.c)
static kern_return_t
workq_add_new_idle_thread(proc_t p, struct workqueue *wq,
thread_continue_t continuation, bool bound, thread_t *new_thread)
{
mach_vm_offset_t th_stackaddr;
kern_return_t kret;
thread_t th;
wq->wq_nthreads++;
workq_unlock(wq);
// 1. 創(chuàng)建線程棧
kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
if (kret != KERN_SUCCESS) goto out;
// 2. 創(chuàng)建內(nèi)核線程,直接設(shè)置continuation
kret = thread_create_workq_waiting(proc_task(p),
continuation, // 關(guān)鍵:直接設(shè)置workq_unpark_continue
&th, bound);
if (kret != KERN_SUCCESS) {
pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
goto out;
}
// 3. 初始化uthread結(jié)構(gòu)并加入線程池
struct uthread *uth = get_bsdthread_info(th);
uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
uth->uu_workq_flags = UT_WORKQ_NEW;
// 4. 重要:新線程直接加入線程池,等待復(fù)用
wq->wq_thidlecount++;
TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
return KERN_SUCCESS;
}
2.7 線程完成工作后回到線程池
// libpthread用戶空間線程完成工作后 (src/pthread.c)
void _pthread_wqthread(pthread_t self, mach_port_t kport, void *stackaddr,
void *keventlist, int flags, int nkevents) {
// 調(diào)用libdispatch工作函數(shù)
if (flags & WQ_FLAG_THREAD_WORKLOOP) {
(*__libdispatch_workloopfunction)(kqidptr, &self->arg, &self->wq_nevents);
__workq_kernreturn(WQOPS_THREAD_WORKLOOP_RETURN, self->arg, self->wq_nevents, 0);
} else if (flags & WQ_FLAG_THREAD_KEVENT) {
(*__libdispatch_keventfunction)(&self->arg, &self->wq_nevents);
__workq_kernreturn(WQOPS_THREAD_KEVENT_RETURN, self->arg, self->wq_nevents, 0);
} else {
// 普通工作線程路徑
(*__libdispatch_workerfunction)(workq_function2_arg);
// 關(guān)鍵:工作完成后直接系統(tǒng)調(diào)用回到內(nèi)核池
__workq_kernreturn(WQOPS_THREAD_RETURN, NULL, 0, 0);
}
}
3. 核心機(jī)制:park/unpark線程池休眠喚醒
3.1 線程park:回到線程池休眠
// 線程完成工作后回到池中休眠 (bsd/pthread/pthread_workqueue.c)
static void
workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
uint32_t setup_flags)
{
// 1. 關(guān)鍵:推入空閑線程池而非銷毀
workq_push_idle_thread(p, wq, uth, setup_flags);
// 2. 重置CPU占用統(tǒng)計(jì)
workq_thread_reset_cpupercent(NULL, uth);
// 3. 清理線程狀態(tài)但保持內(nèi)核結(jié)構(gòu)
if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
workq_unlock(wq);
// 清理?xiàng)?nèi)存(如果需要)、voucher等
workq_lock_spin(wq);
}
// 4. 檢查是否被重新調(diào)度(在清理過程中被喚醒)
if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
__builtin_unreachable();
}
// 5. 設(shè)置等待事件并休眠 - 線程保持在內(nèi)核中等待復(fù)用
assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
workq_unlock(wq);
// 6. 進(jìn)入休眠,設(shè)置喚醒continuation
thread_block(workq_unpark_continue);
__builtin_unreachable();
}
3.2 線程unpark:從線程池智能喚醒
// 線程池中線程被喚醒處理 (bsd/pthread/pthread_workqueue.c)
static void
workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
{
thread_t th = current_thread();
struct uthread *uth = get_bsdthread_info(th);
proc_t p = current_proc();
struct workqueue *wq = proc_get_wqptr_fast(p);
workq_lock_spin(wq);
// 1. 創(chuàng)建者線程的負(fù)載控制
if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
// 如果當(dāng)前線程數(shù)已足夠處理負(fù)載,讓創(chuàng)建者線程讓步
workq_unlock(wq);
thread_yield_with_continuation(workq_unpark_continue, NULL);
__builtin_unreachable();
}
// 2. 檢查線程運(yùn)行狀態(tài)
if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
__builtin_unreachable();
}
// 3. 處理線程終止
if (uth->uu_workq_flags & UT_WORKQ_DYING) {
workq_unpark_for_death_and_unlock(p, wq, uth,
WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
__builtin_unreachable();
}
// 4. 重新進(jìn)入休眠等待
assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
workq_unlock(wq);
thread_block(workq_unpark_continue);
__builtin_unreachable();
}
4. libdispatch集成與智能調(diào)度
4.1 workqueue初始化
// libdispatch初始化workqueue (src/queue.c)
static void _dispatch_root_queues_init_once(void *context)
{
// 獲取內(nèi)核支持的workqueue特性
int r = _pthread_workqueue_supported();
if (r < 0) {
DISPATCH_INTERNAL_CRASH(-r, "Could not initialize workqueue");
}
int wq_supported = r;
// 注冊(cè)libdispatch的工作函數(shù)到內(nèi)核
if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
// 完整模式:支持workloop
r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
_dispatch_kevent_worker_thread,
_dispatch_workloop_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
// 支持kevent
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
} else {
// 基礎(chǔ)模式:僅普通工作線程
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
}
}
4.2 智能線程池大小控制
// 線程池的智能清理策略 (bsd/pthread/pthread_workqueue.c)
static void
workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
{
struct uthread *uth;
if (wq->wq_thidlecount <= 1) {
return; // 保持最少一個(gè)空閑線程
}
if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
return;
}
uint64_t now = mach_absolute_time();
uint64_t delay = workq_kill_delay_for_idle_thread(wq);
if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
// 空閑時(shí)間過長(zhǎng),回收線程
wq->wq_thdying_count++;
uth->uu_workq_flags |= UT_WORKQ_DYING;
if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
workq_thread_wakeup(uth);
}
return;
}
// 設(shè)置定時(shí)器,稍后再檢查
workq_death_call_schedule(wq,
uth->uu_save.uus_workq_park_data.idle_stamp + delay);
}
4.3 Apple內(nèi)核workqueue vs pthread pool對(duì)比
Apple內(nèi)核workqueue路徑:
// 內(nèi)核線程池線程直接運(yùn)行,無復(fù)雜初始化 (src/queue.c)
static void _dispatch_worker_thread2(pthread_priority_t pp) {
// 內(nèi)核已設(shè)置優(yōu)先級(jí),直接獲取對(duì)應(yīng)隊(duì)列
dispatch_queue_global_t dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
// 簡(jiǎn)單遞減pending計(jì)數(shù)
int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
// 直接開始工作,無需休眠/喚醒循環(huán)
_dispatch_root_queue_drain(dq, dq->dq_priority,
DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
// 工作完成后直接退出,由內(nèi)核回收到線程池
// 函數(shù)結(jié)束,線程返回內(nèi)核休眠狀態(tài)等待下次復(fù)用
}
傳統(tǒng)pthread pool路徑:
// 復(fù)雜的pthread生命周期管理 (src/queue.c)
static void *_dispatch_worker_thread(void *context) {
dispatch_queue_global_t dq = context;
// 關(guān)鍵:休眠/喚醒循環(huán),使用信號(hào)量同步
uint64_t timeout = 5 * NSEC_PER_SEC; // 5秒超時(shí)
do {
// 執(zhí)行工作
_dispatch_root_queue_drain(dq, pri, DISPATCH_INVOKE_REDIRECTING_DRAIN);
// 等待新工作或超時(shí)(線程休眠)
} while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
dispatch_time(0, timeout)) == 0);
// 線程退出時(shí)的復(fù)雜清理
_dispatch_release(dq); // 釋放線程創(chuàng)建時(shí)的引用
return NULL; // pthread正常退出,線程被銷毀
}
核心區(qū)別:
- 內(nèi)核workqueue:線程復(fù)用,在內(nèi)核中休眠,無超時(shí)退出
- pthread pool:線程超時(shí)銷毀,用戶空間休眠/喚醒,需要信號(hào)量同步
5. 系統(tǒng)調(diào)用映射表
| 操作類型 | libdispatch調(diào)用 | libpthread系統(tǒng)調(diào)用 | 內(nèi)核處理函數(shù) |
|---|---|---|---|
| 請(qǐng)求線程 | _dispatch_root_queue_poke() |
_pthread_workqueue_addthreads() |
workq_reqthreads() |
| 線程回池 | 工作函數(shù)返回 | __workq_kernreturn(WQOPS_THREAD_RETURN) |
workq_park_and_unlock() |
| kevent回池 | kevent處理完成 | __workq_kernreturn(WQOPS_THREAD_KEVENT_RETURN) |
workq_handle_stack_events() |
| 初始化 | _dispatch_root_queues_init() |
pthread_workqueue_setup() |
workq_setup_dispatch() |
6. 任務(wù)提交、執(zhí)行與線程池管理的協(xié)同機(jī)制
6.1 任務(wù)生命周期與線程池狀態(tài)變化
Apple平臺(tái)的任務(wù)執(zhí)行本質(zhì)上是一個(gè)任務(wù)隊(duì)列驅(qū)動(dòng)的線程池調(diào)度系統(tǒng)。每個(gè)任務(wù)的提交和執(zhí)行都會(huì)引發(fā)線程池狀態(tài)的精確變化:
stateDiagram-v2
[*] --> TaskSubmitted: dispatch_async(queue, block)
state "隊(duì)列狀態(tài)檢查" as QueueCheck {
TaskSubmitted --> QueueEmpty: 隊(duì)列為空
TaskSubmitted --> QueueBusy: 隊(duì)列有任務(wù)
}
QueueEmpty --> RequestThread: _dispatch_root_queue_poke()
QueueBusy --> QueueTask: 直接入隊(duì)等待
state "線程池分配" as ThreadPool {
RequestThread --> IdleThread: wq_thidlecount > 0
RequestThread --> CreateThread: wq_thidlecount == 0
IdleThread --> WakeupThread: workq_pop_idle_thread()
CreateThread --> NewThread: workq_add_new_idle_thread()
}
WakeupThread --> ExecuteTask: workq_unpark_continue()
NewThread --> ExecuteTask: thread_create_workq_waiting()
QueueTask --> ExecuteTask: 線程可用時(shí)調(diào)度
state "任務(wù)執(zhí)行" as TaskExecution {
ExecuteTask --> UserSpace: workq_setup_and_run()
UserSpace --> RunBlock: _dispatch_worker_thread2()
RunBlock --> BlockComplete: 用戶代碼執(zhí)行
}
BlockComplete --> CheckMoreTasks: 檢查隊(duì)列是否還有任務(wù)
CheckMoreTasks --> RunBlock: 隊(duì)列非空,繼續(xù)執(zhí)行
CheckMoreTasks --> ReturnToPool: 隊(duì)列為空,線程歸還
ReturnToPool --> ThreadIdle: workq_park_and_unlock()
ThreadIdle --> [*]: thread_block(休眠等待)
6.2 GCD隊(duì)列層次與線程池映射關(guān)系
每種GCD隊(duì)列類型都對(duì)應(yīng)特定的線程池管理策略:
// GCD全局隊(duì)列到workqueue線程池的映射 (src/queue.c)
static const struct dispatch_queue_global_s _dispatch_root_queues[] = {
// 高優(yōu)先級(jí)隊(duì)列 -> 內(nèi)核QoS_CLASS_USER_INTERACTIVE線程池
[DISPATCH_ROOT_QUEUE_IDX_HIGH_QOS] = {
.dq_priority = DISPATCH_PRIORITY_HIGH | DISPATCH_PRIORITY_REQUESTED,
// 映射到內(nèi)核wq_thscheduled_count[QOS_CLASS_USER_INTERACTIVE]
},
// 默認(rèn)優(yōu)先級(jí)隊(duì)列 -> 內(nèi)核QoS_CLASS_DEFAULT線程池
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
.dq_priority = DISPATCH_PRIORITY_DEFAULT | DISPATCH_PRIORITY_REQUESTED,
// 映射到內(nèi)核wq_thscheduled_count[QOS_CLASS_DEFAULT]
},
// 后臺(tái)隊(duì)列 -> 內(nèi)核QoS_CLASS_BACKGROUND線程池
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
.dq_priority = DISPATCH_PRIORITY_BACKGROUND | DISPATCH_PRIORITY_REQUESTED,
// 映射到內(nèi)核wq_thscheduled_count[QOS_CLASS_BACKGROUND]
}
};
6.3 任務(wù)調(diào)度的三層決策機(jī)制
Apple的任務(wù)調(diào)度系統(tǒng)在三個(gè)層次做出調(diào)度決策,每層都與線程池狀態(tài)緊密耦合:
Layer 1: libdispatch隊(duì)列調(diào)度
// 隊(duì)列層面的調(diào)度決策 (src/queue.c)
static void _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
// 1. 檢查隊(duì)列pending任務(wù)數(shù)
int32_t remaining = n;
int32_t pending = os_atomic_load2o(dq, dgq_pending, relaxed);
// 2. 計(jì)算實(shí)際需要的線程數(shù)
if (pending < floor) {
remaining = floor - pending;
}
// 3. 關(guān)鍵決策點(diǎn):是否需要向內(nèi)核請(qǐng)求更多線程
if (remaining > 0) {
// 直接請(qǐng)求內(nèi)核分配線程池資源
int r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp(dq->dq_priority));
// 如果內(nèi)核無法提供足夠線程,libdispatch會(huì)調(diào)整策略
if (r == EAGAIN) {
// 線程池達(dá)到系統(tǒng)限制,任務(wù)繼續(xù)排隊(duì)等待
return;
}
}
}
Layer 2: 內(nèi)核workqueue資源分配
// 內(nèi)核層面的線程池資源決策 (bsd/pthread/pthread_workqueue.c)
static uint32_t
workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
struct uthread *uth, bool may_start_timer, bool record_failed_allowance)
{
// 1. 全局并發(fā)限制檢查
uint32_t max_count = wq->wq_constrained_threads_scheduled;
if (max_count >= wq_max_constrained_threads) {
return 0; // 拒絕分配,任務(wù)必須等待線程回池
}
// 2. QoS bucket級(jí)別的負(fù)載均衡
thread_qos_t highest_pending_qos = _workq_highest_pending_qos(wq);
if (at_qos < highest_pending_qos) {
// 優(yōu)先滿足更高優(yōu)先級(jí)的任務(wù)
return 0;
}
// 3. 計(jì)算該QoS可用的線程池容量
uint32_t active_count = 0;
for (thread_qos_t qos = at_qos; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
active_count += wq->wq_thscheduled_count[_wq_bucket(qos)];
}
return wq_max_parallelism[_wq_bucket(at_qos)] - active_count;
}
Layer 3: 線程調(diào)度器優(yōu)先級(jí)映射
// 線程調(diào)度器層面的優(yōu)先級(jí)決策 (bsd/pthread/pthread_workqueue.c)
static void
workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
workq_threadreq_t req, bool unpark)
{
thread_t th = get_machthread(uth);
thread_qos_t qos = req->tr_qos;
// 1. 設(shè)置線程的QoS優(yōu)先級(jí),直接影響內(nèi)核調(diào)度
thread_set_workq_pri(th, qos, 0);
// 2. 更新workqueue的QoS bucket計(jì)數(shù)
wq->wq_thscheduled_count[_wq_bucket(qos)]++;
// 3. 設(shè)置線程的CPU時(shí)間片和調(diào)度參數(shù)
if (qos >= THREAD_QOS_USER_INTERACTIVE) {
// 交互式任務(wù)獲得更高的調(diào)度優(yōu)先級(jí)和更大的時(shí)間片
thread_set_base_priority(th, BASEPRI_USER_INITIATED);
}
if (unpark) {
// 4. 立即將線程標(biāo)記為可調(diào)度
thread_unstop(th);
}
}
6.4 負(fù)載感知的動(dòng)態(tài)線程池調(diào)整
內(nèi)核workqueue子系統(tǒng)實(shí)時(shí)監(jiān)控任務(wù)負(fù)載,動(dòng)態(tài)調(diào)整線程池大?。?/p>
// 基于負(fù)載的線程池自適應(yīng)調(diào)整 (bsd/pthread/pthread_workqueue.c)
static void
workq_schedule_creator(proc_t p, struct workqueue *wq, uint32_t flags)
{
struct uthread *uth = wq->wq_creator;
// 1. 負(fù)載評(píng)估:檢查pending任務(wù)vs可用線程
uint32_t pending_requests = workq_pending_request_count(wq);
uint32_t available_threads = wq->wq_thidlecount;
// 2. 動(dòng)態(tài)決策:是否需要擴(kuò)展線程池
if (pending_requests > available_threads &&
wq->wq_nthreads < wq_max_threads) {
// 啟動(dòng)creator線程擴(kuò)展池大小
if (uth == NULL) {
// 創(chuàng)建專門的creator線程來管理線程池?cái)U(kuò)展
(void)workq_add_new_idle_thread(p, wq, workq_creator_continue,
/*bound*/ true, NULL);
} else if (uth->uu_workq_flags & UT_WORKQ_IDLE) {
// 喚醒已有的creator線程
workq_thread_wakeup(uth);
}
}
// 3. 收縮決策:檢查是否有過多空閑線程
if (wq->wq_thidlecount > WQ_IDLE_THREAD_LIMIT) {
workq_death_call_schedule(wq, mach_absolute_time() + WQ_DEATH_CALL_DELAY);
}
}
6.5 任務(wù)執(zhí)行完成后的線程池狀態(tài)更新
每個(gè)任務(wù)執(zhí)行完成都會(huì)觸發(fā)線程池狀態(tài)的精確更新:
// 任務(wù)完成后的線程池狀態(tài)同步 (bsd/pthread/pthread_workqueue.c)
static void
workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
uint32_t setup_flags)
{
thread_qos_t qos = workq_pri_for_thread(uth);
// 1. 更新QoS bucket的活躍線程計(jì)數(shù)
assert(wq->wq_thscheduled_count[_wq_bucket(qos)] > 0);
wq->wq_thscheduled_count[_wq_bucket(qos)]--;
// 2. 更新全局統(tǒng)計(jì)
_wq_thactive_dec(wq, qos);
wq->wq_fulfilled++; // 完成任務(wù)計(jì)數(shù)器
// 3. 線程狀態(tài)轉(zhuǎn)換:運(yùn)行中 -> 空閑池
TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
uth->uu_workq_flags &= ~UT_WORKQ_RUNNING;
uth->uu_workq_flags |= UT_WORKQ_IDLE;
// 4. 更新空閑線程計(jì)數(shù),為下次任務(wù)分配做準(zhǔn)備
wq->wq_thidlecount++;
// 5. 記錄線程進(jìn)入空閑狀態(tài)的時(shí)間戳(用于后續(xù)回收決策)
uth->uu_save.uus_workq_park_data.idle_stamp = mach_absolute_time();
// 6. 檢查是否有等待中的任務(wù)請(qǐng)求可以立即滿足
if (workq_has_pending_requests(wq)) {
workq_schedule_immediate_thread_creation(p, wq);
}
}
6.6 完整的任務(wù)-線程池協(xié)同時(shí)序
sequenceDiagram
participant App as 應(yīng)用程序
participant Queue as GCD隊(duì)列
participant Pool as 線程池管理器
participant Thread as 工作線程
participant Kernel as 內(nèi)核調(diào)度器
Note over App,Kernel: 任務(wù)提交階段
App->>Queue: dispatch_async(queue, block)
Queue->>Queue: 檢查dgq_pending計(jì)數(shù)
Queue->>Pool: _dispatch_root_queue_poke_slow(n=1)
Note over App,Kernel: 線程池資源分配
Pool->>Pool: 檢查wq_thidlecount
alt 有空閑線程
Pool->>Thread: workq_pop_idle_thread()
Note right of Thread: wq_thidlecount--
Pool->>Thread: workq_thread_reset_pri(qos)
Thread->>Kernel: thread_set_workq_pri()
Kernel->>Thread: 更新調(diào)度優(yōu)先級(jí)
else 無空閑線程
Pool->>Pool: workq_constrained_allowance()
Pool->>Thread: workq_add_new_idle_thread()
Note right of Thread: wq_nthreads++
Thread->>Kernel: thread_create_workq_waiting()
end
Note over App,Kernel: 任務(wù)執(zhí)行階段
Thread->>Thread: workq_unpark_continue()
Thread->>Queue: workq_setup_and_run()
Queue->>Thread: _dispatch_worker_thread2()
Thread->>App: 執(zhí)行用戶block
App->>Thread: block執(zhí)行完成
Note over App,Kernel: 線程池狀態(tài)更新
Thread->>Pool: workq_park_and_unlock()
Pool->>Pool: workq_push_idle_thread()
Note right of Pool: wq_thscheduled_count[qos]--, wq_thidlecount++
Pool->>Pool: 檢查pending requests
alt 有等待任務(wù)
Pool->>Thread: 立即重新分配
else 無等待任務(wù)
Thread->>Thread: thread_block(休眠)
Note right of Thread: 線程在池中等待復(fù)用
end
6.7 關(guān)鍵性能優(yōu)化點(diǎn)
Apple的任務(wù)-線程池協(xié)同機(jī)制包含多個(gè)性能優(yōu)化點(diǎn):
- 預(yù)測(cè)性線程創(chuàng)建:在任務(wù)提交高峰期,提前創(chuàng)建線程避免延遲
- QoS感知調(diào)度:高優(yōu)先級(jí)任務(wù)優(yōu)先獲得線程池資源
- 負(fù)載均衡:在不同QoS bucket間動(dòng)態(tài)平衡線程分配
- 延遲回收:空閑線程延遲銷毀,提高復(fù)用率
- 批量處理:?jiǎn)蝹€(gè)線程連續(xù)處理多個(gè)任務(wù),減少上下文切換
這種深度集成的設(shè)計(jì)使得Apple平臺(tái)能夠在任務(wù)密集型應(yīng)用中保持高效的資源利用和響應(yīng)性能。
總結(jié):內(nèi)核級(jí)線程池的技術(shù)優(yōu)勢(shì)
Apple的workqueue機(jī)制通過將線程池管理完全下沉到內(nèi)核層,實(shí)現(xiàn)了質(zhì)的飛躍:
核心創(chuàng)新點(diǎn)
- 內(nèi)核感知:調(diào)度器直接了解每個(gè)線程池中線程的真實(shí)狀態(tài)
- 零拷貝切換:線程在內(nèi)核中直接從工作狀態(tài)切換到池中休眠狀態(tài)
-
智能復(fù)用:優(yōu)先從池中分配(
workq_pop_idle_thread),最大化線程復(fù)用效率 - 動(dòng)態(tài)調(diào)節(jié):根據(jù)系統(tǒng)負(fù)載自動(dòng)調(diào)整線程池大小
關(guān)鍵源碼驗(yàn)證點(diǎn)
-
線程池復(fù)用:
workq_pop_idle_thread()優(yōu)先從wq_thidlelist獲取空閑線程 -
系統(tǒng)調(diào)用優(yōu)化:
__workq_kernreturn()直接與內(nèi)核交互,無中間層 -
continuation機(jī)制:
thread_create_workq_waiting()創(chuàng)建時(shí)就設(shè)置continuation -
智能回收:
workq_park_and_unlock()將線程推回池中而非銷毀
這種設(shè)計(jì)讓Apple平臺(tái)上的多線程應(yīng)用能夠以更低的開銷、更高的效率運(yùn)行,關(guān)鍵在于理解:這不是簡(jiǎn)單的線程創(chuàng)建機(jī)制,而是一個(gè)高度優(yōu)化的內(nèi)核級(jí)線程池系統(tǒng)。