知識(shí)點(diǎn)
- 時(shí)間事件分為定時(shí)事件與周期事件;
- 時(shí)間事件與文件事件是合作關(guān)系,不會(huì)出現(xiàn)搶占的情況;
- 時(shí)間事件要比預(yù)設(shè)的更晚一些。
- Redis的時(shí)間事件分為以下兩類
1.定時(shí)事件:讓一段程序在指定的時(shí)間之后執(zhí)行一次
2.周期性事件:讓一段程序每隔指定時(shí)間就執(zhí)行一次
一個(gè)時(shí)間事件主要由以下三個(gè)屬性組成:
- id(全局唯一id)
- when (毫秒時(shí)間戳,記錄了時(shí)間事件的到達(dá)時(shí)間)
- timeProc(時(shí)間事件處理器,當(dāng)時(shí)間到達(dá)時(shí),Redis就會(huì)調(diào)用相應(yīng)的處理器來處理事件)
/* Time event structure
*
* 時(shí)間事件結(jié)構(gòu)
*/
typedef struct aeTimeEvent {
// 時(shí)間事件的唯一標(biāo)識(shí)符
long long id; /* time event identifier. */
// 事件的到達(dá)時(shí)間,存貯的是UNIX的時(shí)間戳
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件處理函數(shù),當(dāng)?shù)竭_(dá)指定時(shí)間后調(diào)用該函數(shù)處理對(duì)應(yīng)的問題
aeTimeProc *timeProc;
// 事件釋放函數(shù)
aeEventFinalizerProc *finalizerProc;
// 多路復(fù)用庫的私有數(shù)據(jù)
void *clientData;
// 指向下個(gè)時(shí)間事件結(jié)構(gòu),形成鏈表
struct aeTimeEvent *next;
} aeTimeEvent;
在結(jié)構(gòu)里面很容易的得出時(shí)間時(shí)間的結(jié)構(gòu)是一個(gè)無序鏈表。
Redis將時(shí)間事件都放在一個(gè)無序鏈表中,每當(dāng)時(shí)間事件執(zhí)行器運(yùn)行時(shí),它就遍歷整個(gè)鏈表,查找所有已到達(dá)的時(shí)間事件,并調(diào)用相應(yīng)的事件處理器。
時(shí)間事件的API
創(chuàng)建新的時(shí)間事件,新的時(shí)間事件是總是處在時(shí)間事件的表頭
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 更新時(shí)間計(jì)數(shù)器
long long id = eventLoop->timeEventNextId++;
// 創(chuàng)建時(shí)間事件結(jié)構(gòu)
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 設(shè)置 ID
te->id = id;
// 設(shè)定處理事件的時(shí)間
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// 設(shè)置事件處理器
te->timeProc = proc;
te->finalizerProc = finalizerProc;
// 設(shè)置私有數(shù)據(jù)
te->clientData = clientData;
// 將新事件放入表頭
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
刪除給定ID的時(shí)間事件:
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
aeTimeEvent *te, *prev = NULL;
// 遍歷鏈表
te = eventLoop->timeEventHead;
while(te) {
// 發(fā)現(xiàn)目標(biāo)事件,刪除
if (te->id == id) {
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
// 執(zhí)行清理處理器
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
// 釋放時(shí)間事件
zfree(te);
return AE_OK;
}
prev = te;
te = te->next;
}
return AE_ERR; /* NO event with the specified ID found */
}
返回距離當(dāng)前時(shí)間最近的的時(shí)間事件
// 尋找里目前時(shí)間最近的時(shí)間事件
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
時(shí)間事件處理器
與文件事件不同,時(shí)間事件處理器只有一個(gè),用來處理到達(dá)其指定時(shí)間的時(shí)間事件,并判斷其是否需要繼續(xù)循環(huán),即周期事件:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
//獲取當(dāng)前時(shí)間
time_t now = time(NULL);
// 通過重置事件的運(yùn)行時(shí)間,
// 防止因時(shí)間穿插(skew)而造成的事件處理混亂
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次處理時(shí)間事件的時(shí)間
eventLoop->lastTime = now;
// 遍歷鏈表
// 執(zhí)行那些已經(jīng)到達(dá)的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
// 跳過無效事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 獲取當(dāng)前時(shí)間
aeGetTime(&now_sec, &now_ms);
// 如果當(dāng)前時(shí)間等于或等于事件的執(zhí)行時(shí)間,那么說明事件已到達(dá),執(zhí)行這個(gè)事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 執(zhí)行事件處理器,并獲取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 記錄是否有需要循環(huán)執(zhí)行這個(gè)事件時(shí)間
if (retval != AE_NOMORE) {
// 是的, retval 毫秒之后繼續(xù)執(zhí)行這個(gè)時(shí)間事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 不,將這個(gè)事件刪除
aeDeleteTimeEvent(eventLoop, id);
}
// 因?yàn)閳?zhí)行事件之后,事件列表可能已經(jīng)被改變了
// 因此需要將 te 放回表頭,繼續(xù)開始執(zhí)行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
時(shí)間事件的調(diào)度
時(shí)間事件是與文件事件由同一個(gè)事件分派器來調(diào)度的,如果有時(shí)間事件存在的話,文件事件的阻塞時(shí)間將由最近的時(shí)間事件與當(dāng)前時(shí)間的差來決定
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* 沒有事件 */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 獲取最近的時(shí)間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果時(shí)間事件存在的話
// 那么根據(jù)最近可執(zhí)行時(shí)間事件和現(xiàn)在時(shí)間的時(shí)間差來決定文件事件的阻塞時(shí)間
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 計(jì)算距今最近的時(shí)間事件還要多久才能達(dá)到
// 并將該時(shí)間距保存在 tv 結(jié)構(gòu)中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 時(shí)間差小于 0 ,說明事件已經(jīng)可以執(zhí)行了,將秒和毫秒設(shè)為 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 執(zhí)行到這一步,說明沒有時(shí)間事件
// 那么根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞,以及阻塞的時(shí)間長度
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 設(shè)置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到達(dá)為止
tvp = NULL; /* wait forever */
}
}
.......
// 處理文件事件,阻塞時(shí)間由 tvp 決定
......
/* Check time events */
// 執(zhí)行時(shí)間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
這樣調(diào)度的好處是讓時(shí)間事件不會(huì)頻繁的查詢,也不會(huì)讓文件事件阻塞很長的時(shí)間。從代碼中可以看出,時(shí)間事件的執(zhí)行判斷是其指定時(shí)間減去當(dāng)前時(shí)間小于0時(shí)才執(zhí)行的,這樣一來時(shí)間事件的執(zhí)行會(huì)比指定的要晚一些。
時(shí)間事件的應(yīng)用
時(shí)間事件的最主要的應(yīng)用是在redis服務(wù)器需要對(duì)自身的資源與配置進(jìn)行定期的調(diào)整,從而確保服務(wù)器的長久運(yùn)行,這些操作由redis.c中的serverCron函數(shù)實(shí)現(xiàn),主要的工作有清除過期鍵值對(duì)、清理關(guān)閉連接失效的客戶端、更新統(tǒng)計(jì)信息等等:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
updateCachedTime();
// 記錄服務(wù)器執(zhí)行命令的次數(shù)
run_with_period(100) trackOperationsPerSecond();
/* We have just REDIS_LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* 即使服務(wù)器的時(shí)間最終比 1.5 年長也無所謂,
* 對(duì)象系統(tǒng)仍會(huì)正常運(yùn)作,不過一些對(duì)象可能會(huì)比服務(wù)器本身的時(shí)鐘更年輕。
* 不過這要這個(gè)對(duì)象在 1.5 年內(nèi)都沒有被訪問過,才會(huì)出現(xiàn)這種現(xiàn)象。
*
* Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
*
* LRU 時(shí)間的精度可以通過修改 REDIS_LRU_CLOCK_RESOLUTION 常量來改變。
*/
server.lruclock = getLRUClock();
/* Record the max memory used since the server was started. */
// 記錄服務(wù)器的內(nèi)存峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 服務(wù)器進(jìn)程收到 SIGTERM 信號(hào),關(guān)閉服務(wù)器
if (server.shutdown_asap) {
// 嘗試關(guān)閉服務(wù)器
if (prepareForShutdown(0) == REDIS_OK) exit(0);
// 如果關(guān)閉失敗,那么打印 LOG ,并移除關(guān)閉標(biāo)識(shí)
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
/* Show some info about non-empty databases */
// 打印數(shù)據(jù)庫的鍵值對(duì)信息
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
// 可用鍵值對(duì)的數(shù)量
size = dictSlots(server.db[j].dict);
// 已用鍵值對(duì)的數(shù)量
used = dictSize(server.db[j].dict);
// 帶有過期時(shí)間的鍵值對(duì)數(shù)量
vkeys = dictSize(server.db[j].expires);
// 用 LOG 打印數(shù)量
if (used || vkeys) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
/* Show information about connected clients */
// 如果服務(wù)器沒有運(yùn)行在 SENTINEL 模式下,那么打印客戶端的連接信息
if (!server.sentinel_mode) {
run_with_period(5000) {
redisLog(REDIS_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
/* We need to do a few operations on clients asynchronously. */
// 檢查客戶端,關(guān)閉超時(shí)客戶端,并釋放客戶端多余的緩沖區(qū)
clientsCron();
/* Handle background operations on Redis databases. */
// 對(duì)數(shù)據(jù)庫執(zhí)行各種操作
databasesCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果 BGSAVE 和 BGREWRITEAOF 都沒有在執(zhí)行
// 并且有一個(gè) BGREWRITEAOF 在等待,那么執(zhí)行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
// 檢查 BGSAVE 或者 BGREWRITEAOF 是否已經(jīng)執(zhí)行完畢
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
// 接收子進(jìn)程發(fā)來的信號(hào),非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE 執(zhí)行完畢
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF 執(zhí)行完畢
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
// 既然沒有 BGSAVE 或者 BGREWRITEAOF 在執(zhí)行,那么檢查是否需要執(zhí)行它們
// 遍歷所有保存條件,看是否需要執(zhí)行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
// 檢查是否有某個(gè)保存條件已經(jīng)滿足了
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 執(zhí)行 BGSAVE
rdbSaveBackground(server.rdb_filename);
break;
}
}
/* Trigger an AOF rewrite if needed */
// 出發(fā) BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的當(dāng)前大小大于執(zhí)行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次完成 AOF 寫入之后,AOF 文件的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF 文件當(dāng)前的體積相對(duì)于 base 的體積的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 如果增長體積的百分比超過了 growth ,那么執(zhí)行 BGREWRITEAOF
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// 執(zhí)行 BGREWRITEAOF
rewriteAppendOnlyFileBackground();
}
}
}
// 根據(jù) AOF 政策,
// 考慮是否需要將 AOF 緩沖區(qū)中的內(nèi)容寫入到 AOF 文件中
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == REDIS_ERR)
flushAppendOnlyFile(0);
}
/* Close clients that need to be closed asynchronous */
// 關(guān)閉那些需要異步關(guān)閉的客戶端
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
// 復(fù)制函數(shù)
// 重連接主服務(wù)器、向主服務(wù)器發(fā)送 ACK 、判斷數(shù)據(jù)發(fā)送失敗情況、斷開本服務(wù)器超時(shí)的從服務(wù)器,等等
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 如果服務(wù)器運(yùn)行在集群模式下,那么執(zhí)行集群操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服務(wù)器運(yùn)行在 sentinel 模式下,那么執(zhí)行 SENTINEL 的主函數(shù)
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
/* Cleanup expired MIGRATE cached sockets. */
// 集群。。。TODO
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
// 增加 loop 計(jì)數(shù)器
server.cronloops++;
return 1000/server.hz;
}