libevent 線程池的設(shè)計

Thread pool

背景

在我所做過的一個基于libevent項目中, 我所使用的線程模型是 one event_base per thread + thread pool 模型。每個線程最多有一個event_base, 每一個 TCP 連接必須由某一個event_base 管理,所有這個線程的 IO 都會轉(zhuǎn)移到這個 event_base 上面來處理。換句話來說,一個 file descriptor 只能由一個線程來讀寫。這樣,我們就很方便地把不同的 TCP 連接放到不同的線程里面去。

一個節(jié)點支持多線程,它有兩種模式

  • 單線程, accept 與 TCP 的消息處理在同一個線程做 IO
  • 多線程, accept 有一個專門的線程接受連接,然后創(chuàng)建一個新的 Thread-Pool, 新的連接會按round-robin的方式分配。

具體實現(xiàn)

在其他的線程中,只要拿到一個 libevent_thread_t 的對象,往里面的 base 添加事件就好了。

typedef struct libevent_thread_s
{
    pthread_t thread_id;    //  pid of this thread
    struct event_base * base;   // libevent handle this thread uses
    
    // this pipe is used to stop the event_base from its own thread
    struct event * notify_event;
    int notify_receive_fd;
    int notify_send_fd;
    int no;                 // the thread number, which is needed for test
} libevent_thread_t;

創(chuàng)建新線程調(diào)用的函數(shù)(一直循環(huán)):

static void* worker_libevent(void *arg)
{
    libevent_thread_t* me = (libevent_thread_t *)arg;
    event_base_dispatch(me->base);
    return NULL;
}

雖然其他的線程可以往這個線程添加事件,但是當一個 libevent_thread_t 對象釋放的時候, 其他的線程不能 break 這個線程的 event_base,只能是這個線程自己 break 自己的 event_base,所以在實現(xiàn)上我們可以首先要打開一個管道

libevent_thread_t* libevent_thread_new(){
    int fds[2];
    if(pipe(fds)){
        printf("Can't create notify pipe");
        return NULL;
    }
    pr_libevent_thread_t* t = g_new0(pr_libevent_thread_t, 1);
    t->notify_receive_fd = fds[0];
    t->notify_send_fd = fds[1];
    setup_thread(t);
    return t;
}

為這個管道一端的 fd 在event_base上面添加監(jiān)聽事件,有數(shù)據(jù)可讀的時候就 break event_base 就可以退出了

static void thread_libevent_process(int fd, short which, void * arg)
{
    libevent_thread_t* me = (libevent_thread_t *)arg;
    char buf[1];
    read(fd, buf, 1);
    event_base_loopbreak(me->base);
}

static int setup_thread(libevent_thread_t* me)
{
    me->base = event_base_new();
    if(!me->base)
    {
        printf( "Can't allocate event base\n");
        return -1;
    }
    me->notify_event = event_new(me->base, me->notify_receive_fd, EV_READ | EV_PERSIST,
                                 thread_libevent_process, me);
    if(event_add(me->notify_event, NULL) == -1)
    {
        printf("Can't monitor libevent notify pipe\n");
        return -1;
    }
    return 0;
}

所以當我們想要退出時,向 fd 發(fā)送數(shù)據(jù)

void libevent_thread_wakeup(libevent_thread_t* me)
{
    char one[1];
    one[0] = 1;
    write(me->notify_send_fd, one, sizeof(one));
}

至于線程池,就是一個 libevent_thread_t 的數(shù)組,通過next來指定函數(shù)返回的線程,其中 n 為線程池容量大小, next 自增到 n 時變成 0

libevent_thread_t* event_pool_get_next_thread(libevent_thread_pool_t*);

typedef struct libevent_thread_pool_s
{
    int next;
    struct spinlock lock;
    int n;
    pr_libevent_thread_t** slot;
} libevent_thread_pool_t;

注意事項

在多線程環(huán)境中,libevent 的 event_base 的 loopbreak必須由他自己的線程來實現(xiàn),所以其他線程只能是通過管道來通知。
在一個線程往另一個線程的 event_base 添加事件的時候,也就是在多個線程對于一個 event_base 操作的時候,event_base 需要對它自己的數(shù)據(jù)結(jié)構(gòu)加鎖,所以在使用線程池或者寫多線程的程序的時候,開始時需要調(diào)用, 文檔

evthread_use_pthreads()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容