引言
Beanstalkd 是一個比較輕量級的消息隊(duì)列服務(wù),對于性能和穩(wěn)定性要求不是特別高(相對于 RabbitMQ, Redis, Kafka 等),并且需要延遲執(zhí)行任務(wù)的場景非常合適;此外,它也支持給任務(wù)設(shè)置不同的優(yōu)先級、執(zhí)行超時時間等。
在我們的業(yè)務(wù)中,經(jīng)常會借助 Beanstalkd 執(zhí)行隊(duì)列任務(wù),常見的用例如下:
- 用戶完成會員購買并激活后,發(fā)送私信通知、重置賬號重命名狀態(tài)等;
- 用戶完成評論后,異步更新評論計數(shù);
- 用戶對私家課收聽記錄上報后,異步更新最近收聽的小節(jié)、累積收聽時長、同步到其它系統(tǒng)等;
- 用戶記錄添加后,會同步至 Redis,為保證數(shù)據(jù)庫和 Redis 的數(shù)據(jù)最終一致性,會提前啟動一個延遲校驗(yàn)的任務(wù)(如 5s 后),檢查 Redis 中與數(shù)據(jù)庫記錄是否一致。
Beanstalkd 初識
特點(diǎn)
- 基于 TCP 并采用 ASCII 編碼的文本協(xié)議。詳細(xì)定義參見:protocol.txt:
- 客戶端負(fù)責(zé)與服務(wù)端的交互:連接、發(fā)送命令和數(shù)據(jù)、等待響應(yīng)、關(guān)閉連接
- 服務(wù)端串行處理每個客戶端連接
- 協(xié)議由兩個部分組成:文本行(用于客戶端命令和服務(wù)端響應(yīng))和非結(jié)構(gòu)化的數(shù)據(jù)塊(用于傳送任務(wù) body 和 stats 信息)
- 隊(duì)列消息是存儲在內(nèi)存中的,但用戶可以選擇開啟 WAL 機(jī)制(binlog),這樣重啟后可以回放任務(wù),提高了可用性
- 采用類似 Redis 的單線程模型(IO 多路復(fù)用機(jī)制),因此不必考慮多線程環(huán)境下線程同步、加鎖等,簡化實(shí)現(xiàn)
關(guān)鍵詞
- Tube:類似 Kafka 中的 Topic,或者其它隊(duì)列系統(tǒng)中的 Channel
- Job:客戶端生產(chǎn)和消費(fèi)的基本單元。每個任務(wù)都有特定的 id,可設(shè)定優(yōu)先級,超時時間,延遲執(zhí)行時間等
- WAL (Write Ahead Log):負(fù)責(zé) binlog 管理(寫入、壓縮、日志文件清理、任務(wù)恢復(fù)等)
- Server:Beanstalkd 服務(wù)端
- Conn:Beanstalkd 客戶端連接處理
任務(wù)狀態(tài)流轉(zhuǎn)

任務(wù)典型生命周期

工作方式描述
- 服務(wù)端會有一到多個 tubes(在數(shù)組中維護(hù))。每個 tube 都會包含一個就緒隊(duì)列(在最小堆維護(hù))以及一個延遲隊(duì)列(也在最小堆維護(hù))。每個任務(wù)都會在一個特定的 tube 中度過全部的生命周期
-
客戶端可以使用
watch指令訂閱某個 tube,也可以使用ignore取消訂閱,消費(fèi)者可以同時訂閱多個 tube,當(dāng)消費(fèi)者reserve任務(wù)時,該任務(wù)可能來自其watch list中的任意一個 tube - 當(dāng)客戶端連接時,默認(rèn)會使用
defaulttube,可以使用use切換 tube - tube 是會根據(jù)需要隨時創(chuàng)建的,當(dāng)沒有客戶端引用時,就會被刪除
安裝
借助 Docker 啟動一個 Beanstalkd 服務(wù)非常輕松,請運(yùn)行下面的命令行即可:
docker run -d -p 11300:11300 schickling/beanstalkd
如果上述命令行執(zhí)行正常,則 Beanstalkd 服務(wù)應(yīng)該啟動了,其默認(rèn)監(jiān)聽的端口號為 11300,運(yùn)行 docker ps 可以查看服務(wù)是否正常啟動并運(yùn)行:

編譯 & 運(yùn)行 & 調(diào)試
首先,需要前往 beanstalkd 倉庫克隆 Master 分支源碼至本地。
為了方便管理 C 項(xiàng)目,這里使用了 JET BRAINS 家族的 Clion。當(dāng)然,你也可以使用自己喜歡的工具打開。
由于 Clion 使用了 CMake 管理 C&C++ 項(xiàng)目,所以打開項(xiàng)目時需要在其根目錄下創(chuàng)建一個 CMakeLists.txt 文件,并填寫如下內(nèi)容:
cmake_minimum_required(VERSION 3.13)
project(beanstalkd C)
set(BUILD_DIR .)
add_custom_target(beanstalkd ALL COMMAND make WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
add_custom_command(TARGET beanstalkd POST_BUILD
COMMAND echo copy ${PROJECT_NAME} to ${CMAKE_CURRENT_BINARY_DIR}
COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/beanstalkd ${CMAKE_CURRENT_BINARY_DIR}
)
?? 至此,準(zhǔn)備工作已經(jīng)做完啦。接下來,可以嘗試點(diǎn)擊「構(gòu)建」按鈕,進(jìn)行編譯。編譯結(jié)束后,就可以點(diǎn)擊運(yùn)行啟動 Beanstalkd 服務(wù)啦。哦,對了,如果需要調(diào)試支持的話,直接在需要的地方打上斷點(diǎn),并點(diǎn)擊「調(diào)試」按鈕即可開始。

關(guān)于 Makefile
查看 Makefile 文件,可以看到有如下幾個命令可以執(zhí)行:
-
make all: 編譯、鏈接并生成可執(zhí)行的二進(jìn)制文件beanstalkd。由于我們已經(jīng)將該命令放到CMakeLists.txt文件中,在使用 Clion 構(gòu)建時可自動觸發(fā) -
make install: 將生成的可執(zhí)行文件beanstalkd安裝到BINDIR=$(DESTDIR)/usr/local/bin目錄下 -
make clean: 清理生成的*.o文件 -
make bench: 跑 Benchmark 用
客戶端使用示例
下面看一個簡單的例子。生產(chǎn)者負(fù)責(zé)將一組待抓取的 URLs 放到隊(duì)列中,再由一組消費(fèi)者并發(fā)訪問隊(duì)列中的 URLs。主流程的示例代碼如下:
// 會首先啟動 NUM_WORKERS 個消費(fèi)者在不同的線程中監(jiān)聽
// 然后讓生產(chǎn)者向隊(duì)列中填充 URLs,供消費(fèi)者使用
fn main() {
const NUM_WORKERS: isize = 5;
let mut handles = vec![];
for i in 0..NUM_WORKERS {
// 啟動 NUM_WORKERS 個消費(fèi)者
let hd = thread::spawn(move || consume_urls(i));
handles.push(hd);
}
produce_urls();
// 等待消費(fèi)者結(jié)束
for hd in handles {
hd.join().unwrap();
}
}
生產(chǎn)者
fn produce_urls() {
let mut client = Beanstalkd::localhost().unwrap();
client.tube("urls").unwrap();
let urls = vec![
"https://github.com/iFaceless/ifaceless.github.io",
"https://github.com/iFaceless/gic",
"https://github.com/iFaceless/rust-exercises",
"https://github.com/iFaceless/learning-rust",
"https://github.com/iFaceless/fixture",
"https://github.com/iFaceless/rest",
"https://github.com/iFaceless/bigcache",
"https://github.com/iFaceless/leetgogo",
"https://github.com/iFaceless/freecache",
];
for url in urls {
client.put(url, 0, 0, 1000).unwrap();
}
}
消費(fèi)者
fn consume_urls(id: isize) {
println!("[Consumer {}] started...", id);
let mut client = Beanstalkd::localhost().unwrap();
client.watch("urls").unwrap();
loop {
let (job_id, url) = match client.reserve() {
Ok(job) => job,
Err(e) => {
println!("[Consumer {}] error happens: {}", id, e);
break;
}
};
println!("[Consumer {}] got job <{}>: {}", id, job_id, url);
client.delete(job_id).unwrap();
}
}
源碼探索
模塊分類圖
為了方便閱讀源碼,粗略地根據(jù)自己的理解給各個文件做了簡單的分類:

模塊 UML 圖
雖說 Beanstalkd 的源碼是使用 C 編寫的,但是其中的設(shè)計思想依然可以從面向?qū)ο蟮慕嵌葋斫忉尅1热缒K化設(shè)計、接口設(shè)計、多態(tài)等。根據(jù)自己的理解,對其中的一些核心模塊做了梳理,并繪制了一個簡單的 UML 圖來加深理解:

基本數(shù)據(jù)結(jié)構(gòu)
最小堆
二叉堆(Heap) 是一種很常見的數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一棵完全二叉樹。其分為最大堆(也叫大根堆) 和 最小堆(也叫小根堆):
- 最大堆:根結(jié)點(diǎn)的鍵值是所有堆結(jié)點(diǎn)鍵值中最大者的堆
- 最小堆:根結(jié)點(diǎn)的鍵值是所有堆結(jié)點(diǎn)鍵值中最小者的堆
在 beanstalkd/heap.c 是對最小堆的實(shí)現(xiàn)。那么,beanstalkd 中哪些地方用到了最小堆呢?
- Tube 中的延遲任務(wù)隊(duì)列(最先到期的任務(wù)會在堆頂,這樣可以在 O(1) 時間復(fù)雜度獲取到)
- Tube 中的就緒任務(wù)隊(duì)列(基于優(yōu)先級排列,優(yōu)先級最高的任務(wù)會在堆頂)
- Server 中的客戶端連接隊(duì)列(基于 tickat 時間排列)
接下來,我們看看最小堆的實(shí)現(xiàn):
int
heapinsert(Heap *h, void *x)
{
int k;
// 擴(kuò)容策略:2 倍長度
if (h->len == h->cap) {
void **ndata;
int ncap = (h->len+1) * 2; /* allocate twice what we need */
ndata = malloc(sizeof(void*) * ncap);
if (!ndata) {
return 0;
}
memcpy(ndata, h->data, sizeof(void*)*h->len);
free(h->data);
// 指向新的位置
h->data = ndata;
// 更新容量
h->cap = ncap;
}
k = h->len;
h->len++;
set(h, k, x);
siftdown(h, k);
return 1;
}
void *
heapremove(Heap *h, int k)
{
void *x;
if (k >= h->len) {
return 0;
}
x = h->data[k];
h->len--;
// 用原來的數(shù)組最后一位覆蓋被刪除的位置
set(h, k, h->data[h->len]);
siftdown(h, k);
siftup(h, k);
h->rec(x, -1);
return x;
}
static void
set(Heap *h, int k, void *x)
{
h->data[k] = x;
// 這里會去調(diào)用相應(yīng)的回調(diào)函數(shù)
h->rec(x, k);
}
// 判斷位置 a 指向的對象是否小于 b 指向的對象
static int
less(Heap *h, int a, int b)
{
// h->less 是一個判斷大小的回調(diào)
// 其實(shí)要在面向?qū)ο蟮恼Z言中,完全可以自定一個實(shí)現(xiàn)了比較大小的接口
// 比如在 Rust 中,可以使用 `PartialEq` 限定...
return h->less(h->data[a], h->data[b]);
}
變長數(shù)組
在 C 語言標(biāo)準(zhǔn)庫中是沒有可變長度的數(shù)組實(shí)現(xiàn)的,所以在 beanstalkd/ms.c 實(shí)現(xiàn)了一種類似的數(shù)據(jù)結(jié)構(gòu),它具有如下特點(diǎn):
-
ms結(jié)構(gòu)體維護(hù)一個可動態(tài)擴(kuò)容的數(shù)組(**items) - 擴(kuò)容策略很粗暴,直接擴(kuò)充為原來容量的兩倍
- 插入的平均時間復(fù)雜度為 O(1)
- 刪除的平均時間復(fù)雜度為 O(1)
- 由于刪除時,會將尾部 item 替換掉被刪除的 item,所以不能依賴數(shù)組中的元素順序(順序不保證和添加時一致)
- 刪除 item 后,其實(shí)數(shù)組占用的內(nèi)存空間還在(并沒有動態(tài)縮容的策略)
那具體在哪些地方用到了 ms 這種數(shù)據(jù)結(jié)構(gòu)呢?梳理后,主要發(fā)現(xiàn)以下幾處:
- 全局的 Tube 列表
- 客戶端連接的
Conn中維護(hù)的 watch list - 與 Tube 關(guān)聯(lián)的等待連接(conns)列表
下面看看其具體的實(shí)現(xiàn):
// 初始化數(shù)組,并注冊插入和移除的回調(diào)函數(shù)
void
ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove)
{
a->used = a->cap = a->last = 0;
a->items = NULL;
a->oninsert = oninsert;
a->onremove = onremove;
}
// 控制數(shù)組增長
static void
grow(ms a)
{
void **nitems;
// 倍速增長:1, 2, 4, ...
size_t ncap = (a->cap << 1) ? : 1;
nitems = malloc(ncap * sizeof(void *));
if (!nitems) return;
// 舊的數(shù)據(jù)拷貝到新開辟的空間
memcpy(nitems, a->items, a->used * sizeof(void *));
// 釋放舊的內(nèi)存空間
free(a->items);
// 指向新的位置
a->items = nitems;
// 更新數(shù)組容量
a->cap = ncap;
}
// 在數(shù)組尾部插入新的 item
// O(1)
int
ms_append(ms a, void *item)
{
// 按需擴(kuò)展容量
if (a->used >= a->cap) grow(a);
// 擴(kuò)容失敗,就返回,也就是不能再新增 item 了
if (a->used >= a->cap) return 0;
a->items[a->used++] = item;
// 如果有回調(diào),則觸發(fā)回調(diào)函數(shù)
if (a->oninsert) a->oninsert(a, item, a->used - 1);
return 1;
}
// 刪除指定位置的 item
// O(1)
static int
ms_delete(ms a, size_t i)
{
void *item;
if (i >= a->used) return 0;
item = a->items[i];
// 相當(dāng)于把尾部 item 寫到被刪除的位置,并「縮容」
a->items[i] = a->items[--a->used];
/* it has already been removed now */
if (a->onremove) a->onremove(a, item, i);
return 1;
}
字典

在 beanstalkd/job.c 中,為了方便基于 job_id 快速定位到具體的任務(wù),作者實(shí)現(xiàn)了一個字典數(shù)據(jù)結(jié)構(gòu)。這里是和 job 耦合在一起實(shí)現(xiàn)的,根據(jù)對源碼的分析,可以得出該字典數(shù)據(jù)結(jié)構(gòu)的特點(diǎn)如下:
- 采用基于
job_id哈希取模的方式計算slot_id - 使用鏈地址法解決哈希沖突
- 根據(jù)負(fù)載因子自動進(jìn)行 rehash(進(jìn)行擴(kuò)容或縮容),擴(kuò)容或者縮容的系數(shù)根據(jù) beanstalkd/primes.c 設(shè)置
- rehash 過程并沒有采用類似 Redis 中漸進(jìn)式 rehash 機(jī)制,而是阻塞式完成整個哈希表的 rehash 后才可以進(jìn)行后續(xù)操作
存放 job 及 rehash 的詳細(xì)源碼分析如下:
// 存放一個 job
static void
store_job(job j)
{
int index = 0;
index = _get_job_hash_index(j->r.id);
j->ht_next = all_jobs[index]; // 如果存在沖突,就采用鏈地址法
all_jobs[index] = j;
all_jobs_used++;
/* accept a load factor of 4 */
// 負(fù)載因子設(shè)置為 4,超過閾值時就會進(jìn)行 rehash
// 看起這里是阻塞的方式來進(jìn)行 rehash 了,如果 hash 表太大,會被阻塞
// 并沒有使用類似 redis 那樣漸進(jìn)式 rehash 思路
if (all_jobs_used > (all_jobs_cap << 2)) rehash(1);
}
// 支持?jǐn)U容和縮容
static void
rehash(int is_upscaling)
{
job *old = all_jobs;
// 記錄下舊的 hash 表容量,元素個數(shù)
size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
int old_prime = cur_prime;
int d = is_upscaling ? 1 : -1;
if (cur_prime + d >= NUM_PRIMES) return;
if (cur_prime + d < 0) return;
if (is_upscaling && hash_table_was_oom) return;
cur_prime += d;
all_jobs_cap = primes[cur_prime];
all_jobs = calloc(all_jobs_cap, sizeof(job));
if (!all_jobs) { // 針對擴(kuò)容失敗的處理,恢復(fù)原來的不變,但是標(biāo)記 OOM
twarnx("Failed to allocate %zu new hash buckets", all_jobs_cap);
hash_table_was_oom = 1;
cur_prime = old_prime;
all_jobs = old;
all_jobs_cap = old_cap;
all_jobs_used = old_used;
return;
}
// 重置 hash 表狀態(tài)
all_jobs_used = 0;
hash_table_was_oom = 0;
// 其實(shí)就是把 Hash 表上所有的 jobs 全部映射到新的空間
for (i = 0; i < old_cap; i++) {
while (old[i]) {
job j = old[i];
old[i] = j->ht_next;
j->ht_next = NULL;
store_job(j);
}
}
// 然后把原來的內(nèi)存空間釋放掉
if (old != all_jobs_init) {
free(old);
}
}
鏈表
鏈表這種數(shù)據(jù)結(jié)構(gòu)在 Beanstalkd 實(shí)現(xiàn)中用得比較頻繁,比如
-
beanstalkd/walg.c 中使用了單向鏈表的串聯(lián)了一些列的日志文件(參見三個游標(biāo)指針:
head,cur,tail) -
beanstalkd/conn.c 使用雙向鏈表連接了一些列被
reserve的任務(wù)
在 beastalkd/job.c,可以看到任務(wù)雙向鏈表實(shí)現(xiàn):
int
job_list_any_p(job head)
{
return head->next != head || head->prev != head;
}
job
job_remove(job j)
{
if (!j) return NULL;
if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
j->next->prev = j->prev;
j->prev->next = j->next;
j->prev = j->next = j;
return j;
}
void
job_insert(job head, job j)
{
if (job_list_any_p(j)) return; /* already in a linked list */
j->prev = head->prev;
j->next = head;
head->prev->next = j;
head->prev = j;
}
部分模塊源碼學(xué)習(xí)
main.c
int
main(int argc, char **argv) {
int r;
// 存放任務(wù)的鏈表
struct job list = {};
progname = argv[0];
// 設(shè)置使用行緩存,表使用標(biāo)準(zhǔn)輸出作為打印目標(biāo)
// 詳細(xì)文檔參見:https://linux.die.net/man/3/setlinebuf
// 意思是,只有在滿足一行(換行)時才輸出
setlinebuf(stdout);
// 命令行處理
optparse(&srv, argv + 1);
if (verbose) {
printf("pid %d\n", getpid());
}
// 服務(wù)器端 socket 初始化等,返回一個指向 socket 的 file descriptor
r = make_server_socket(srv.addr, srv.port);
if (r == -1) twarnx("make_server_socket()"), exit(111);
srv.sock.fd = r;
// 協(xié)議處理模塊初始化
prot_init();
if (srv.user) su(srv.user);
set_sig_handlers();
if (srv.wal.use) {
// We want to make sure that only one beanstalkd tries
// to use the wal directory at a time. So acquire a lock
// now and never release it.
// WAL 即 Write Ahead Log Directory,主要是記錄日志用
// 這里是要保證每次只能有一個 beanstalkd 實(shí)例使用 WAL 目錄,防止相互寫入沖突
// 那估計以后就沒法從 binlog 恢復(fù)任務(wù)了。。。
if (!waldirlock(&srv.wal)) {
twarnx("failed to lock wal dir %s", srv.wal.dir);
exit(10);
}
// 初始化任務(wù)鏈表(雙向鏈表)
list.prev = list.next = &list;
// 初始化 WAL,如果 log 中有任務(wù),還要恢復(fù)回來,掛載到 job list
walinit(&srv.wal, &list);
// 回放任務(wù)執(zhí)行
r = prot_replay(&srv, &list);
if (!r) {
twarnx("failed to replay log");
return 1;
}
}
// 正式啟動 server,并監(jiān)聽請求,處理請求了
srvserve(&srv);
return 0;
}

serv.c
void
srvserve(Server *s)
{
int r;
Socket *sock;
int64 period;
if (sockinit() == -1) {
twarnx("sockinit");
exit(1);
}
s->sock.x = s;
// 指定回調(diào),當(dāng)接收到 `r` 事件后,就會觸發(fā)這個回調(diào)
s->sock.f = (Handle)srvaccept;
// Server 維護(hù)了關(guān)聯(lián)的客戶端連接(最小堆)
s->conns.less = (Less)connless;
s->conns.rec = (Record)connrec;
r = listen(s->sock.fd, 1024);
if (r == -1) {
twarn("listen");
return;
}
// 注冊
r = sockwant(&s->sock, 'r');
if (r == -1) {
twarn("sockwant");
exit(2);
}
for (;;) {
// 執(zhí)行周期性的任務(wù)
// 如果 tick 中執(zhí)行的任務(wù)時間過久,會阻塞后面的 socket connection 處理
// 嚴(yán)重會導(dǎo)致超時,而如果客戶端重試過多,則回增加服務(wù)端負(fù)載
period = prottick(s);
// 輪詢是否有就緒的請求(rw),其實(shí)就是個適配器,將具體平臺下返回的狀態(tài)
// 轉(zhuǎn)換成統(tǒng)一的 `r`, `w`, `h`
// Linux 使用 epoll 封裝(參見 `linux.c`)
// Unix 使用 kqueue 封裝(參見 `darwin.c`)
int rw = socknext(&sock, period);
if (rw == -1) {
twarnx("socknext");
exit(1);
}
if (rw) {
// 如果輪詢到需要處理的請求,則執(zhí)行相應(yīng)的回調(diào) Handle
// 注意,這里的回調(diào)依然在主線程中執(zhí)行的,所以如果主線程被阻塞,就呵呵噠了
sock->f(sock->x, rw);
}
}
}
tube.c
// 新建 tube,這里需要給定 tube 名稱
// 這里可以看到一個 tube 有幾個比較重要的組成:
// 1. 維護(hù)就緒任務(wù)的堆
// 2. 維護(hù)延遲執(zhí)行任務(wù)的堆
// 3. 維護(hù)處于 buried 狀態(tài)的任務(wù)鏈表
// 4. 維護(hù)一個等待列表
tube
make_tube(const char *name)
{
tube t;
// 分配內(nèi)存空間,用于存儲 tube 結(jié)構(gòu)體值
t = new(struct tube);
if (!t) return NULL;
// 初始化 tube 名稱
t->name[MAX_TUBE_NAME_LEN - 1] = '\0';
strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);
if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");
// ready 堆維護(hù)著一些已經(jīng)就緒的 jobs,這里是指定按照 job 優(yōu)先級的方式比較大小
// 這樣,這個堆頂就是優(yōu)先級最高的 job
t->ready.less = job_pri_less;
// delay 堆維護(hù)著一些被延遲執(zhí)行的 jobs,這里是按照 job delay 時間比較大小
// 這樣,這個堆頂就是延遲時間最短的 job
t->delay.less = job_delay_less;
// 用于記錄 job 在堆上的位置
t->ready.rec = job_setheappos;
t->delay.rec = job_setheappos;
t->buried = (struct job) { };
// 使用鏈表維護(hù)著被 bury 掉的 job
t->buried.prev = t->buried.next = &t->buried;
// 初始化排隊(duì)列表
ms_init(&t->waiting, NULL, NULL);
return t;
}
// 釋放 tube
static void
tube_free(tube t)
{
// 實(shí)際就是從全局的 tubes 列表中移除該 tube
prot_remove_tube(t);
// 釋放就緒的任務(wù)
free(t->ready.data);
// 釋放延遲的任務(wù)
free(t->delay.data);
// 清空等待列表
ms_clear(&t->waiting);
// 釋放 tube 指向的內(nèi)存
free(t);
}
// 引用計數(shù):減引用
void
tube_dref(tube t)
{
if (!t) return;
if (t->refs < 1) return twarnx("refs is zero for tube: %s", t->name);
--t->refs;
// 沒有引用后就可以釋放該 tube 了
if (t->refs < 1) tube_free(t);
}
// 引用計數(shù):增加引用
void
tube_iref(tube t)
{
if (!t) return;
++t->refs;
}
// 新建一個 tube,然后注冊到全局 tubes 列表
static tube
make_and_insert_tube(const char *name)
{
int r;
tube t = NULL;
t = make_tube(name);
if (!t) return NULL;
/* We want this global tube list to behave like "weak" refs, so don't
* increment the ref count. */
// 這里是想讓全局 tube 列表表現(xiàn)為弱引用,所這里并沒有做增加引用的操作
r = ms_append(&tubes, t);
// 如果注冊 tube 失敗,減引用,必要的話會被釋放掉
if (!r) return tube_dref(t), (tube) 0;
return t;
}
job.c
在 Tube 中有兩個最小堆數(shù)據(jù)結(jié)構(gòu)分別存放被延遲的任何和就緒的任務(wù),這兩種使用的排序方式是不同的。我們看到在上面的 Tube 初始化時,給兩個堆綁定了不同的比較大小的回調(diào):
t->ready.less = job_pri_less;
t->delay.less = job_delay_less;
以下可以看到具體的排序方式:
// 回調(diào)函數(shù),先基于優(yōu)先級比較哪個小,再基于 id 比較哪個小
int
job_pri_less(void *ax, void *bx)
{
job a = ax, b = bx;
if (a->r.pri < b->r.pri) return 1;
if (a->r.pri > b->r.pri) return 0;
return a->r.id < b->r.id;
}
// 回調(diào)函數(shù),先基于到期時間比較哪個小,再基于 id 比較哪個小
int
job_delay_less(void *ax, void *bx)
{
job a = ax, b = bx;
if (a->r.deadline_at < b->r.deadline_at) return 1;
if (a->r.deadline_at > b->r.deadline_at) return 0;
return a->r.id < b->r.id;
}
讀后感
整個 Beanstalkd 的核心的代碼不過五千行左右,但這就實(shí)現(xiàn)了一個生產(chǎn)級別的消息隊(duì)列,的確是很厲害。不過,也正因?yàn)槠鋵?shí)現(xiàn)比較簡單,所以也沒有提供類似 Redis 的主從機(jī)制等。當(dāng)然,在這篇文章中,并沒有完整地剖析所有的模塊實(shí)現(xiàn),只是列出了個人比較感興趣的模塊;關(guān)于 binlog 管理的源碼(比如垃圾回收,壓縮,預(yù)留空間申請,任務(wù)恢復(fù)等)只是粗略地閱讀了下,就不在此處獻(xiàn)丑啦~
總的來說,對于單機(jī)消息隊(duì)列實(shí)現(xiàn)感興趣的同學(xué)還是推薦閱讀下該系統(tǒng)的源碼,可以學(xué)習(xí)其中的一些設(shè)計思想,實(shí)現(xiàn)思路等~
參考
延伸閱讀
聲明
- 本文鏈接: http://ifaceless.space/2019/02/04/learning-beanstalkd-source-code/
- 版權(quán)聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 3.0 許可協(xié)議。轉(zhuǎn)載請注明出處!