QNX相關(guān)歷史文章:
Multithreaded Resource Managers
這篇文章主要描述多線程來(lái)實(shí)現(xiàn)資源管理器。
1. Multithreaded resource manager example
先來(lái)看個(gè)例子:
#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
/*
* define THREAD_POOL_PARAM_T such that we can avoid a compiler
* warning when we use the dispatch_*() functions below
*/
#define THREAD_POOL_PARAM_T dispatch_context_t
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
main(int argc, char **argv)
{
/* declare variables we'll be using */
thread_pool_attr_t pool_attr;
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
thread_pool_t *tpp;
dispatch_context_t *ctp;
int id;
/* initialize dispatch interface */
if((dpp = dispatch_create()) == NULL) {
fprintf(stderr,
"%s: Unable to allocate dispatch handle.\n",
argv[0]);
return EXIT_FAILURE;
}
/* initialize resource manager attributes */
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* initialize functions for handling messages */
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
/* initialize attribute structure used by the device */
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
/* attach our device name */
id = resmgr_attach(
dpp, /* dispatch handle */
&resmgr_attr, /* resource manager attrs */
"/dev/sample", /* device name */
_FTYPE_ANY, /* open type */
0, /* flags */
&connect_funcs, /* connect routines */
&io_funcs, /* I/O routines */
&attr); /* handle */
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* initialize thread pool attributes */
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.unblock_func = dispatch_unblock;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 4;
pool_attr.increment = 1;
pool_attr.maximum = 50;
/* allocate a thread pool handle */
if((tpp = thread_pool_create(&pool_attr,
POOL_FLAG_EXIT_SELF)) == NULL) {
fprintf(stderr, "%s: Unable to initialize thread pool.\n",
argv[0]);
return EXIT_FAILURE;
}
/* start the threads, will not return */
thread_pool_start(tpp);
}
線程池屬性pool_attr控制線程池的各個(gè)方面,比如新線程啟動(dòng)或終止時(shí)調(diào)用哪些函數(shù)、工作線程的總數(shù)、最小數(shù)量等等。
2. Thread pool attributes
_thread_pool_attr結(jié)構(gòu)如下:
typedef struct _thread_pool_attr {
THREAD_POOL_HANDLE_T *handle;
THREAD_POOL_PARAM_T *(*block_func)(THREAD_POOL_PARAM_T *ctp);
void (*unblock_func)(THREAD_POOL_PARAM_T *ctp);
int (*handler_func)(THREAD_POOL_PARAM_T *ctp);
THREAD_POOL_PARAM_T *(*context_alloc)(
THREAD_POOL_HANDLE_T *handle);
void (*context_free)(THREAD_POOL_PARAM_T *ctp);
pthread_attr_t *attr;
unsigned short lo_water;
unsigned short increment;
unsigned short hi_water;
unsigned short maximum;
unsigned reserved[8];
} thread_pool_attr_t;
填充這個(gè)數(shù)據(jù)結(jié)構(gòu)中的函數(shù),可以是dispatch layer的函數(shù)(比如 dispatch_block()...),也可以是resmgr layer的函數(shù)(比如 resmgr_block()...),也可以是自己實(shí)現(xiàn)的函數(shù)。
如果不使用resmgr layer函數(shù),則必須將THREAD_POOL_PARAM_T定義為某種上下文結(jié)構(gòu),以便庫(kù)在各種函數(shù)間傳遞。默認(rèn)情況下,它被定義為resmgr_context_t,但由于這個(gè)示例使用的dispatch layer,因此需要定義成dispatch_context_t。需要在include之前定義,因?yàn)轭^文件引用了THREAD_POOL_PARAM_T。
上邊結(jié)構(gòu)告訴資源管理器如何處理多線程。在開發(fā)過程中,在設(shè)計(jì)資源管理器時(shí)應(yīng)該考慮到多個(gè)線程,在測(cè)試期間,為了方便調(diào)試,可能只有一個(gè)線程在運(yùn)行,在確保資源管理器基本功能穩(wěn)定后,則需要嘗試使用多個(gè)線程來(lái)運(yùn)行調(diào)試。
-
lo_water,阻塞線程的最小數(shù)量 -
increment,每次要?jiǎng)?chuàng)建的數(shù)量,以達(dá)到lo_water -
hi_water,阻塞線程的最大數(shù)量 -
maximum,任何時(shí)候創(chuàng)建線程的最大數(shù)量
maximum值應(yīng)該確保始終有一個(gè)處于接收阻塞狀態(tài)的線程,如果處于最大線程數(shù),那么客戶端將阻塞,直到空閑線程準(zhǔn)備好接收數(shù)據(jù)為止。為increment指定的值將減少驅(qū)動(dòng)程序需要?jiǎng)?chuàng)建線程的次數(shù)。明智的做法可能是錯(cuò)誤的創(chuàng)建更多的線程,而不是一直創(chuàng)建/銷毀它們。通過填充lo_water參數(shù),可以隨時(shí)在MsgReceive()上確定希望接收阻塞的線程數(shù)。如果接收阻塞的線程少于lo_water線程,那么increment參數(shù)指定一次應(yīng)該創(chuàng)建多少個(gè)線程,這樣至少lo_water線程的數(shù)量會(huì)再次被接收阻塞。一旦線程完成了處理,將返回到block函數(shù)。hi_water變量指定接收阻塞線程數(shù)量的上限,一旦達(dá)到這個(gè)限制,線程將自我銷毀,以確保接收阻塞的線程數(shù)量不會(huì)超過hi_water。為了防止線程數(shù)量無(wú)限制的增加,maximum參數(shù)限制了同時(shí)運(yùn)行線程的最大值。
當(dāng)資源管理器創(chuàng)建線程時(shí),可以通過thread_stack_size來(lái)指定堆棧的大小,如果想要指定堆棧的大小,優(yōu)先級(jí)等,可以填充由pthread_attr_t類型指針指向的pool_attr.attr結(jié)構(gòu)。
thread_pool_attr_t結(jié)構(gòu)中,還包含了幾個(gè)函數(shù)指針:
-
block_func(),當(dāng)需要阻塞等待某些消息時(shí)調(diào)用; -
handler_func(),當(dāng)接收到消息解除阻塞后調(diào)用,在這個(gè)函數(shù)中對(duì)消息進(jìn)行處理; -
context_alloc(),新線程創(chuàng)建時(shí)調(diào)用,新線程使用這個(gè)上下文來(lái)工作; -
context_free(),當(dāng)線程退出時(shí),釋放這個(gè)上下文; -
unblock_func(),當(dāng)關(guān)閉線程池或改變運(yùn)行線程的數(shù)量時(shí),調(diào)用這個(gè)函數(shù);
3. Thread pool functions
資源管理器庫(kù)提供了幾個(gè)線程池的函數(shù):
-
thread_pool_create(),初始化線程池上下文,返回一個(gè)線程池的handle,用于啟動(dòng)線程池; -
thread_pool_start(),啟動(dòng)線程池,這個(gè)函數(shù)可能返回,也可能不返回,取決于thread_pool_create()的傳入flags; -
thread_pool_destroy(),銷毀線程池; -
thread_pool_control(),控制線程的數(shù)量;