簡單易用的任務(wù)隊(duì)列-beanstalkd

更多技術(shù)文章,請關(guān)注我的個人博客 www.immaxfang.com 和小公眾號 Max的學(xué)習(xí)札記。

概述

beanstalkd 是一個簡單快速的分布式工作隊(duì)列系統(tǒng),協(xié)議基于 ASCII 編碼運(yùn)行在 TCP 上。其最初設(shè)計(jì)的目的是通過后臺異步執(zhí)行耗時任務(wù)的方式降低高容量 Web 應(yīng)用的頁面延時。其具有簡單、輕量、易用等特點(diǎn),也支持對任務(wù)優(yōu)先級、延時/超時重發(fā)等控制,同時還有眾多語言版本的客戶端支持,這些優(yōu)點(diǎn)使得它成為各種需要隊(duì)列系統(tǒng)場景的一種常見選擇。

beanstalkd 優(yōu)點(diǎn)

  • 如他官網(wǎng)的介紹,simple&fast,使用非常簡單,適合需要引入消息隊(duì)列又不想引入 kafka 這類重型的 mq,維護(hù)成本低;同時,它的性能非常高,大部分場景下都可以 cover 住。
  • 支持持久化
  • 支持消息優(yōu)先級,topic,延時消息,消息重試等
  • 主流語言客戶端都支持,還可以根據(jù) beanstalkd 協(xié)議自行實(shí)現(xiàn)。

beanstalkd 不足

  • 無最大內(nèi)存控制,當(dāng)業(yè)務(wù)消息極多時,服務(wù)可能會不穩(wěn)定。
  • 官方?jīng)]有提供集群故障切換方案(主從或哨兵等),需要自己解決。

beanstalkd 重點(diǎn)概念

  • job

任務(wù),隊(duì)列中的基本單元,每個 job 都會有 id 和優(yōu)先級。有點(diǎn)類似其他消息隊(duì)列中的 message 的概念。但 job 有各種狀態(tài),下文介紹生命周期部分會重點(diǎn)介紹。job 存放在 tube 中。

  • tube

管道,用來存儲同一類型的 job。有點(diǎn)類似其他消息隊(duì)列中的 topic 的概念。beanstalkd 通過 tube 來實(shí)現(xiàn)多任務(wù)隊(duì)列,beanstalkd 中可以有多個管道,每個管道有自己的 producer 和 consumer,管道之間互相不影響。

  • producer

job 生產(chǎn)者。通過 put 命令將一個 job 放入到一個 tube 中。

  • consumer

job 消費(fèi)者。通過 reserve 來獲取 job,通過 delete、release、bury 來改變 job 的狀態(tài)。

beanstalkd 生命周期

上文介紹到,beanstalkd 中 job 有狀態(tài)區(qū)分,在整個生命周期中,job 可能有四種狀態(tài):READY, RESERVED, DELAYED, BURIED。只有處于READY狀態(tài)的 job 才能被消費(fèi)。下圖介紹了各狀態(tài)之間的流轉(zhuǎn)情況。

beanstalkd-status.png

producer 在創(chuàng)建 job 的時候有兩種方式,put 和 put with delay(延時任務(wù))。
如果 producer 使用 put 直接創(chuàng)建一個 job 時,該 job 就處于 READY 狀態(tài),等待 consumer 處理。
如果 producer 使用 put with delay 方式創(chuàng)建 job,該 job 的初始狀態(tài)為 DELAYED 狀態(tài),等待延遲時間過后才變更為 READY 狀態(tài)。
以上兩種方式創(chuàng)建的 job 都會傳入一個 TTR(超時機(jī)制),當(dāng) job 處于 RESERVED 狀態(tài)時,TTR 開始倒計(jì)時,當(dāng) TTR 倒計(jì)時完,job 狀態(tài)還沒有改變,則會認(rèn)為該 job 處理失敗,會被重新放回到隊(duì)列中。

consumer 獲取到(reserve)一個 READY 狀態(tài)的 job 之后,該 job 的狀態(tài)就會變更為 RESERVED。此時,其他的 consumer 就不能再操作該 job 了。當(dāng) consumer 完成該 job 之后,可以選擇 delete,release,或 bury 操作。

  • delete ,job 被刪除,從 beanstalkd 中清除,以后也無法再獲取到,生命周期結(jié)束。
  • release ,可以把該 job 重新變更為 READY 狀態(tài),使得其他的 consumer 可以繼續(xù)獲取和執(zhí)行該 job,也可以使用 release with delay 延時操作,這樣會先進(jìn)入 DELAYED 狀態(tài),延遲時間到達(dá)后再變?yōu)?READY。
  • bury,可以將 job 休眠,等需要的時候,在將休眠的 job 通過 kick 命令變更回 READY 狀態(tài),也可以通過 delete 直接刪除 BURIED 狀態(tài)的 job 。

處于 BURIED 狀態(tài)的 job,可以通過 kick 重回 READY 狀態(tài),也可以通過 delete 刪除 job。

為什么設(shè)計(jì)這個 BURIED 狀態(tài)呢?
一般我們可以用這個狀態(tài)來做異常捕獲,例如執(zhí)行超時或者異常的 job,我們可以將其置為 BURIED 狀態(tài),這樣做有幾個好處:
1.可以便面這些異常的 job 直接被放回隊(duì)列重試,影響正常的隊(duì)列消費(fèi)(這些失敗一次的 job,很有可能再次失敗)。如果沒有這個 BURIED 狀態(tài),如果我們要單獨(dú)隔離,一般我們會使用一個新的 tube 單獨(dú)存放這些異常的 job,使用單獨(dú)的 consumer 消費(fèi)。這樣就不會影響正常的新消息消費(fèi)。特別是失敗率比較高的時候,會占用很多的正常資源。
2.便于人工排查,上面已經(jīng)講到,可以將異常的 job 置為 BURIED 狀態(tài),這樣人工排查時重點(diǎn)關(guān)注這個狀態(tài)就可以了。

beanstalkd 特性

持久化

通過 binlog 將 job 及其狀態(tài)記錄到本地文件,當(dāng) beanstalkd 重啟時,可以通過讀取 binlog 來恢復(fù)之前的 job 狀態(tài)。

分布式

在 beanstalkd 的文檔中,其實(shí)是支持分布式的,其設(shè)計(jì)思想和 Memcached 類似,beanstalkd 各個 server 之間并不知道彼此的存在,是通過 client 實(shí)現(xiàn)分布式以及根據(jù) tube 名稱去特定的 server 上獲取 job。貼一篇專門討論 beanstalkd 分布式的文章,Beanstalkd的一種分布式方案

任務(wù)延時

天然支持延時任務(wù),可以在創(chuàng)建 job 時指定延時時間,也可以當(dāng) job 被處理完后才能后,消費(fèi)者使用 release with delay 將 job 再次放入隊(duì)列延時執(zhí)行。

任務(wù)優(yōu)先級

producer 生成的 job 可以給他分配優(yōu)先級,支持 0 到 2^32 的優(yōu)先級,值越小,優(yōu)先級越高,默認(rèn)優(yōu)先級為 1024。優(yōu)先級高的 job 會被 consumer 優(yōu)先執(zhí)行。

超時機(jī)制

為了防止某個 consumer 長時間占用 job 但無法處理完成的情況,beanstalkd 的 reserve 操作支持設(shè)置 timeout 時間(TTR)。如果 consumer 不能在 TTR 內(nèi)發(fā)送 delete、release 或 bury 命令改變 job 狀態(tài),那么 beanstalkd 會認(rèn)為任務(wù)處理失敗,會將 job 重新置為 READY 狀態(tài)供其他 consumer 消費(fèi)。
如果消費(fèi)者已經(jīng)預(yù)知可能無法在 TTR 內(nèi)完成該 job,則可以發(fā)送 touch 命令,使得 beanstalkd 重新計(jì)算 TTR。

任務(wù)預(yù)留

有一個 BURIED 狀態(tài)可以作為緩沖,具體特點(diǎn)見上文生命周期中關(guān)于 BURIED 狀態(tài)的介紹。

安裝及配置

以下以 ubuntu 為例,安轉(zhuǎn) beanstalkd:

sudo apt-get update
sudo apt-get install beanstalkd
vi /etc/sysconfig/beanstalkd
# 添加如下內(nèi)容
BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

可以通過 beanstalkd 命令來運(yùn)行服務(wù),并且可以添加多種參數(shù)。命令的格式如下:

beanstalkd [OPTIONS]

 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

如下我們啟動一個 beanstalkd 服務(wù),并開啟 binlog:

nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ &

beanstalkd管理工具

官方推薦的一些管理工具:Tools
筆者常用的管理工具:https://github.com/ptrofimov/beanstalk_console
如果只是簡單的操作和查看 beanstalkd,可以使用 telnet 工具,然后執(zhí)行 stats,use,put,watch 等:

$ telnet 127.0.0.1 11300
stats

實(shí)際應(yīng)用

beansralkd 有很多語言版本的客戶端實(shí)現(xiàn),官方提供了一些客戶端列表beanstalkd客戶端列表。
如果現(xiàn)有的這些庫不滿足需求,也可以自行實(shí)現(xiàn),參考 beanstalkd協(xié)議。

以下以 go 為例,簡單演示下 beanstalkd 常用處理操作。

go get github.com/beanstalkd/go-beanstalk

生產(chǎn)者

向默認(rèn)的 tube 中投入 job:

id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
    panic(err)
}
fmt.Println("job", id)

向指定的 tube 中投入 job:

tube := &beanstalk.Tube{Conn: conn, Name: "mytube"}
id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
    panic(err)
}
fmt.Println("job", id)

消費(fèi)者

消費(fèi)默認(rèn)的 tube 中的 job:

id, body, err := conn.Reserve(5 * time.Second)
if err != nil {
    panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

消費(fèi)指定的 tube (此處指定多個) 中的 job:

tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
id, body, err := tubeSet.Reserve(10 * time.Hour)
if err != nil {
    panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

beanstalkd 使用小 tips

  • 可以通過指定 tube ,在 put 的時候?qū)?job 放入指定的 tube 中,否則會放入 default 的 tube 中。
  • beanstalkd 支持持久化,在啟動時使用 -b參數(shù)來開啟binlog,通過binog可以將 job 及其狀態(tài)記錄到文件里。當(dāng)重新使用-b參數(shù)重啟 beanstalkd,將讀取binlog來恢復(fù)之前的 job 及狀態(tài)。

參考資料


更多技術(shù)文章,請關(guān)注我的個人博客 www.immaxfang.com 和小公眾號 Max的學(xué)習(xí)札記。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容