什么是AE?
首先介紹這里說的的AE指的是redis的網(wǎng)絡(luò)事件庫,標(biāo)準(zhǔn)說明應(yīng)該是:一個(gè)簡單的事件驅(qū)動(dòng)程序庫。
源碼ae.h頭文件說明如下:
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
我們知道這個(gè)程序庫封裝了網(wǎng)絡(luò)模型:evport、epoll、kqueue、select。
那么第一個(gè)問題:redis是如何實(shí)現(xiàn)不同操作系統(tǒng)使用對(duì)應(yīng)的模型呢?
redis是如何實(shí)現(xiàn)不同操作系統(tǒng)使用對(duì)應(yīng)的模型呢?
上代碼:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
上面這部分代碼是在ae.c前面的一段代碼,顯然是通過宏來控制引入不同模型的.c文件實(shí)現(xiàn)的。也就是不同操作系統(tǒng)定義不同的宏,編譯的時(shí)候就編譯對(duì)應(yīng)的.c文件。我們可以進(jìn)一步來看下宏定義,位于config.h:
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
#ifdef __sun
#include <sys/feature_tests.h>
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif
可知:
linux系統(tǒng)使用epoll,apple/Mac/bsd系統(tǒng)使用kqueue,__sun使用evport(宏__sun實(shí)際對(duì)應(yīng)Solaris系統(tǒng)),其他操作系統(tǒng)如windows則使用select。
epoll和kqueue都有聽說過,第二個(gè)問題來了:evport是什么模型?
evport是什么模型?
evport主要API:
-
port_create()
原型:int port_create(void);port_create() 創(chuàng)建一個(gè) Event ports 隊(duì)列,返回一個(gè)文件描述符作為該
Event port 的代表。相似:kqueue(),epoll_create()
-
port_associate()
原型:int port_associate(int port, int source, uintptr_t object,
int events, void *user);port_associate() 將某一個(gè)對(duì)象的特定 event 與 Event port 相關(guān)聯(lián)。當(dāng)
source 為 PORT_SOURCE_FD 時(shí),object 就是文件描述符。events 可以參考
poll 的。user 是一個(gè)用戶自定義的指針,與該 object 相關(guān)的。在
kqueue 和 epoll 也提供了類似的用戶自定義指針。需要注意的是,當(dāng)用 port_get() 取得某個(gè) object 的 event 之后,這個(gè)
object 與 port 也就不再相關(guān)聯(lián)了。如果想繼續(xù)取得這個(gè) object 的 event,
必須再次調(diào)用 port_associate() 將 object 與 port 關(guān)聯(lián)。這種設(shè)計(jì)顯然
是為多線程程序而做的,當(dāng)某一個(gè)線程取得一個(gè) event 之后,object 就從
port 的隊(duì)列中刪掉了,這樣可以保證這個(gè)線程完完整整地處理完這個(gè) event,
不用擔(dān)心別的線程也會(huì)取得這個(gè) event。相似:kevent(),epoll_ctl()
-
port_get()
原型:int port_get(int port, port_event_t *pe, const timespec_t
*timeout);port_get() 每次從 port 中取回一個(gè) event。如果 timeout 參數(shù)為 NULL,
port_get() 會(huì)一直等待,直到某一個(gè) event 到來。pe 用于存儲(chǔ)返回的
event。相似:kevent(),epoll_wait()
-
port_getn()
原型:int port_getn(int port, port_event_t list[], uint_t max,
uint_t *nget, const timespec_t *timeout);port_getn() 與 port_get() 都是用于從 port 中取回 event,不同的是
port_getn() 可以一次取回多個(gè)。list 數(shù)組用于存儲(chǔ)返回的多個(gè) events,
max 為 list 數(shù)組的元素個(gè)數(shù)。timeout 與 port_get() 的一致。需要特別注意的是 nget 參數(shù),這是一個(gè) value-result 參數(shù),也就是傳入
n,告訴內(nèi)核要取得 n 個(gè) event,當(dāng) port_getn() 返回時(shí),getn 的值表示
內(nèi)核實(shí)際取得了多少個(gè)。當(dāng) timeout 為 NULL 時(shí),port_getn() 會(huì)一直等待,
直到確實(shí)取得了 n 個(gè) event 之后才會(huì)返回,這一點(diǎn)是與 kevent() 和
epoll_wait() 很不相同的地方。如果 timeout 不為 NULL,則只要超時(shí)就返
回,不管是不是已經(jīng)取到了 n 個(gè) event。(注:這里忽略了其他可能引起
port_getn() 返回的因素)相似:kevent(),epoll_wait()
總結(jié)evport:實(shí)際也是類似epoll、kqueue的一種事件模型。
再回過頭來,我們看看redis 定義的事件模型是怎么被調(diào)用和工作的?
redis 定義的事件模型是怎么被調(diào)用和工作的?
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
上面定義了事件循環(huán)結(jié)構(gòu)體。它在server中使用如下:
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */
int dynamic_hz; /* Change hz value depending on # of clients. */
int config_hz; /* Configured HZ value. May be different than
the actual 'hz' field value if dynamic-hz
is enabled. */
int hz; /* serverCron() calls frequency in hertz */
redisDb *db;
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;//注意這個(gè)就是aeEventLoop在sever定義的變量
看看它使用的地方:
1)初始化
void initServer(void) {
...省略其它代碼
//初始化
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
//創(chuàng)建一個(gè)定時(shí)事件 每隔1ms進(jìn)行回調(diào)
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
//針對(duì)每個(gè)新連接創(chuàng)建文件事件
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//針對(duì)unix socket創(chuàng)建文件事件
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
//創(chuàng)建一個(gè)針對(duì)pipe的事件
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
}
上面的初始化看到創(chuàng)建了多個(gè)文件事件,這個(gè)文件事件實(shí)際就是用來監(jiān)聽fd的讀寫。
2)循環(huán)處理
int main(int argc, char **argv) {
...
aeSetBeforeSleepProc(server.el,beforeSleep);//設(shè)置事件循環(huán)處理前的回調(diào)
aeSetAfterSleepProc(server.el,afterSleep);//設(shè)置事件循環(huán)處理后的回調(diào)
aeMain(server.el);//設(shè)置事件循環(huán)主要邏輯(包含事件處理邏輯)
aeDeleteEventLoop(server.el);//清除事件循環(huán)對(duì)象
...
}
總結(jié):事件loop對(duì)象在initServer函數(shù)中初始化,在main函數(shù)調(diào)用。通過aeMain中的while函數(shù)不斷處理事件。