Redis AE事件庫研究

什么是AE?

首先介紹這里說的的AE指的是redis的網(wǎng)絡(luò)事件庫,標(biāo)準(zhǔn)說明應(yīng)該是:一個(gè)簡單的事件驅(qū)動(dòng)程序庫。
源碼ae.h頭文件說明如下:

/* A simple event-driven programming library. Originally I wrote this code
 * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
 * it in form of a library for easy reuse.

我們知道這個(gè)程序庫封裝了網(wǎng)絡(luò)模型:evport、epoll、kqueue、select。
那么第一個(gè)問題:redis是如何實(shí)現(xiàn)不同操作系統(tǒng)使用對(duì)應(yīng)的模型呢?

redis是如何實(shí)現(xiàn)不同操作系統(tǒng)使用對(duì)應(yīng)的模型呢?

上代碼:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

上面這部分代碼是在ae.c前面的一段代碼,顯然是通過宏來控制引入不同模型的.c文件實(shí)現(xiàn)的。也就是不同操作系統(tǒng)定義不同的宏,編譯的時(shí)候就編譯對(duì)應(yīng)的.c文件。我們可以進(jìn)一步來看下宏定義,位于config.h:

#ifdef __linux__
#define HAVE_EPOLL 1
#endif

#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif

#ifdef __sun
#include <sys/feature_tests.h>
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif

可知:
linux系統(tǒng)使用epoll,apple/Mac/bsd系統(tǒng)使用kqueue,__sun使用evport(宏__sun實(shí)際對(duì)應(yīng)Solaris系統(tǒng)),其他操作系統(tǒng)如windows則使用select。

epoll和kqueue都有聽說過,第二個(gè)問題來了:evport是什么模型?

evport是什么模型?

evport主要API:

  1. port_create()
    原型:int port_create(void);

    port_create() 創(chuàng)建一個(gè) Event ports 隊(duì)列,返回一個(gè)文件描述符作為該
    Event port 的代表。

    相似:kqueue(),epoll_create()

  2. port_associate()
    原型:int port_associate(int port, int source, uintptr_t object,
    int events, void *user);

    port_associate() 將某一個(gè)對(duì)象的特定 event 與 Event port 相關(guān)聯(lián)。當(dāng)
    source 為 PORT_SOURCE_FD 時(shí),object 就是文件描述符。events 可以參考
    poll 的。user 是一個(gè)用戶自定義的指針,與該 object 相關(guān)的。在
    kqueue 和 epoll 也提供了類似的用戶自定義指針。

    需要注意的是,當(dāng)用 port_get() 取得某個(gè) object 的 event 之后,這個(gè)
    object 與 port 也就不再相關(guān)聯(lián)了。如果想繼續(xù)取得這個(gè) object 的 event,
    必須再次調(diào)用 port_associate() 將 object 與 port 關(guān)聯(lián)。這種設(shè)計(jì)顯然
    是為多線程程序而做的,當(dāng)某一個(gè)線程取得一個(gè) event 之后,object 就從
    port 的隊(duì)列中刪掉了,這樣可以保證這個(gè)線程完完整整地處理完這個(gè) event,
    不用擔(dān)心別的線程也會(huì)取得這個(gè) event。

    相似:kevent(),epoll_ctl()

  3. port_get()
    原型:int port_get(int port, port_event_t *pe, const timespec_t
    *timeout);

    port_get() 每次從 port 中取回一個(gè) event。如果 timeout 參數(shù)為 NULL,
    port_get() 會(huì)一直等待,直到某一個(gè) event 到來。pe 用于存儲(chǔ)返回的
    event。

    相似:kevent(),epoll_wait()

  4. port_getn()
    原型:int port_getn(int port, port_event_t list[], uint_t max,
    uint_t *nget, const timespec_t *timeout);

    port_getn() 與 port_get() 都是用于從 port 中取回 event,不同的是
    port_getn() 可以一次取回多個(gè)。list 數(shù)組用于存儲(chǔ)返回的多個(gè) events,
    max 為 list 數(shù)組的元素個(gè)數(shù)。timeout 與 port_get() 的一致。

    需要特別注意的是 nget 參數(shù),這是一個(gè) value-result 參數(shù),也就是傳入
    n,告訴內(nèi)核要取得 n 個(gè) event,當(dāng) port_getn() 返回時(shí),getn 的值表示
    內(nèi)核實(shí)際取得了多少個(gè)。當(dāng) timeout 為 NULL 時(shí),port_getn() 會(huì)一直等待,
    直到確實(shí)取得了 n 個(gè) event 之后才會(huì)返回,這一點(diǎn)是與 kevent() 和
    epoll_wait() 很不相同的地方。如果 timeout 不為 NULL,則只要超時(shí)就返
    回,不管是不是已經(jīng)取到了 n 個(gè) event。(注:這里忽略了其他可能引起
    port_getn() 返回的因素)

    相似:kevent(),epoll_wait()

總結(jié)evport:實(shí)際也是類似epoll、kqueue的一種事件模型。

再回過頭來,我們看看redis 定義的事件模型是怎么被調(diào)用和工作的?

redis 定義的事件模型是怎么被調(diào)用和工作的?

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

上面定義了事件循環(huán)結(jié)構(gòu)體。它在server中使用如下:

struct redisServer {
    /* General */
    pid_t pid;                  /* Main process pid. */
    char *configfile;           /* Absolute config file path, or NULL */
    char *executable;           /* Absolute executable file path. */
    char **exec_argv;           /* Executable argv vector (copy). */
    int dynamic_hz;             /* Change hz value depending on # of clients. */
    int config_hz;              /* Configured HZ value. May be different than
                                   the actual 'hz' field value if dynamic-hz
                                   is enabled. */
    int hz;                     /* serverCron() calls frequency in hertz */
    redisDb *db;
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    aeEventLoop *el;//注意這個(gè)就是aeEventLoop在sever定義的變量

看看它使用的地方:
1)初始化

void initServer(void) {
    ...省略其它代碼
   //初始化
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
   ...
   //創(chuàng)建一個(gè)定時(shí)事件 每隔1ms進(jìn)行回調(diào)
  /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
 //針對(duì)每個(gè)新連接創(chuàng)建文件事件
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
   //針對(duì)unix socket創(chuàng)建文件事件
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
  //創(chuàng)建一個(gè)針對(duì)pipe的事件
    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }
}

上面的初始化看到創(chuàng)建了多個(gè)文件事件,這個(gè)文件事件實(shí)際就是用來監(jiān)聽fd的讀寫。

2)循環(huán)處理

int main(int argc, char **argv) {
...
aeSetBeforeSleepProc(server.el,beforeSleep);//設(shè)置事件循環(huán)處理前的回調(diào)
aeSetAfterSleepProc(server.el,afterSleep);//設(shè)置事件循環(huán)處理后的回調(diào)
aeMain(server.el);//設(shè)置事件循環(huán)主要邏輯(包含事件處理邏輯)
aeDeleteEventLoop(server.el);//清除事件循環(huán)對(duì)象
...
}

總結(jié):事件loop對(duì)象在initServer函數(shù)中初始化,在main函數(shù)調(diào)用。通過aeMain中的while函數(shù)不斷處理事件。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 名稱 libev - 一個(gè) C 編寫的功能全面的高性能事件循環(huán)。 概要 示例程序 關(guān)于 libev Libev 是...
    hanpfei閱讀 15,551評(píng)論 0 5
  • 大綱 一.Socket簡介 二.BSD Socket編程準(zhǔn)備 1.地址 2.端口 3.網(wǎng)絡(luò)字節(jié)序 4.半相關(guān)與全相...
    VD2012閱讀 2,705評(píng)論 0 5
  • 常用操作以及概念 求助 –help 指令的基本用法與選項(xiàng)介紹。 man man 是 manual 的縮寫,將指令的...
    Fellers閱讀 508評(píng)論 0 3
  • 綜述 最近筆者閱讀并研究redis源碼,在redis客戶端與服務(wù)器端交互這個(gè)內(nèi)容點(diǎn)上,需要參考網(wǎng)上一些文章,但是遺...
    zbdba閱讀 1,371評(píng)論 0 1
  • 原生API select intselect(int numfds, fd_set *readfds, fd_se...
    VD2012閱讀 1,553評(píng)論 0 1

友情鏈接更多精彩內(nèi)容