Thread pool
背景
在我所做過的一個基于libevent項目中, 我所使用的線程模型是 one event_base per thread + thread pool 模型。每個線程最多有一個event_base, 每一個 TCP 連接必須由某一個event_base 管理,所有這個線程的 IO 都會轉(zhuǎn)移到這個 event_base 上面來處理。換句話來說,一個 file descriptor 只能由一個線程來讀寫。這樣,我們就很方便地把不同的 TCP 連接放到不同的線程里面去。
一個節(jié)點支持多線程,它有兩種模式
- 單線程, accept 與 TCP 的消息處理在同一個線程做 IO
- 多線程, accept 有一個專門的線程接受連接,然后創(chuàng)建一個新的 Thread-Pool, 新的連接會按round-robin的方式分配。
具體實現(xiàn)
在其他的線程中,只要拿到一個 libevent_thread_t 的對象,往里面的 base 添加事件就好了。
typedef struct libevent_thread_s
{
pthread_t thread_id; // pid of this thread
struct event_base * base; // libevent handle this thread uses
// this pipe is used to stop the event_base from its own thread
struct event * notify_event;
int notify_receive_fd;
int notify_send_fd;
int no; // the thread number, which is needed for test
} libevent_thread_t;
創(chuàng)建新線程調(diào)用的函數(shù)(一直循環(huán)):
static void* worker_libevent(void *arg)
{
libevent_thread_t* me = (libevent_thread_t *)arg;
event_base_dispatch(me->base);
return NULL;
}
雖然其他的線程可以往這個線程添加事件,但是當一個 libevent_thread_t 對象釋放的時候, 其他的線程不能 break 這個線程的 event_base,只能是這個線程自己 break 自己的 event_base,所以在實現(xiàn)上我們可以首先要打開一個管道
libevent_thread_t* libevent_thread_new(){
int fds[2];
if(pipe(fds)){
printf("Can't create notify pipe");
return NULL;
}
pr_libevent_thread_t* t = g_new0(pr_libevent_thread_t, 1);
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
setup_thread(t);
return t;
}
為這個管道一端的 fd 在event_base上面添加監(jiān)聽事件,有數(shù)據(jù)可讀的時候就 break event_base 就可以退出了
static void thread_libevent_process(int fd, short which, void * arg)
{
libevent_thread_t* me = (libevent_thread_t *)arg;
char buf[1];
read(fd, buf, 1);
event_base_loopbreak(me->base);
}
static int setup_thread(libevent_thread_t* me)
{
me->base = event_base_new();
if(!me->base)
{
printf( "Can't allocate event base\n");
return -1;
}
me->notify_event = event_new(me->base, me->notify_receive_fd, EV_READ | EV_PERSIST,
thread_libevent_process, me);
if(event_add(me->notify_event, NULL) == -1)
{
printf("Can't monitor libevent notify pipe\n");
return -1;
}
return 0;
}
所以當我們想要退出時,向 fd 發(fā)送數(shù)據(jù)
void libevent_thread_wakeup(libevent_thread_t* me)
{
char one[1];
one[0] = 1;
write(me->notify_send_fd, one, sizeof(one));
}
至于線程池,就是一個 libevent_thread_t 的數(shù)組,通過next來指定函數(shù)返回的線程,其中 n 為線程池容量大小, next 自增到 n 時變成 0
libevent_thread_t* event_pool_get_next_thread(libevent_thread_pool_t*);
typedef struct libevent_thread_pool_s
{
int next;
struct spinlock lock;
int n;
pr_libevent_thread_t** slot;
} libevent_thread_pool_t;
注意事項
在多線程環(huán)境中,libevent 的 event_base 的 loopbreak必須由他自己的線程來實現(xiàn),所以其他線程只能是通過管道來通知。
在一個線程往另一個線程的 event_base 添加事件的時候,也就是在多個線程對于一個 event_base 操作的時候,event_base 需要對它自己的數(shù)據(jù)結(jié)構(gòu)加鎖,所以在使用線程池或者寫多線程的程序的時候,開始時需要調(diào)用, 文檔
evthread_use_pthreads()