pthread線程池(一)2019-08-15

\color{blue}{線程池的實現(xiàn)原理}
線程池的實現(xiàn)原理基本上就是生產(chǎn)者與消費者模型的擴(kuò)展,即剛開始開出一定數(shù)量的線程,以基本生產(chǎn)者消費者模型為基本原理,不斷使用這些線程從"任務(wù)隊列"中取出任務(wù)進(jìn)行處理,本線程池中的線程通過競爭關(guān)系取任務(wù)進(jìn)行處理.

實現(xiàn)過程

  • 定義封裝了Task類,具體的任務(wù)實現(xiàn)可以通過繼承該Task類進(jìn)行實現(xiàn).
  • 定義Threadpool線程池類,類內(nèi)定義了AddTask(添加任務(wù)),stop(停止銷毀線程池),Task* take(取任務(wù)),createthread(創(chuàng)造線程)和線程回調(diào)函數(shù)等成員,其中在運用pthread_create創(chuàng)建線程時需要第三個參數(shù)必須指向一個靜態(tài)函數(shù).
  • 對線程的同步機(jī)制進(jìn)行了封裝,包括信號量sem,互斥鎖Locker以及條件變量cond類,方便以后的調(diào)用.
    \color{blue}{Locker.h}
#ifndef __LOCKER__
#define __LOCKER__
#include <pthread.h>
#include <exception>
//互斥鎖類
class Locker
{
    public:
        Locker()
        {
            if(pthread_mutex_init(&mutex, NULL)!=0)
            {
                throw std::exception();
            }
        }
        ~Locker()
        {
            pthread_mutex_destroy(&mutex);
        }
        bool lock()
        {
            return pthread_mutex_lock(&mutex)==0;
        }
        bool unlock()
        {
            return pthread_mutex_unlock(&mutex)==0;
        }
        pthread_mutex_t* Get()
        {
            return &mutex;
        }
    private:
        pthread_mutex_t mutex;

};
//封裝條件變量類
class cond
{
    public:
        cond()
        {
            if(pthread_cond_init(&mcond,NULL)!=0)
            {
                throw std::exception();
            }
        }
        ~cond()
        {
            pthread_cond_destroy(&mcond);
        }
        bool wait(pthread_mutex_t* mutex)
        {
            return pthread_cond_wait( &mcond, mutex)==0;
        }
        bool signal()
        {
            return pthread_cond_signal(&mcond)==0;
        }
        bool signalAll()
        {
            return pthread_cond_broadcast(&mcond)==0;
        }
    private:
        pthread_cond_t mcond;
};
//通過使用ScopeLock類在作用域內(nèi)會自動調(diào)用構(gòu)造函數(shù)和析構(gòu)函數(shù)的優(yōu)點,方便了互斥鎖的加鎖和解鎖操作
class ScopeLock{
public:
    ScopeLock( Locker& lock ):mlock(lock){mlock.lock();};
    ~ScopeLock( ){mlock.unlock();}
private:
    Locker& mlock;
};
#endif

\color{blue}{Thread.h}

#ifndef __THREAD_OR__
#define __THREAD_OR__
#include <deque>
#include <pthread.h>
#include <string>
#include <stdio.h>
#include "Locker.h"
//任務(wù)類
class Task
{
    public:
        virtual void execute()=0;     //純虛函數(shù),在子類中實現(xiàn)
        void run()
        {
            execute();
            delete this;
        }
};
//線程池類
class Threadpool
{
    public:
        Threadpool(int threadnum);   
        ~Threadpool();
        void AddTask(Task* task);     
        void stop();
        Task* take();      //取任務(wù)
        int size();          //任務(wù)隊列中任務(wù)個數(shù)   
        int createthread();     //創(chuàng)建線程
        static void* createthreadcb(void* arg);    //線程回調(diào)
     private:
        volatile bool             isRunning; //告訴編譯器不要進(jìn)行任何優(yōu)化,運行狀態(tài)
        int                              threadNums;   //線程池內(nèi)線程的數(shù)量 
        std::deque<Task*>    taskQueue;   //任務(wù)隊列
        pthread_t*                 tid;               //線程標(biāo)識符
        Locker                    mlocker;       //互斥鎖
        cond                       mcond;     //條件變量
};
#endif

\color{blue}{Thread.cpp}

#include <iostream>
#include <string>
#include "thread.h"
#include <assert.h>
#include <stdlib.h>
#include "Locker.h"

Threadpool::Threadpool(int threadnum):
    threadNums(threadnum){
        isRunning = true;
        createthread();
};

Threadpool::~Threadpool()
{
    stop();
    for(std::deque<Task*>::iterator it = taskQueue.begin();
            it!=taskQueue.end();++it)
    {
        delete *it;
    }
    taskQueue.clear();
}

int Threadpool::createthread()
{
    ScopeLock guard(mlocker);
    tid = (pthread_t*)malloc(sizeof(pthread_t)*threadNums);
    for(int i=0; i<threadNums; ++i)
    {
        pthread_create(&tid[i],NULL,createthreadcb,(void*)this);
       //把本身為void*類型的this指針重新強(qiáng)制類型轉(zhuǎn)換為Threadpool* 類型
   }
    return 0;
}

void Threadpool::AddTask(Task* task)
{
    ScopeLock guard(mlocker);
    taskQueue.push_back(task);
    int size = taskQueue.size();
    mcond.signal();
    return;
}

int Threadpool::size()
{
    ScopeLock guard(mlocker);
    int size = taskQueue.size();
    return size;
}

void Threadpool::stop()
{
    if(!isRunning)
    {
        return;
    }
    isRunning = false;
    mcond.signalAll();     //喚醒線程池內(nèi)所有阻塞的線程,stop
    for(int i=0; i<threadNums; ++i)
    {
        pthread_join(tid[i],NULL);
    }
    free(tid);
    tid = NULL;
}
//取任務(wù)
Task* Threadpool::take()
{
    Task* task=NULL;
    while(!task)
    {
        {
            ScopeLock guard(mlocker);    //鎖
            while(taskQueue.empty() && isRunning)
            {
                 mcond.wait(mlocker.Get());    //阻塞
             };
             if(!isRunning)
             {
                 break;
             };
        }
        ScopeLock guard(mlocker);
    std::cout << "take a task" << std::endl;
        task = taskQueue.front();
        taskQueue.pop_front();
    }
    return task;
}

void* Threadpool::createthreadcb(void* arg)
{
    pthread_t mtid = pthread_self();
    Threadpool* pool = (Threadpool*) arg;
    while(pool->isRunning)
    {
        Task* task = pool->take();
        if(!task)
        {
            printf("thread %lu will exit\n",mtid);
            break;
        }
        assert(task);
        task->run();  //任務(wù)執(zhí)行,調(diào)用子任務(wù)類內(nèi)的execute函數(shù)內(nèi)容
}
    return 0;
}

\color{blue}{main.cpp}
測試main函數(shù):

#include <iostream>
#include <stdio.h>
#include "Locker.h"
#include "thread.h"
#include <unistd.h>
#include <stdlib.h>

class mytask : public Task
{
    public:
        virtual void execute()
        {
            std::cout << "Task A" << std::endl;
            std::cout << "thread:" << pthread_self() << std::endl;
            sleep(1);
        }
};
int main()
{
    Threadpool threadpool(10);   //創(chuàng)建十個線程
    for(int i=0; i<20; i++)
    {
        threadpool.AddTask(new mytask);
    sleep(1);
    }
    while(1)
    {
         if(threadpool.size()==0)     //當(dāng)任務(wù)隊列中的任務(wù)為0之后,stop
         {
             threadpool.stop();
             std::cout << "exit now all" << std::endl;
             exit(0);
         };
    sleep(2);
    };
    return 0;
}

\color{blue}{編譯}
g++ main.cpp thread.cpp -lpthread
\color{blue}{運行結(jié)果}

take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751115704064
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751082133248
take a task
Task A
thread:139751090525952
thread 139751073740544 will exit
thread 139751115704064 will exit
thread 139751132489472 will exit
thread 139751140882176 will exit
thread 139751107311360 will exit
thread 139751098918656 will exit
thread 139751149274880 will exit
thread 139751124096768 will exit
thread 139751082133248 will exit
exit now all

因為固定了線程的數(shù)量,本線程池并不會自動擴(kuò)展,下一篇的線程池可以實現(xiàn)自動擴(kuò)展和減少以維持線程池的穩(wěn)定.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,663評論 1 32
  • 1.設(shè)計模式是什么? 你知道哪些設(shè)計模式,并簡要敘述?設(shè)計模式是一種編碼經(jīng)驗,就是用比較成熟的邏輯去處理某一種類型...
    龍飝閱讀 2,302評論 0 12
  • 【JAVA 線程】 線程 進(jìn)程:是一個正在執(zhí)行中的程序。每一個進(jìn)程執(zhí)行都有一個執(zhí)行順序。該順序是一個執(zhí)行路徑,或者...
    Rtia閱讀 2,893評論 2 20
  • 序言 近日后臺需要一些數(shù)據(jù),需要從網(wǎng)上爬取,但是爬取的過程中,由于訪問速度太頻繁,造成IP被封,最終通過線程池解決...
    非專業(yè)程序員閱讀 962評論 0 3
  • 今天下班回來后,開始找鑰匙開門,雖然在包里翻了好久但是發(fā)現(xiàn)還是找到了,開始嘲笑自己,以前的時候總是丟鑰匙,現(xiàn)在竟然...
    不知道已經(jīng)被占用閱讀 1,217評論 0 0

友情鏈接更多精彩內(nèi)容