「Redis源碼解讀」—事件(三)時(shí)間事件

知識(shí)點(diǎn)

  1. 時(shí)間事件分為定時(shí)事件與周期事件;
  2. 時(shí)間事件與文件事件是合作關(guān)系,不會(huì)出現(xiàn)搶占的情況;
  3. 時(shí)間事件要比預(yù)設(shè)的更晚一些。
  • Redis的時(shí)間事件分為以下兩類
    1.定時(shí)事件:讓一段程序在指定的時(shí)間之后執(zhí)行一次
    2.周期性事件:讓一段程序每隔指定時(shí)間就執(zhí)行一次

一個(gè)時(shí)間事件主要由以下三個(gè)屬性組成:

  • id(全局唯一id)
  • when (毫秒時(shí)間戳,記錄了時(shí)間事件的到達(dá)時(shí)間)
  • timeProc(時(shí)間事件處理器,當(dāng)時(shí)間到達(dá)時(shí),Redis就會(huì)調(diào)用相應(yīng)的處理器來處理事件)
/* Time event structure
 *
 * 時(shí)間事件結(jié)構(gòu)
 */
typedef struct aeTimeEvent {
    // 時(shí)間事件的唯一標(biāo)識(shí)符
    long long id; /* time event identifier. */
    // 事件的到達(dá)時(shí)間,存貯的是UNIX的時(shí)間戳
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    // 事件處理函數(shù),當(dāng)?shù)竭_(dá)指定時(shí)間后調(diào)用該函數(shù)處理對(duì)應(yīng)的問題
    aeTimeProc *timeProc;
    // 事件釋放函數(shù)
    aeEventFinalizerProc *finalizerProc;
    // 多路復(fù)用庫的私有數(shù)據(jù)
    void *clientData;
    // 指向下個(gè)時(shí)間事件結(jié)構(gòu),形成鏈表
    struct aeTimeEvent *next;
} aeTimeEvent;

在結(jié)構(gòu)里面很容易的得出時(shí)間時(shí)間的結(jié)構(gòu)是一個(gè)無序鏈表。
Redis將時(shí)間事件都放在一個(gè)無序鏈表中,每當(dāng)時(shí)間事件執(zhí)行器運(yùn)行時(shí),它就遍歷整個(gè)鏈表,查找所有已到達(dá)的時(shí)間事件,并調(diào)用相應(yīng)的事件處理器。

時(shí)間事件的API

創(chuàng)建新的時(shí)間事件,新的時(shí)間事件是總是處在時(shí)間事件的表頭

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 更新時(shí)間計(jì)數(shù)器
    long long id = eventLoop->timeEventNextId++;
    // 創(chuàng)建時(shí)間事件結(jié)構(gòu)
    aeTimeEvent *te;
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    // 設(shè)置 ID
    te->id = id;
    // 設(shè)定處理事件的時(shí)間
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    // 設(shè)置事件處理器
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    // 設(shè)置私有數(shù)據(jù)
    te->clientData = clientData;
    // 將新事件放入表頭
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

刪除給定ID的時(shí)間事件:

int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te, *prev = NULL;
    // 遍歷鏈表
    te = eventLoop->timeEventHead;
    while(te) {
        // 發(fā)現(xiàn)目標(biāo)事件,刪除
        if (te->id == id) {
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            // 執(zhí)行清理處理器
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            // 釋放時(shí)間事件
            zfree(te);
            return AE_OK;
        }
        prev = te;
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}

返回距離當(dāng)前時(shí)間最近的的時(shí)間事件

// 尋找里目前時(shí)間最近的時(shí)間事件
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;
    while(te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

時(shí)間事件處理器

與文件事件不同,時(shí)間事件處理器只有一個(gè),用來處理到達(dá)其指定時(shí)間的時(shí)間事件,并判斷其是否需要繼續(xù)循環(huán),即周期事件:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    //獲取當(dāng)前時(shí)間
    time_t now = time(NULL);
    // 通過重置事件的運(yùn)行時(shí)間,
    // 防止因時(shí)間穿插(skew)而造成的事件處理混亂
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 更新最后一次處理時(shí)間事件的時(shí)間
    eventLoop->lastTime = now;
    // 遍歷鏈表
    // 執(zhí)行那些已經(jīng)到達(dá)的事件
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;
        // 跳過無效事件
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 獲取當(dāng)前時(shí)間
        aeGetTime(&now_sec, &now_ms);
        // 如果當(dāng)前時(shí)間等于或等于事件的執(zhí)行時(shí)間,那么說明事件已到達(dá),執(zhí)行這個(gè)事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 執(zhí)行事件處理器,并獲取返回值
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            // 記錄是否有需要循環(huán)執(zhí)行這個(gè)事件時(shí)間
            if (retval != AE_NOMORE) {
                // 是的, retval 毫秒之后繼續(xù)執(zhí)行這個(gè)時(shí)間事件
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 不,將這個(gè)事件刪除
                aeDeleteTimeEvent(eventLoop, id);
            }
            // 因?yàn)閳?zhí)行事件之后,事件列表可能已經(jīng)被改變了
            // 因此需要將 te 放回表頭,繼續(xù)開始執(zhí)行事件
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

時(shí)間事件的調(diào)度

時(shí)間事件是與文件事件由同一個(gè)事件分派器來調(diào)度的,如果有時(shí)間事件存在的話,文件事件的阻塞時(shí)間將由最近的時(shí)間事件與當(dāng)前時(shí)間的差來決定

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    /* 沒有事件 */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 獲取最近的時(shí)間事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果時(shí)間事件存在的話
            // 那么根據(jù)最近可執(zhí)行時(shí)間事件和現(xiàn)在時(shí)間的時(shí)間差來決定文件事件的阻塞時(shí)間
            long now_sec, now_ms;
            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 計(jì)算距今最近的時(shí)間事件還要多久才能達(dá)到
            // 并將該時(shí)間距保存在 tv 結(jié)構(gòu)中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            // 時(shí)間差小于 0 ,說明事件已經(jīng)可以執(zhí)行了,將秒和毫秒設(shè)為 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            // 執(zhí)行到這一步,說明沒有時(shí)間事件
            // 那么根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞,以及阻塞的時(shí)間長度
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 設(shè)置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到達(dá)為止
                tvp = NULL; /* wait forever */
            }
        }
        .......
        // 處理文件事件,阻塞時(shí)間由 tvp 決定
       ......
    /* Check time events */
    // 執(zhí)行時(shí)間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}

這樣調(diào)度的好處是讓時(shí)間事件不會(huì)頻繁的查詢,也不會(huì)讓文件事件阻塞很長的時(shí)間。從代碼中可以看出,時(shí)間事件的執(zhí)行判斷是其指定時(shí)間減去當(dāng)前時(shí)間小于0時(shí)才執(zhí)行的,這樣一來時(shí)間事件的執(zhí)行會(huì)比指定的要晚一些。

時(shí)間事件的應(yīng)用

時(shí)間事件的最主要的應(yīng)用是在redis服務(wù)器需要對(duì)自身的資源與配置進(jìn)行定期的調(diào)整,從而確保服務(wù)器的長久運(yùn)行,這些操作由redis.c中的serverCron函數(shù)實(shí)現(xiàn),主要的工作有清除過期鍵值對(duì)、清理關(guān)閉連接失效的客戶端、更新統(tǒng)計(jì)信息等等:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);
    /* Software watchdog: deliver the SIGALRM that will reach the signal
     * handler if we don't return here fast enough. */
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    /* Update the time cache. */
    updateCachedTime();
    // 記錄服務(wù)器執(zhí)行命令的次數(shù)
    run_with_period(100) trackOperationsPerSecond();
    /* We have just REDIS_LRU_BITS bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock.
     *
     * Note that even if the counter wraps it's not a big problem,
     * everything will still work but some object will appear younger
     * to Redis. However for this to happen a given object should never be
     * touched for all the time needed to the counter to wrap, which is
     * not likely.
     *
     * 即使服務(wù)器的時(shí)間最終比 1.5 年長也無所謂,
     * 對(duì)象系統(tǒng)仍會(huì)正常運(yùn)作,不過一些對(duì)象可能會(huì)比服務(wù)器本身的時(shí)鐘更年輕。
     * 不過這要這個(gè)對(duì)象在 1.5 年內(nèi)都沒有被訪問過,才會(huì)出現(xiàn)這種現(xiàn)象。
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     *
     * LRU 時(shí)間的精度可以通過修改 REDIS_LRU_CLOCK_RESOLUTION 常量來改變。
     */
    server.lruclock = getLRUClock();
    /* Record the max memory used since the server was started. */
    // 記錄服務(wù)器的內(nèi)存峰值
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();
    /* Sample the RSS here since this is a relatively slow call. */
    server.resident_set_size = zmalloc_get_rss();
    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    // 服務(wù)器進(jìn)程收到 SIGTERM 信號(hào),關(guān)閉服務(wù)器
    if (server.shutdown_asap) {
        // 嘗試關(guān)閉服務(wù)器
        if (prepareForShutdown(0) == REDIS_OK) exit(0);
        // 如果關(guān)閉失敗,那么打印 LOG ,并移除關(guān)閉標(biāo)識(shí)
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }
    /* Show some info about non-empty databases */
    // 打印數(shù)據(jù)庫的鍵值對(duì)信息
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;
            // 可用鍵值對(duì)的數(shù)量
            size = dictSlots(server.db[j].dict);
            // 已用鍵值對(duì)的數(shù)量
            used = dictSize(server.db[j].dict);
            // 帶有過期時(shí)間的鍵值對(duì)數(shù)量
            vkeys = dictSize(server.db[j].expires);

            // 用 LOG 打印數(shù)量
            if (used || vkeys) {
                redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    }
    /* Show information about connected clients */
    // 如果服務(wù)器沒有運(yùn)行在 SENTINEL 模式下,那么打印客戶端的連接信息
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            redisLog(REDIS_VERBOSE,
                "%lu clients connected (%lu slaves), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }
    /* We need to do a few operations on clients asynchronously. */
    // 檢查客戶端,關(guān)閉超時(shí)客戶端,并釋放客戶端多余的緩沖區(qū)
    clientsCron();
    /* Handle background operations on Redis databases. */
    // 對(duì)數(shù)據(jù)庫執(zhí)行各種操作
    databasesCron();
    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    // 如果 BGSAVE 和 BGREWRITEAOF 都沒有在執(zhí)行
    // 并且有一個(gè) BGREWRITEAOF 在等待,那么執(zhí)行 BGREWRITEAOF
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }
    /* Check if a background saving or AOF rewrite in progress terminated. */
    // 檢查 BGSAVE 或者 BGREWRITEAOF 是否已經(jīng)執(zhí)行完畢
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;
        // 接收子進(jìn)程發(fā)來的信號(hào),非阻塞
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;    
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
            // BGSAVE 執(zhí)行完畢
            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);

            // BGREWRITEAOF 執(zhí)行完畢
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);

            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            updateDictResizePolicy();
        }
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now */
        // 既然沒有 BGSAVE 或者 BGREWRITEAOF 在執(zhí)行,那么檢查是否需要執(zhí)行它們
        // 遍歷所有保存條件,看是否需要執(zhí)行 BGSAVE 命令
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
            // 檢查是否有某個(gè)保存條件已經(jīng)滿足了
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 REDIS_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == REDIS_OK))
            {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                // 執(zhí)行 BGSAVE
                rdbSaveBackground(server.rdb_filename);
                break;
            }
         }
         /* Trigger an AOF rewrite if needed */
        // 出發(fā) BGREWRITEAOF
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             // AOF 文件的當(dāng)前大小大于執(zhí)行 BGREWRITEAOF 所需的最小大小
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            // 上一次完成 AOF 寫入之后,AOF 文件的大小
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            // AOF 文件當(dāng)前的體積相對(duì)于 base 的體積的百分比
            long long growth = (server.aof_current_size*100/base) - 100;
            // 如果增長體積的百分比超過了 growth ,那么執(zhí)行 BGREWRITEAOF
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                // 執(zhí)行 BGREWRITEAOF
                rewriteAppendOnlyFileBackground();
            }
         }
    }
    // 根據(jù) AOF 政策,
    // 考慮是否需要將 AOF 緩沖區(qū)中的內(nèi)容寫入到 AOF 文件中
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == REDIS_ERR)
            flushAppendOnlyFile(0);
    }
    /* Close clients that need to be closed asynchronous */
    // 關(guān)閉那些需要異步關(guān)閉的客戶端
    freeClientsInAsyncFreeQueue();
    /* Clear the paused clients flag if needed. */
    clientsArePaused(); /* Don't check return value, just use the side effect. */
    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    // 復(fù)制函數(shù)
    // 重連接主服務(wù)器、向主服務(wù)器發(fā)送 ACK 、判斷數(shù)據(jù)發(fā)送失敗情況、斷開本服務(wù)器超時(shí)的從服務(wù)器,等等
    run_with_period(1000) replicationCron();
    /* Run the Redis Cluster cron. */
    // 如果服務(wù)器運(yùn)行在集群模式下,那么執(zhí)行集群操作
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }
    /* Run the Sentinel timer if we are in sentinel mode. */
    // 如果服務(wù)器運(yùn)行在 sentinel 模式下,那么執(zhí)行 SENTINEL 的主函數(shù)
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }
    /* Cleanup expired MIGRATE cached sockets. */
    // 集群。。。TODO
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }
    // 增加 loop 計(jì)數(shù)器
    server.cronloops++;
    return 1000/server.hz;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ORA-00001: 違反唯一約束條件 (.) 錯(cuò)誤說明:當(dāng)在唯一索引所對(duì)應(yīng)的列上鍵入重復(fù)值時(shí),會(huì)觸發(fā)此異常。 O...
    我想起個(gè)好名字閱讀 5,974評(píng)論 0 9
  • 一. 數(shù)據(jù)結(jié)構(gòu) 我們知道redis有5種基本類型:string、list、hash、set、zset,我們來看一下...
    漂泊的胡蘿卜閱讀 680評(píng)論 1 0
  • Redis服務(wù)器是一個(gè)事件驅(qū)動(dòng)程序,服務(wù)器需要處理以下兩類事件: 文件事件(file event):Redis服務(wù)...
    豬大金閱讀 629評(píng)論 0 1
  • Zookeeper用于集群主備切換。 YARN讓集群具備更好的擴(kuò)展性。 Spark沒有存儲(chǔ)能力。 Spark的Ma...
    Yobhel閱讀 7,603評(píng)論 0 34
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,663評(píng)論 1 32

友情鏈接更多精彩內(nèi)容