線程池

一、線程池功能組件

總共包含三個組件:線程池、線程執(zhí)行任務,任務詳情。

線程池包含條件等待,鎖,鏈中線程任務某一個,鏈中job隊列中某一個。

線程執(zhí)行任務包含線程池,當前線程,當前線程的的prev和next。

任務詳情包含回調(diào)函數(shù),回調(diào)函數(shù)的參數(shù),當前job的prev和next。

二、代碼編寫流程

  1. 初始化線程池:ThreadPool pool
  2. 為線程池添加線程:參數(shù)是線程池指針,線程數(shù)量
    2.1 如果線程數(shù)量小于1,賦值1
    2.2 為線程池pool分配內(nèi)存空間
    2.3 創(chuàng)建條件信號量,將條件信號量賦值給線程池的條件信號量
    2.4 創(chuàng)建鎖,將鎖賦值給線程池的鎖
    2.5 遍歷線程數(shù)量,生成執(zhí)行任務的結(jié)構(gòu)體,并分配內(nèi)存空間,同時初始化執(zhí)行任務結(jié)構(gòu)體
    2.6 創(chuàng)建線程pthread_create(&worker->thread,NULL,ntyWorkThread,(void*)worker),將其指針給執(zhí)行任務的線程,并且執(zhí)行的方法是線程執(zhí)行任務的方法,worker是線程方法的參數(shù)。通過worker的線程池的等待任務隊列中,獲取對應的job回調(diào)函數(shù)和回調(diào)函數(shù)的參數(shù),并執(zhí)行任務的回調(diào)函數(shù),釋放worker,線程退出
  3. 為線程池添加任務: 參數(shù)是線程池指針,和job結(jié)構(gòu)體
    3.1 加鎖
    3.2 將線程池添加job
  4. 關(guān)閉線程池
    4.1 遍歷線程池,將所有執(zhí)行線程設(shè)置終止條件設(shè)置為1
    4.2 加鎖
    4.3 將線程池的當前work設(shè)置為NULL
    4.4 線程池的等待隊列job設(shè)置為NULL
    4.5 廣播信號量
    4.6 解鎖

main的執(zhí)行順序

  1. 初始化線程池結(jié)構(gòu)體
  2. 初始化線程池
  3. 添加job
  4. 關(guān)閉線程池

三、具體代碼

//
// Created by rosy on 2022/7/24.
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define MAX_NUM_THREADS  100
#define NUM_JOBS  1000

#define LL_ADD(item,list) do{ \
    item->prev = NULL; \
    item->next = list; \
    list = item; \
} while(0)

#define LL_REMOVE(item,list) do { \
    if(item->prev != NULL) item->prev->next = item->next; \
    if(item->next != NULL) item->next->prev = item->prev; \
    if(list == item) list = item->next;                   \
    item->prev = item->next = NULL ;\
} while(0)


typedef struct NWORK{
    pthread_t thread;
    struct THREAD_POOL *thread_pool;
    struct NWORK *prev;
    struct NWORK *next;
    int terminate;
} nWork;

typedef struct THREAD_POOL{
    pthread_cond_t job_cond;
    pthread_mutex_t  job_mtx;
    struct NWORK *works;
    struct NJOB *jobs;

} thread_pool;

typedef struct NJOB{
    void (*job_function)(struct NJOB *job);
    void *user_data;
    struct NJOB *prev;
    struct NJOB *next;
} nJob;

static void *work_do_job(void *ptr){
    nWork  *work = (nWork*)ptr;

    while(1) {
        pthread_mutex_lock(&work->thread_pool->job_mtx);

        while(work->thread_pool->jobs == NULL){
            if(work->terminate){
                break;
            }

            pthread_cond_wait(&work->thread_pool->job_cond,&work->thread_pool->job_mtx);
        }

        if(work->terminate){
            pthread_mutex_unlock(&work->thread_pool->job_mtx);
            break;

        }

        nJob *job = work->thread_pool->jobs;
        if(job != NULL){ //移除當前job
            LL_REMOVE(job,work->thread_pool->jobs);
        }

        pthread_mutex_unlock(&work->thread_pool->job_mtx);

        if(job == NULL) continue;

        job->job_function(job);
    }

    free(work);
    pthread_exit(NULL);
}

int createThreadPool(thread_pool* pool,int num_works){

    if(num_works < 1) {
        num_works = 1;
    }
    memset(pool,0,sizeof(thread_pool));

    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->job_cond,&blank_cond,sizeof(pool->job_cond));

    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->job_mtx,&blank_mutex,sizeof(pool->job_mtx));

    for(int i = 0; i < num_works;i++){
        nWork *work = (nWork*)malloc(sizeof(nWork));
        if(work == NULL){
            perror("malloc work");
            return 1;
        }
        memset(work,0,sizeof(nWork));

        work->thread_pool = pool;

        int ret = pthread_create(&work->thread,NULL,work_do_job,(void *)work);
        if(ret){
            perror("pthread_create");
            free(work);
            return 1;
        }
        LL_ADD(work,work->thread_pool->works);
    }
    return 0;
}

void add_job(thread_pool *pool,nJob* job){

    pthread_mutex_lock(&pool->job_mtx);

    LL_ADD(job,pool->jobs);

    pthread_cond_signal(&pool->job_cond);
    pthread_mutex_unlock(&pool->job_mtx);


}

void shut_down(thread_pool *pool){
    nWork *work = NULL;
    for(work = pool->works; work != NULL; work = work->next){
        work->terminate = 1;
    }

    pthread_mutex_lock(&pool->job_mtx);

    pool->works = NULL;
    pool->jobs = NULL;

    pthread_cond_broadcast(&pool->job_cond);
    pthread_mutex_unlock(&pool->job_mtx);
}

void counter(nJob *job){
    int index = *(int*)job->user_data;

    printf("index:%d,selfid:%lu\n",index,pthread_self());

    free(job->user_data);
    free(job);
}

int main(int argc,char *argv[]) {
    printf("hello yes\n");
    thread_pool pool;
    createThreadPool(&pool,MAX_NUM_THREADS);

    printf("create thread pool\n");
    for(int i = 0; i < NUM_JOBS; i++){
        nJob *job = (nJob*)malloc(sizeof(nJob));
        if(job == NULL){
            perror("malloc");
            exit(1);
        }

        job->job_function = counter;
        job->user_data = malloc(sizeof(int));
        *(int*)job->user_data = i;

        add_job(&pool,job);
    }
    printf("關(guān)閉...\n");
    shut_down(&pool);

    getchar();
    printf("finish\n");

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容