Redis網(wǎng)絡(luò)模型整體采用經(jīng)典的reactor模型,單線程事件循環(huán)+IO多路復(fù)用。
實(shí)現(xiàn)代碼主要在ae.h和ae.c內(nèi),此外還包括ae_epoll.c、ae_kqueue.c等文件,用于跨平臺使用不同的IO復(fù)用接口。
1.使用哪種IO復(fù)用
config.h文件內(nèi),根據(jù)區(qū)分平臺的編譯宏,來決定使用哪一套IO復(fù)用接口。對于平臺支持的IO復(fù)用接口,按照evport->epoll->kqueue->select的順序進(jìn)行選擇。
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
2.相關(guān)結(jié)構(gòu)體
事件循環(huán)接口體是aeEventLoop, 里面封裝了當(dāng)前實(shí)際監(jiān)聽的最大fd,定時(shí)器鏈表等字段。其中,apidata指針指向的結(jié)構(gòu)體里保存的是IO復(fù)用接口的相關(guān)句柄結(jié)構(gòu)等。
typedef struct aeEventLoop {
// 當(dāng)前最大fd
int maxfd; /* highest file descriptor currently registered */
// 最大可用fd數(shù)量
int setsize; /* max number of file descriptors tracked */
// 下一個(gè)可用的定時(shí)器事件ID,每添加一個(gè)定時(shí)器時(shí)間,就遞增1
long long timeEventNextId;
// 上一次調(diào)用aeProcessEvents的時(shí)間,用于修正可能發(fā)生的時(shí)間錯誤
time_t lastTime; /* Used to detect system clock skew */
// 所有當(dāng)前監(jiān)聽的事件數(shù)組,在這里主要是網(wǎng)絡(luò)事件
aeFileEvent *events; /* Registered events */
// 調(diào)用epoll_wait后有事件觸發(fā)的套接字
aeFiredEvent *fired; /* Fired events */
// 定時(shí)器時(shí)間鏈表頭
aeTimeEvent *timeEventHead;
// 時(shí)間循環(huán)停止標(biāo)志位
int stop;
// 保存對應(yīng)的IO復(fù)用句柄
void *apidata; /* This is used for polling API specific data */
// 每次IO復(fù)用輪詢前調(diào)用的回調(diào)
aeBeforeSleepProc *beforesleep;
// 每次IO復(fù)用輪詢后的回調(diào)
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
文件事件結(jié)構(gòu)體如下,這里沒有保存實(shí)際fd,這是因?yàn)?code>aeFileEvent是作為數(shù)組元素保存在aeEventLoop里,數(shù)組下表就是fd。
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 監(jiān)聽時(shí)間掩碼
aeFileProc *rfileProc; // 讀事件處理接口
aeFileProc *wfileProc; // 寫時(shí)間處理接口
void *clientData; // 私有數(shù)據(jù),一般是保存指向持有當(dāng)前fd的client的指針
} aeFileEvent;
定時(shí)器事件的結(jié)構(gòu)體如下。該結(jié)構(gòu)體封裝了定時(shí)器執(zhí)行時(shí)間,定時(shí)器鉤子函數(shù)等字段。Redis所有定時(shí)器事件組織成一個(gè)雙向鏈表,在事件循環(huán)的時(shí)候,遍歷整個(gè)定時(shí)器鏈表,檢查定時(shí)器是否到期,到期則執(zhí)行該定時(shí)器內(nèi)的定時(shí)器事件鉤子函數(shù)。
typedef struct aeTimeEvent {
long long id; /* time event identifier. */ // 定時(shí)器ID
long when_sec; /* seconds */ // 到期時(shí)間的秒數(shù)
long when_ms; /* milliseconds */ // 到期時(shí)間的毫秒數(shù)
aeTimeProc *timeProc; // 定時(shí)器事件鉤子函數(shù)
aeEventFinalizerProc *finalizerProc; // 定時(shí)器刪除時(shí)的鉤子函數(shù)
void *clientData; // 指向client的指針
struct aeTimeEvent *prev; // 定時(shí)器鏈表當(dāng)前節(jié)點(diǎn)前向指針
struct aeTimeEvent *next; // 定時(shí)器鏈表當(dāng)前節(jié)點(diǎn)后向指針
} aeTimeEvent;
3.相關(guān)接口匯總
aeEventLoop *aeCreateEventLoop(int setsize); // 創(chuàng)建一個(gè)時(shí)間循環(huán)
void aeDeleteEventLoop(aeEventLoop *eventLoop); // 銷毀時(shí)間循環(huán)
void aeStop(aeEventLoop *eventLoop); // 停止時(shí)間循環(huán)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, // 為當(dāng)前時(shí)間循環(huán)增加一個(gè)文件事件
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); // 在當(dāng)前時(shí)間循環(huán)里刪除文件事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd); // 根據(jù)fd獲取文件事件結(jié)構(gòu)
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, // 創(chuàng)建一個(gè)定時(shí)器事件
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); // 在當(dāng)前時(shí)間循環(huán)里刪除指定id的定時(shí)器事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags); // 處理時(shí)間循環(huán)
int aeWait(int fd, int mask, long long milliseconds); // 使用poll接口實(shí)現(xiàn)的針對單個(gè)fd的監(jiān)聽接口
void aeMain(aeEventLoop *eventLoop); // 啟動時(shí)間循環(huán)的入口
char *aeGetApiName(void); // 獲取當(dāng)前使用的接口名
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); // 設(shè)置時(shí)間循環(huán)前回調(diào)
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); // 設(shè)置時(shí)間循環(huán)后回調(diào)
int aeGetSetSize(aeEventLoop *eventLoop); // 獲取最大支持的文件描述符
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); // 調(diào)整時(shí)間循環(huán)支持的最大文件描述符
4.事件循環(huán)函數(shù)
aeProcessEvents就是事件循環(huán)的主流程了。這里接口里主要就做了這4項(xiàng)工作:
- 遍歷定時(shí)器鏈表,計(jì)算出下一個(gè)定時(shí)器執(zhí)行的時(shí)間,作為時(shí)間輪詢最長的等待時(shí)間。
- 調(diào)用
eopll_wait(封裝在aeApiPoll接口內(nèi))接口獲取已經(jīng)觸發(fā)的事件。 - 對每一項(xiàng)已經(jīng)觸發(fā)的事件,執(zhí)行事件的處理函數(shù)。
- 處理完所有文件事件后,遍歷定時(shí)器鏈表處理定時(shí)器事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
// 檢查是否處理文件讀寫事件
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 先找到離當(dāng)前最近需要觸發(fā)的定時(shí)器,計(jì)算出epoll_wait等待時(shí)間
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
// 找到了下一個(gè)將要觸發(fā)的定時(shí)器事件
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
// 找到的這個(gè)定時(shí)器事件還未超期,計(jì)算出等待時(shí)間
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
// 當(dāng)前定時(shí)器已經(jīng)超期了,等待時(shí)間為0,也就是epoll_wait馬上返回
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
// 如果沒有找到下一個(gè)要執(zhí)行的定時(shí)器事件
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 如果設(shè)置了AE_DONT_WAIT標(biāo)志,將等待時(shí)間設(shè)置為0,也就是epoll_wait立刻返回
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
// 否則設(shè)置為NULL,標(biāo)識可以一直阻塞來等待
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 調(diào)用epoll_wait
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
// 執(zhí)行時(shí)間循環(huán)后鉤子函數(shù)
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
// 判斷是否調(diào)整處理當(dāng)前FD讀事件和寫時(shí)間的順序
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
// 處理讀事件
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 處理寫時(shí)間
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
// 處理定時(shí)器事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}