libevent學(xué)習(xí)篇之一:libevent快速入門(mén)

LibEvent快速入門(mén)

簡(jiǎn)介

基本的socket變成是阻塞/同步的,每個(gè)操作除非已經(jīng)完成,出錯(cuò),或者超時(shí)才會(huì)返回,這樣對(duì)于每一個(gè)請(qǐng)求,要使用一個(gè)線(xiàn)程或者單獨(dú)的進(jìn)程去處理,系統(tǒng)資源沒(méi)有辦法支撐大量的請(qǐng)求。posix定義了可以使用異步的select系統(tǒng)調(diào)用,但是因?yàn)樗捎昧溯喸?xún)的方式來(lái)判斷某個(gè)fd是否變成active,效率不高。于是各系統(tǒng)就分別提出了基于異步的系統(tǒng)調(diào)用,例如Linux的epoll,由于在內(nèi)核層面做了支持,所以可以用O(1)的效率查找到active的fd。基本上,libevent就是對(duì)這些高效IO的封裝,提供統(tǒng)一的API,簡(jiǎn)化開(kāi)發(fā)。

原理簡(jiǎn)介

libevent默認(rèn)情況下是單線(xiàn)程的,可以配置成多線(xiàn)程,每個(gè)線(xiàn)程有且只有一個(gè)event_base,對(duì)應(yīng)一個(gè)struct event_base結(jié)構(gòu)體以及附于其上的事件管理器,用來(lái)調(diào)度托管給它的一系列event,可以和操作系統(tǒng)的進(jìn)程管理類(lèi)比。當(dāng)一個(gè)事件發(fā)生后,event_base會(huì)在合適的時(shí)間,不一定是立即去調(diào)用綁定在這個(gè)事件上的函數(shù),直到這個(gè)函數(shù)執(zhí)行完,再去調(diào)度其他的事件。

//創(chuàng)建一個(gè)event_base
struct event_base *base = event_base_new();
assert(base != NULL);

event_base內(nèi)部有一個(gè)循環(huán),循環(huán)阻塞在epoll等系統(tǒng)調(diào)用上,直到有一個(gè)/一些時(shí)間發(fā)生,然后去處理這些事件。當(dāng)然,這些事件要被綁定在這個(gè)event_base上,每個(gè)事件對(duì)應(yīng)一個(gè)struct event,可以是監(jiān)聽(tīng)一個(gè)fd或者信號(hào)量之類(lèi)的,struct event使用event_new來(lái)創(chuàng)建和綁定,使用event_add來(lái)將event綁定到event_base中。

// 創(chuàng)建并綁定一個(gè)event
struct event* listen_event;

//參數(shù):event_base,監(jiān)聽(tīng)的對(duì)象,需要監(jiān)聽(tīng)的事件,事件發(fā)生后的回調(diào)函數(shù),傳給回調(diào)函數(shù)的參數(shù)
listen_event = event_new(base, listener, EV_READ | EV_PERSIST, callback_func, (void*)base);
//參數(shù):event,超時(shí)時(shí)間,NULL表示無(wú)超時(shí)設(shè)置
event_add(listen_event, NULL);

注:libevent支持的事件及屬性包括(使用bitfield實(shí)現(xiàn))

  1. EV_TIMEOUT:超時(shí);
  2. EV_READ:只要網(wǎng)絡(luò)緩沖中還有數(shù)據(jù),回調(diào)函數(shù)就會(huì)被觸發(fā);
  3. EV_WRITE:只要塞給網(wǎng)絡(luò)緩沖的數(shù)據(jù)被寫(xiě)完,回調(diào)函數(shù)就會(huì)被觸發(fā);
  4. EV_SIGNAL:POSIX信號(hào)量;
  5. EV_PERSIST:不指定這個(gè)屬性,回調(diào)函數(shù)被觸發(fā)后事件會(huì)被刪除;
  6. EV_ET:Edge-Trigger邊緣觸發(fā)(這個(gè)還不懂是什么意思)

然后啟動(dòng)event_base的循環(huán),開(kāi)始處理事件。循環(huán)地啟動(dòng)使用event_base_dispatch,循環(huán)將一直持續(xù),找到不再有需要關(guān)注的事件,或者是遇到event_loopbreak()/event_loopexit()函數(shù)。

//啟動(dòng)循環(huán),開(kāi)始處理事件
event_base_dispatch(base);

接下來(lái)再來(lái)關(guān)注事件發(fā)生時(shí)的回調(diào)函數(shù)callback_func,callback_func的原型如下所示

typedef void(* event_callback_fn)(evutil_socket_t sockfd, short event_type, void *arg)

傳給callback_func的是一個(gè)監(jiān)聽(tīng)的fd,監(jiān)聽(tīng)的事件類(lèi)型,以及event_new中最后一個(gè)參數(shù)。在上述程序中,是將event_base傳給了callback_func,實(shí)際中更常用的是構(gòu)造一個(gè)結(jié)構(gòu)體,把需要傳給回調(diào)函數(shù)的參數(shù)都放進(jìn)來(lái),然后傳給event_new,event_new再傳給回調(diào)函數(shù)。

所以總結(jié)一下,對(duì)于一個(gè)服務(wù)器而言,流程大致如下:

  1. 獲取待監(jiān)聽(tīng)的內(nèi)容的fd;
  2. 創(chuàng)建一個(gè)event_base;
  3. 創(chuàng)建一個(gè)event,指定待監(jiān)聽(tīng)的fd,待監(jiān)聽(tīng)事件的類(lèi)型,以及事件放生時(shí)的回調(diào)函數(shù)及傳給回調(diào)函數(shù)的參數(shù);
  4. 將event添加到event_base的事件管理器中;
  5. 開(kāi)啟event_base的事件處理循環(huán);
  6. (異步)當(dāng)事件發(fā)生的時(shí)候,調(diào)用前面設(shè)置的回調(diào)函數(shù)。

簡(jiǎn)易版QuickStart

下面的代碼實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的echo server,server啟動(dòng)后,client端啟動(dòng)并連接,在cmd中輸入文字,server端收到后,將文字再返回給client。
server端代碼:

/**
You need libevent2 to compile this piece of code
Please see: http://libevent.org/
Or you can simply run this command to install on Mac: brew install libevent
Cmd to compile this piece of code: g++ LibeventQuickStartServer.c  -o  LibeventQuickStartServer /usr/local/lib/libevent.a
**/
#include<stdio.h>  
#include<string.h>  
#include<errno.h>  
  
#include<unistd.h>  
#include<event.h>

void accept_cb(int fd, short events, void* arg);
void socket_read_cb(int fd, short events, void* arg);

int tcp_server_init(int port, int listen_num);

int main(int argc, char const *argv[])
{
    /* code */
    int listener = tcp_server_init(9999, 10);
    if (listener == -1)
    {
        perror("tcp_server_init error");
        return -1;
    }

    struct event_base* base = event_base_new();

    // 監(jiān)聽(tīng)客戶(hù)端請(qǐng)求鏈接事件
    struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST, accept_cb, base);

    event_add(ev_listen, NULL);

    event_base_dispatch(base);

    return 0;
}

void accept_cb(int fd, short events, void* arg)
{
    evutil_socket_t sockfd;

    struct sockaddr_in client;
    socklen_t len = sizeof(client);

    sockfd = ::accept(fd, (struct sockaddr*)&client, &len);
    evutil_make_socket_nonblocking(sockfd);

    printf("accept a client %d\n", sockfd);

    struct event_base* base = (event_base*)arg;

    //動(dòng)態(tài)創(chuàng)建一個(gè)event結(jié)構(gòu)體,并將其作為回調(diào)參數(shù)傳遞給
    struct event* ev = event_new(NULL, -1, 0, NULL, NULL);
    event_assign(ev, base, sockfd, EV_READ | EV_PERSIST, socket_read_cb, (void*)ev);

    event_add(ev, NULL);
}


void socket_read_cb(int fd, short events, void* arg)
{
    char msg[4096];
    struct event* ev = (struct event*)arg;
    int len = read(fd, msg, sizeof(msg) - 1);

    if(len <= 0)
    {
        printf("some error happen when read\n");
        event_free(ev);
        close(fd);
        return;
    }

    msg[len] = '\0';
    printf("recv the client msg : %s\n", msg);

    char reply_msg[4096] = "I have received the msg: ";
    strcat(reply_msg + strlen(reply_msg), msg);

    write(fd, reply_msg, strlen(reply_msg));
}

typedef struct sockaddr SA;  
int tcp_server_init(int port, int listen_num)  
{  
    int errno_save;  
    evutil_socket_t listener;  
  
    listener = ::socket(AF_INET, SOCK_STREAM, 0);  
    if( listener == -1 )  
        return -1;  
  
    //允許多次綁定同一個(gè)地址。要用在socket和bind之間  
    evutil_make_listen_socket_reuseable(listener);  
  
    struct sockaddr_in sin;  
    sin.sin_family = AF_INET;  
    sin.sin_addr.s_addr = 0;  
    sin.sin_port = htons(port);  
  
    if( ::bind(listener, (SA*)&sin, sizeof(sin)) < 0 )  
        goto error;  
  
    if( ::listen(listener, listen_num) < 0)  
        goto error;  
  
  
    //跨平臺(tái)統(tǒng)一接口,將套接字設(shè)置為非阻塞狀態(tài)  
    evutil_make_socket_nonblocking(listener);  
  
    return listener;  
  
    error:  
        errno_save = errno;  
        evutil_closesocket(listener);  
        errno = errno_save;  
  
        return -1;  
}  

client端代碼:


/**
You need libevent2 to compile this piece of code
Please see: http://libevent.org/
Or you can simply run this command to install on Mac: brew install libevent
Cmd to compile this piece of code: g++ LibeventQuickStartClient.c -o LibeventQuickStartClient /usr/local/lib/libevent.a
**/
#include<sys/types.h>  
#include<sys/socket.h>  
#include<netinet/in.h>  
#include<arpa/inet.h>  
#include<errno.h>  
#include<unistd.h>  
  
#include<stdio.h>  
#include<string.h>  
#include<stdlib.h>  
  
#include<event.h>  
#include<event2/util.h>  
  
  
  
  
int tcp_connect_server(const char* server_ip, int port);  
  
  
void cmd_msg_cb(int fd, short events, void* arg);  
void socket_read_cb(int fd, short events, void *arg);  
  
int main(int argc, char** argv)  
{  
    if( argc < 3 )  
    {  
        printf("please input 2 parameter\n");  
        return -1;  
    }  
  
  
    //兩個(gè)參數(shù)依次是服務(wù)器端的IP地址、端口號(hào)  
    int sockfd = tcp_connect_server(argv[1], atoi(argv[2]));  
    if( sockfd == -1)  
    {  
        perror("tcp_connect error ");  
        return -1;  
    }  
  
    printf("connect to server successful\n");  
  
    struct event_base* base = event_base_new();  
  
    struct event *ev_sockfd = event_new(base, sockfd,  
                                        EV_READ | EV_PERSIST,  
                                        socket_read_cb, NULL);  
    event_add(ev_sockfd, NULL);  
  
    //監(jiān)聽(tīng)終端輸入事件  
    struct event* ev_cmd = event_new(base, STDIN_FILENO,  
                                      EV_READ | EV_PERSIST, cmd_msg_cb,  
                                      (void*)&sockfd);  
  
  
    event_add(ev_cmd, NULL);  
  
    event_base_dispatch(base);  
  
    printf("finished \n");  
    return 0;  
}  
  
  
  
  
  
  
void cmd_msg_cb(int fd, short events, void* arg)  
{  
    char msg[1024];  
  
    int ret = read(fd, msg, sizeof(msg));  
    if( ret <= 0 )  
    {  
        perror("read fail ");  
        exit(1);  
    }  
  
    int sockfd = *((int*)arg);  
  
    //把終端的消息發(fā)送給服務(wù)器端  
    //為了簡(jiǎn)單起見(jiàn),不考慮寫(xiě)一半數(shù)據(jù)的情況  
    write(sockfd, msg, ret);  
}  
  
  
void socket_read_cb(int fd, short events, void *arg)  
{  
    char msg[1024];  
  
    //為了簡(jiǎn)單起見(jiàn),不考慮讀一半數(shù)據(jù)的情況  
    int len = read(fd, msg, sizeof(msg)-1);  
    if( len <= 0 )  
    {  
        perror("read fail ");  
        exit(1);  
    }  
  
    msg[len] = '\0';  
  
    printf("recv %s from server\n", msg);  
}  
  
  
  
typedef struct sockaddr SA;  
int tcp_connect_server(const char* server_ip, int port)  
{  
    int sockfd, status, save_errno;  
    struct sockaddr_in server_addr;  
  
    memset(&server_addr, 0, sizeof(server_addr) );  
  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_port = htons(port);  
    status = inet_aton(server_ip, &server_addr.sin_addr);  
  
    if( status == 0 ) //the server_ip is not valid value  
    {  
        errno = EINVAL;  
        return -1;  
    }  
  
    sockfd = ::socket(PF_INET, SOCK_STREAM, 0);  
    if( sockfd == -1 )  
        return sockfd;  
  
  
    status = ::connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );  
  
    if( status == -1 )  
    {  
        save_errno = errno;  
        ::close(sockfd);  
        errno = save_errno; //the close may be error  
        return -1;  
    }  
  
    evutil_make_socket_nonblocking(sockfd);  
  
    return sockfd;  
}  

程序運(yùn)行截圖

客戶(hù)端


Paste_Image.png

服務(wù)端

Paste_Image.png

使用BufferEvent

在上面的代碼中,client的cmd中有信息輸入時(shí),client直接將數(shù)據(jù)寫(xiě)入到fd中,server中收到信息后,也是直接將信息寫(xiě)入到fd中,因?yàn)閒d是非阻塞的,所以不能保證正確。那么需要一個(gè)自己管理的緩存來(lái)管理自己的數(shù)據(jù)。那么步驟將稍微有些變化,如下所示:

  1. 設(shè)置scokfd為nonblocking;
  2. 使用bufferevent_socket_new創(chuàng)建一個(gè)struct bufferevent* bev,關(guān)聯(lián)上面的sockfd,并托管給event_base;
  3. 使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void*)arg);
  4. 使用buffevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)來(lái)啟動(dòng)read/write事件

代碼如下所示:

使用bufferevent的server端代碼

#include<stdio.h>
#include<string.h>
#include<errno.h>

#include<event.h>
#include<event2/bufferevent.h>



void accept_cb(int fd, short events, void* arg);
void socket_read_cb(bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
int tcp_server_init(int port, int listen_num);

int main(int argc, char** argv)
{

    int listener = tcp_server_init(9999, 10);
    if( listener == -1 )
    {
        perror(" tcp_server_init error ");
        return -1;
    }

    struct event_base* base = event_base_new();

    //添加監(jiān)聽(tīng)客戶(hù)端請(qǐng)求連接事件
    struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST,
                                        accept_cb, base);
    event_add(ev_listen, NULL);


    event_base_dispatch(base);
    event_base_free(base);


    return 0;
}



void accept_cb(int fd, short events, void* arg)
{
    evutil_socket_t sockfd;

    struct sockaddr_in client;
    socklen_t len = sizeof(client);

    sockfd = ::accept(fd, (struct sockaddr*)&client, &len );
    evutil_make_socket_nonblocking(sockfd);

    printf("accept a client %d\n", sockfd);

    struct event_base* base = (event_base*)arg;

    bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg);

    bufferevent_enable(bev, EV_READ | EV_PERSIST);
}



void socket_read_cb(bufferevent* bev, void* arg)
{
    char msg[4096];

    size_t len = bufferevent_read(bev, msg, sizeof(msg));

    msg[len] = '\0';
    printf("recv the client msg: %s", msg);


    char reply_msg[4096] = "I have recvieced the msg: ";

    strcat(reply_msg + strlen(reply_msg), msg);
    bufferevent_write(bev, reply_msg, strlen(reply_msg));
}



void event_cb(struct bufferevent *bev, short event, void *arg)
{

    if (event & BEV_EVENT_EOF)
        printf("connection closed\n");
    else if (event & BEV_EVENT_ERROR)
        printf("some other error\n");

    //這將自動(dòng)close套接字和free讀寫(xiě)緩沖區(qū)
    bufferevent_free(bev);
}


typedef struct sockaddr SA;
int tcp_server_init(int port, int listen_num)
{
    int errno_save;
    evutil_socket_t listener;

    listener = ::socket(AF_INET, SOCK_STREAM, 0);
    if( listener == -1 )
        return -1;

    //允許多次綁定同一個(gè)地址。要用在socket和bind之間
    evutil_make_listen_socket_reuseable(listener);

    struct sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(port);

    if( ::bind(listener, (SA*)&sin, sizeof(sin)) < 0 )
        goto error;

    if( ::listen(listener, listen_num) < 0)
        goto error;


    //跨平臺(tái)統(tǒng)一接口,將套接字設(shè)置為非阻塞狀態(tài)
    evutil_make_socket_nonblocking(listener);

    return listener;

    error:
        errno_save = errno;
        evutil_closesocket(listener);
        errno = errno_save;

        return -1;
}

使用bufferevent的client端代碼

#include<sys/types.h>  
#include<sys/socket.h>  
#include<netinet/in.h>  
#include<arpa/inet.h>  
#include<errno.h>  
#include<unistd.h>  
  
#include<stdio.h>  
#include<string.h>  
#include<stdlib.h>  
  
#include<event.h>  
#include<event2/bufferevent.h>  
#include<event2/buffer.h>  
#include<event2/util.h>  
  
  
  
  
int tcp_connect_server(const char* server_ip, int port);  
  
  
void cmd_msg_cb(int fd, short events, void* arg);  
void server_msg_cb(struct bufferevent* bev, void* arg);  
void event_cb(struct bufferevent *bev, short event, void *arg);  
  
int main(int argc, char** argv)  
{  
    if( argc < 3 )  
    {  
        printf("please input 2 parameter\n");  
        return -1;  
    }  
  
  
    //兩個(gè)參數(shù)依次是服務(wù)器端的IP地址、端口號(hào)  
    int sockfd = tcp_connect_server(argv[1], atoi(argv[2]));  
    if( sockfd == -1)  
    {  
        perror("tcp_connect error ");  
        return -1;  
    }  
  
    printf("connect to server successful\n");  
  
    struct event_base* base = event_base_new();  
  
    struct bufferevent* bev = bufferevent_socket_new(base, sockfd,  
                                                     BEV_OPT_CLOSE_ON_FREE);  
  
    //監(jiān)聽(tīng)終端輸入事件  
    struct event* ev_cmd = event_new(base, STDIN_FILENO,  
                                      EV_READ | EV_PERSIST, cmd_msg_cb,  
                                      (void*)bev);  
    event_add(ev_cmd, NULL);  
  
    //當(dāng)socket關(guān)閉時(shí)會(huì)用到回調(diào)參數(shù)  
    bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, (void*)ev_cmd);  
    bufferevent_enable(bev, EV_READ | EV_PERSIST);  
  
  
    event_base_dispatch(base);  
  
    printf("finished \n");  
    return 0;  
}  
  
  
  
  
  
  
void cmd_msg_cb(int fd, short events, void* arg)  
{  
    char msg[1024];  
  
    int ret = read(fd, msg, sizeof(msg));  
    if( ret < 0 )  
    {  
        perror("read fail ");  
        exit(1);  
    }  
  
    struct bufferevent* bev = (struct bufferevent*)arg;  
  
    //把終端的消息發(fā)送給服務(wù)器端  
    bufferevent_write(bev, msg, ret);  
}  
  
  
void server_msg_cb(struct bufferevent* bev, void* arg)  
{  
    char msg[1024];  
  
    size_t len = bufferevent_read(bev, msg, sizeof(msg));  
    msg[len] = '\0';  
  
    printf("recv %s from server\n", msg);  
}  
  
  
void event_cb(struct bufferevent *bev, short event, void *arg)  
{  
  
    if (event & BEV_EVENT_EOF)  
        printf("connection closed\n");  
    else if (event & BEV_EVENT_ERROR)  
        printf("some other error\n");  
  
    //這將自動(dòng)close套接字和free讀寫(xiě)緩沖區(qū)  
    bufferevent_free(bev);  
  
    struct event *ev = (struct event*)arg;  
    //因?yàn)閟ocket已經(jīng)沒(méi)有,所以這個(gè)event也沒(méi)有存在的必要了  
    event_free(ev);  
}  
  
  
typedef struct sockaddr SA;  
int tcp_connect_server(const char* server_ip, int port)  
{  
    int sockfd, status, save_errno;  
    struct sockaddr_in server_addr;  
  
    memset(&server_addr, 0, sizeof(server_addr) );  
  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_port = htons(port);  
    status = inet_aton(server_ip, &server_addr.sin_addr);  
  
    if( status == 0 ) //the server_ip is not valid value  
    {  
        errno = EINVAL;  
        return -1;  
    }  
  
    sockfd = ::socket(PF_INET, SOCK_STREAM, 0);  
    if( sockfd == -1 )  
        return sockfd;  
  
  
    status = ::connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );  
  
    if( status == -1 )  
    {  
        save_errno = errno;  
        ::close(sockfd);  
        errno = save_errno; //the close may be error  
        return -1;  
    }  
  
    evutil_make_socket_nonblocking(sockfd);  
  
    return sockfd;  
}  

歡迎留言交流學(xué)習(xí)
參考鏈接:
https://www.felix021.com/blog/read.php?2068
http://blog.csdn.net/luotuo44/article/details/39670221

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ?追根究底?Libevent內(nèi)部實(shí)現(xiàn)原理初探 Libevent確實(shí)方便了開(kāi)發(fā)人員,對(duì)于定時(shí)器、信號(hào)處理、關(guān)心的文件...
    meng_philip123閱讀 4,842評(píng)論 0 4
  • 名稱(chēng) libev - 一個(gè) C 編寫(xiě)的功能全面的高性能事件循環(huán)。 概要 示例程序 關(guān)于 libev Libev 是...
    hanpfei閱讀 15,530評(píng)論 0 5
  • 原文 關(guān)鍵數(shù)據(jù)結(jié)構(gòu) CQ_ITEM 可以將這個(gè)結(jié)構(gòu)體看著是主線(xiàn)程accept觸發(fā)時(shí)即有客戶(hù)端連入時(shí),主線(xiàn)程寫(xiě)入工作...
    lcode閱讀 2,138評(píng)論 0 1
  • 大綱 一.Socket簡(jiǎn)介 二.BSD Socket編程準(zhǔn)備 1.地址 2.端口 3.網(wǎng)絡(luò)字節(jié)序 4.半相關(guān)與全相...
    y角閱讀 2,655評(píng)論 2 11
  • 你知道么,這一整天我都恍恍惚惚,我想做點(diǎn)什么,忙起來(lái),讓自己不要想,于是即使爸媽讓我不要跳舞,擔(dān)心我的身體,我還是...
    蝸牛其實(shí)不想有觸角閱讀 486評(píng)論 0 0

友情鏈接更多精彩內(nèi)容