Linux系統(tǒng)編程12:線程池編程

1. 概念

安檢
銀行柜臺(tái)
  • 為什么使用線程池?
    頻繁創(chuàng)建和銷毀線程浪費(fèi)CPU資源
  • 線程是什么?
    一堆線程放在一個(gè)池子里統(tǒng)一管理

2. 構(gòu)成

線程池

2.1 任務(wù)隊(duì)列job_queue

  • 作用
    存放待處理的任務(wù)
  • 成員
No. 構(gòu)成 接口
1 處理函數(shù) void *(*)(void*)
2 參數(shù) void *arg
3 隊(duì)列指針 struct job_queue* pnext

2.2 工作線程worker

  • 作用
    處理任務(wù)

2.3 線程池thread_pool

  • 作用
    管理多個(gè)線程并提供任務(wù)隊(duì)列的接口

  • 成員

No. 構(gòu)成 接口
1 初始化 threadpool_init()
2 銷毀 threadpool_destroy()
3 添加任務(wù) threadpool_add_job()

3. 流程

使用流程

  1. 初始化線程池
  2. 向線程池添加任務(wù)
  3. 銷毀線程池

線程池初始化

  1. 初始化任務(wù)隊(duì)列和工作線程組
  2. 將等候在條件變量(任務(wù)隊(duì)列上有任務(wù))上的一個(gè)線程喚醒并從該任務(wù)隊(duì)列中取出第一個(gè)任務(wù)給該線程執(zhí)行
  3. 等待任務(wù)隊(duì)列中所有任務(wù)執(zhí)行完畢

4. 實(shí)例

  • Linux C語言實(shí)現(xiàn)
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
 
struct job_queue{
    void* (*func)(void* arg);
    void* arg;
    struct job_queue* pnext;
};
 
struct threadpool{
    struct job_queue* phead;
    struct job_queue* ptail;
    pthread_t* pworks; 
    size_t nthread;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int destroy;
};
 
void* threadpool_routine(void* arg){
    assert(NULL != arg);
    struct threadpool* pthpool = (struct threadpool*)arg;
    for(;;){
        struct job_queue* pjob = NULL;
        pthread_mutex_lock(&(pthpool->mutex));
        while(pthpool->phead == NULL && 0 == pthpool->destroy){
            pthread_cond_wait(&(pthpool->cond),&(pthpool->mutex));
        }

        // 
        if(1 == pthpool->destroy){
            printf("%lu exit\n",pthread_self());
            pthread_mutex_unlock(&(pthpool->mutex));
            pthread_exit(NULL);
        }
        
        pjob = pthpool->phead;
        pthpool->phead = pjob->pnext;
        pthread_mutex_unlock(&(pthpool->mutex));
        pjob->func(pjob->arg);
        free(pjob);
    }
}
 
struct threadpool* threadpool_init(size_t nthread){
    struct threadpool* pthpool = malloc(sizeof(struct threadpool));
 
    pthread_mutex_init(&(pthpool->mutex),NULL);
    pthread_cond_init(&(pthpool->cond),NULL);
     
    pthpool->phead = NULL;
    pthpool->ptail = NULL;
    pthpool->nthread = nthread;
    pthpool->destroy = 0;
 
    // 初始化工作線程組
    pthpool->pworks = malloc(nthread * sizeof(pthread_t));
    int i;
    for(i=0;i<nthread;i++){
        pthread_create(&(pthpool->pworks[i]),NULL,threadpool_routine,pthpool);
    }
 
    return pthpool;
}
void threadpool_destroy(struct threadpool* pthpool){
    assert(NULL != pthpool);
 
 
    while(NULL != pthpool->phead){
        struct job_queue* pnext = pthpool->phead->pnext;
        free(pthpool->phead);
        pthpool->phead = pnext;
    }
 
    pthpool->destroy = 1;
    pthread_cond_broadcast(&(pthpool->cond));
    int i;
    for(i=0;i<pthpool->nthread;i++){
        pthread_join(pthpool->pworks[i],NULL);
    }
     
    free(pthpool->pworks);
    free(pthpool);
 
    pthread_mutex_destroy(&(pthpool->mutex));
    pthread_cond_destroy(&(pthpool->cond));
    pthpool = NULL;
}
void threadpool_add_job(struct threadpool* pthpool,void*(*func)(void*),void* arg){
    assert(NULL != pthpool);
    struct job_queue* jq = malloc(sizeof(struct job_queue));
    jq->func = func;
    jq->arg = arg;
    jq->pnext = NULL;
 
    printf("add job %d\n",(int) arg);
    pthread_mutex_lock(&(pthpool->mutex));
    if(pthpool->phead == NULL){
        pthpool->phead = jq;
        pthpool->ptail = jq;
    }else{
        pthpool->ptail->pnext = jq;
        pthpool->ptail = jq;
    }
    pthread_mutex_unlock(&(pthpool->mutex));
    pthread_cond_signal(&(pthpool->cond));
}
     
void* test(void* arg){
    printf("%lu do job %d\n",pthread_self(),(int)arg);
    sleep(1);
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    // 1. 初始化線程池
    struct threadpool* pool = threadpool_init(nthread);

    // 2. 向線程池添加任務(wù)
    int i;
    for(i=0;i<njob;i++){
        threadpool_add_job(pool,test,(void*)i);
    }
    pause();

    // 3. 銷毀線程池
    threadpool_destroy(pool);
}

注意:任務(wù)隊(duì)列初始化在工作線程組前。

  • Linux C++98實(shí)現(xiàn)
#include <pthread.h>
#include <vector>
#include <queue>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>

using namespace std;

class ThreadPool{
    typedef void (*func_t)(int);
public:
    ThreadPool(size_t count):destroy(false){

        // 初始化互斥鎖和條件變量
        pthread_mutex_init(&mutex,NULL);
        pthread_cond_init(&cond,NULL);

        // 初始化線程組
        for(int i=0;i<count;i++){
            pthread_t tid;
            pthread_create(&tid,NULL,reinterpret_cast<void*(*)(void*)>(&ThreadPool::Route),this);
            threads.push_back(tid);
        }        
    }
    ~ThreadPool(){
        // 通知線程退出
        destroy = true;
        pthread_cond_broadcast(&cond);
        for(vector<pthread_t>::iterator it = threads.begin();it != threads.end();it++){
            pthread_join(*it,NULL);
        }

        // 銷毀互斥鎖和條件變量
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }
    void AddJob(func_t func,int arg){
        pthread_mutex_lock(&mutex);
        tasks.push(func);
        args.push(arg);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
private:
    static void Route(ThreadPool* pPool){
        for(;;){
            pthread_mutex_lock(&(pPool->mutex));
            // 如果沒有任務(wù)等待
            while(pPool->tasks.empty() && !pPool->destroy){
                pthread_cond_wait(&(pPool->cond),&(pPool->mutex));
            }

            // 線程退出
            if(pPool->destroy){
                pthread_mutex_unlock(&(pPool->mutex));
                break;
            }

            // 獲取任務(wù)
            func_t task = pPool->tasks.front();
            pPool->tasks.pop();
            int arg = pPool->args.front();
            pPool->args.pop();
            pthread_mutex_unlock(&(pPool->mutex));

            // 執(zhí)行任務(wù)
            task(arg);
        }
    }
private:
    vector<pthread_t> threads;  ///< 線程組
    queue<func_t> tasks;        ///< 任務(wù)隊(duì)列
    queue<int> args;            ///< 參數(shù)隊(duì)列
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool destroy;           ///< 線程池銷毀標(biāo)志
};

void test(int arg){
    printf("%lu do job %d\n",pthread_self(),arg);
    sleep(1);
}

int main(int argc,char** argv){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    // 1. 初始化線程池
    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    ThreadPool pool(nthread);

    // 2. 向線程池添加任務(wù)
    for(int i=0;i<njob;i++){
        pool.AddJob(test,i);
    }
    pause();
    return 0;
}
  • C++11實(shí)現(xiàn)
#include <thread>
#include <vector>
#include <queue>
#include <cstdio>
#include <cstdlib>
#include <condition_variable>
#include <mutex>
#include <functional>
#include <chrono>
#include <iostream>

using namespace std;

class ThreadPool{
    typedef function<void(int)> func_t;
public:
    ThreadPool(size_t count):destroy(false){
        // 初始化線程組
        for(int i=0;i<count;i++){
            threads.emplace_back(&ThreadPool::Route,this);
        }        
    }
    ~ThreadPool(){
        // 通知線程退出
        destroy = true;
        cond.notify_all();
        for(auto& th:threads){
            th.join();
        }
    }
    void AddJob(func_t func,int arg){
        mtx.lock();
        tasks.push(func);
        args.push(arg);
        cond.notify_one();
        mtx.unlock();
    }
private:
    static void Route(ThreadPool* pPool){
        for(;;){
            unique_lock<mutex> lck(pPool->mtx);
            // 如果沒有任務(wù)等待
            while(pPool->tasks.empty() && !pPool->destroy){
                pPool->cond.wait(lck);
            }

            // 線程退出
            if(pPool->destroy){
                break;
            }

            // 獲取任務(wù)
            func_t task = pPool->tasks.front();
            pPool->tasks.pop();
            int arg = pPool->args.front();
            pPool->args.pop();
            lck.unlock();
            
            // 執(zhí)行任務(wù)
            task(arg);
        }
    }
private:
    vector<thread> threads;  ///< 線程組
    queue<func_t> tasks;     ///< 任務(wù)隊(duì)列
    queue<int> args;         ///< 參數(shù)隊(duì)列
    mutex mtx;
    condition_variable cond;
    bool destroy;            ///< 線程池銷毀標(biāo)志
};

void test(int arg){
    cout << this_thread::get_id() << " do job " << arg << endl;
    this_thread::sleep_for(chrono::seconds(1));
}

int main(int argc,char** argv){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    // 1. 初始化線程池
    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    ThreadPool pool(nthread);

    // 2. 向線程池添加任務(wù)
    for(int i=0;i<njob;i++){
        pool.AddJob(test,i);
    }
    
    this_thread::sleep_for(chrono::seconds(10));

    return 0;
}

C++11 lambda表達(dá)式實(shí)現(xiàn)

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <functional>
using namespace std;
 
class ThreadPool{
    vector<thread> threads;
    queue<function<void()>> tasks;
    condition_variable cond;
    mutex m;
    bool exited = false;
public:
    ThreadPool(int num){
    auto routine = [this](){
        while(true){
            unique_lock<mutex> lock(m);
            while(tasks.empty() && !exited) cond.wait(lock);
            if(exited) break;
            auto func = tasks.front();
            tasks.pop();
            lock.unlock();
            func();
       }            
    };
        for(int i=0;i<num;++i){
        // thread t(routine);
        // threads.emplace_back(move(t));
        // threads.emplace_back(thread(routine));
        threads.emplace_back(thread{routine});
    }
    }
    ~ThreadPool(){
        exited = true;
    cond.notify_all();
    for(auto& thread:threads){
        thread.join();
    }
    }
    ThreadPool(const ThreadPool&) = default;
    ThreadPool& operator=(const ThreadPool&) = default;
 
    void AddTask(function<void()> func){
     lock_guard<mutex> guard(m);
     tasks.push(func);
     cond.notify_one();
         // signal
    }
 
};
 
int main(){
    mutex m;
    auto test = [&m](){
    this_thread::sleep_for(2s);
    lock_guard<mutex> guard(m);
        cout << this_thread::get_id() << " do something..." << endl;
    };
 
    ThreadPool pool(5);
 
    for(int i=0;i<15;++i){
        pool.AddTask(test);
    }
    this_thread::sleep_for(10s);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容