poll和epoll的使用應(yīng)該不用再多說(shuō)了。當(dāng)fd很多時(shí),使用epoll比poll效率更高。我們通過(guò)內(nèi)核源碼分析來(lái)看看到底是為什么。
poll剖析poll系統(tǒng)調(diào)用:
intpoll(struct pollfd *fds,nfds_tnfds,inttimeout);
對(duì)應(yīng)的實(shí)現(xiàn)代碼為:
[fs/select.c -->sys_poll]
asmlinkagelongsys_poll(struct pollfd __user * ufds,unsignedintnfds,longtimeout)
{
structpoll_wqueuestable;
intfdcount, err;
unsignedinti;
structpoll_list*head;
structpoll_list*walk;
/* Do a sanity check on nfds ... *//* 用戶給的nfds數(shù)不可以超過(guò)一個(gè)struct file結(jié)構(gòu)支持
的最大fd數(shù)(默認(rèn)是256)*/
if(nfds > current->files->max_fdset && nfds > OPEN_MAX)
return-EINVAL;
if(timeout) {
/* Careful about overflow in the intermediate values */
if((unsignedlong) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
timeout = (unsignedlong)(timeout*HZ+999)/1000+1;
else/* Negative or overflow */
timeout = MAX_SCHEDULE_TIMEOUT;
}
poll_initwait(&table);
其中poll_initwait較為關(guān)鍵,從字面上看,應(yīng)該是初始化變量table,注意此處table在整個(gè)執(zhí)行poll的過(guò)程中是很關(guān)鍵的變量。而struct poll_table其實(shí)就只包含了一個(gè)函數(shù)指針:
[fs/poll.h]
/*
* structures and helpers for f_op->poll implementations
*/
typedefvoid(*poll_queue_proc)(struct file *,wait_queue_head_t*, struct
poll_table_struct *);
typedefstructpoll_table_struct{
poll_queue_proc qproc;
}
poll_table;
現(xiàn)在我們來(lái)看看poll_initwait到底在做些什么
[fs/select.c]
void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *p);
void poll_initwait(structpoll_wqueues*pwq)
{
&(pwq->pt)->qproc = __pollwait;/*此行已經(jīng)被我“翻譯”了,方便觀看*/
pwq->error =0;
pwq->table = NULL;
}
需要C/C++ Linux服務(wù)器架構(gòu)師學(xué)習(xí)資料私信“資料”(資料包括C/C++,Linux,golang技術(shù),Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK,ffmpeg等),免費(fèi)分享

很明顯,poll_initwait的主要?jiǎng)幼骶褪前裻able變量的成員poll_table對(duì)應(yīng)的回調(diào)函數(shù)置__pollwait。這個(gè)__pollwait不僅是poll系統(tǒng)調(diào)用需要,select系統(tǒng)調(diào)用也一樣是用這個(gè)__pollwait,說(shuō)白了,這是個(gè)操作系統(tǒng)的異步操作的“御用”回調(diào)函數(shù)。當(dāng)然了,epoll沒(méi)有用這個(gè),它另外新增了一個(gè)回調(diào)函數(shù),以達(dá)到其高效運(yùn)轉(zhuǎn)的目的,這是后話,暫且不表。我們先不討論__pollwait的具體實(shí)現(xiàn),還是繼續(xù)看sys_poll:
[fs/select.c -->sys_poll]
head = NULL;
walk = NULL;
i = nfds;
err = -ENOMEM;
while(i!=0) {
structpoll_list*pp;
pp = kmalloc(sizeof(structpoll_list)+
sizeof(structpollfd)*
(i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
GFP_KERNEL);
if(pp==NULL)
goto out_fds;
pp->next=NULL;
pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
if(head == NULL)
head = pp;
else
walk->next = pp;
walk = pp;
if(copy_from_user(pp->entries, ufds + nfds-i,
sizeof(structpollfd)*pp->len)) {
err = -EFAULT;
goto out_fds;
}
i -= pp->len;
}
fdcount = do_poll(nfds, head, &table, timeout);
這一大堆代碼就是建立一個(gè)鏈表,每個(gè)鏈表的節(jié)點(diǎn)是一個(gè)page大?。ㄍǔJ?k),這鏈表節(jié)點(diǎn)由一個(gè)指向struct poll_list的指針掌控,而眾多的struct pollfd就通過(guò)struct_list的entries成員訪問(wèn)。上面的循環(huán)就是把用戶態(tài)的struct pollfd拷進(jìn)這些entries里。通常用戶程序的poll調(diào)用就監(jiān)控幾個(gè)fd,所以上面這個(gè)鏈表通常也就只需要一個(gè)節(jié)點(diǎn),即操作系統(tǒng)的一頁(yè)。但是,當(dāng)用戶傳入的fd很多時(shí),由于poll系統(tǒng)調(diào)用每次都要把所有struct pollfd拷進(jìn)內(nèi)核,所以參數(shù)傳遞和頁(yè)分配此時(shí)就成了poll系統(tǒng)調(diào)用的性能瓶頸。最后一句do_poll,我們跟進(jìn)去:
[fs/select.c-->sys_poll()-->do_poll()]
staticvoiddo_pollfd(unsignedintnum, struct pollfd * fdpage,
poll_table ** pwait,int*count)
{
inti;
for(i =0; i < num; i++) {
intfd;
unsignedintmask;
structpollfd*fdp;
mask =0;
fdp = fdpage+i;
fd = fdp->fd;
if(fd >=0) {
structfile*file=fget(fd);
mask = POLLNVAL;
if(file !=NULL) {
mask = DEFAULT_POLLMASK;
if(file->f_op && file->f_op->poll)
mask = file->f_op->poll(file, *pwait);
mask &= fdp->events | POLLERR | POLLHUP;
fput(file);
}
if(mask) {
*pwait =NULL;
(*count)++;
}
}
fdp->revents = mask;
}
}
staticintdo_poll(unsignedintnfds, struct poll_list *list,
struct poll_wqueues *wait,longtimeout)
{
intcount =0;
poll_table* pt = &wait->pt;
if(!timeout)
pt =NULL;
for(;;) {
structpoll_list*walk;
set_current_state(TASK_INTERRUPTIBLE);
walk =list;
while(walk !=NULL) {
do_pollfd( walk->len, walk->entries, &pt, &count);
walk = walk->next;
}
pt =NULL;
if(count || !timeout || signal_pending(current))
break;
count = wait->error;
if(count)
break;
timeout = schedule_timeout(timeout);/* 讓current掛起,別的進(jìn)程跑,timeout到了
以后再回來(lái)運(yùn)行current*/
}
__set_current_state(TASK_RUNNING);
returncount;
}
注意set_current_state和signal_pending,它們兩句保障了當(dāng)用戶程序在調(diào)用poll后掛起時(shí),發(fā)信號(hào)可以讓程序迅速推出poll調(diào)用,而通常的系統(tǒng)調(diào)用是不會(huì)被信號(hào)打斷的。
縱覽do_poll函數(shù),主要是在循環(huán)內(nèi)等待,直到count大于0才跳出循環(huán),而count主要是靠do_pollfd函數(shù)處理。注意這段代碼:
while(walk !=NULL) {
do_pollfd( walk->len, walk->entries, &pt, &count);
walk = walk->next;
}
當(dāng)用戶傳入的fd很多時(shí)(比如1000個(gè)),對(duì)do_pollfd就會(huì)調(diào)用很多次,poll效率瓶頸的另一原因就在這里。do_pollfd就是針對(duì)每個(gè)傳進(jìn)來(lái)的fd,調(diào)用它們各自對(duì)應(yīng)的poll函數(shù),簡(jiǎn)化一下調(diào)用過(guò)程,如下:
structfile* file = fget(fd);
file->f_op->poll(file, &(table->pt));
如果fd對(duì)應(yīng)的是某個(gè)socket,do_pollfd調(diào)用的就是網(wǎng)絡(luò)設(shè)備驅(qū)動(dòng)實(shí)現(xiàn)的poll;如果fd對(duì)應(yīng)的是某個(gè)ext3文件系統(tǒng)上的一個(gè)打開(kāi)文件,那do_pollfd調(diào)用的就是ext3文件系統(tǒng)驅(qū)動(dòng)實(shí)現(xiàn)的poll。一句話,這個(gè)file->f_op->poll是設(shè)備驅(qū)動(dòng)程序?qū)崿F(xiàn)的,那設(shè)備驅(qū)動(dòng)程序的poll實(shí)現(xiàn)通常又是什么樣子呢?其實(shí),設(shè)備驅(qū)動(dòng)程序的標(biāo)準(zhǔn)實(shí)現(xiàn)是:調(diào)用poll_wait,即以設(shè)備自己的等待隊(duì)列為參數(shù)(通常設(shè)備都有自己的等待隊(duì)列,不然一個(gè)不支持異步操作的設(shè)備會(huì)讓人很郁悶)調(diào)用struct poll_table的回調(diào)函數(shù)。作為驅(qū)動(dòng)程序的代表,我們看看socket在使用tcp時(shí)的代碼:
[net/ipv4/tcp.c-->tcp_poll]
unsigned int tcp_poll(structfile*file,structsocket*sock, poll_table *wait)
{
unsigned int mask;
structsock*sk = sock->sk;
structtcp_opt*tp = tcp_sk(sk);
poll_wait(file, sk->sk_sleep, wait);
代碼就看這些,剩下的無(wú)非就是判斷狀態(tài)、返回狀態(tài)值,tcp_poll的核心實(shí)現(xiàn)就是poll_wait,而
poll_wait就是調(diào)用struct poll_table對(duì)應(yīng)的回調(diào)函數(shù),那poll系統(tǒng)調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)就是__poll_wait,所以這里幾乎就可以把tcp_poll理解為一個(gè)語(yǔ)句:
__poll_wait(file, sk->sk_sleep,wait);
由此也可以看出,每個(gè)socket自己都帶有一個(gè)等待隊(duì)列sk_sleep,所以上面我們所說(shuō)的“設(shè)備的等待隊(duì)列”其實(shí)不止一個(gè)。這時(shí)候我們?cè)倏纯確_poll_wait的實(shí)現(xiàn):
[fs/select.c-->__poll_wait()]
void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *_p)
{
structpoll_wqueues*p = container_of(_p,structpoll_wqueues, pt);
structpoll_table_page*table = p->table;
if(!table || POLL_TABLE_FULL(table)) {
structpoll_table_page*new_table;
new_table = (structpoll_table_page*) __get_free_page(GFP_KERNEL);
if(!new_table) {
p->error = -ENOMEM;
__set_current_state(TASK_RUNNING);
return;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
/* Add a new entry */
{
structpoll_table_entry* entry = table->entry;
table->entry = entry+1;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
add_wait_queue(wait_address,&entry->wait);
}
}
__poll_wait的作用就是創(chuàng)建了上圖所示的數(shù)據(jù)結(jié)構(gòu)(一次__poll_wait即一次設(shè)備poll調(diào)用只創(chuàng)建一個(gè)poll_table_entry),并通過(guò)struct poll_table_entry的wait成員,把current掛在了設(shè)備的等待隊(duì)列
上,此處的等待隊(duì)列是wait_address,對(duì)應(yīng)tcp_poll里的sk->sk_sleep?,F(xiàn)在我們可以回顧一下poll系統(tǒng)調(diào)用的原理了:先注冊(cè)回調(diào)函數(shù)__poll_wait,再初始化table變量(類型為struct poll_wqueues),接著拷貝用戶傳入的struct pollfd(其實(shí)主要是fd),然后輪流調(diào)用所有fd對(duì)應(yīng)的poll(把current掛到各個(gè)fd對(duì)應(yīng)的設(shè)備等待隊(duì)列上)。在設(shè)備收到一條消息(網(wǎng)絡(luò)設(shè)備)或填寫(xiě)完文件數(shù)據(jù)(磁盤(pán)設(shè)備)后,會(huì)喚醒設(shè)備等待隊(duì)列上的進(jìn)程,這時(shí)current便被喚醒了。current醒來(lái)后離開(kāi)sys_poll的操作相對(duì)簡(jiǎn)單,這里就不逐行分析了。
epoll
通過(guò)上面的分析,poll運(yùn)行效率的兩個(gè)瓶頸已經(jīng)找出,現(xiàn)在的問(wèn)題是怎么改進(jìn)。首先,每次poll都要把1000個(gè)fd 拷入內(nèi)核,太不科學(xué)了,內(nèi)核干嘛不自己保存已經(jīng)拷入的fd呢?答對(duì)了,epoll就是自己保存拷入的fd,它的API就已經(jīng)說(shuō)明了這一點(diǎn)——不是 epoll_wait的時(shí)候才傳入fd,而是通過(guò)epoll_ctl把所有fd傳入內(nèi)核再一起"wait",這就省掉了不必要的重復(fù)拷貝。其次,在 epoll_wait時(shí),也不是把current輪流的加入fd對(duì)應(yīng)的設(shè)備等待隊(duì)列,而是在設(shè)備等待隊(duì)列醒來(lái)時(shí)調(diào)用一個(gè)回調(diào)函數(shù)(當(dāng)然,這就需要“喚醒回調(diào)”機(jī)制),把產(chǎn)生事件的fd歸入一個(gè)鏈表,然后返回這個(gè)鏈表上的fd。
epoll剖析
epoll是個(gè)module,所以先看看module的入口eventpoll_init
[fs/eventpoll.c-->evetpoll_init()]
staticint__init eventpoll_init(void)
{
interror;
init_MUTEX(&epsem);
/* Initialize the structure used to perform safe poll wait head wake ups */
ep_poll_safewake_init(&psw);
/* Allocates slab cache used to allocate "struct epitem" items */
epi_cache = kmem_cache_create("eventpoll_epi",sizeof(structepitem),
0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,
NULL,NULL);
/* Allocates slab cache used to allocate "struct eppoll_entry" */
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(structeppoll_entry),0,
EPI_SLAB_DEBUG|SLAB_PANIC,NULL,NULL);
/*
* Register the virtual file system that will be the source of inodes
* for the eventpoll files
*/
error = register_filesystem(&eventpoll_fs_type);
if(error)
gotoepanic;
/* Mount the above commented virtual file system */
eventpoll_mnt = kern_mount(&eventpoll_fs_type);
error = PTR_ERR(eventpoll_mnt);
if(IS_ERR(eventpoll_mnt))
gotoepanic;
DNPRINTK(3, (KERN_INFO"[%p] eventpoll: successfully initialized.\n",
current));
return0;
epanic:
panic("eventpoll_init() failed\n");
}
很有趣,這個(gè)module在初始化時(shí)注冊(cè)了一個(gè)新的文件系統(tǒng),叫"eventpollfs"(在eventpoll_fs_type結(jié)構(gòu)里),然后掛載此文件系統(tǒng)。另外創(chuàng)建兩個(gè)內(nèi)核cache(在內(nèi)核編程中,如果需要頻繁分配小塊內(nèi)存,應(yīng)該創(chuàng)建kmem_cahe來(lái)做“內(nèi)存池”),分別用于存放struct epitem和eppoll_entry。如果以后要開(kāi)發(fā)新的文件系統(tǒng),可以參考這段代碼?,F(xiàn)在想想epoll_create為什么會(huì)返回一個(gè)新的fd?因?yàn)樗褪窃谶@個(gè)叫做"eventpollfs"的文件系統(tǒng)里創(chuàng)建了一個(gè)新文件!如下:
[fs/eventpoll.c-->sys_epoll_create()]
asmlinkagelongsys_epoll_create(intsize)
{
interror, fd;
structinode*inode;
structfile*file;
DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_create(%d)\n",
current, size));
/* Sanity check on the size parameter */
error = -EINVAL;
if(size <=0)
gotoeexit_1;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure, and inode and a free file descriptor.
*/
error = ep_getfd(&fd, &inode, &file);
if(error)
gotoeexit_1;
/* Setup the file internal data structure ( "struct eventpoll" ) */
error = ep_file_init(file);
if(error)
gotoeexit_2;
函數(shù)很簡(jiǎn)單,其中ep_getfd看上去是“get”,其實(shí)在第一次調(diào)用epoll_create時(shí),它是要?jiǎng)?chuàng)建新inode、新的file、新的fd。而ep_file_init則要?jiǎng)?chuàng)建一個(gè)struct eventpoll結(jié)構(gòu),并把它放入file-
>private_data,注意,這個(gè)private_data后面還要用到的??吹竭@里,也許有人要問(wèn)了,為什么epoll的開(kāi)發(fā)者不做一個(gè)內(nèi)核的超級(jí)大map把用戶要?jiǎng)?chuàng)建的epoll句柄存起來(lái),在epoll_create時(shí)返回一個(gè)指針?那似乎很直觀呀。但是,仔細(xì)看看,linux的系統(tǒng)調(diào)用有多少是返回指針的?你會(huì)發(fā)現(xiàn)幾乎沒(méi)有!(特此強(qiáng)調(diào),malloc不是系統(tǒng)調(diào)用,malloc調(diào)用的brk才是)因?yàn)閘inux做為unix的最杰出的繼承人,它遵循了unix的一個(gè)巨大優(yōu)點(diǎn)——一切皆文件,輸入輸出是文件、socket也
是文件,一切皆文件意味著使用這個(gè)操作系統(tǒng)的程序可以非常簡(jiǎn)單,因?yàn)橐磺卸际俏募僮鞫眩。╱nix還不是完全做到,plan 9才算)。而且使用文件系統(tǒng)有個(gè)好處:epoll_create返回的是一個(gè)fd,而不是該死的指針,指針如果指錯(cuò)了,你簡(jiǎn)直沒(méi)辦法判斷,而fd則可以通過(guò)current->files->fd_array[]找到其真?zhèn)?。epoll_create好了,該epoll_ctl了,我們略去判斷性的代碼:
[fs/eventpoll.c-->sys_epoll_ctl()]
asmlinkagelong
sys_epoll_ctl(intepfd,intop,intfd, struct epoll_event __user *event)
{
interror;
structfile*file, *tfile;
structeventpoll*ep;
structepitem*epi;
structepoll_eventepds;
....
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch(op) {
caseEPOLL_CTL_ADD:
if(!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
}else
error = -EEXIST;
break;
caseEPOLL_CTL_DEL:
if(epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
caseEPOLL_CTL_MOD:
if(epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
原來(lái)就是在一個(gè)大的結(jié)構(gòu)(現(xiàn)在先不管是什么大結(jié)構(gòu))里先ep_find,如果找到了struct epitem而用戶操作是ADD,那么返回-EEXIST;如果是DEL,則ep_remove。如果找不到struct epitem而用戶操作是ADD,就ep_insert創(chuàng)建并插入一個(gè)。很直白。那這個(gè)“大結(jié)構(gòu)”是什么呢?看ep_find的調(diào)用方式,ep參數(shù)應(yīng)該是指向這個(gè)“大結(jié)構(gòu)”的指針,再看ep = file->private_data,我們才明白,原來(lái)這個(gè)“大結(jié)構(gòu)”就是那個(gè)在epoll_create時(shí)創(chuàng)建的struct eventpoll,具體再看看ep_find的實(shí)現(xiàn),發(fā)現(xiàn)原來(lái)是struct eventpoll的rbr成員(struct rb_root),原來(lái)這是一個(gè)紅黑樹(shù)的根!而紅黑樹(shù)上掛的都是struct epitem?,F(xiàn)在清楚了,一個(gè)新創(chuàng)建的epoll文件帶有一個(gè)struct eventpoll結(jié)構(gòu),這個(gè)結(jié)構(gòu)上再掛一個(gè)紅黑樹(shù),而這個(gè)紅黑樹(shù)就是每次epoll_ctl時(shí)fd存放的地方!現(xiàn)在數(shù)據(jù)結(jié)構(gòu)都已經(jīng)清楚了,我們來(lái)看最核心的:
[fs/eventpoll.c-->sys_epoll_wait()]
asmlinkagelongsys_epoll_wait(intepfd, struct epoll_event __user *events,
intmaxevents,inttimeout)
{
interror;
structfile*file;
structeventpoll*ep;
DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n",
current, epfd, events, maxevents, timeout));
/* The maximum number of event must be greater than zero */
if(maxevents <=0)
return-EINVAL;
/* Verify that the area passed by the user is writeable */
if((error = verify_area(VERIFY_WRITE, events, maxevents *sizeof(struct
epoll_event))))
gotoeexit_1;
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
file = fget(epfd);
if(!file)
gotoeexit_1;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if(!IS_FILE_EPOLL(file))
gotoeexit_2;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
eexit_2:
fput(file);
eexit_1:
DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) =
%d\n",
current, epfd, events, maxevents, timeout, error));
returnerror;
}
故伎重演,從file->private_data中拿到struct eventpoll,再調(diào)用ep_poll
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]
staticintep_poll(structeventpoll *ep,structepoll_event __user *events,
intmaxevents,longtimeout)
{
intres, eavail;
unsignedlongflags;
longjtimeout;
wait_queue_t wait;
/*
* Calculate the timeout by checking for the "infinite" value ( -1 )
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
jtimeout = timeout ==-1|| timeout > (MAX_SCHEDULE_TIMEOUT -1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ +999) /1000;
retry:
write_lock_irqsave(&ep->lock, flags);
res =0;
if(list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
?* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait);
for(;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if(!list_empty(&ep->rdllist) || !jtimeout)
break;
if(signal_pending(current)) {
res = -EINTR;
break;
}
write_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
write_lock_irqsave(&ep->lock, flags);
}
remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
又是一個(gè)大循環(huán),不過(guò)這個(gè)大循環(huán)比poll的那個(gè)好,因?yàn)樽屑?xì)一看——它居然除了睡覺(jué)和判斷ep->rdllist是否為空以外,啥也沒(méi)做!什么也沒(méi)做當(dāng)然效率高了,但到底是誰(shuí)來(lái)讓ep->rdllist不為空呢?答案是ep_insert時(shí)設(shè)下的回調(diào)函數(shù)
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]
staticint ep_insert(structeventpoll*ep,structepoll_event*event,
structfile*tfile, int fd)
{
int error, revents, pwake =0;
unsigned long flags;
structepitem*epi;
structep_pqueueepq;
error = -ENOMEM;
if(!(epi = EPI_MEM_ALLOC()))
goto eexit_1;
/* Item initialization follow here ... */
EP_RB_INITNODE(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->txlink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
EP_SET_FFD(&epi->ffd, tfile, fd);
epi->event = *event;
atomic_set(&epi->usecnt,1);
epi->nwait =0;
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function.
*/
revents = tfile->f_op->poll(tfile, &epq.pt);
我們注意init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);這一行,其實(shí)就是&(epq.pt)->qproc = ep_ptable_queue_proc;緊接著 tfile->f_op->poll(tfile, &epq.pt)其實(shí)就是調(diào)用被監(jiān)控文件(epoll里叫“target file”)的poll方法,而這個(gè)poll其實(shí)就是調(diào)用poll_wait(還記得poll_wait嗎?每個(gè)支持poll的設(shè)備驅(qū)動(dòng)程序都要調(diào)用的),最后就是調(diào)用ep_ptable_queue_proc。這是比較難解的一個(gè)調(diào)用關(guān)系,因?yàn)椴皇钦Z(yǔ)言級(jí)的直接調(diào)用。ep_insert還把struct epitem放到struct file里的f_ep_links連表里,以方便查找,struct epitem里的fllink就是擔(dān)負(fù)這個(gè)使命的。
[fs/eventpoll.c-->ep_ptable_queue_proc()]
staticvoid ep_ptable_queue_proc(structfile*file, wait_queue_head_t *whead,
poll_table *pt)
{
structepitem*epi = EP_ITEM_FROM_EPQUEUE(pt);
structeppoll_entry*pwq;
if(epi->nwait >=0&& (pwq = PWQ_MEM_ALLOC())) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
}else{
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
上面的代碼就是ep_insert中要做的最重要的事:創(chuàng)建struct eppoll_entry,設(shè)置其喚醒回調(diào)函數(shù)為
ep_poll_callback,然后加入設(shè)備等待隊(duì)列(注意這里的whead就是上一章所說(shuō)的每個(gè)設(shè)備驅(qū)動(dòng)都要帶的等待隊(duì)列)。只有這樣,當(dāng)設(shè)備就緒,喚醒等待隊(duì)列上的等待著時(shí),ep_poll_callback就會(huì)被調(diào)用。每次調(diào)用poll系統(tǒng)調(diào)用,操作系統(tǒng)都要把current(當(dāng)前進(jìn)程)掛到fd對(duì)應(yīng)的所有設(shè)備的等待隊(duì)列上,可以想象,fd多到上千的時(shí)候,這樣“掛”法很費(fèi)事;而每次調(diào)用epoll_wait則沒(méi)有這么羅嗦,epoll只在epoll_ctl時(shí)把current掛一遍(這第一遍是免不了的)并給每個(gè)fd一個(gè)命令“好了就調(diào)回調(diào)函數(shù)”,如果設(shè)備有事件了,通過(guò)回調(diào)函數(shù),會(huì)把fd放入rdllist,而每次調(diào)用epoll_wait就只是收集rdllist里的fd就可以了——epoll巧妙的利用回調(diào)函數(shù),實(shí)現(xiàn)了更高效的事件驅(qū)動(dòng)模型。現(xiàn)在我們猜也能猜出來(lái)ep_poll_callback會(huì)干什么了——肯定是把紅黑樹(shù)上的收到event的epitem(代表每個(gè)fd)插入ep->rdllist中,這樣,當(dāng)epoll_wait返回時(shí),rdllist里就都是就緒的fd了!
[fs/eventpoll.c-->ep_poll_callback()]
staticint ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake =0;
unsigned long flags;
structepitem*epi = EP_ITEM_FROM_WAIT(wait);
structeventpoll*ep = epi->ep;
DNPRINTK(3, (KERN_INFO"[%p] eventpoll: poll_callback(%p) epi=%p
ep=%p\n",
current, epi->file, epi, ep));
write_lock_irqsave(&ep->lock, flags);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if(!(epi->event.events & ~EP_PRIVATE_BITS))
goto is_disabled;
/* If this file is already in the ready list we exit soon */
if(EP_IS_LINKED(&epi->rdllink))
goto is_linked;
list_add_tail(&epi->rdllink, &ep->rdllist);
is_linked:
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if(waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if(waitqueue_active(&ep->poll_wait))
pwake++;
is_disabled:
write_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if(pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
return1;
}
真正重要的只有 list_add_tail(&epi->rdllink, &ep->rdllist);一句,就是把struct epitem放到struct eventpoll的rdllist中去?,F(xiàn)在我們可以畫(huà)出epoll的核心數(shù)據(jù)結(jié)構(gòu)圖了:
epoll獨(dú)有的EPOLLET
EPOLLET是epoll系統(tǒng)調(diào)用獨(dú)有的flag,ET就是Edge Trigger(邊緣觸發(fā))的意思,具體含義和應(yīng)用大家可google之。有了EPOLLET,重復(fù)的事件就不會(huì)總是出來(lái)打擾程序的判斷,故而常被使用。那EPOLLET的原理是什么呢?epoll把fd都掛上一個(gè)回調(diào)函數(shù),當(dāng)fd對(duì)應(yīng)的設(shè)備有消息時(shí),就把fd放入rdllist鏈表,這樣epoll_wait只要檢查這個(gè)rdllist鏈表就可以知道哪些fd有事件了。我們看看ep_poll的最后幾行代碼:
[fs/eventpoll.c->ep_poll()]
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if(!res && eavail &&
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
gotoretry;
returnres;
}
把rdllist里的fd拷到用戶空間,這個(gè)任務(wù)是ep_events_transfer做的:
[fs/eventpoll.c->ep_events_transfer()]
staticint ep_events_transfer(structeventpoll*ep,
structepoll_event__user *events, int maxevents)
{
int eventcnt =0;
structlist_headtxlist;
INIT_LIST_HEAD(&txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
*/
down_read(&ep->sem);
/* Collect/extract ready items */
if(ep_collect_ready_items(ep, &txlist, maxevents) >0) {
/* Build result set in userspace */
eventcnt = ep_send_events(ep, &txlist, events);
/* Reinject ready items into the ready list */
ep_reinject_items(ep, &txlist);
}
up_read(&ep->sem);
returneventcnt;
}
代碼很少,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了),接著
ep_send_events把txlist里的fd拷給用戶空間,然后ep_reinject_items把一部分fd從txlist里“返還”給
rdllist以便下次還能從rdllist里發(fā)現(xiàn)它。其中ep_send_events的實(shí)現(xiàn):
[fs/eventpoll.c->ep_send_events()]
staticint ep_send_events(structeventpoll*ep,structlist_head*txlist,
structepoll_event__user *events)
{
int eventcnt =0;
unsigned int revents;
structlist_head*lnk;
structepitem*epi;
/*
* We can loop without lock because this is a task private list.
* The test done during the collection loop will guarantee us that
* another task will not try to collect this file. Also, items
* cannot vanish during the loop because we are holding "sem".
*/
list_for_each(lnk, txlist) {
epi = list_entry(lnk,structepitem, txlink);
/*
* Get the ready file event set. We can safely use the file
* because we are holding the "sem" in read and this will
* guarantee that both the file and the item will not vanish.
?*/
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
/*
* Set the return event set for the current file descriptor.
* Note that only the task task was successfully able to link
* the item to its "txlist" will write this field.
*/
epi->revents = revents & epi->event.events;
if(epi->revents) {
if(__put_user(epi->revents,
&events[eventcnt].events) ||
__put_user(epi->event.data,
&events[eventcnt].data))
return-EFAULT;
if(epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
eventcnt++;
}
}
returneventcnt;
}
這個(gè)拷貝實(shí)現(xiàn)其實(shí)沒(méi)什么可看的,但是請(qǐng)注意revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);這一行,這個(gè)poll很狡猾,它把第二個(gè)參數(shù)置為NULL來(lái)調(diào)用。我們先看一下設(shè)備驅(qū)動(dòng)通常是怎么實(shí)現(xiàn)poll的:
staticunsigned int scull_p_poll(structfile*filp, poll_table *wait)
{
structscull_pipe*dev = filp->private_data;
unsigned int mask =0;
/*
* The buffer is circular; it is considered full
* if "wp" is right behind "rp" and empty if the
* two are equal.
*/
down(&dev->sem);
poll_wait(filp, &dev->inq, wait);
poll_wait(filp, &dev->outq, wait);
if(dev->rp != dev->wp)
mask |= POLLIN | POLLRDNORM;/* readable */
if(spacefree(dev))
mask |= POLLOUT | POLLWRNORM;/* writable */
up(&dev->sem);
returnmask;
}
上面這段代碼摘自《linux設(shè)備驅(qū)動(dòng)程序(第三版)》,絕對(duì)經(jīng)典,設(shè)備先要把current(當(dāng)前進(jìn)程)掛在inq和outq兩個(gè)隊(duì)列上(這個(gè)“掛”操作是wait回調(diào)函數(shù)指針做的),然后等設(shè)備來(lái)喚醒,喚醒后就能通過(guò)mask拿到事件掩碼了(注意那個(gè)mask參數(shù),它就是負(fù)責(zé)拿事件掩碼的)。那如果wait為NULL,poll_wait會(huì)做些什么呢?
[include/linux/poll.h->poll_wait]
staticinlinevoidpoll_wait(struct file * filp,wait_queue_head_t* wait_address,
poll_table *p)
{
if(p && wait_address)
p->qproc(filp, wait_address, p);
}
如果poll_table為空,什么也不做。我們倒回ep_send_events,那句標(biāo)紅的poll,實(shí)際上就是“我不想休眠,我只想拿到事件掩碼”的意思。然后再把拿到的事件掩碼拷給用戶空間。ep_send_events完成后,就輪到ep_reinject_items了:
[fs/eventpoll.c->ep_reinject_items]
staticvoid ep_reinject_items(structeventpoll*ep,structlist_head*txlist)
{
int ricnt =0, pwake =0;
unsigned long flags;
structepitem*epi;
write_lock_irqsave(&ep->lock, flags);
while(!list_empty(txlist)) {
epi = list_entry(txlist->next,structepitem, txlink);
/* Unlink the current item from the transfer list */
EP_LIST_DEL(&epi->txlink);
/*
* If the item is no more linked to the interest set, we don't
* have to push it inside the ready list because the following
* ep_release_epitem() is going to drop it. Also, if the current
* item is set to have an Edge Triggered behaviour, we don't have
* to push it back either.
*/
if(EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ricnt++;
}
}
if(ricnt) {
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if(waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if(pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
}
ep_reinject_items把txlist里的一部分fd又放回rdllist,那么,是把哪一部分fd放回去呢?看上面if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&這個(gè)判斷——是哪些“沒(méi)有標(biāo)上EPOLLET”(標(biāo)紅代碼)且“事件被關(guān)注”(標(biāo)藍(lán)代碼)的fd被重新放回了rdllist。那么下次epoll_wait當(dāng)然會(huì)又把rdllist里的fd拿來(lái)拷給用戶了。舉個(gè)例子。假設(shè)一個(gè)socket,只是connect,還沒(méi)有收發(fā)數(shù)據(jù),那么它的poll事件掩碼總是有POLLOUT的(參見(jiàn)上面的驅(qū)動(dòng)示例),每次調(diào)用epoll_wait總是返回POLLOUT事件(比較煩),因?yàn)樗膄d就總是被放回rdllist;假如此時(shí)有人往這個(gè)socket里寫(xiě)了一大堆數(shù)據(jù),造成socket塞住(不可寫(xiě)了),那么(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {里的判斷就不成立了(沒(méi)有POLLOUT了),fd不會(huì)放回rdllist,epoll_wait將不會(huì)再返回用戶POLLOUT事件。現(xiàn)在我們給這個(gè)socket加上EPOLLET,然后connect,沒(méi)有收發(fā)數(shù)據(jù),此時(shí),if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&判斷又不成立了,所以epoll_wait只會(huì)返回一次POLLOUT通知給用戶(因?yàn)榇薴d不會(huì)再回到rdllist了),接下來(lái)的epoll_wait都不會(huì)有任何事件通知了。