LibEvent快速入門(mén)
簡(jiǎn)介
基本的socket變成是阻塞/同步的,每個(gè)操作除非已經(jīng)完成,出錯(cuò),或者超時(shí)才會(huì)返回,這樣對(duì)于每一個(gè)請(qǐng)求,要使用一個(gè)線(xiàn)程或者單獨(dú)的進(jìn)程去處理,系統(tǒng)資源沒(méi)有辦法支撐大量的請(qǐng)求。posix定義了可以使用異步的select系統(tǒng)調(diào)用,但是因?yàn)樗捎昧溯喸?xún)的方式來(lái)判斷某個(gè)fd是否變成active,效率不高。于是各系統(tǒng)就分別提出了基于異步的系統(tǒng)調(diào)用,例如Linux的epoll,由于在內(nèi)核層面做了支持,所以可以用O(1)的效率查找到active的fd。基本上,libevent就是對(duì)這些高效IO的封裝,提供統(tǒng)一的API,簡(jiǎn)化開(kāi)發(fā)。
原理簡(jiǎn)介
libevent默認(rèn)情況下是單線(xiàn)程的,可以配置成多線(xiàn)程,每個(gè)線(xiàn)程有且只有一個(gè)event_base,對(duì)應(yīng)一個(gè)struct event_base結(jié)構(gòu)體以及附于其上的事件管理器,用來(lái)調(diào)度托管給它的一系列event,可以和操作系統(tǒng)的進(jìn)程管理類(lèi)比。當(dāng)一個(gè)事件發(fā)生后,event_base會(huì)在合適的時(shí)間,不一定是立即去調(diào)用綁定在這個(gè)事件上的函數(shù),直到這個(gè)函數(shù)執(zhí)行完,再去調(diào)度其他的事件。
//創(chuàng)建一個(gè)event_base
struct event_base *base = event_base_new();
assert(base != NULL);
event_base內(nèi)部有一個(gè)循環(huán),循環(huán)阻塞在epoll等系統(tǒng)調(diào)用上,直到有一個(gè)/一些時(shí)間發(fā)生,然后去處理這些事件。當(dāng)然,這些事件要被綁定在這個(gè)event_base上,每個(gè)事件對(duì)應(yīng)一個(gè)struct event,可以是監(jiān)聽(tīng)一個(gè)fd或者信號(hào)量之類(lèi)的,struct event使用event_new來(lái)創(chuàng)建和綁定,使用event_add來(lái)將event綁定到event_base中。
// 創(chuàng)建并綁定一個(gè)event
struct event* listen_event;
//參數(shù):event_base,監(jiān)聽(tīng)的對(duì)象,需要監(jiān)聽(tīng)的事件,事件發(fā)生后的回調(diào)函數(shù),傳給回調(diào)函數(shù)的參數(shù)
listen_event = event_new(base, listener, EV_READ | EV_PERSIST, callback_func, (void*)base);
//參數(shù):event,超時(shí)時(shí)間,NULL表示無(wú)超時(shí)設(shè)置
event_add(listen_event, NULL);
注:libevent支持的事件及屬性包括(使用bitfield實(shí)現(xiàn))
- EV_TIMEOUT:超時(shí);
- EV_READ:只要網(wǎng)絡(luò)緩沖中還有數(shù)據(jù),回調(diào)函數(shù)就會(huì)被觸發(fā);
- EV_WRITE:只要塞給網(wǎng)絡(luò)緩沖的數(shù)據(jù)被寫(xiě)完,回調(diào)函數(shù)就會(huì)被觸發(fā);
- EV_SIGNAL:POSIX信號(hào)量;
- EV_PERSIST:不指定這個(gè)屬性,回調(diào)函數(shù)被觸發(fā)后事件會(huì)被刪除;
- EV_ET:Edge-Trigger邊緣觸發(fā)(這個(gè)還不懂是什么意思)
然后啟動(dòng)event_base的循環(huán),開(kāi)始處理事件。循環(huán)地啟動(dòng)使用event_base_dispatch,循環(huán)將一直持續(xù),找到不再有需要關(guān)注的事件,或者是遇到event_loopbreak()/event_loopexit()函數(shù)。
//啟動(dòng)循環(huán),開(kāi)始處理事件
event_base_dispatch(base);
接下來(lái)再來(lái)關(guān)注事件發(fā)生時(shí)的回調(diào)函數(shù)callback_func,callback_func的原型如下所示
typedef void(* event_callback_fn)(evutil_socket_t sockfd, short event_type, void *arg)
傳給callback_func的是一個(gè)監(jiān)聽(tīng)的fd,監(jiān)聽(tīng)的事件類(lèi)型,以及event_new中最后一個(gè)參數(shù)。在上述程序中,是將event_base傳給了callback_func,實(shí)際中更常用的是構(gòu)造一個(gè)結(jié)構(gòu)體,把需要傳給回調(diào)函數(shù)的參數(shù)都放進(jìn)來(lái),然后傳給event_new,event_new再傳給回調(diào)函數(shù)。
所以總結(jié)一下,對(duì)于一個(gè)服務(wù)器而言,流程大致如下:
- 獲取待監(jiān)聽(tīng)的內(nèi)容的fd;
- 創(chuàng)建一個(gè)event_base;
- 創(chuàng)建一個(gè)event,指定待監(jiān)聽(tīng)的fd,待監(jiān)聽(tīng)事件的類(lèi)型,以及事件放生時(shí)的回調(diào)函數(shù)及傳給回調(diào)函數(shù)的參數(shù);
- 將event添加到event_base的事件管理器中;
- 開(kāi)啟event_base的事件處理循環(huán);
- (異步)當(dāng)事件發(fā)生的時(shí)候,調(diào)用前面設(shè)置的回調(diào)函數(shù)。
簡(jiǎn)易版QuickStart
下面的代碼實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的echo server,server啟動(dòng)后,client端啟動(dòng)并連接,在cmd中輸入文字,server端收到后,將文字再返回給client。
server端代碼:
/**
You need libevent2 to compile this piece of code
Please see: http://libevent.org/
Or you can simply run this command to install on Mac: brew install libevent
Cmd to compile this piece of code: g++ LibeventQuickStartServer.c -o LibeventQuickStartServer /usr/local/lib/libevent.a
**/
#include<stdio.h>
#include<string.h>
#include<errno.h>
#include<unistd.h>
#include<event.h>
void accept_cb(int fd, short events, void* arg);
void socket_read_cb(int fd, short events, void* arg);
int tcp_server_init(int port, int listen_num);
int main(int argc, char const *argv[])
{
/* code */
int listener = tcp_server_init(9999, 10);
if (listener == -1)
{
perror("tcp_server_init error");
return -1;
}
struct event_base* base = event_base_new();
// 監(jiān)聽(tīng)客戶(hù)端請(qǐng)求鏈接事件
struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST, accept_cb, base);
event_add(ev_listen, NULL);
event_base_dispatch(base);
return 0;
}
void accept_cb(int fd, short events, void* arg)
{
evutil_socket_t sockfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
sockfd = ::accept(fd, (struct sockaddr*)&client, &len);
evutil_make_socket_nonblocking(sockfd);
printf("accept a client %d\n", sockfd);
struct event_base* base = (event_base*)arg;
//動(dòng)態(tài)創(chuàng)建一個(gè)event結(jié)構(gòu)體,并將其作為回調(diào)參數(shù)傳遞給
struct event* ev = event_new(NULL, -1, 0, NULL, NULL);
event_assign(ev, base, sockfd, EV_READ | EV_PERSIST, socket_read_cb, (void*)ev);
event_add(ev, NULL);
}
void socket_read_cb(int fd, short events, void* arg)
{
char msg[4096];
struct event* ev = (struct event*)arg;
int len = read(fd, msg, sizeof(msg) - 1);
if(len <= 0)
{
printf("some error happen when read\n");
event_free(ev);
close(fd);
return;
}
msg[len] = '\0';
printf("recv the client msg : %s\n", msg);
char reply_msg[4096] = "I have received the msg: ";
strcat(reply_msg + strlen(reply_msg), msg);
write(fd, reply_msg, strlen(reply_msg));
}
typedef struct sockaddr SA;
int tcp_server_init(int port, int listen_num)
{
int errno_save;
evutil_socket_t listener;
listener = ::socket(AF_INET, SOCK_STREAM, 0);
if( listener == -1 )
return -1;
//允許多次綁定同一個(gè)地址。要用在socket和bind之間
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(port);
if( ::bind(listener, (SA*)&sin, sizeof(sin)) < 0 )
goto error;
if( ::listen(listener, listen_num) < 0)
goto error;
//跨平臺(tái)統(tǒng)一接口,將套接字設(shè)置為非阻塞狀態(tài)
evutil_make_socket_nonblocking(listener);
return listener;
error:
errno_save = errno;
evutil_closesocket(listener);
errno = errno_save;
return -1;
}
client端代碼:
/**
You need libevent2 to compile this piece of code
Please see: http://libevent.org/
Or you can simply run this command to install on Mac: brew install libevent
Cmd to compile this piece of code: g++ LibeventQuickStartClient.c -o LibeventQuickStartClient /usr/local/lib/libevent.a
**/
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<unistd.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<event.h>
#include<event2/util.h>
int tcp_connect_server(const char* server_ip, int port);
void cmd_msg_cb(int fd, short events, void* arg);
void socket_read_cb(int fd, short events, void *arg);
int main(int argc, char** argv)
{
if( argc < 3 )
{
printf("please input 2 parameter\n");
return -1;
}
//兩個(gè)參數(shù)依次是服務(wù)器端的IP地址、端口號(hào)
int sockfd = tcp_connect_server(argv[1], atoi(argv[2]));
if( sockfd == -1)
{
perror("tcp_connect error ");
return -1;
}
printf("connect to server successful\n");
struct event_base* base = event_base_new();
struct event *ev_sockfd = event_new(base, sockfd,
EV_READ | EV_PERSIST,
socket_read_cb, NULL);
event_add(ev_sockfd, NULL);
//監(jiān)聽(tīng)終端輸入事件
struct event* ev_cmd = event_new(base, STDIN_FILENO,
EV_READ | EV_PERSIST, cmd_msg_cb,
(void*)&sockfd);
event_add(ev_cmd, NULL);
event_base_dispatch(base);
printf("finished \n");
return 0;
}
void cmd_msg_cb(int fd, short events, void* arg)
{
char msg[1024];
int ret = read(fd, msg, sizeof(msg));
if( ret <= 0 )
{
perror("read fail ");
exit(1);
}
int sockfd = *((int*)arg);
//把終端的消息發(fā)送給服務(wù)器端
//為了簡(jiǎn)單起見(jiàn),不考慮寫(xiě)一半數(shù)據(jù)的情況
write(sockfd, msg, ret);
}
void socket_read_cb(int fd, short events, void *arg)
{
char msg[1024];
//為了簡(jiǎn)單起見(jiàn),不考慮讀一半數(shù)據(jù)的情況
int len = read(fd, msg, sizeof(msg)-1);
if( len <= 0 )
{
perror("read fail ");
exit(1);
}
msg[len] = '\0';
printf("recv %s from server\n", msg);
}
typedef struct sockaddr SA;
int tcp_connect_server(const char* server_ip, int port)
{
int sockfd, status, save_errno;
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr) );
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
status = inet_aton(server_ip, &server_addr.sin_addr);
if( status == 0 ) //the server_ip is not valid value
{
errno = EINVAL;
return -1;
}
sockfd = ::socket(PF_INET, SOCK_STREAM, 0);
if( sockfd == -1 )
return sockfd;
status = ::connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );
if( status == -1 )
{
save_errno = errno;
::close(sockfd);
errno = save_errno; //the close may be error
return -1;
}
evutil_make_socket_nonblocking(sockfd);
return sockfd;
}
程序運(yùn)行截圖
客戶(hù)端

服務(wù)端

使用BufferEvent
在上面的代碼中,client的cmd中有信息輸入時(shí),client直接將數(shù)據(jù)寫(xiě)入到fd中,server中收到信息后,也是直接將信息寫(xiě)入到fd中,因?yàn)閒d是非阻塞的,所以不能保證正確。那么需要一個(gè)自己管理的緩存來(lái)管理自己的數(shù)據(jù)。那么步驟將稍微有些變化,如下所示:
- 設(shè)置scokfd為nonblocking;
- 使用bufferevent_socket_new創(chuàng)建一個(gè)struct bufferevent* bev,關(guān)聯(lián)上面的sockfd,并托管給event_base;
- 使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void*)arg);
- 使用buffevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)來(lái)啟動(dòng)read/write事件
代碼如下所示:
使用bufferevent的server端代碼
#include<stdio.h>
#include<string.h>
#include<errno.h>
#include<event.h>
#include<event2/bufferevent.h>
void accept_cb(int fd, short events, void* arg);
void socket_read_cb(bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
int tcp_server_init(int port, int listen_num);
int main(int argc, char** argv)
{
int listener = tcp_server_init(9999, 10);
if( listener == -1 )
{
perror(" tcp_server_init error ");
return -1;
}
struct event_base* base = event_base_new();
//添加監(jiān)聽(tīng)客戶(hù)端請(qǐng)求連接事件
struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST,
accept_cb, base);
event_add(ev_listen, NULL);
event_base_dispatch(base);
event_base_free(base);
return 0;
}
void accept_cb(int fd, short events, void* arg)
{
evutil_socket_t sockfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
sockfd = ::accept(fd, (struct sockaddr*)&client, &len );
evutil_make_socket_nonblocking(sockfd);
printf("accept a client %d\n", sockfd);
struct event_base* base = (event_base*)arg;
bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}
void socket_read_cb(bufferevent* bev, void* arg)
{
char msg[4096];
size_t len = bufferevent_read(bev, msg, sizeof(msg));
msg[len] = '\0';
printf("recv the client msg: %s", msg);
char reply_msg[4096] = "I have recvieced the msg: ";
strcat(reply_msg + strlen(reply_msg), msg);
bufferevent_write(bev, reply_msg, strlen(reply_msg));
}
void event_cb(struct bufferevent *bev, short event, void *arg)
{
if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");
//這將自動(dòng)close套接字和free讀寫(xiě)緩沖區(qū)
bufferevent_free(bev);
}
typedef struct sockaddr SA;
int tcp_server_init(int port, int listen_num)
{
int errno_save;
evutil_socket_t listener;
listener = ::socket(AF_INET, SOCK_STREAM, 0);
if( listener == -1 )
return -1;
//允許多次綁定同一個(gè)地址。要用在socket和bind之間
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(port);
if( ::bind(listener, (SA*)&sin, sizeof(sin)) < 0 )
goto error;
if( ::listen(listener, listen_num) < 0)
goto error;
//跨平臺(tái)統(tǒng)一接口,將套接字設(shè)置為非阻塞狀態(tài)
evutil_make_socket_nonblocking(listener);
return listener;
error:
errno_save = errno;
evutil_closesocket(listener);
errno = errno_save;
return -1;
}
使用bufferevent的client端代碼
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<unistd.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<event.h>
#include<event2/bufferevent.h>
#include<event2/buffer.h>
#include<event2/util.h>
int tcp_connect_server(const char* server_ip, int port);
void cmd_msg_cb(int fd, short events, void* arg);
void server_msg_cb(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
int main(int argc, char** argv)
{
if( argc < 3 )
{
printf("please input 2 parameter\n");
return -1;
}
//兩個(gè)參數(shù)依次是服務(wù)器端的IP地址、端口號(hào)
int sockfd = tcp_connect_server(argv[1], atoi(argv[2]));
if( sockfd == -1)
{
perror("tcp_connect error ");
return -1;
}
printf("connect to server successful\n");
struct event_base* base = event_base_new();
struct bufferevent* bev = bufferevent_socket_new(base, sockfd,
BEV_OPT_CLOSE_ON_FREE);
//監(jiān)聽(tīng)終端輸入事件
struct event* ev_cmd = event_new(base, STDIN_FILENO,
EV_READ | EV_PERSIST, cmd_msg_cb,
(void*)bev);
event_add(ev_cmd, NULL);
//當(dāng)socket關(guān)閉時(shí)會(huì)用到回調(diào)參數(shù)
bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, (void*)ev_cmd);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
event_base_dispatch(base);
printf("finished \n");
return 0;
}
void cmd_msg_cb(int fd, short events, void* arg)
{
char msg[1024];
int ret = read(fd, msg, sizeof(msg));
if( ret < 0 )
{
perror("read fail ");
exit(1);
}
struct bufferevent* bev = (struct bufferevent*)arg;
//把終端的消息發(fā)送給服務(wù)器端
bufferevent_write(bev, msg, ret);
}
void server_msg_cb(struct bufferevent* bev, void* arg)
{
char msg[1024];
size_t len = bufferevent_read(bev, msg, sizeof(msg));
msg[len] = '\0';
printf("recv %s from server\n", msg);
}
void event_cb(struct bufferevent *bev, short event, void *arg)
{
if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");
//這將自動(dòng)close套接字和free讀寫(xiě)緩沖區(qū)
bufferevent_free(bev);
struct event *ev = (struct event*)arg;
//因?yàn)閟ocket已經(jīng)沒(méi)有,所以這個(gè)event也沒(méi)有存在的必要了
event_free(ev);
}
typedef struct sockaddr SA;
int tcp_connect_server(const char* server_ip, int port)
{
int sockfd, status, save_errno;
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr) );
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
status = inet_aton(server_ip, &server_addr.sin_addr);
if( status == 0 ) //the server_ip is not valid value
{
errno = EINVAL;
return -1;
}
sockfd = ::socket(PF_INET, SOCK_STREAM, 0);
if( sockfd == -1 )
return sockfd;
status = ::connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );
if( status == -1 )
{
save_errno = errno;
::close(sockfd);
errno = save_errno; //the close may be error
return -1;
}
evutil_make_socket_nonblocking(sockfd);
return sockfd;
}
歡迎留言交流學(xué)習(xí)
參考鏈接:
https://www.felix021.com/blog/read.php?2068
http://blog.csdn.net/luotuo44/article/details/39670221