什么時(shí)候需要?jiǎng)?chuàng)建線程池呢?簡單的說,如果一個(gè)應(yīng)用需要頻繁的創(chuàng)建和銷毀線程,而任務(wù)執(zhí)行的時(shí)間又非常短,這樣線程創(chuàng)建和銷毀的帶來的開銷就不容忽視,這時(shí)也是線程池該出場的機(jī)會了。如果線程創(chuàng)建T1和銷毀時(shí)間T3相比任務(wù)執(zhí)行時(shí)間T2可以忽略不計(jì),則沒有必要使用線程池了。反之如果T1+T3>T2,那就很有必要使用線程池。
- 下面是Linux系統(tǒng)下用C語言創(chuàng)建的一個(gè)線程池。線程池會維護(hù)一個(gè)任務(wù)鏈表(每個(gè)CThread_worker結(jié)構(gòu)就是一個(gè)任務(wù))。
pool_init()函數(shù)預(yù)先創(chuàng)建好max_thread_num個(gè)線程,每個(gè)線程執(zhí)thread_routine ()函數(shù)。該函數(shù)中
while (pool->cur_queue_size == 0)
{
pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
}
表示如果任務(wù)鏈表中沒有任務(wù),則該線程處于阻塞等待狀態(tài)。否則從隊(duì)列中取出任務(wù)并執(zhí)行。
pool_add_worker()函數(shù)向線程池的任務(wù)鏈表中加入一個(gè)任務(wù),加入后通過調(diào)用pthread_cond_signal (&(pool->queue_ready))喚醒一個(gè)出于阻塞狀態(tài)的線程(如果有的話)。
pool_destroy ()函數(shù)用于銷毀線程池,線程池任務(wù)鏈表中的任務(wù)不會再被執(zhí)行,但是正在運(yùn)行的線程會一直把任務(wù)運(yùn)行完后再退出。
下面貼出完整代碼
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
/*
*線程池里所有運(yùn)行和等待的任務(wù)都是一個(gè)CThread_worker
*由于所有任務(wù)都在鏈表里,所以是一個(gè)鏈表結(jié)構(gòu)
*/
typedef struct worker
{
/*回調(diào)函數(shù),任務(wù)運(yùn)行時(shí)會調(diào)用此函數(shù),注意也可聲明成其它形式*/
void *(*process) (void *arg);
void *arg;/*回調(diào)函數(shù)的參數(shù)*/
struct worker *next;
} CThread_worker;
/*線程池結(jié)構(gòu)*/
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
/*鏈表結(jié)構(gòu),線程池中所有等待任務(wù)*/
CThread_worker *queue_head;
/*是否銷毀線程池*/
int shutdown;
pthread_t *threadid;
/*線程池中允許的活動線程數(shù)目*/
int max_thread_num;
/*當(dāng)前等待隊(duì)列的任務(wù)數(shù)目*/
int cur_queue_size;
} CThread_pool;
int pool_add_worker (void *(*process) (void *arg), void *arg);
void *thread_routine (void *arg);
static CThread_pool *pool = NULL;
void pool_init (int max_thread_num)
{
pool = (CThread_pool *) malloc (sizeof (CThread_pool));
pthread_mutex_init (&(pool->queue_lock), NULL);
pthread_cond_init (&(pool->queue_ready), NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->shutdown = 0;
pool->threadid =
(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
int i = 0;
for (i = 0; i < max_thread_num; i++)
{
pthread_create (&(pool->threadid[i]), NULL, thread_routine,
NULL);
}
}
/*向線程池中加入任務(wù)*/
int pool_add_worker (void *(*process) (void *arg), void *arg)
{
/*構(gòu)造一個(gè)新任務(wù)*/
CThread_worker *newworker =
(CThread_worker *) malloc (sizeof (CThread_worker));
newworker->process = process;
newworker->arg = arg;
newworker->next = NULL;/*別忘置空*/
pthread_mutex_lock (&(pool->queue_lock));
/*將任務(wù)加入到等待隊(duì)列中*/
CThread_worker *member = pool->queue_head;
if (member != NULL)
{
while (member->next != NULL)
member = member->next;
member->next = newworker;
}
else
{
pool->queue_head = newworker;
}
assert (pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock (&(pool->queue_lock));
/*好了,等待隊(duì)列中有任務(wù)了,喚醒一個(gè)等待線程;
注意如果所有線程都在忙碌,這句沒有任何作用*/
pthread_cond_signal (&(pool->queue_ready));
return 0;
}
/*銷毀線程池,等待隊(duì)列中的任務(wù)不會再被執(zhí)行,但是正在運(yùn)行的線程會一直
把任務(wù)運(yùn)行完后再退出*/
int pool_destroy ()
{
if (pool->shutdown)
return -1;/*防止兩次調(diào)用*/
pool->shutdown = 1;
/*喚醒所有等待線程,線程池要銷毀了*/
pthread_cond_broadcast (&(pool->queue_ready));
/*阻塞等待線程退出,否則就成僵尸了*/
int i;
for (i = 0; i < pool->max_thread_num; i++)
pthread_join (pool->threadid[i], NULL);
free (pool->threadid);
/*銷毀等待隊(duì)列*/
CThread_worker *head = NULL;
while (pool->queue_head != NULL)
{
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free (head);
}
/*條件變量和互斥量也別忘了銷毀*/
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
free (pool);
/*銷毀后指針置空是個(gè)好習(xí)慣*/
pool=NULL;
return 0;
}
//非常重要的任務(wù)接口函數(shù),各子線程統(tǒng)一調(diào)用這個(gè)函數(shù),而這個(gè)函數(shù)內(nèi)部檢查調(diào)用任務(wù)隊(duì)列中的實(shí)際任務(wù)函數(shù)指針。
void * thread_routine (void *arg)
{
printf ("starting thread 0x%x/n", pthread_self ());
while (1)
{
pthread_mutex_lock (&(pool->queue_lock));
/*如果等待隊(duì)列為0并且不銷毀線程池,則處于阻塞狀態(tài); 注意
pthread_cond_wait是一個(gè)原子操作,等待前會解鎖,喚醒后會加鎖*/
while (pool->cur_queue_size == 0 && !pool->shutdown)
{
printf ("thread 0x%x is waiting/n", pthread_self ());
pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
}
/*線程池要銷毀了*/
if (pool->shutdown)
{
/*遇到break,continue,return等跳轉(zhuǎn)語句,千萬不要忘記先解鎖*/
pthread_mutex_unlock (&(pool->queue_lock));
printf ("thread 0x%x will exit/n", pthread_self ());
pthread_exit (NULL);
}
printf ("thread 0x%x is starting to work/n", pthread_self ());
/*assert是調(diào)試的好幫手*/
assert (pool->cur_queue_size != 0);
assert (pool->queue_head != NULL);
/*等待隊(duì)列長度減去1,并取出鏈表中的頭元素*/
pool->cur_queue_size--;
CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock (&(pool->queue_lock));
/*調(diào)用回調(diào)函數(shù),執(zhí)行任務(wù)*/
(*(worker->process)) (worker->arg);
free (worker);
worker = NULL;
}
/*這一句應(yīng)該是不可達(dá)的*/
pthread_exit (NULL);
}
下面是測試代碼
void * myprocess (void *arg)
{
printf ("threadid is 0x%x, working on task %d/n", pthread_self (),*(int *) arg);
sleep (1);/*休息一秒,延長任務(wù)的執(zhí)行時(shí)間*/
return NULL;
}
int main (int argc, char **argv)
{
pool_init (3);/*線程池中最多三個(gè)活動線程*/
/*連續(xù)向池中投入10個(gè)任務(wù)*/
int *workingnum = (int *) malloc (sizeof (int) * 10);
int i;
for (i = 0; i < 10; i++)
{
workingnum[i] = i;
pool_add_worker (myprocess, &workingnum[i]);
}
/*等待所有任務(wù)完成*/
sleep (5); //這句可能出問題,偷懶寫法。
/*銷毀線程池*/
pool_destroy ();
free (workingnum);
return 0;
}
將上述所有代碼放入threadpool.c文件中,
在Linux輸入編譯命令
$ gcc -o threadpool threadpool.c -lpthread
以下是運(yùn)行結(jié)果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit