線程池的實現(xiàn)原理基本上就是生產(chǎn)者與消費者模型的擴(kuò)展,即剛開始開出一定數(shù)量的線程,以基本生產(chǎn)者消費者模型為基本原理,不斷使用這些線程從"任務(wù)隊列"中取出任務(wù)進(jìn)行處理,本線程池中的線程通過競爭關(guān)系取任務(wù)進(jìn)行處理.
實現(xiàn)過程
- 定義封裝了Task類,具體的任務(wù)實現(xiàn)可以通過繼承該Task類進(jìn)行實現(xiàn).
- 定義Threadpool線程池類,類內(nèi)定義了AddTask(添加任務(wù)),stop(停止銷毀線程池),Task* take(取任務(wù)),createthread(創(chuàng)造線程)和線程回調(diào)函數(shù)等成員,其中在運用pthread_create創(chuàng)建線程時需要第三個參數(shù)必須指向一個靜態(tài)函數(shù).
- 對線程的同步機(jī)制進(jìn)行了封裝,包括信號量sem,互斥鎖Locker以及條件變量cond類,方便以后的調(diào)用.
#ifndef __LOCKER__
#define __LOCKER__
#include <pthread.h>
#include <exception>
//互斥鎖類
class Locker
{
public:
Locker()
{
if(pthread_mutex_init(&mutex, NULL)!=0)
{
throw std::exception();
}
}
~Locker()
{
pthread_mutex_destroy(&mutex);
}
bool lock()
{
return pthread_mutex_lock(&mutex)==0;
}
bool unlock()
{
return pthread_mutex_unlock(&mutex)==0;
}
pthread_mutex_t* Get()
{
return &mutex;
}
private:
pthread_mutex_t mutex;
};
//封裝條件變量類
class cond
{
public:
cond()
{
if(pthread_cond_init(&mcond,NULL)!=0)
{
throw std::exception();
}
}
~cond()
{
pthread_cond_destroy(&mcond);
}
bool wait(pthread_mutex_t* mutex)
{
return pthread_cond_wait( &mcond, mutex)==0;
}
bool signal()
{
return pthread_cond_signal(&mcond)==0;
}
bool signalAll()
{
return pthread_cond_broadcast(&mcond)==0;
}
private:
pthread_cond_t mcond;
};
//通過使用ScopeLock類在作用域內(nèi)會自動調(diào)用構(gòu)造函數(shù)和析構(gòu)函數(shù)的優(yōu)點,方便了互斥鎖的加鎖和解鎖操作
class ScopeLock{
public:
ScopeLock( Locker& lock ):mlock(lock){mlock.lock();};
~ScopeLock( ){mlock.unlock();}
private:
Locker& mlock;
};
#endif
#ifndef __THREAD_OR__
#define __THREAD_OR__
#include <deque>
#include <pthread.h>
#include <string>
#include <stdio.h>
#include "Locker.h"
//任務(wù)類
class Task
{
public:
virtual void execute()=0; //純虛函數(shù),在子類中實現(xiàn)
void run()
{
execute();
delete this;
}
};
//線程池類
class Threadpool
{
public:
Threadpool(int threadnum);
~Threadpool();
void AddTask(Task* task);
void stop();
Task* take(); //取任務(wù)
int size(); //任務(wù)隊列中任務(wù)個數(shù)
int createthread(); //創(chuàng)建線程
static void* createthreadcb(void* arg); //線程回調(diào)
private:
volatile bool isRunning; //告訴編譯器不要進(jìn)行任何優(yōu)化,運行狀態(tài)
int threadNums; //線程池內(nèi)線程的數(shù)量
std::deque<Task*> taskQueue; //任務(wù)隊列
pthread_t* tid; //線程標(biāo)識符
Locker mlocker; //互斥鎖
cond mcond; //條件變量
};
#endif
#include <iostream>
#include <string>
#include "thread.h"
#include <assert.h>
#include <stdlib.h>
#include "Locker.h"
Threadpool::Threadpool(int threadnum):
threadNums(threadnum){
isRunning = true;
createthread();
};
Threadpool::~Threadpool()
{
stop();
for(std::deque<Task*>::iterator it = taskQueue.begin();
it!=taskQueue.end();++it)
{
delete *it;
}
taskQueue.clear();
}
int Threadpool::createthread()
{
ScopeLock guard(mlocker);
tid = (pthread_t*)malloc(sizeof(pthread_t)*threadNums);
for(int i=0; i<threadNums; ++i)
{
pthread_create(&tid[i],NULL,createthreadcb,(void*)this);
//把本身為void*類型的this指針重新強(qiáng)制類型轉(zhuǎn)換為Threadpool* 類型
}
return 0;
}
void Threadpool::AddTask(Task* task)
{
ScopeLock guard(mlocker);
taskQueue.push_back(task);
int size = taskQueue.size();
mcond.signal();
return;
}
int Threadpool::size()
{
ScopeLock guard(mlocker);
int size = taskQueue.size();
return size;
}
void Threadpool::stop()
{
if(!isRunning)
{
return;
}
isRunning = false;
mcond.signalAll(); //喚醒線程池內(nèi)所有阻塞的線程,stop
for(int i=0; i<threadNums; ++i)
{
pthread_join(tid[i],NULL);
}
free(tid);
tid = NULL;
}
//取任務(wù)
Task* Threadpool::take()
{
Task* task=NULL;
while(!task)
{
{
ScopeLock guard(mlocker); //鎖
while(taskQueue.empty() && isRunning)
{
mcond.wait(mlocker.Get()); //阻塞
};
if(!isRunning)
{
break;
};
}
ScopeLock guard(mlocker);
std::cout << "take a task" << std::endl;
task = taskQueue.front();
taskQueue.pop_front();
}
return task;
}
void* Threadpool::createthreadcb(void* arg)
{
pthread_t mtid = pthread_self();
Threadpool* pool = (Threadpool*) arg;
while(pool->isRunning)
{
Task* task = pool->take();
if(!task)
{
printf("thread %lu will exit\n",mtid);
break;
}
assert(task);
task->run(); //任務(wù)執(zhí)行,調(diào)用子任務(wù)類內(nèi)的execute函數(shù)內(nèi)容
}
return 0;
}
測試main函數(shù):
#include <iostream>
#include <stdio.h>
#include "Locker.h"
#include "thread.h"
#include <unistd.h>
#include <stdlib.h>
class mytask : public Task
{
public:
virtual void execute()
{
std::cout << "Task A" << std::endl;
std::cout << "thread:" << pthread_self() << std::endl;
sleep(1);
}
};
int main()
{
Threadpool threadpool(10); //創(chuàng)建十個線程
for(int i=0; i<20; i++)
{
threadpool.AddTask(new mytask);
sleep(1);
}
while(1)
{
if(threadpool.size()==0) //當(dāng)任務(wù)隊列中的任務(wù)為0之后,stop
{
threadpool.stop();
std::cout << "exit now all" << std::endl;
exit(0);
};
sleep(2);
};
return 0;
}
g++ main.cpp thread.cpp -lpthread
take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751115704064
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751082133248
take a task
Task A
thread:139751090525952
thread 139751073740544 will exit
thread 139751115704064 will exit
thread 139751132489472 will exit
thread 139751140882176 will exit
thread 139751107311360 will exit
thread 139751098918656 will exit
thread 139751149274880 will exit
thread 139751124096768 will exit
thread 139751082133248 will exit
exit now all
因為固定了線程的數(shù)量,本線程池并不會自動擴(kuò)展,下一篇的線程池可以實現(xiàn)自動擴(kuò)展和減少以維持線程池的穩(wěn)定.