nfs-ganesha - thread model - work pool

work pool

內(nèi)部維護(hù)一個(gè)隊(duì)列,生產(chǎn)者調(diào)用work_pool_submit將entry插入隊(duì)列,有多個(gè)線程作為消費(fèi)者,去處理隊(duì)列中的entry。線程的數(shù)量會(huì)根據(jù)entry的個(gè)數(shù)自動(dòng)調(diào)節(jié)。

pqh.qcount

pool->pqh.qcount小于0時(shí)候,代表有幾個(gè)entry在隊(duì)列中等待處理。大于0時(shí)候,代表有幾個(gè)沒(méi)事干的線程在睡覺(jué)。

彈性的調(diào)節(jié)線程數(shù)量

當(dāng)有所有線程都在干活且隊(duì)列里還有entry沒(méi)有處理,或者當(dāng)有很少的線程睡覺(jué)時(shí)(流出一些裕量),創(chuàng)建更多的處理線程。當(dāng)沒(méi)事做睡覺(jué)的線程太多時(shí),退出一些線程。

work_pool_submit的處理方法

當(dāng)所有線程都干活的時(shí)候, work_pool_submit將entry插入隊(duì)列。當(dāng)有一些沒(méi)事干而睡覺(jué)的線程時(shí),喚醒其中一個(gè),讓它去處理這個(gè)entry。

svc_work_pool全局變量

struct work_pool svc_work_pool

(gdb) p svc_work_pool
$23 = {
  pqh = {
    qh = { //work pool entry處理隊(duì)列
      tqh_first = 0x7fffd80008c0, //隊(duì)列的第一個(gè)元素(每個(gè)元素都有prev和next指針)
      tqh_last = 0x7fffe00008c0 //隊(duì)列的最后一個(gè)元素
    },
    qmutex = {... }, //mutex
    qsize = 0,
    qcount = 10  //此值大于0,說(shuō)明有10個(gè)線程無(wú)事可做在睡覺(jué)。如果小于零,說(shuō)明所有線程都在干活,隊(duì)列中有多少entry等待處理
  },
  wptqh = { //working threads隊(duì)列
    tqh_first = 0x7e00d0,
    tqh_last = 0x7fffe8000cb8
  },
  name = 0x7e00b0 "svc_",
  attr = { ...  },
  params = {
    thrd_max = 200,
    thrd_min = 7
  },
  timeout_ms = 31000,
  n_threads = 15,
  worker_index = 15
}

函數(shù)

  • work_pool_init: 初始化work pool
  • work_pool_thread: 線程處理函數(shù),個(gè)數(shù)等于svc_work_pool.n_threads
  • work_pool_spawn: 為work pool創(chuàng)建新線程
  • work_pool_submit: 將work_pool_entry插入到pool中處理
  • work_pool_shutdown

work pool的應(yīng)用場(chǎng)景

  1. 每種連接(TCP,UDP,RDMA)都對(duì)應(yīng)一個(gè)channel,每個(gè)channel在創(chuàng)建時(shí)候,都會(huì)構(gòu)造work pool entry,其處理函數(shù)是svc_rqst_run_task,將此entry插入到work pool。
  2. channel內(nèi)部對(duì)epoll的處理函數(shù)svc_rqst_epoll_events中,如只有一個(gè)event,直接調(diào)用svc_rqst_xprt_task。如果有大于1的event,將多余的event構(gòu)造相應(yīng)的entry,并扔到work pool里處理。
  3. svc_rqst_epoll_events的退出,也將導(dǎo)致svc_rqst_run_task的退出。所以在svc_rqst_epoll_events退出前,重新將svc_rqst_run_task對(duì)應(yīng)的entry插入work pool中。
  4. 在一定時(shí)間內(nèi),epoll沒(méi)有接到數(shù)據(jù),將svc_rqst_expire_task對(duì)應(yīng)的entry插入work pool中。

數(shù)據(jù)結(jié)構(gòu)

struct work_pool {
    struct poolq_head pqh; //work pool entry list
    TAILQ_HEAD(work_pool_s, work_pool_thread) wptqh;//thread list
    char *name;
    pthread_attr_t attr;
    struct work_pool_params params;
    long timeout_ms;
    uint32_t n_threads;
    uint32_t worker_index;
};

//對(duì)worker thread的封裝
struct work_pool_thread {
    struct poolq_entry pqe;     /*** 1st ***/
    TAILQ_ENTRY(work_pool_thread) wptq;
    pthread_cond_t pqcond;

    struct work_pool *pool;
    struct work_pool_entry *work;
    char worker_name[16];
    pthread_t pt;
    uint32_t worker_index;
};

struct work_pool_entry {
    struct poolq_entry pqe;     /*** 1st ***/
    struct work_pool_thread *wpt;
    work_pool_fun_t fun;
    void *arg;
};

struct poolq_entry {
    TAILQ_ENTRY(poolq_entry) q; /*** 1st ***/
    u_int qsize;            /* allocated size of q entry,
                     * 0: default size */
    uint16_t qflags;
};

struct poolq_head {
    TAILQ_HEAD(poolq_head_s, poolq_entry) qh;
    pthread_mutex_t qmutex;

    u_int qsize;            /* default size of q entries,
                     * 0: static size */
    int qcount;         /* number of entries,
                     * < 0: has waiting workers. */
};

代碼注釋

static void * work_pool_thread(void *arg)
{
    struct work_pool_thread *wpt = arg;
    struct work_pool *pool = wpt->pool;
    struct poolq_entry *have;
    struct timespec ts;
    int rc;
    bool spawn;

    pthread_cond_init(&wpt->pqcond, NULL);
    pthread_mutex_lock(&pool->pqh.qmutex); 
    TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq); //將當(dāng)前線程插入pool->wptqh

    wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);

    do {
                //如果當(dāng)前線程有事做
        if (wpt->work) {
            wpt->work->wpt = wpt;
            spawn = pool->pqh.qcount < pool->params.thrd_min
                  && pool->n_threads < pool->params.thrd_max;
            if (spawn)
                pool->n_threads++;
            pthread_mutex_unlock(&pool->pqh.qmutex);

            if (spawn) {
                //線程不夠,需要?jiǎng)?chuàng)建新線程
                (void)work_pool_spawn(pool);
            }

            wpt->work->fun(wpt->work);
            wpt->work = NULL;
            pthread_mutex_lock(&pool->pqh.qmutex);
        }

                //pool->pqh.qcount小于0說(shuō)明所有線程都在干活,隊(duì)列積攢了很多entry需要處理
        if (0 > pool->pqh.qcount++) {
                        //從隊(duì)列中取出entry
            have = TAILQ_FIRST(&pool->pqh.qh);
            TAILQ_REMOVE(&pool->pqh.qh, have, q);
                        //告訴當(dāng)前線程去處理這個(gè)entry
            wpt->work = (struct work_pool_entry *)have;
            continue;
        }
                //小技巧,將wpt->pqe插入隊(duì)列,等同于將當(dāng)前線程插入隊(duì)列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);

        clock_gettime(CLOCK_REALTIME_FAST, &ts);
        timespec_addms(&ts, pool->timeout_ms);
                //等待CLOCK_REALTIME_FAST時(shí)間,看是否被work_pool_submit喚醒
        rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
                        &ts);
        if (!wpt->work) {
                        //如果這期間沒(méi)有發(fā)生work_pool_submit,wpt->work就還為NULL
                        //將剛才插入隊(duì)列假的entry,從隊(duì)列中刪除
            pool->pqh.qcount--;
            TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
        }
    } while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
        //如果有太多無(wú)所事事的線程在睡覺(jué),則退出當(dāng)前線程

    pool->n_threads--;
    TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
    pthread_mutex_unlock(&pool->pqh.qmutex);

    cond_destroy(&wpt->pqcond);
    mem_free(wpt, sizeof(*wpt));

    return (NULL);
}

int work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
{
    int rc = 0;

    pthread_mutex_lock(&pool->pqh.qmutex);

        //如果有沒(méi)事做的線程在睡覺(jué),此時(shí)隊(duì)列里的元素都是睡覺(jué)的線程,而非需要處理的entry
    if (0 < pool->pqh.qcount--) {
        struct work_pool_thread *wpt = (struct work_pool_thread *)
            TAILQ_FIRST(&pool->pqh.qh);

        TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
                //告訴這個(gè)線程去做這件事情 
        wpt->work = work;

                //喚醒這個(gè)線程
        pthread_cond_signal(&wpt->pqcond);
    } else {
                 //如果所有線程都在忙,就把entry插入隊(duì)列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
    }

    pthread_mutex_unlock(&pool->pqh.qmutex);
    return rc;
}
  1. 最多同時(shí)幾個(gè)線程可以同時(shí)處理epoll產(chǎn)生的數(shù)據(jù), RPC_Ioq_ThrdMax

Log分析

//svc_51 working thread 在等待事件
TRACE 0213 11:23:58.916742 10680  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_51 waiting

//svc_85 正在處理0x387a9990指向的work_pool_entry
TRACE 0213 11:23:58.916872  3267  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_85 task 0x387a9990

//接收了5440字節(jié),但還有84192個(gè)字節(jié)沒(méi)有讀出來(lái)
TRACE 0213 11:23:58.921465  8398  : xxxxxxx : <no-file>:0 :rpc :svc_vc_recv: 0x3d60cc00 fd 274 recv 5440, need 84192, flags 2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容