Redis 源碼分析 (3)網(wǎng)絡(luò)模型

Redis網(wǎng)絡(luò)模型整體采用經(jīng)典的reactor模型,單線程事件循環(huán)+IO多路復(fù)用。

實(shí)現(xiàn)代碼主要在ae.hae.c內(nèi),此外還包括ae_epoll.c、ae_kqueue.c等文件,用于跨平臺使用不同的IO復(fù)用接口。

1.使用哪種IO復(fù)用

config.h文件內(nèi),根據(jù)區(qū)分平臺的編譯宏,來決定使用哪一套IO復(fù)用接口。對于平臺支持的IO復(fù)用接口,按照evport->epoll->kqueue->select的順序進(jìn)行選擇。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

2.相關(guān)結(jié)構(gòu)體

事件循環(huán)接口體是aeEventLoop, 里面封裝了當(dāng)前實(shí)際監(jiān)聽的最大fd,定時(shí)器鏈表等字段。其中,apidata指針指向的結(jié)構(gòu)體里保存的是IO復(fù)用接口的相關(guān)句柄結(jié)構(gòu)等。

typedef struct aeEventLoop {
    // 當(dāng)前最大fd
    int maxfd;   /* highest file descriptor currently registered */
    // 最大可用fd數(shù)量
    int setsize; /* max number of file descriptors tracked */
    // 下一個(gè)可用的定時(shí)器事件ID,每添加一個(gè)定時(shí)器時(shí)間,就遞增1
    long long timeEventNextId;
    // 上一次調(diào)用aeProcessEvents的時(shí)間,用于修正可能發(fā)生的時(shí)間錯誤
    time_t lastTime;     /* Used to detect system clock skew */
    // 所有當(dāng)前監(jiān)聽的事件數(shù)組,在這里主要是網(wǎng)絡(luò)事件
    aeFileEvent *events; /* Registered events */
    // 調(diào)用epoll_wait后有事件觸發(fā)的套接字
    aeFiredEvent *fired; /* Fired events */
    // 定時(shí)器時(shí)間鏈表頭
    aeTimeEvent *timeEventHead;
    // 時(shí)間循環(huán)停止標(biāo)志位
    int stop;
    // 保存對應(yīng)的IO復(fù)用句柄
    void *apidata; /* This is used for polling API specific data */
    // 每次IO復(fù)用輪詢前調(diào)用的回調(diào)
    aeBeforeSleepProc *beforesleep;
    // 每次IO復(fù)用輪詢后的回調(diào)
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

文件事件結(jié)構(gòu)體如下,這里沒有保存實(shí)際fd,這是因?yàn)?code>aeFileEvent是作為數(shù)組元素保存在aeEventLoop里,數(shù)組下表就是fd

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */     // 監(jiān)聽時(shí)間掩碼
    aeFileProc *rfileProc;               // 讀事件處理接口
    aeFileProc *wfileProc;               // 寫時(shí)間處理接口
    void *clientData;                     // 私有數(shù)據(jù),一般是保存指向持有當(dāng)前fd的client的指針
} aeFileEvent;

定時(shí)器事件的結(jié)構(gòu)體如下。該結(jié)構(gòu)體封裝了定時(shí)器執(zhí)行時(shí)間,定時(shí)器鉤子函數(shù)等字段。Redis所有定時(shí)器事件組織成一個(gè)雙向鏈表,在事件循環(huán)的時(shí)候,遍歷整個(gè)定時(shí)器鏈表,檢查定時(shí)器是否到期,到期則執(zhí)行該定時(shí)器內(nèi)的定時(shí)器事件鉤子函數(shù)。

typedef struct aeTimeEvent {
    long long id; /* time event identifier. */           // 定時(shí)器ID
    long when_sec; /* seconds */                         // 到期時(shí)間的秒數(shù)
    long when_ms; /* milliseconds */                     // 到期時(shí)間的毫秒數(shù)
    aeTimeProc *timeProc;                                // 定時(shí)器事件鉤子函數(shù)
    aeEventFinalizerProc *finalizerProc;                 // 定時(shí)器刪除時(shí)的鉤子函數(shù)
    void *clientData;                                    // 指向client的指針
    struct aeTimeEvent *prev;                            // 定時(shí)器鏈表當(dāng)前節(jié)點(diǎn)前向指針
    struct aeTimeEvent *next;                            // 定時(shí)器鏈表當(dāng)前節(jié)點(diǎn)后向指針
} aeTimeEvent;

3.相關(guān)接口匯總

aeEventLoop *aeCreateEventLoop(int setsize);                   // 創(chuàng)建一個(gè)時(shí)間循環(huán)
void aeDeleteEventLoop(aeEventLoop *eventLoop);                  // 銷毀時(shí)間循環(huán)
void aeStop(aeEventLoop *eventLoop);                              // 停止時(shí)間循環(huán)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,          // 為當(dāng)前時(shí)間循環(huán)增加一個(gè)文件事件
        aeFileProc *proc, void *clientData);                                        
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);        // 在當(dāng)前時(shí)間循環(huán)里刪除文件事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);                         // 根據(jù)fd獲取文件事件結(jié)構(gòu)
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,        // 創(chuàng)建一個(gè)定時(shí)器事件
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);                // 在當(dāng)前時(shí)間循環(huán)里刪除指定id的定時(shí)器事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);                 // 處理時(shí)間循環(huán)
int aeWait(int fd, int mask, long long milliseconds);                 // 使用poll接口實(shí)現(xiàn)的針對單個(gè)fd的監(jiān)聽接口
void aeMain(aeEventLoop *eventLoop);                                  // 啟動時(shí)間循環(huán)的入口  
char *aeGetApiName(void);                                               // 獲取當(dāng)前使用的接口名
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);   // 設(shè)置時(shí)間循環(huán)前回調(diào)
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);     // 設(shè)置時(shí)間循環(huán)后回調(diào)
int aeGetSetSize(aeEventLoop *eventLoop);                                             // 獲取最大支持的文件描述符
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);                       // 調(diào)整時(shí)間循環(huán)支持的最大文件描述符

4.事件循環(huán)函數(shù)

aeProcessEvents就是事件循環(huán)的主流程了。這里接口里主要就做了這4項(xiàng)工作:

  1. 遍歷定時(shí)器鏈表,計(jì)算出下一個(gè)定時(shí)器執(zhí)行的時(shí)間,作為時(shí)間輪詢最長的等待時(shí)間。
  2. 調(diào)用eopll_wait(封裝在aeApiPoll接口內(nèi))接口獲取已經(jīng)觸發(fā)的事件。
  3. 對每一項(xiàng)已經(jīng)觸發(fā)的事件,執(zhí)行事件的處理函數(shù)。
  4. 處理完所有文件事件后,遍歷定時(shí)器鏈表處理定時(shí)器事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    // 檢查是否處理文件讀寫事件
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 先找到離當(dāng)前最近需要觸發(fā)的定時(shí)器,計(jì)算出epoll_wait等待時(shí)間
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        // 找到了下一個(gè)將要觸發(fā)的定時(shí)器事件
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            // 找到的這個(gè)定時(shí)器事件還未超期,計(jì)算出等待時(shí)間
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            // 當(dāng)前定時(shí)器已經(jīng)超期了,等待時(shí)間為0,也就是epoll_wait馬上返回
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        // 如果沒有找到下一個(gè)要執(zhí)行的定時(shí)器事件
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            // 如果設(shè)置了AE_DONT_WAIT標(biāo)志,將等待時(shí)間設(shè)置為0,也就是epoll_wait立刻返回
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            // 否則設(shè)置為NULL,標(biāo)識可以一直阻塞來等待
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 調(diào)用epoll_wait
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        // 執(zhí)行時(shí)間循環(huán)后鉤子函數(shù)
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            // 判斷是否調(diào)整處理當(dāng)前FD讀事件和寫時(shí)間的順序
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                // 處理讀事件
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // 處理寫時(shí)間
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    // 處理定時(shí)器事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容