Nuttx相關(guān)的歷史文章:
介紹
Nuttx提供工作隊(duì)列機(jī)制。工作隊(duì)列是一個(gè)存放線程的隊(duì)列,它對(duì)于將任務(wù)負(fù)載減荷到不同的線程上下文中,以便于延遲執(zhí)行,或者串行執(zhí)行很有幫助。
工作隊(duì)列分類
有三種不同類型的工作隊(duì)列,每一類都有不同的屬性和用途。
- 高優(yōu)先級(jí)內(nèi)核工作隊(duì)列
高優(yōu)先級(jí)內(nèi)核工作隊(duì)列
專用的高優(yōu)先級(jí)工作隊(duì)列用于中斷處理函數(shù)中的延遲處理,在有些驅(qū)動(dòng)中可能需要這樣一個(gè)工作隊(duì)列,如果沒有必要的話,也可以安全的禁掉。高優(yōu)先級(jí)的線程也可以充當(dāng)資源回收器--從中斷處理函數(shù)中完成內(nèi)存的延遲釋放。如果高優(yōu)先級(jí)工作線程被disable了的話,清理工作有兩種方式來完成:1)如果使能了低優(yōu)先級(jí)的工作線程,在該線程中完成;2)如果低優(yōu)先級(jí)線程沒有使能,則IDLE線程來完成(如果內(nèi)存回收優(yōu)先級(jí)比較高,可能不太合適)。設(shè)備驅(qū)動(dòng)底半部
高優(yōu)先級(jí)工作線程可以用于設(shè)備驅(qū)動(dòng)程序的底半部,因此它必須運(yùn)行在一個(gè)非常高,并且固定的優(yōu)先級(jí),與中斷處理程序本身的優(yōu)先級(jí)競(jìng)爭(zhēng)。通常,高優(yōu)先級(jí)工作隊(duì)列應(yīng)該是系統(tǒng)中最高優(yōu)先級(jí)的線程。默認(rèn)的優(yōu)先級(jí)為224。線程池
工作隊(duì)列可以被配置成支持多個(gè)低優(yōu)先級(jí)線程,這本質(zhì)上是一個(gè)線程池,為隊(duì)列工作提供多線程服務(wù),這打破了“隊(duì)列”的嚴(yán)格序列化(因此,工作隊(duì)列也不再是一種隊(duì)列)。
當(dāng)在I/O操作,暫停等待輸入時(shí),多個(gè)工作線程是需要的,如果只有一個(gè)工作線程的話,那么整個(gè)工作隊(duì)列處理就會(huì)停止。這對(duì)于異步I/O、AIO是必要的。與低優(yōu)先級(jí)內(nèi)核工作隊(duì)列比較
對(duì)于不太關(guān)鍵、較低優(yōu)先級(jí)、面向應(yīng)用程序的工作線程支持,考慮使用較低優(yōu)先級(jí)的工作隊(duì)列。較低優(yōu)先級(jí)的工作隊(duì)列以較低的優(yōu)先級(jí)運(yùn)行,但是它有一個(gè)額外的優(yōu)點(diǎn),那就是支持優(yōu)先級(jí)繼承(如果CONFIG_PRIORITY_INHERITANCE=y選中的話):低優(yōu)先級(jí)的工作線程可以被調(diào)整優(yōu)先級(jí)。配置選項(xiàng)
CONFIG_SCHED_HPWORK:使能高優(yōu)先級(jí)工作隊(duì)列
CONFIG_SCHED_HPNTHREADS:高優(yōu)先級(jí)工作隊(duì)列線程池中的線程數(shù)量,默認(rèn)是1.
CONFIG_SCHED_HPWORKPRIORITY:高優(yōu)先級(jí)工作線程的執(zhí)行優(yōu)先級(jí),默認(rèn)是224.
CONFIG_SCHED_HPWORKSTACKSIZE:工作線程的棧空間大小,默認(rèn)是2048字節(jié)通用配置選項(xiàng)
這個(gè)選項(xiàng)通用于所有的工作隊(duì)列:
CONFIG_SIG_SIGWORK:用于喚醒工作線程的信號(hào)值,默認(rèn)使用17.
- 低優(yōu)先級(jí)內(nèi)核工作隊(duì)列
低優(yōu)先級(jí)內(nèi)核工作隊(duì)列
低優(yōu)先級(jí)工作隊(duì)列更適合于具備擴(kuò)展性的,面向應(yīng)用程序處理的場(chǎng)景,比如文件系統(tǒng)清理、內(nèi)存垃圾回收、異步I/O操作等。與高優(yōu)先內(nèi)核工作隊(duì)列比較
低優(yōu)先級(jí)內(nèi)核工作隊(duì)列,由于優(yōu)先級(jí)會(huì)低一些,因此不適合用作驅(qū)動(dòng)程序的底半部。除此之外,它與高優(yōu)先級(jí)內(nèi)核工作隊(duì)列非常相似,上文中關(guān)于高優(yōu)先級(jí)工作隊(duì)列的大部分討論同樣適用。但是低優(yōu)先級(jí)內(nèi)核工作隊(duì)列,有一個(gè)重要的特點(diǎn)就是優(yōu)先級(jí)繼承,這個(gè)讓它更適合于某些任務(wù)。優(yōu)先級(jí)繼承
低優(yōu)先級(jí)內(nèi)核工作線程支持優(yōu)先級(jí)繼承(需要選擇CONFIG_PRIORITY_INHERITANCE=y),可以根據(jù)實(shí)際情況調(diào)整優(yōu)先級(jí)。優(yōu)先級(jí)繼承不是自動(dòng)完成的,低優(yōu)先級(jí)工作線程總是運(yùn)行在一個(gè)固定的優(yōu)先級(jí)上??梢酝ㄟ^調(diào)用lpwork_bootstpriority()接口來提升優(yōu)先級(jí)(通常在調(diào)度這個(gè)任務(wù)之前調(diào)用),在任務(wù)完成之后可以通過lpwork_restorepriority()接口來恢復(fù)優(yōu)先級(jí)(一般在任務(wù)完成時(shí)的work handler中調(diào)用)。目前,只有Nuttx異步I/O邏輯使用了這個(gè)動(dòng)態(tài)優(yōu)先級(jí)特性。配置選項(xiàng)
CONFIG_SCHED_LPWORK:使能低優(yōu)先級(jí)工作隊(duì)列
CONFIG_SCHED_LPNTHREADS:低優(yōu)先級(jí)工作隊(duì)列中線程數(shù)量,默認(rèn)值為1
CONFIG_SCHED_LPWORKPRIORITY:低優(yōu)先級(jí)工作線程中最小的執(zhí)行優(yōu)先級(jí),隊(duì)列中每個(gè)線程都以這個(gè)優(yōu)先級(jí)的值開始運(yùn)行。如果優(yōu)先級(jí)繼承使能了的話,優(yōu)先級(jí)會(huì)在這個(gè)基礎(chǔ)上往上提升,默認(rèn)50.
CONFIG_SCHED_LPWORKPRIOMAX:低優(yōu)先級(jí)線程中最大的執(zhí)行優(yōu)先級(jí)。運(yùn)行的優(yōu)先級(jí)不能超過這個(gè)值,默認(rèn)176.
CONFIG_SCHED_LPWORKSTACKSIZE:低優(yōu)先級(jí)工作線程的棧大小,默認(rèn)2048Byte。
- 用戶模式工作隊(duì)列
工作隊(duì)列訪問權(quán)限
低優(yōu)先級(jí)和高優(yōu)先級(jí)工作線程,都是內(nèi)核線程。在Nuttxflat build模式下編譯時(shí),應(yīng)用程序是可以訪問和使用的。但是,在Nuttxprotected/kernel build模式下編譯時(shí),內(nèi)核模式下的代碼是獨(dú)立的,用戶模式是沒法訪問的。工作模式工作隊(duì)列
用戶模式工作隊(duì)列接口與內(nèi)核模式工作隊(duì)列接口相同,用戶模式工作隊(duì)列的功能等效于高優(yōu)先級(jí)工作隊(duì)列,不同之處在于,它的實(shí)現(xiàn)不依賴于內(nèi)核內(nèi)部提供的資源。配置選項(xiàng)
CONFIG_LIB_USRWORK:使能用戶模式工作隊(duì)列
CONFIG_LIB_USRWORKPRIORITY:用戶模式下工作線程的執(zhí)行優(yōu)先級(jí),默認(rèn)為100.
CONFIG_LIB_USRWORKSTACKSIZE:用戶模式下工作線程的棧大小,默認(rèn)2048.
數(shù)據(jù)結(jié)構(gòu)及接口
數(shù)據(jù)結(jié)構(gòu)
數(shù)據(jù)結(jié)構(gòu)分為兩部分,一部分是用戶使用的結(jié)構(gòu),另一部分是內(nèi)核實(shí)現(xiàn)用到的結(jié)構(gòu):
- 用戶數(shù)據(jù)結(jié)構(gòu)
/* Defines the work callback */
typedef void (*worker_t)(FAR void *arg);
/* Defines one entry in the work queue. The user only needs this structure
* in order to declare instances of the work structure. Handling of all
* fields is performed by the work APIs
*/
struct work_s
{
struct dq_entry_s dq; /* Implements a doubly linked list */
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
systime_t qtime; /* Time work queued */
systime_t delay; /* Delay until work performed */
};
struct work_s結(jié)構(gòu)只需要用來聲明實(shí)例即可,該數(shù)據(jù)結(jié)構(gòu)中的內(nèi)部成員,全部由相應(yīng)的API接口來操作,其中qtime表示的是該任務(wù)入隊(duì)的時(shí)間,而delay表示的是需要延遲多長(zhǎng)時(shí)間去執(zhí)行,如果delay值為0,表明立刻執(zhí)行。
- 內(nèi)核實(shí)現(xiàn)數(shù)據(jù)結(jié)構(gòu)
/* This represents one worker */
struct kworker_s
{
pid_t pid; /* The task ID of the worker thread */
volatile bool busy; /* True: Worker is not available */
};
/* This structure defines the state of one kernel-mode work queue */
struct kwork_wqueue_s
{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
struct kworker_s worker[1]; /* Describes a worker thread */
};
/* This structure defines the state of one high-priority work queue. This
* structure must be cast-compatible with kwork_wqueue_s.
*/
#ifdef CONFIG_SCHED_HPWORK
struct hp_wqueue_s
{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
struct kworker_s worker[1]; /* Describes the single high priority worker */
};
#endif
/* This structure defines the state of one high-priority work queue. This
* structure must be cast compatible with kwork_wqueue_s
*/
#ifdef CONFIG_SCHED_LPWORK
struct lp_wqueue_s
{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
/* Describes each thread in the low priority queue's thread pool */
struct kworker_s worker[CONFIG_SCHED_LPNTHREADS];
};
#endif
/****************************************************************************
* Public Data
****************************************************************************/
#ifdef CONFIG_SCHED_HPWORK
/* The state of the kernel mode, high priority work queue. */
extern struct hp_wqueue_s g_hpwork;
#endif
#ifdef CONFIG_SCHED_LPWORK
/* The state of the kernel mode, low priority work queue(s). */
extern struct lp_wqueue_s g_lpwork;
#endif
上述結(jié)構(gòu)體中:
struct kworker_s:對(duì)應(yīng)一個(gè)工作線程,其中包含了線程ID號(hào)及運(yùn)行狀態(tài)。
struct kwork_wqueue_s:描述內(nèi)核模式下的工作隊(duì)列,在接口中都使用這個(gè)數(shù)據(jù)結(jié)構(gòu),實(shí)際上是將struct hp_wqueue_s/struct lp_wqueue_s數(shù)據(jù)結(jié)構(gòu)進(jìn)行強(qiáng)制類型轉(zhuǎn)換。
struct hp_wqueue_s:描述高優(yōu)先級(jí)內(nèi)核工作隊(duì)列,從數(shù)據(jù)結(jié)構(gòu)中可以看出,該隊(duì)列中默認(rèn)只支持1個(gè)工作線程。
struct lp_wqueue_s:描述低優(yōu)先級(jí)內(nèi)核工作隊(duì)列,從數(shù)據(jù)結(jié)構(gòu)中可以看出,該隊(duì)列中的工作線程是可以配置的,CONFIG_SCHED_LPNTHREADS的值就代表線程數(shù)量。
g_hpwork/g_lpwork:分別為兩個(gè)全局描述符,對(duì)應(yīng)到兩種類型的內(nèi)核工作隊(duì)列。
接口定義
-
int work_usrstart(void):?jiǎn)?dòng)用戶模式下的工作隊(duì)列。 -
int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay):將任務(wù)添加到工作隊(duì)列中,任務(wù)將會(huì)在工作隊(duì)列中的線程上延遲運(yùn)行。 -
int work_cancel(int qid, FAR struct work_s *work):將之前入列的任務(wù)刪除掉。 -
int work_signal(int qid):通過工作隊(duì)列中的線程去執(zhí)行任務(wù)處理。 -
work_available(work):檢查任務(wù)的結(jié)構(gòu)體是否可用。 -
void lpwork_boostpriority(uint8_t reqprio):提升線程執(zhí)行的優(yōu)先級(jí)。 -
void lpwork_restorepriority(uint8_t reqprio):恢復(fù)線程執(zhí)行的優(yōu)先級(jí)。
代碼說明一切:
/****************************************************************************
* Name: work_usrstart
*
* Description:
* Start the user mode work queue.
*
* Input parameters:
* None
*
* Returned Value:
* The task ID of the worker thread is returned on success. A negated
* errno value is returned on failure.
*
****************************************************************************/
#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
int work_usrstart(void);
#endif
/****************************************************************************
* Name: work_queue
*
* Description:
* Queue work to be performed at a later time. All queued work will be
* performed on the worker thread of execution (not the caller's).
*
* The work structure is allocated by caller, but completely managed by
* the work queue logic. The caller should never modify the contents of
* the work queue structure; the caller should not call work_queue()
* again until either (1) the previous work has been performed and removed
* from the queue, or (2) work_cancel() has been called to cancel the work
* and remove it from the work queue.
*
* Input parameters:
* qid - The work queue ID
* work - The work structure to queue
* worker - The worker callback to be invoked. The callback will invoked
* on the worker thread of execution.
* arg - The argument that will be passed to the worker callback when
* it is invoked.
* delay - Delay (in clock ticks) from the time queue until the worker
* is invoked. Zero means to perform the work immediately.
*
* Returned Value:
* Zero on success, a negated errno on failure
*
****************************************************************************/
int work_queue(int qid, FAR struct work_s *work, worker_t worker,
FAR void *arg, systime_t delay);
/****************************************************************************
* Name: work_cancel
*
* Description:
* Cancel previously queued work. This removes work from the work queue.
* After work has been cancelled, it may be re-queue by calling work_queue()
* again.
*
* Input parameters:
* qid - The work queue ID
* work - The previously queue work structure to cancel
*
* Returned Value:
* Zero on success, a negated errno on failure
*
* -ENOENT - There is no such work queued.
* -EINVAL - An invalid work queue was specified
*
****************************************************************************/
int work_cancel(int qid, FAR struct work_s *work);
/****************************************************************************
* Name: work_signal
*
* Description:
* Signal the worker thread to process the work queue now. This function
* is used internally by the work logic but could also be used by the
* user to force an immediate re-assessment of pending work.
*
* Input parameters:
* qid - The work queue ID
*
* Returned Value:
* Zero on success, a negated errno on failure
*
****************************************************************************/
int work_signal(int qid);
/****************************************************************************
* Name: work_available
*
* Description:
* Check if the work structure is available.
*
* Input parameters:
* work - The work queue structure to check.
* None
*
* Returned Value:
* true if available; false if busy (i.e., there is still pending work).
*
****************************************************************************/
#define work_available(work) ((work)->worker == NULL)
/****************************************************************************
* Name: lpwork_boostpriority
*
* Description:
* Called by the work queue client to assure that the priority of the low-
* priority worker thread is at least at the requested level, reqprio. This
* function would normally be called just before calling work_queue().
*
* Parameters:
* reqprio - Requested minimum worker thread priority
*
* Return Value:
* None
*
****************************************************************************/
#if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE)
void lpwork_boostpriority(uint8_t reqprio);
#endif
/****************************************************************************
* Name: lpwork_restorepriority
*
* Description:
* This function is called to restore the priority after it was previously
* boosted. This is often done by client logic on the worker thread when
* the scheduled work completes. It will check if we need to drop the
* priority of the worker thread.
*
* Parameters:
* reqprio - Previously requested minimum worker thread priority to be
* "unboosted"
*
* Return Value:
* None
*
****************************************************************************/
#if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE)
void lpwork_restorepriority(uint8_t reqprio);
#endif
原理
按慣例,先來一張圖吧:

簡(jiǎn)單來說,工作隊(duì)列就如上圖所示,由三個(gè)部分組成:
- 任務(wù)隊(duì)列:用于存放需要延遲執(zhí)行的任務(wù),這個(gè)也就是通過
work_queue()接口添加任務(wù)的任務(wù)隊(duì)列。 - 工作線程:在高優(yōu)先級(jí)內(nèi)核工作隊(duì)列中,默認(rèn)只有一個(gè)線程;在低優(yōu)先級(jí)內(nèi)核工作隊(duì)列中支持多個(gè)工作線程。任務(wù)隊(duì)列中的任務(wù)就分發(fā)到這些線程上來執(zhí)行。
- 延時(shí)參數(shù)
delay:這個(gè)參數(shù)定義了輪詢時(shí)的間隔時(shí)間,進(jìn)而判斷任務(wù)隊(duì)列中的任務(wù)是否已經(jīng)到需要執(zhí)行的時(shí)間點(diǎn)了。
Nuttx操作系統(tǒng)執(zhí)行的入口在os_start(),從這開始,最終會(huì)調(diào)用到工作隊(duì)列線程的創(chuàng)建,調(diào)用關(guān)系如下:
os_start() ---> os_bringup() ---> os_workqueue() ---> work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()
其中work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()分別對(duì)應(yīng)內(nèi)核高優(yōu)先級(jí)工作隊(duì)列、內(nèi)核低優(yōu)先級(jí)工作隊(duì)列、用戶模式工作隊(duì)列三種情況,由于原理類似,我將選擇內(nèi)核高優(yōu)先級(jí)工作隊(duì)列來進(jìn)行分析。入口為:work_hpstart()。
work_hpstart()主要完成以下幾點(diǎn):
- 初始化高優(yōu)先級(jí)工作隊(duì)列數(shù)據(jù)結(jié)構(gòu);
- 在該工作隊(duì)列中,創(chuàng)建一個(gè)高優(yōu)先級(jí)的工作線程
work_hpthread,默認(rèn)只支持一個(gè);
int work_hpstart(void)
{
pid_t pid;
/* Initialize work queue data structures */
g_hpwork.delay = CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK;
dq_init(&g_hpwork.q);
/* Start the high-priority, kernel mode worker thread */
sinfo("Starting high-priority kernel worker thread\n");
pid = kernel_thread(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY,
CONFIG_SCHED_HPWORKSTACKSIZE,
(main_t)work_hpthread,
(FAR char * const *)NULL);
DEBUGASSERT(pid > 0);
if (pid < 0)
{
int errcode = errno;
DEBUGASSERT(errcode > 0);
serr("ERROR: kernel_thread failed: %d\n", errcode);
return -errcode;
}
g_hpwork.worker[0].pid = pid;
g_hpwork.worker[0].busy = true;
return pid;
實(shí)際的工作由work_hpthread線程來處理,在該函數(shù)中運(yùn)行一個(gè)死循環(huán),在循環(huán)中調(diào)用work_process()來處理實(shí)際的任務(wù)。
/****************************************************************************
* Name: work_hpthread
*
* Description:
* This is the worker thread that performs the actions placed on the high
* priority work queue.
*
* This, along with the lower priority worker thread(s) are the kernel
* mode work queues (also build in the flat build). One of these threads
* also performs periodic garbage collection (that would otherwise be
* performed by the idle thread if CONFIG_SCHED_WORKQUEUE is not defined).
* That will be the higher priority worker thread only if a lower priority
* worker thread is available.
*
* All kernel mode worker threads are started by the OS during normal
* bring up. This entry point is referenced by OS internally and should
* not be accessed by application logic.
*
* Input parameters:
* argc, argv (not used)
*
* Returned Value:
* Does not return
*
****************************************************************************/
static int work_hpthread(int argc, char *argv[])
{
/* Loop forever */
for (; ; )
{
#ifndef CONFIG_SCHED_LPWORK
/* First, perform garbage collection. This cleans-up memory
* de-allocations that were queued because they could not be freed in
* that execution context (for example, if the memory was freed from
* an interrupt handler).
*
* NOTE: If the work thread is disabled, this clean-up is performed by
* the IDLE thread (at a very, very low priority). If the low-priority
* work thread is enabled, then the garbage collection is done on that
* thread instead.
*/
sched_garbage_collection();
#endif
/* Then process queued work. work_process will not return until: (1)
* there is no further work in the work queue, and (2) the polling
* period provided by g_hpwork.delay expires.
*/
work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0);
}
return OK; /* To keep some compilers happy */
}
所以工作隊(duì)列的任務(wù)處理核心是work_process()接口,該接口對(duì)于內(nèi)核的高優(yōu)先級(jí)工作隊(duì)列和內(nèi)核低優(yōu)先級(jí)工作隊(duì)列是一致的。
work_process()完成的主要任務(wù)有:
- 獲取執(zhí)行時(shí)候的系統(tǒng)時(shí)間,這個(gè)時(shí)間主要用于統(tǒng)計(jì)任務(wù)進(jìn)入工作隊(duì)列后,消耗了多久,是否到了需要去執(zhí)行的時(shí)間點(diǎn)。
- 從工作隊(duì)列的頭部獲取一個(gè)任務(wù),通過比較兩個(gè)時(shí)間值:1)消耗的時(shí)間,也就是當(dāng)前的系統(tǒng)時(shí)間減去任務(wù)入列的時(shí)間;2)任務(wù)延遲執(zhí)行的時(shí)間,也就是數(shù)據(jù)結(jié)構(gòu)中描述的
delay時(shí)間。 - 如果消耗的時(shí)間大于延遲執(zhí)行的時(shí)間,那就立刻執(zhí)行任務(wù)的回調(diào)函數(shù)。
- 如果消耗的時(shí)間小于延遲執(zhí)行的時(shí)間,計(jì)算剩余時(shí)間,并最終讓任務(wù)睡眠等待一下。
5.高優(yōu)先級(jí)內(nèi)核工作隊(duì)列和低優(yōu)先內(nèi)核工作隊(duì)列的實(shí)現(xiàn)方式有一些細(xì)微的差異,主要體現(xiàn)在,高優(yōu)先級(jí)的情況下,如果還不到執(zhí)行時(shí)間,工作線程選擇睡眠讓出CPU;低優(yōu)先級(jí)的情況下,會(huì)選擇讓第一個(gè)線程輪詢(與高優(yōu)先級(jí)工作線程行為一致),而讓其他的工作線程調(diào)用sigwaitinfo()接口等待信號(hào)。
代碼如下:
void work_process(FAR struct kwork_wqueue_s *wqueue, systime_t period, int wndx)
{
volatile FAR struct work_s *work;
worker_t worker;
irqstate_t flags;
FAR void *arg;
systime_t elapsed;
systime_t remaining;
systime_t stick;
systime_t ctick;
systime_t next;
/* Then process queued work. We need to keep interrupts disabled while
* we process items in the work list.
*/
next = period;
flags = enter_critical_section();
/* Get the time that we started this polling cycle in clock ticks. */
stick = clock_systimer();
/* And check each entry in the work queue. Since we have disabled
* interrupts we know: (1) we will not be suspended unless we do
* so ourselves, and (2) there will be no changes to the work queue
*/
work = (FAR struct work_s *)wqueue->q.head;
while (work)
{
/* Is this work ready? It is ready if there is no delay or if
* the delay has elapsed. qtime is the time that the work was added
* to the work queue. It will always be greater than or equal to
* zero. Therefore a delay of zero will always execute immediately.
*/
ctick = clock_systimer();
elapsed = ctick - work->qtime;
if (elapsed >= work->delay)
{
/* Remove the ready-to-execute work from the list */
(void)dq_rem((struct dq_entry_s *)work, &wqueue->q);
/* Extract the work description from the entry (in case the work
* instance by the re-used after it has been de-queued).
*/
worker = work->worker;
/* Check for a race condition where the work may be nullified
* before it is removed from the queue.
*/
if (worker != NULL)
{
/* Extract the work argument (before re-enabling interrupts) */
arg = work->arg;
/* Mark the work as no longer being queued */
work->worker = NULL;
/* Do the work. Re-enable interrupts while the work is being
* performed... we don't have any idea how long this will take!
*/
leave_critical_section(flags);
worker(arg);
/* Now, unfortunately, since we re-enabled interrupts we don't
* know the state of the work list and we will have to start
* back at the head of the list.
*/
flags = enter_critical_section();
work = (FAR struct work_s *)wqueue->q.head;
}
else
{
/* Cancelled.. Just move to the next work in the list with
* interrupts still disabled.
*/
work = (FAR struct work_s *)work->dq.flink;
}
}
else /* elapsed < work->delay */
{
/* This one is not ready.
*
* NOTE that elapsed is relative to the the current time,
* not the time of beginning of this queue processing pass.
* So it may need an adjustment.
*/
elapsed += (ctick - stick);
if (elapsed > work->delay)
{
/* The delay has expired while we are processing */
elapsed = work->delay;
}
/* Will it be ready before the next scheduled wakeup interval? */
remaining = work->delay - elapsed;
if (remaining < next)
{
/* Yes.. Then schedule to wake up when the work is ready */
next = remaining;
}
/* Then try the next in the list. */
work = (FAR struct work_s *)work->dq.flink;
}
}
#if defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 0
/* Value of zero for period means that we should wait indefinitely until
* signalled. This option is used only for the case where there are
* multiple, low-priority worker threads. In that case, only one of
* the threads does the poll... the others simple. In all other cases
* period will be non-zero and equal to wqueue->delay.
*/
if (period == 0)
{
sigset_t set;
/* Wait indefinitely until signalled with SIGWORK */
sigemptyset(&set);
sigaddset(&set, SIGWORK);
wqueue->worker[wndx].busy = false;
DEBUGVERIFY(sigwaitinfo(&set, NULL));
wqueue->worker[wndx].busy = true;
}
else
#endif
{
/* Get the delay (in clock ticks) since we started the sampling */
elapsed = clock_systimer() - stick;
if (elapsed < period && next > 0)
{
/* How much time would we need to delay to get to the end of the
* sampling period? The amount of time we delay should be the smaller
* of the time to the end of the sampling period and the time to the
* next work expiry.
*/
remaining = period - elapsed;
next = MIN(next, remaining);
/* Wait awhile to check the work list. We will wait here until
* either the time elapses or until we are awakened by a signal.
* Interrupts will be re-enabled while we wait.
*/
wqueue->worker[wndx].busy = false;
usleep(next * USEC_PER_TICK);
wqueue->worker[wndx].busy = true;
}
}
leave_critical_section(flags);
}
總結(jié)
Nuttx中的工作隊(duì)列機(jī)制還是比較簡(jiǎn)單的:一個(gè)工作隊(duì)列,對(duì)應(yīng)到一個(gè)任務(wù)的隊(duì)列,以及一個(gè)工作線程的數(shù)組。內(nèi)核負(fù)責(zé)來調(diào)度這些工作線程,而任務(wù)隊(duì)列中的任務(wù)會(huì)分發(fā)到各個(gè)線程上執(zhí)行。三種類型的工作隊(duì)列,實(shí)現(xiàn)都是大同小異。