Beanstalkd 源碼初探

引言

Beanstalkd 是一個比較輕量級的消息隊(duì)列服務(wù),對于性能和穩(wěn)定性要求不是特別高(相對于 RabbitMQ, Redis, Kafka 等),并且需要延遲執(zhí)行任務(wù)的場景非常合適;此外,它也支持給任務(wù)設(shè)置不同的優(yōu)先級、執(zhí)行超時時間等。

在我們的業(yè)務(wù)中,經(jīng)常會借助 Beanstalkd 執(zhí)行隊(duì)列任務(wù),常見的用例如下:

  1. 用戶完成會員購買并激活后,發(fā)送私信通知、重置賬號重命名狀態(tài)等;
  2. 用戶完成評論后,異步更新評論計數(shù);
  3. 用戶對私家課收聽記錄上報后,異步更新最近收聽的小節(jié)、累積收聽時長、同步到其它系統(tǒng)等;
  4. 用戶記錄添加后,會同步至 Redis,為保證數(shù)據(jù)庫和 Redis 的數(shù)據(jù)最終一致性,會提前啟動一個延遲校驗(yàn)的任務(wù)(如 5s 后),檢查 Redis 中與數(shù)據(jù)庫記錄是否一致。

Beanstalkd 初識

特點(diǎn)

  1. 基于 TCP 并采用 ASCII 編碼的文本協(xié)議。詳細(xì)定義參見:protocol.txt
    1. 客戶端負(fù)責(zé)與服務(wù)端的交互:連接發(fā)送命令和數(shù)據(jù)、等待響應(yīng)、關(guān)閉連接
    2. 服務(wù)端串行處理每個客戶端連接
    3. 協(xié)議由兩個部分組成:文本行(用于客戶端命令和服務(wù)端響應(yīng))和非結(jié)構(gòu)化的數(shù)據(jù)塊(用于傳送任務(wù) body 和 stats 信息)
  1. 隊(duì)列消息是存儲在內(nèi)存中的,但用戶可以選擇開啟 WAL 機(jī)制(binlog),這樣重啟后可以回放任務(wù),提高了可用性
  2. 采用類似 Redis 的單線程模型(IO 多路復(fù)用機(jī)制),因此不必考慮多線程環(huán)境下線程同步、加鎖等,簡化實(shí)現(xiàn)

關(guān)鍵詞

  1. Tube:類似 Kafka 中的 Topic,或者其它隊(duì)列系統(tǒng)中的 Channel
  2. Job:客戶端生產(chǎn)和消費(fèi)的基本單元。每個任務(wù)都有特定的 id,可設(shè)定優(yōu)先級,超時時間,延遲執(zhí)行時間等
  3. WAL (Write Ahead Log):負(fù)責(zé) binlog 管理(寫入、壓縮、日志文件清理、任務(wù)恢復(fù)等)
  4. Server:Beanstalkd 服務(wù)端
  5. Conn:Beanstalkd 客戶端連接處理

任務(wù)狀態(tài)流轉(zhuǎn)

image

任務(wù)典型生命周期

image

工作方式描述

  1. 服務(wù)端會有一到多個 tubes(在數(shù)組中維護(hù))。每個 tube 都會包含一個就緒隊(duì)列(在最小堆維護(hù))以及一個延遲隊(duì)列(也在最小堆維護(hù))。每個任務(wù)都會在一個特定的 tube 中度過全部的生命周期
  2. 客戶端可以使用 watch 指令訂閱某個 tube,也可以使用 ignore 取消訂閱,消費(fèi)者可以同時訂閱多個 tube,當(dāng)消費(fèi)者 reserve 任務(wù)時,該任務(wù)可能來自其 watch list 中的任意一個 tube
  3. 當(dāng)客戶端連接時,默認(rèn)會使用 default tube,可以使用 use 切換 tube
  4. tube 是會根據(jù)需要隨時創(chuàng)建的,當(dāng)沒有客戶端引用時,就會被刪除

安裝

借助 Docker 啟動一個 Beanstalkd 服務(wù)非常輕松,請運(yùn)行下面的命令行即可:

docker run -d -p 11300:11300 schickling/beanstalkd

如果上述命令行執(zhí)行正常,則 Beanstalkd 服務(wù)應(yīng)該啟動了,其默認(rèn)監(jiān)聽的端口號為 11300,運(yùn)行 docker ps 可以查看服務(wù)是否正常啟動并運(yùn)行:

image

編譯 & 運(yùn)行 & 調(diào)試

首先,需要前往 beanstalkd 倉庫克隆 Master 分支源碼至本地。

為了方便管理 C 項(xiàng)目,這里使用了 JET BRAINS 家族的 Clion。當(dāng)然,你也可以使用自己喜歡的工具打開。

由于 Clion 使用了 CMake 管理 C&C++ 項(xiàng)目,所以打開項(xiàng)目時需要在其根目錄下創(chuàng)建一個 CMakeLists.txt 文件,并填寫如下內(nèi)容:

cmake_minimum_required(VERSION 3.13)
project(beanstalkd C)
set(BUILD_DIR .)
add_custom_target(beanstalkd ALL COMMAND make WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
add_custom_command(TARGET beanstalkd POST_BUILD
        COMMAND echo copy ${PROJECT_NAME} to ${CMAKE_CURRENT_BINARY_DIR}
        COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/beanstalkd ${CMAKE_CURRENT_BINARY_DIR}
)

?? 至此,準(zhǔn)備工作已經(jīng)做完啦。接下來,可以嘗試點(diǎn)擊「構(gòu)建」按鈕,進(jìn)行編譯。編譯結(jié)束后,就可以點(diǎn)擊運(yùn)行啟動 Beanstalkd 服務(wù)啦。哦,對了,如果需要調(diào)試支持的話,直接在需要的地方打上斷點(diǎn),并點(diǎn)擊「調(diào)試」按鈕即可開始。

image

關(guān)于 Makefile

查看 Makefile 文件,可以看到有如下幾個命令可以執(zhí)行:

  1. make all: 編譯、鏈接并生成可執(zhí)行的二進(jìn)制文件 beanstalkd。由于我們已經(jīng)將該命令放到 CMakeLists.txt 文件中,在使用 Clion 構(gòu)建時可自動觸發(fā)
  2. make install: 將生成的可執(zhí)行文件 beanstalkd 安裝到 BINDIR=$(DESTDIR)/usr/local/bin 目錄下
  3. make clean: 清理生成的 *.o 文件
  4. make bench: 跑 Benchmark 用

客戶端使用示例

下面看一個簡單的例子。生產(chǎn)者負(fù)責(zé)將一組待抓取的 URLs 放到隊(duì)列中,再由一組消費(fèi)者并發(fā)訪問隊(duì)列中的 URLs。主流程的示例代碼如下:

// 會首先啟動 NUM_WORKERS 個消費(fèi)者在不同的線程中監(jiān)聽
// 然后讓生產(chǎn)者向隊(duì)列中填充 URLs,供消費(fèi)者使用
fn main() {
    const NUM_WORKERS: isize = 5;

    let mut handles = vec![];
    for i in 0..NUM_WORKERS {
        // 啟動 NUM_WORKERS 個消費(fèi)者
        let hd = thread::spawn(move || consume_urls(i));
        handles.push(hd);
    }

    produce_urls();

    // 等待消費(fèi)者結(jié)束
    for hd in handles {
        hd.join().unwrap();
    }
}

生產(chǎn)者

fn produce_urls() {
    let mut client = Beanstalkd::localhost().unwrap();
    client.tube("urls").unwrap();

    let urls = vec![
        "https://github.com/iFaceless/ifaceless.github.io",
        "https://github.com/iFaceless/gic",
        "https://github.com/iFaceless/rust-exercises",
        "https://github.com/iFaceless/learning-rust",
        "https://github.com/iFaceless/fixture",
        "https://github.com/iFaceless/rest",
        "https://github.com/iFaceless/bigcache",
        "https://github.com/iFaceless/leetgogo",
        "https://github.com/iFaceless/freecache",
    ];

    for url in urls {
        client.put(url, 0, 0, 1000).unwrap();
    }
}

消費(fèi)者

fn consume_urls(id: isize) {
    println!("[Consumer {}] started...", id);
    let mut client = Beanstalkd::localhost().unwrap();
    client.watch("urls").unwrap();

    loop {
        let (job_id, url) = match client.reserve() {
            Ok(job) => job,
            Err(e) => {
                println!("[Consumer {}] error happens: {}", id, e);
                break;
            }
        };

        println!("[Consumer {}] got job <{}>: {}", id, job_id, url);
        client.delete(job_id).unwrap();
    }
}

源碼探索

模塊分類圖

為了方便閱讀源碼,粗略地根據(jù)自己的理解給各個文件做了簡單的分類:

image

模塊 UML 圖

雖說 Beanstalkd 的源碼是使用 C 編寫的,但是其中的設(shè)計思想依然可以從面向?qū)ο蟮慕嵌葋斫忉尅1热缒K化設(shè)計、接口設(shè)計、多態(tài)等。根據(jù)自己的理解,對其中的一些核心模塊做了梳理,并繪制了一個簡單的 UML 圖來加深理解:

image

基本數(shù)據(jù)結(jié)構(gòu)

最小堆

二叉堆(Heap) 是一種很常見的數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一棵完全二叉樹。其分為最大堆(也叫大根堆)最小堆(也叫小根堆)

  1. 最大堆:根結(jié)點(diǎn)的鍵值是所有堆結(jié)點(diǎn)鍵值中最大者的堆
  2. 最小堆:根結(jié)點(diǎn)的鍵值是所有堆結(jié)點(diǎn)鍵值中最小者的堆

beanstalkd/heap.c 是對最小堆的實(shí)現(xiàn)。那么,beanstalkd 中哪些地方用到了最小堆呢?

  1. Tube 中的延遲任務(wù)隊(duì)列(最先到期的任務(wù)會在堆頂,這樣可以在 O(1) 時間復(fù)雜度獲取到)
  2. Tube 中的就緒任務(wù)隊(duì)列(基于優(yōu)先級排列,優(yōu)先級最高的任務(wù)會在堆頂)
  3. Server 中的客戶端連接隊(duì)列(基于 tickat 時間排列)

接下來,我們看看最小堆的實(shí)現(xiàn):

int
heapinsert(Heap *h, void *x)
{
    int k;

    // 擴(kuò)容策略:2 倍長度
    if (h->len == h->cap) {
        void **ndata;
        int ncap = (h->len+1) * 2; /* allocate twice what we need */

        ndata = malloc(sizeof(void*) * ncap);
        if (!ndata) {
            return 0;
        }

        memcpy(ndata, h->data, sizeof(void*)*h->len);
        free(h->data);
        // 指向新的位置
        h->data = ndata;
        // 更新容量
        h->cap = ncap;
    }

    k = h->len;
    h->len++;
    set(h, k, x);
    siftdown(h, k);
    return 1;
}

void *
heapremove(Heap *h, int k)
{
    void *x;

    if (k >= h->len) {
        return 0;
    }

    x = h->data[k];
    h->len--;
    // 用原來的數(shù)組最后一位覆蓋被刪除的位置
    set(h, k, h->data[h->len]);
    siftdown(h, k);
    siftup(h, k);
    h->rec(x, -1);
    return x;
}

static void
set(Heap *h, int k, void *x)
{
    h->data[k] = x;
    // 這里會去調(diào)用相應(yīng)的回調(diào)函數(shù)
    h->rec(x, k);
}

// 判斷位置 a 指向的對象是否小于 b 指向的對象
static int
less(Heap *h, int a, int b)
{
    // h->less 是一個判斷大小的回調(diào)
    // 其實(shí)要在面向?qū)ο蟮恼Z言中,完全可以自定一個實(shí)現(xiàn)了比較大小的接口
    // 比如在 Rust 中,可以使用 `PartialEq` 限定...
    return h->less(h->data[a], h->data[b]);
}

變長數(shù)組

在 C 語言標(biāo)準(zhǔn)庫中是沒有可變長度的數(shù)組實(shí)現(xiàn)的,所以在 beanstalkd/ms.c 實(shí)現(xiàn)了一種類似的數(shù)據(jù)結(jié)構(gòu),它具有如下特點(diǎn):

  1. ms 結(jié)構(gòu)體維護(hù)一個可動態(tài)擴(kuò)容的數(shù)組(**items)
  2. 擴(kuò)容策略很粗暴,直接擴(kuò)充為原來容量的兩倍
  3. 插入的平均時間復(fù)雜度為 O(1)
  4. 刪除的平均時間復(fù)雜度為 O(1)
  5. 由于刪除時,會將尾部 item 替換掉被刪除的 item,所以不能依賴數(shù)組中的元素順序(順序不保證和添加時一致)
  6. 刪除 item 后,其實(shí)數(shù)組占用的內(nèi)存空間還在(并沒有動態(tài)縮容的策略)

那具體在哪些地方用到了 ms 這種數(shù)據(jù)結(jié)構(gòu)呢?梳理后,主要發(fā)現(xiàn)以下幾處:

  1. 全局的 Tube 列表
  2. 客戶端連接的 Conn 中維護(hù)的 watch list
  3. 與 Tube 關(guān)聯(lián)的等待連接(conns)列表

下面看看其具體的實(shí)現(xiàn):

// 初始化數(shù)組,并注冊插入和移除的回調(diào)函數(shù)
void
ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove)
{
    a->used = a->cap = a->last = 0;
    a->items = NULL;
    a->oninsert = oninsert;
    a->onremove = onremove;
}

// 控制數(shù)組增長
static void
grow(ms a)
{
    void **nitems;
    // 倍速增長:1, 2, 4, ...
    size_t ncap = (a->cap << 1) ? : 1;

    nitems = malloc(ncap * sizeof(void *));
    if (!nitems) return;

    // 舊的數(shù)據(jù)拷貝到新開辟的空間
    memcpy(nitems, a->items, a->used * sizeof(void *));
    // 釋放舊的內(nèi)存空間
    free(a->items);
    // 指向新的位置
    a->items = nitems;
    // 更新數(shù)組容量
    a->cap = ncap;
}

// 在數(shù)組尾部插入新的 item
// O(1)
int
ms_append(ms a, void *item)
{
    // 按需擴(kuò)展容量
    if (a->used >= a->cap) grow(a);
    // 擴(kuò)容失敗,就返回,也就是不能再新增 item 了
    if (a->used >= a->cap) return 0;

    a->items[a->used++] = item;
    // 如果有回調(diào),則觸發(fā)回調(diào)函數(shù)
    if (a->oninsert) a->oninsert(a, item, a->used - 1);
    return 1;
}

// 刪除指定位置的 item
// O(1)
static int
ms_delete(ms a, size_t i)
{
    void *item;

    if (i >= a->used) return 0;
    item = a->items[i];
    // 相當(dāng)于把尾部 item 寫到被刪除的位置,并「縮容」
    a->items[i] = a->items[--a->used];

    /* it has already been removed now */
    if (a->onremove) a->onremove(a, item, i);
    return 1;
}

字典

image

beanstalkd/job.c 中,為了方便基于 job_id 快速定位到具體的任務(wù),作者實(shí)現(xiàn)了一個字典數(shù)據(jù)結(jié)構(gòu)。這里是和 job 耦合在一起實(shí)現(xiàn)的,根據(jù)對源碼的分析,可以得出該字典數(shù)據(jù)結(jié)構(gòu)的特點(diǎn)如下:

  1. 采用基于 job_id 哈希取模的方式計算 slot_id
  2. 使用鏈地址法解決哈希沖突
  3. 根據(jù)負(fù)載因子自動進(jìn)行 rehash(進(jìn)行擴(kuò)容或縮容),擴(kuò)容或者縮容的系數(shù)根據(jù) beanstalkd/primes.c 設(shè)置
  4. rehash 過程并沒有采用類似 Redis 中漸進(jìn)式 rehash 機(jī)制,而是阻塞式完成整個哈希表的 rehash 后才可以進(jìn)行后續(xù)操作

存放 job 及 rehash 的詳細(xì)源碼分析如下:


// 存放一個 job
static void
store_job(job j)
{
    int index = 0;

    index = _get_job_hash_index(j->r.id);

    j->ht_next = all_jobs[index]; // 如果存在沖突,就采用鏈地址法
    all_jobs[index] = j;
    all_jobs_used++;

    /* accept a load factor of 4 */
    // 負(fù)載因子設(shè)置為 4,超過閾值時就會進(jìn)行 rehash
    // 看起這里是阻塞的方式來進(jìn)行 rehash 了,如果 hash 表太大,會被阻塞
    // 并沒有使用類似 redis 那樣漸進(jìn)式 rehash 思路
    if (all_jobs_used > (all_jobs_cap << 2)) rehash(1);
}

// 支持?jǐn)U容和縮容
static void
rehash(int is_upscaling)
{
    job *old = all_jobs;
    // 記錄下舊的 hash 表容量,元素個數(shù)
    size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
    int old_prime = cur_prime;
    int d = is_upscaling ? 1 : -1;

    if (cur_prime + d >= NUM_PRIMES) return;
    if (cur_prime + d < 0) return;
    if (is_upscaling && hash_table_was_oom) return;

    cur_prime += d;

    all_jobs_cap = primes[cur_prime];
    all_jobs = calloc(all_jobs_cap, sizeof(job));
    if (!all_jobs) { // 針對擴(kuò)容失敗的處理,恢復(fù)原來的不變,但是標(biāo)記 OOM
        twarnx("Failed to allocate %zu new hash buckets", all_jobs_cap);
        hash_table_was_oom = 1;
        cur_prime = old_prime;
        all_jobs = old;
        all_jobs_cap = old_cap;
        all_jobs_used = old_used;
        return;
    }
    // 重置 hash 表狀態(tài)
    all_jobs_used = 0;
    hash_table_was_oom = 0;

    // 其實(shí)就是把 Hash 表上所有的 jobs 全部映射到新的空間
    for (i = 0; i < old_cap; i++) {
        while (old[i]) {
            job j = old[i];
            old[i] = j->ht_next;
            j->ht_next = NULL;
            store_job(j);
        }
    }

    // 然后把原來的內(nèi)存空間釋放掉
    if (old != all_jobs_init) {
        free(old);
    }
}

鏈表

鏈表這種數(shù)據(jù)結(jié)構(gòu)在 Beanstalkd 實(shí)現(xiàn)中用得比較頻繁,比如

  1. beanstalkd/walg.c 中使用了單向鏈表的串聯(lián)了一些列的日志文件(參見三個游標(biāo)指針:head, cur, tail
  2. beanstalkd/conn.c 使用雙向鏈表連接了一些列被 reserve 的任務(wù)

beastalkd/job.c,可以看到任務(wù)雙向鏈表實(shí)現(xiàn):

int
job_list_any_p(job head)
{
    return head->next != head || head->prev != head;
}

job
job_remove(job j)
{
    if (!j) return NULL;
    if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */

    j->next->prev = j->prev;
    j->prev->next = j->next;

    j->prev = j->next = j;

    return j;
}

void
job_insert(job head, job j)
{
    if (job_list_any_p(j)) return; /* already in a linked list */

    j->prev = head->prev;
    j->next = head;
    head->prev->next = j;
    head->prev = j;
}

部分模塊源碼學(xué)習(xí)

main.c

int
main(int argc, char **argv) {
    int r;
    // 存放任務(wù)的鏈表
    struct job list = {};

    progname = argv[0];
    // 設(shè)置使用行緩存,表使用標(biāo)準(zhǔn)輸出作為打印目標(biāo)
    // 詳細(xì)文檔參見:https://linux.die.net/man/3/setlinebuf
    // 意思是,只有在滿足一行(換行)時才輸出
    setlinebuf(stdout);
    // 命令行處理
    optparse(&srv, argv + 1);

    if (verbose) {
        printf("pid %d\n", getpid());
    }

    // 服務(wù)器端 socket 初始化等,返回一個指向 socket 的 file descriptor
    r = make_server_socket(srv.addr, srv.port);
    if (r == -1) twarnx("make_server_socket()"), exit(111);
    srv.sock.fd = r;

    // 協(xié)議處理模塊初始化
    prot_init();

    if (srv.user) su(srv.user);
    set_sig_handlers();

    if (srv.wal.use) {
        // We want to make sure that only one beanstalkd tries
        // to use the wal directory at a time. So acquire a lock
        // now and never release it.
        // WAL 即 Write Ahead Log Directory,主要是記錄日志用
        // 這里是要保證每次只能有一個 beanstalkd 實(shí)例使用 WAL 目錄,防止相互寫入沖突
        // 那估計以后就沒法從 binlog 恢復(fù)任務(wù)了。。。
        if (!waldirlock(&srv.wal)) {
            twarnx("failed to lock wal dir %s", srv.wal.dir);
            exit(10);
        }

        // 初始化任務(wù)鏈表(雙向鏈表)
        list.prev = list.next = &list;
        // 初始化 WAL,如果 log 中有任務(wù),還要恢復(fù)回來,掛載到 job list
        walinit(&srv.wal, &list);
        // 回放任務(wù)執(zhí)行
        r = prot_replay(&srv, &list);
        if (!r) {
            twarnx("failed to replay log");
            return 1;
        }
    }

    // 正式啟動 server,并監(jiān)聽請求,處理請求了
    srvserve(&srv);
    return 0;
}
image

serv.c

void
srvserve(Server *s)
{
    int r;
    Socket *sock;
    int64 period;

    if (sockinit() == -1) {
        twarnx("sockinit");
        exit(1);
    }

    s->sock.x = s;
    // 指定回調(diào),當(dāng)接收到 `r` 事件后,就會觸發(fā)這個回調(diào)
    s->sock.f = (Handle)srvaccept;
    // Server 維護(hù)了關(guān)聯(lián)的客戶端連接(最小堆)
    s->conns.less = (Less)connless;
    s->conns.rec = (Record)connrec;

    r = listen(s->sock.fd, 1024);
    if (r == -1) {
        twarn("listen");
        return;
    }

    // 注冊
    r = sockwant(&s->sock, 'r');
    if (r == -1) {
        twarn("sockwant");
        exit(2);
    }


    for (;;) {
        // 執(zhí)行周期性的任務(wù)
        // 如果 tick 中執(zhí)行的任務(wù)時間過久,會阻塞后面的 socket connection 處理
        // 嚴(yán)重會導(dǎo)致超時,而如果客戶端重試過多,則回增加服務(wù)端負(fù)載
        period = prottick(s);

        // 輪詢是否有就緒的請求(rw),其實(shí)就是個適配器,將具體平臺下返回的狀態(tài)
        // 轉(zhuǎn)換成統(tǒng)一的 `r`, `w`, `h`
        // Linux 使用 epoll 封裝(參見 `linux.c`)
        // Unix 使用 kqueue 封裝(參見 `darwin.c`)
        int rw = socknext(&sock, period);
        if (rw == -1) {
            twarnx("socknext");
            exit(1);
        }

        if (rw) {
            // 如果輪詢到需要處理的請求,則執(zhí)行相應(yīng)的回調(diào) Handle
            // 注意,這里的回調(diào)依然在主線程中執(zhí)行的,所以如果主線程被阻塞,就呵呵噠了
            sock->f(sock->x, rw);
        }
    }
}

tube.c

// 新建 tube,這里需要給定 tube 名稱
// 這里可以看到一個 tube 有幾個比較重要的組成:
// 1. 維護(hù)就緒任務(wù)的堆
// 2. 維護(hù)延遲執(zhí)行任務(wù)的堆
// 3. 維護(hù)處于 buried 狀態(tài)的任務(wù)鏈表
// 4. 維護(hù)一個等待列表
tube
make_tube(const char *name)
{
    tube t;

    // 分配內(nèi)存空間,用于存儲 tube 結(jié)構(gòu)體值
    t = new(struct tube);
    if (!t) return NULL;

    // 初始化 tube 名稱
    t->name[MAX_TUBE_NAME_LEN - 1] = '\0';
    strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);
    if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");

    // ready 堆維護(hù)著一些已經(jīng)就緒的 jobs,這里是指定按照 job 優(yōu)先級的方式比較大小
    // 這樣,這個堆頂就是優(yōu)先級最高的 job
    t->ready.less = job_pri_less;
    // delay 堆維護(hù)著一些被延遲執(zhí)行的 jobs,這里是按照 job delay 時間比較大小
    // 這樣,這個堆頂就是延遲時間最短的 job
    t->delay.less = job_delay_less;

    // 用于記錄 job 在堆上的位置
    t->ready.rec = job_setheappos;
    t->delay.rec = job_setheappos;
    t->buried = (struct job) { };
    // 使用鏈表維護(hù)著被 bury 掉的 job
    t->buried.prev = t->buried.next = &t->buried;
    // 初始化排隊(duì)列表
    ms_init(&t->waiting, NULL, NULL);

    return t;
}

// 釋放 tube
static void
tube_free(tube t)
{
    // 實(shí)際就是從全局的 tubes 列表中移除該 tube
    prot_remove_tube(t);
    // 釋放就緒的任務(wù)
    free(t->ready.data);
    // 釋放延遲的任務(wù)
    free(t->delay.data);
    // 清空等待列表
    ms_clear(&t->waiting);
    // 釋放 tube 指向的內(nèi)存
    free(t);
}

// 引用計數(shù):減引用
void
tube_dref(tube t)
{
    if (!t) return;
    if (t->refs < 1) return twarnx("refs is zero for tube: %s", t->name);

    --t->refs;
    // 沒有引用后就可以釋放該 tube 了
    if (t->refs < 1) tube_free(t);
}

// 引用計數(shù):增加引用
void
tube_iref(tube t)
{
    if (!t) return;
    ++t->refs;
}

// 新建一個 tube,然后注冊到全局 tubes 列表
static tube
make_and_insert_tube(const char *name)
{
    int r;
    tube t = NULL;

    t = make_tube(name);
    if (!t) return NULL;

    /* We want this global tube list to behave like "weak" refs, so don't
     * increment the ref count. */
    // 這里是想讓全局 tube 列表表現(xiàn)為弱引用,所這里并沒有做增加引用的操作
    r = ms_append(&tubes, t);
    // 如果注冊 tube 失敗,減引用,必要的話會被釋放掉
    if (!r) return tube_dref(t), (tube) 0;

    return t;
}

job.c

在 Tube 中有兩個最小堆數(shù)據(jù)結(jié)構(gòu)分別存放被延遲的任何和就緒的任務(wù),這兩種使用的排序方式是不同的。我們看到在上面的 Tube 初始化時,給兩個堆綁定了不同的比較大小的回調(diào):

t->ready.less = job_pri_less;
t->delay.less = job_delay_less;

以下可以看到具體的排序方式:

// 回調(diào)函數(shù),先基于優(yōu)先級比較哪個小,再基于 id 比較哪個小
int
job_pri_less(void *ax, void *bx)
{
    job a = ax, b = bx;
    if (a->r.pri < b->r.pri) return 1;
    if (a->r.pri > b->r.pri) return 0;
    return a->r.id < b->r.id;
}

// 回調(diào)函數(shù),先基于到期時間比較哪個小,再基于 id 比較哪個小
int
job_delay_less(void *ax, void *bx)
{
    job a = ax, b = bx;
    if (a->r.deadline_at < b->r.deadline_at) return 1;
    if (a->r.deadline_at > b->r.deadline_at) return 0;
    return a->r.id < b->r.id;
}

讀后感

整個 Beanstalkd 的核心的代碼不過五千行左右,但這就實(shí)現(xiàn)了一個生產(chǎn)級別的消息隊(duì)列,的確是很厲害。不過,也正因?yàn)槠鋵?shí)現(xiàn)比較簡單,所以也沒有提供類似 Redis 的主從機(jī)制等。當(dāng)然,在這篇文章中,并沒有完整地剖析所有的模塊實(shí)現(xiàn),只是列出了個人比較感興趣的模塊;關(guān)于 binlog 管理的源碼(比如垃圾回收,壓縮,預(yù)留空間申請,任務(wù)恢復(fù)等)只是粗略地閱讀了下,就不在此處獻(xiàn)丑啦~

總的來說,對于單機(jī)消息隊(duì)列實(shí)現(xiàn)感興趣的同學(xué)還是推薦閱讀下該系統(tǒng)的源碼,可以學(xué)習(xí)其中的一些設(shè)計思想,實(shí)現(xiàn)思路等~

參考

  1. beanstalkd 的一些看法
  2. beanstalkd repo
  3. beanstalkd docs
  4. 消息隊(duì)列 beanstalkd 源碼詳解
  5. 最大—最小堆

延伸閱讀

  1. Kqueue 與 Epoll 機(jī)制
  2. 從 0到 100——知乎架構(gòu)變遷史

聲明

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容