Node.js 中使用 Redis 來實(shí)現(xiàn)定時(shí)任務(wù)

原文鏈接:http://xcoder.in/2015/06/05/scheduled-task-using-redis/

好久沒寫博文了,最近在跟隨著公司大牛們的腳步秘密研發(fā)新產(chǎn)品中。

不過前幾天有一個(gè)小需求的東西可以提出來寫一點(diǎn)點(diǎn)小干貨兒跟大家分享分享。米娜桑會(huì)的就可以忽略了,反正我也是隨便寫的;如果覺得本文對(duì)你有用的話還請(qǐng)多多支持喵。(●′ω`●)ゞ

本文所說的定時(shí)任務(wù)或者說計(jì)劃任務(wù)并不是很多人想象中的那樣,比如說每天凌晨三點(diǎn)自動(dòng)運(yùn)行起來跑一個(gè)腳本。這種都已經(jīng)爛大街了,隨便一個(gè) Crontab 就能搞定了。

這里所說的定時(shí)任務(wù)可以說是計(jì)時(shí)器任務(wù),比如說用戶觸發(fā)了某個(gè)動(dòng)作,那么從這個(gè)點(diǎn)開始過二十四小時(shí)我們要對(duì)這個(gè)動(dòng)作做點(diǎn)什么。那么如果有 1000 個(gè)用戶觸發(fā)了這個(gè)動(dòng)作,就會(huì)有 1000 個(gè)定時(shí)任務(wù)。于是這就不是 Cron 范疇里面的內(nèi)容了。

舉個(gè)最簡單的例子,一個(gè)用戶推薦了另一個(gè)用戶,我們定一個(gè)二十四小時(shí)之后的任務(wù),看看被推薦的用戶有沒有來注冊(cè),如果沒注冊(cè)就給他搞一條短信過去。Σ>―(〃°ω°〃)?→

最初的設(shè)想

一開始我是想把這個(gè)計(jì)時(shí)器做在內(nèi)存里面直接調(diào)用的。

考慮到 Node.js 的定時(shí)并不是那么準(zhǔn)確(無論是 setTimeout 還是 setInterval),所以本來打算自己維護(hù)這個(gè)定時(shí)器隊(duì)列。

又考慮到 Node.js 原生對(duì)象比較耗內(nèi)存。之前我用 JSON 對(duì)象存了一本字典,約十二萬多的詞條,原文件大概也就五六兆,用 Node.js 的原生對(duì)象一存居然有五六百兆的內(nèi)存占用——所以打算這個(gè)定時(shí)器隊(duì)列用 C++ 來寫 addon。

考慮到任何時(shí)候插入的任務(wù)都有可能在已有的任務(wù)之前或者之后,所以本來想用 C++ 來寫一個(gè)小根堆。每次用戶來一個(gè)任務(wù)的時(shí)候就將這個(gè)任務(wù)插入到堆中。

如果按照上述方法的話,再加上對(duì)時(shí)間要求掐得也不是那么緊,于是就是一個(gè)不斷的 process.nextTick() 的過程。

process.nextTick() 當(dāng)中執(zhí)行這么一個(gè)函數(shù):

  1. 從小根堆中不斷獲取堆頂?shù)娜蝿?wù)并處理,一直處理到堆頂任務(wù)的執(zhí)行時(shí)間大于當(dāng)前時(shí)間為止。
  2. 繼續(xù) process.nextTick() 來讓下一個(gè) tick 執(zhí)行步驟 1 中的流程。

所以最后就是一邊往小根堆插入任務(wù),另一邊通過不斷 process.nextTick() 消費(fèi)任務(wù)的這么一個(gè)過程。

最后,為了考慮到程序重啟的時(shí)候內(nèi)存數(shù)據(jù)會(huì)丟失,還應(yīng)該做一個(gè)持久化的事情——在每次插入任務(wù)的時(shí)候順便往持久化中間件中插一條副本,比如 MySQL、MongoDB、Redis、Riak 等等任何三方依賴。消費(fèi)任務(wù)的時(shí)候順便把中間件中的這條任務(wù)數(shù)據(jù)給刪除。

也就是說中間件中永遠(yuǎn)存的就是當(dāng)前尚未完成的任務(wù)。每當(dāng)程序重啟的時(shí)候都先從中間件中把所有任務(wù)讀取進(jìn)來重建一下堆,然后就能繼續(xù)工作了。

如果當(dāng)時(shí)我沒有發(fā)現(xiàn) Redis 的這個(gè)妙用的話,上述的流程將會(huì)是我實(shí)現(xiàn)我們定時(shí)任務(wù)的流程了。

Redis 妙用

在 Redis 的 2.8.0 版本之后,其推出了一個(gè)新的特性——鍵空間消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成這個(gè)定時(shí)任務(wù)的操作了,不過定時(shí)的單位是秒

Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是說一邊給 Redis 的特定頻道發(fā)送消息,另一邊從 Redis 的特定頻道取值——形成了一個(gè)簡易的消息隊(duì)列

比如我們可以往 foo 頻道推一個(gè)消息 bar,那么就可以直接:

PUBLISH foo bar

另一邊我們?cè)诳蛻舳擞嗛?foo 頻道就能接受到這個(gè)消息了。

舉個(gè)例子,如果在 Node.js 里面使用 ioredis 這個(gè)包那么看起來就會(huì)像這樣:

var Redis = require("ioredis");
var sub = new Redis(/** 連接信息 */);
sub.once("connect", function() {
    // 假設(shè)我們需要選擇 redis 的 db,因?yàn)閷?shí)際上我們不會(huì)去污染默認(rèn)的 db 0
    sub.select(DB_NUMBER, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 訂閱頻道成功
        });
    });
});

// 監(jiān)聽從 `foo` 來的消息
sub.on("message", function(channel, msg) {
    console.log(channel, msg);
});

Redis Keyspace Notifications

在 Redis 里面有一些事件,比如鍵到期、鍵被刪除等。然后我們可以通過配置一些東西來讓 Redis 一旦觸發(fā)這些事件的時(shí)候就往特定的 Channel 推一條消息。

本文所涉及到的需求的話我們所需要關(guān)心的事件是 EXPIRE 即過期事件。

大致的流程就是我們給 Redis 的某一個(gè) db 設(shè)置過期事件,使其鍵一旦過期就會(huì)往特定頻道推消息,我在自己的客戶端這邊就一直消費(fèi)這個(gè)頻道就好了。

以后一來一條定時(shí)任務(wù),我們就把這個(gè)任務(wù)狀態(tài)壓縮成一個(gè)鍵,并且過期時(shí)間為距這個(gè)任務(wù)執(zhí)行的時(shí)間差。那么當(dāng)鍵一旦到期,就到了任務(wù)該執(zhí)行的時(shí)間,Redis 自然會(huì)把過期消息推去,我們的客戶端就能接收到了。這樣一來就起到了定時(shí)任務(wù)的作用。

消息類型

當(dāng)達(dá)到一定條件后,有兩種類型的這種消息會(huì)被觸發(fā),用哪個(gè)需要自己選了。舉個(gè)例子,我們刪除了在 db 0 中一個(gè)叫 foo 的鍵,那么系統(tǒng)會(huì)往兩個(gè)頻道推消息,一個(gè)是 del 事件頻道推 foo 消息,另一個(gè)是 foo 頻道推 del 消息,它們小倆口被系統(tǒng)推送的指令分別等價(jià)于:

PUBLISH __keyspace@0__:foo del
PUBLISH __keyevent@0__:del foo

其中往 foo 推送 del 的頻道名為 __keyspace@0__:foo,即是 "__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而 del 的頻道名為 "__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME

配置

即使你的 Redis 版本達(dá)標(biāo)了,但是 Redis 默認(rèn)是關(guān)閉這個(gè)功能的,你需要修改配置文件來打開它,或者直接在 CLI 里面通過指令修改。這里就說說配置文件的修改吧。

如果不想看我在這里羅里吧嗦的,也可以直接去看 Redis 的相關(guān)文檔。

首先打開 Redis 的配置文件,在不同的系統(tǒng)和安裝方式下文件位置可能不同,比如通過 brew 安裝的 MacOS 下可能是在 /usr/local/etc/redis.conf 下面,通過 apt-get 安裝的 Ubuntu 下可能是在 /etc/redis/redis.conf 下,總之找到配置文件。或者自己寫一個(gè)配置文件,啟動(dòng)的時(shí)候指定配置文件地址就好。

然后找到一項(xiàng)叫 notify-keyspace-events 的地方,如果找不到則自行添加,其值可以是 ExKlg 等等。這些字母的具體含義如下所示:

  • K,表示 keyspace 事件,有這個(gè)字母表示會(huì)往 __keyspace@<db>__ 頻道推消息。
  • E,表示 keyevent 事件,有這個(gè)字母表示會(huì)往 __keyevent@<db>__ 頻道推消息。
  • g,表示一些通用指令事件支持,如 DEL、EXPIRE、RENAME 等等。
  • $,表示字符串(String)相關(guān)指令的事件支持。
  • l,表示列表(List)相關(guān)指令事件支持。
  • s,表示集合(Set)相關(guān)指令事件支持。
  • h,哈希(Hash)相關(guān)指令事件支持。
  • z,有序集(Sorted Set)相關(guān)指令事件支持。
  • x,過期事件,與 g 中的 EXPIRE 不同的是,gEXPIRE 是指執(zhí)行 EXPIRE key ttl 這條指令的時(shí)候順便觸發(fā)的事件,而這里是指那個(gè) key 剛好過期的這個(gè)時(shí)間點(diǎn)觸發(fā)的事件。
  • e,驅(qū)逐事件,一個(gè) key 由于內(nèi)存上限而被驅(qū)逐的時(shí)候會(huì)觸發(fā)的事件。
  • A,g$lshzxe 的別名。也就是說 AKE 的意思就代表了所有的事件。

結(jié)合上述列表我們就能拼湊出自己所需要的事件支持字符串了,在我的需求中我只需要 Ex 就可以滿足了,所以配置項(xiàng)就是這樣的:

notify-keyspace-events Ex

然后保存配置文件,啟動(dòng) Redis 就啟用了過期事件的支持了。

實(shí)踐

我們先說任務(wù)的創(chuàng)造者吧。由于這里 Redis 的事件只會(huì)傳鍵名,并不會(huì)傳鍵值,而過期事件觸發(fā)的時(shí)候那個(gè)鍵已經(jīng)沒了,你也無法獲取鍵值,加上我的主系統(tǒng)和任務(wù)系統(tǒng)是分布式的,所以就把所有需要的信息往鍵名塞。

一個(gè)最簡單的鍵名設(shè)計(jì)就是 任務(wù)類型 + ":" + JSON.stringify 化后的參數(shù)數(shù)組;更有甚者可以直接把任務(wù)類型替換成所需的函數(shù)路徑,比如需要執(zhí)行這個(gè)任務(wù)的函數(shù)在 task/foo/bar 文件下面的 baz 函數(shù),參數(shù) arguments 數(shù)組為 [ 1, 2 ],那么鍵名的設(shè)計(jì)可以是 task/foo/bar.baz:[1,2],反正我們只需要觸發(fā)這個(gè)鍵,用不著去查詢這個(gè)鍵。等到真正過期了任務(wù)系統(tǒng)接收到這個(gè)鍵名的時(shí)候再一一解析,得到需要執(zhí)行 task/foo/bar.baz 這個(gè)消息,并且網(wǎng)函數(shù)里面?zhèn)魅?[1,2] 這個(gè) arguments

所以當(dāng)接收到一個(gè)定時(shí)任務(wù)的時(shí)候,我們得到消息、函數(shù)名、過期時(shí)間參數(shù),這個(gè)函數(shù)可以如下設(shè)計(jì):

/** 我們假設(shè) redis 是一個(gè) ioredis 的對(duì)象 */

var sampleTaskMaker = function(message, func, timeout) {
    message = JSON.stringify(message);
    console.log("Received a new task:", func, message, "after " + timeout + ".");

    // 這里的 uuid 是 npm 一個(gè)包
    // 生成一個(gè)唯一 uuid 的目的是為了防止兩個(gè)任務(wù)用了相同的函數(shù)和參數(shù),那么
    // 鍵名可能會(huì)重復(fù)并覆蓋的情況
    // uuid 的文檔為 https://www.npmjs.com/package/node-uuid
    //
    // 這里的 ?? 是一個(gè)分隔符,冒號(hào)是分割 uuid 和后面內(nèi)容的,而 ?? 是分割函數(shù)名
    // 和消息的
    var key = uuid.v1().replace(/-/g, "") +
        ":??" + func + "??" + message;
    var content = "";

    redis.multi()
        .set(key, content)
        .expire(key, timeout)
        .exec(function(err) {
            if(err) {
                console.error("Failed to publish EXPIRE EVENT for " + content);
                console.error(err);
                return;
            }
        });
};

Ioredis 的穩(wěn)定可以點(diǎn)此查看。

然后在任務(wù)系統(tǒng)里面的一開始監(jiān)聽這個(gè)過期頻道:

// assign 是 sugarjs 里面的函數(shù)
// 把 db 塞到字符串里面的 {db} 里去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 });

// 假設(shè) sub 是 ioredis 的對(duì)象
sub.once("connect", function() {
    // 假設(shè)我們需要選擇 redis 的 db,因?yàn)閷?shí)際上我們不會(huì)去污染默認(rèn)的 db 0
    sub.select(1, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 訂閱頻道成功
        });
    });
});

// 監(jiān)聽從 `foo` 來的消息
sub.on("message", sampleOnExpired);

注意: 我們這里選擇 db 1 是因?yàn)橐坏╅_啟過期事件監(jiān)聽,那么這個(gè) db 的所有過期事件都會(huì)被發(fā)送。為了不跟正常使用的 redis 過期鍵混淆,我們?yōu)檫@個(gè)事情專門用一個(gè)新的 db。比如我們?cè)谧约赫J褂玫?db 0 里面監(jiān)聽了,那么不是我們?nèi)蝿?wù)觸發(fā)的過期事件也會(huì)傳過來,這個(gè)時(shí)候我們解析的鍵名就不對(duì)了。

最后就是我們的 sampleOnExpired 函數(shù)了。

var sampleOnExpired = function(channel, key) {
    // UUID:??func??params
    var body = key.split("??");
    if(body.length < 3) return;

    // 取出 body 第一位為 func
    var func = body[1];

    // 推出前兩位,后面剩下的有可能是參數(shù)里面自帶 ?? 而被分割,所以要拼回去
    body.shift(); body.shift();
    var params = body.join("??");

    // 然后把 params 傳入 func 去執(zhí)行
    // func:
    //   path1/path2.func
    func = func.split(".");
    if(func.length !== 2) {
        console.error("Bad params for task:", func.join("."), "-", params);
        return;
    }

    var path = func[0];
    func = func[1];

    var mod;
    try {
        mod = require("./tasks/" + path);
    } catch(e) {
        console.error("Failed to load module", path);
        console.error(e.stack);
        return;
    }

    process.nextTick(function() {
        try {
            mod[func].apply(null, JSON.parse(params));
        } catch(e) {
            console.error("Failed to call function", path, "-", func, "-", params);
            console.error(e.stack);
        }
    });
};

這個(gè)簡易的架子搭好后,你只需要去寫一堆任務(wù)執(zhí)行函數(shù),然后在生成任務(wù)的時(shí)候把相應(yīng)參數(shù)傳給 sampleTaskMaker 就好了。Redis 會(huì)自動(dòng)過期并且觸發(fā)事件給你的 sampleOnExpired 函數(shù),然后就會(huì)去執(zhí)行相應(yīng)的任務(wù)處理函數(shù)了。

小結(jié)

其實(shí)這個(gè)需求在我們項(xiàng)目目前就是給用戶定時(shí)發(fā)提醒短信用的。如果沒有發(fā)現(xiàn) Redis 的這個(gè)妙用,我還是會(huì)去用第二節(jié)里面的方法來寫的。其實(shí)這期間也有考慮過用 RabbitMQ,不過貌似它的定時(shí)消息需要做一些 Hack,比較麻煩,最后就放棄了。

Redis 的這個(gè)方法其實(shí)是我在谷歌搜出來的,別人在 StackOverflow 回答的答案。我參考了之后用我自己的方法實(shí)現(xiàn)了出來,并且把代碼的關(guān)鍵部分提取出來整理成這篇小文,還希望能給各位看官一些用吧,望打賞。

如果沒有什么用也憋噴我,畢竟我是個(gè)蒟蒻。有更好的方法希望留個(gè)言,望告知。謝謝。(′,,?ω?,,)?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容