轉(zhuǎn)載自:一個基于信號量的簡易線程池
信號無需由同一個線程來獲取和釋放,因此信號可用于異步事件通知,如用于信號處理程序中。同時,由于信號包含狀態(tài),因此可以異步方式使用,而不用象條件變量那樣要求獲取互斥鎖。但是,信號的效率不如互斥鎖高。缺省情況下,如果有多個線程正在等待信號,則解除阻塞的順序是不確定的。信號在使用前必須先初始化,但是信號沒有屬性。計數(shù)信號量與互斥鎖一起使用時的功能幾乎與條件變量一樣強大。在許多情況下,使用計數(shù)信號量實現(xiàn)的代碼比使用條件變量實現(xiàn)的代碼更為簡單。本文參考:1、Linux多線程編程-信號量的使用2、Linux C++ 一個線程池的簡單實現(xiàn)
CThread.h
#ifndef CTHREAD_H_
#define CTHREAD_H_
#include <pthread.h>
class CThread {
private:
pthread_t m_thread; //保持線程句柄
public:
CThread(void* (*threadFuction)(void*),void* threadArgv);
virtual ~CThread();
void JoinThread();
};
#endif /* CTHREAD_H_ */
CThread.cpp
#include "CThread.h"
CThread::CThread(void* (*threadFuction)(void*),void* threadArgv) {
// 初始化線程屬性
pthread_attr_t threadAttr;
pthread_attr_init(&threadAttr);
pthread_create(&m_thread, &threadAttr, threadFuction, threadArgv);
}
CThread::~CThread() {
// TODO Auto-generated destructor stub
}
void CThread::JoinThread()
{
// join
pthread_join(m_thread, NULL);
}
CThreadManager.h
#ifndef CTHREADMANAGER_H_
#define CTHREADMANAGER_H_
#include <stdio.h>
#include <list>
#include <queue>
#include <semaphore.h>
#include "CThread.h"
using namespace std;
class CThreadManager {
friend void* ManageFuction(void*);
private:
sem_t m_sem; // 信號量
pthread_mutex_t m_mutex; // 互斥鎖
queue<int> m_queWork; // 工作隊列
list<CThread*> m_lstThread; // 線程list
int (*m_threadFuction)(int); //函數(shù)指針,指向main函數(shù)傳過來的線程執(zhí)行函數(shù)
public:
CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt);
virtual ~CThreadManager();
int WaitSem();
int PostSem();
int LockMutex();
int UnlockMutex();
void PushWorkQue(int nWork);
int PopWorkQue();
int RunThreadFunction(int nWork);
};
#endif /* CTHREADMANAGER_H_ */
CThreadManager.cpp
#include "CThreadManager.h"
// 線程執(zhí)行函數(shù),它只是個殼子,處理信號量和互斥鎖等,
// 最后調(diào)用main函數(shù)傳過來的線程執(zhí)行函數(shù)來實現(xiàn)業(yè)務(wù)處理
void* ManageFuction(void* argv)
{
CThreadManager* pManager = (CThreadManager*)argv;
// 進行無限循環(huán)(意味著線程是不銷毀的,重復(fù)利用)
while(true)
{
// 線程開啟后,就在這里阻塞著,直到main函數(shù)設(shè)置了信號量
pManager->WaitSem();
printf("thread wakeup.\n");
// 從工作隊列中取出要處理的數(shù)
pManager->LockMutex();
int nWork = pManager->PopWorkQue();
pManager->UnlockMutex();
printf("call Count function.\n");
pManager->RunThreadFunction(nWork);
}
return 0;
}
CThreadManager::CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt) {
sem_init(&m_sem, 0, 0);
pthread_mutex_init(&m_mutex, NULL);
m_threadFuction = threadFuction;
for(int i=0; i<nMaxThreadCnt; i++)
{
CThread* pThread = new CThread(ManageFuction, this);
printf("thread started.\n");
m_lstThread.push_back(pThread);
}
}
CThreadManager::~CThreadManager()
{
sem_destroy(&m_sem);
pthread_mutex_destroy(&m_mutex);
list<CThread*>::iterator it;
for(it=m_lstThread.begin(); it!=m_lstThread.end();it++)
{
(*it)->JoinThread();
}
}
// 等待信號量
int CThreadManager::WaitSem()
{
return sem_wait(&m_sem);
}
// 設(shè)置信號量
int CThreadManager::PostSem()
{
return sem_post(&m_sem);
}
// 取得鎖
int CThreadManager::LockMutex()
{
int n= pthread_mutex_lock(&m_mutex);
return n;
}
// 釋放鎖
int CThreadManager::UnlockMutex()
{
return pthread_mutex_unlock(&m_mutex);
}
// 往工作隊列里放要處理的數(shù)
void CThreadManager::PushWorkQue(int nWork)
{
m_queWork.push(nWork);
}
// 從工作隊列中取出要處理的數(shù)
int CThreadManager::PopWorkQue()
{
int nWork = m_queWork.front();
m_queWork.pop();
return nWork;
}
// 執(zhí)行main函數(shù)傳過來的線程執(zhí)行函數(shù)
int CThreadManager::RunThreadFunction(int nWork)
{
return (*m_threadFuction)(nWork);
}
ThreadTest.cpp
#include <stdio.h>
#include <unistd.h>
#include "CThreadManager.h"
using namespace std;
// 線程要執(zhí)行的函數(shù)
int Count(int nWork)
{
int nResult = nWork * nWork;
printf("count result is %d\n",nResult);
return 0;
}
int main() {
// 創(chuàng)建線程管理類的實例,把要執(zhí)行的線程函數(shù)和最大線程數(shù)傳進去
CThreadManager* pManager = new CThreadManager(Count, 3);
// 把要進行計算的數(shù)放到工作隊列中
pManager->PushWorkQue(5);
pManager->PushWorkQue(20);
// 設(shè)置信號量,喚醒線程
pManager->PostSem();
pManager->PostSem();
// 等待子線程執(zhí)行
sleep(1);
return 0;
}