C++實(shí)現(xiàn)簡(jiǎn)單線程池

#include <atomic>
#include <iostream>
#include <unistd.h>
#include <queue>
using namespace std;
using callback = void(*)(void *);
class Task {
public:
    Task(callback func, void *arg): func(func), arg(arg){}
    void run() {func(arg);}
private:
    callback func;
    void *arg;
};

class ThreadPool {
public:
    ThreadPool(uint32_t min, uint32_t max);
    ~ThreadPool();
    void submitTask(Task *task);
    bool hasTask();
private:
    static void *manager(void *arg);
    static void *worker(void *arg);
    pthread_mutex_t queueMutex;
    pthread_cond_t notEmpty;
    queue<Task*> *taskQueue;
    pthread_t *threads;
    pthread_t manageThread;
    uint32_t minThreadNum;
    uint32_t maxThreadNum;
    bool isShutdown;
};

bool ThreadPool::hasTask()
{
    if (taskQueue->empty())
        return false;
    else
        return true;
}

void ThreadPool::submitTask(Task *task)
{
    pthread_mutex_lock(&queueMutex);
    taskQueue->push(task);
    pthread_mutex_unlock(&queueMutex);
    pthread_cond_broadcast(&notEmpty);
}

void *ThreadPool::worker(void *arg)
{
    ThreadPool *pool = (ThreadPool *)arg;
    Task *task;
    while(!pool->isShutdown) {
        pthread_mutex_lock(&pool->queueMutex);
        while(pool->taskQueue->empty()) {
            pthread_cond_wait(&pool->notEmpty, &pool->queueMutex);
        }
        task = pool->taskQueue->front();
        pool->taskQueue->pop();
        pthread_mutex_unlock(&pool->queueMutex);
        task->run();
        delete task;
    }
    pthread_exit(0);
}

void *ThreadPool::manager(void *arg)
{
    pthread_exit(0);
}

ThreadPool::ThreadPool(uint32_t min, uint32_t max)
{
    isShutdown = false;
    minThreadNum = min;
    maxThreadNum = max;
    pthread_mutex_init(&queueMutex, NULL);
    pthread_cond_init(&notEmpty, NULL);
    taskQueue = new queue<Task*>();
    threads = new pthread_t[maxThreadNum];
    pthread_t pthid;
    for (int i = 0; i < minThreadNum; i++) {
        pthread_create(&pthid, NULL, worker, this);
        threads[i] = pthid;
    }
    pthread_create(&manageThread, NULL, manager, NULL);
}

ThreadPool::~ThreadPool()
{
    isShutdown = true;
    pthread_cond_broadcast(&notEmpty);
    delete [] threads;
    delete taskQueue;
}

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n",
           pthread_self(), num);
    //sleep(1);
}

int main()
{
    // 創(chuàng)建線程池
    ThreadPool *pool = new ThreadPool(2, 4);
    Task *task;
    int *arg;
    for (int i = 0; i < 100; ++i)
    {
        arg = new int(i);
        task = new Task(taskFunc, arg);
        pool->submitTask(task);
    }
    while (pool->hasTask()) {
        sleep(1);
    }
    delete pool;
    return 0;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容