緊接著上回繼續(xù)翻譯吧。有關(guān)huey這個(gè)python寫的的輕量級(jí)消息隊(duì)列
個(gè)人才疏學(xué)淺,可能很多英文都要借助翻譯軟件,但盡量做到能夠易于理解。
教程指導(dǎo)
這個(gè)文檔的目的是為了幫助人盡可能快速使用huey。
簡(jiǎn)單入門
使用huey需要注意有如下三個(gè)主要的組成(或者過程):
- 生產(chǎn)者,例如
web應(yīng)用等。 - 消費(fèi)者,運(yùn)行放置在消息隊(duì)列中的任務(wù)(
jobs)。 - 隊(duì)列,存放任務(wù)。例如
Redis等。
底下的截圖展示了上述三個(gè)不同的過程。左邊是生產(chǎn)者:一個(gè)簡(jiǎn)單的程序詢問用戶要輸入多少的“豆子”。右上角消費(fèi)者一直運(yùn)行,它正在做“計(jì)算”,舉例如圖中所示打印了有多少“豆子”被計(jì)數(shù)。右下角是是一個(gè)隊(duì)列,圖中使用的是Redis。我們可以看到任務(wù)被添加(LPUSH)入隊(duì)列和從數(shù)據(jù)庫(kù)中讀取(BRPOP)任務(wù)。

自我嘗試
假定你安裝了huey,讓我們來看一下代碼例子。
第一步先配置隊(duì)列。消費(fèi)者需要指定一個(gè)Huey實(shí)例,這代表了使用的后端類型。
# config.py
from huey import RedisHuey
huey = RedisHuey()
huey對(duì)象封裝了隊(duì)列。隊(duì)列負(fù)責(zé)存儲(chǔ)和取回消息,你的應(yīng)用程序代碼使用huey實(shí)例來聯(lián)系函數(shù)調(diào)用。我們來看一下怎么使用huey來連接一個(gè)計(jì)算豆子的函數(shù):
# tasks.py
from config import huey # import the huey we instantiated in config.py
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
上述代碼展現(xiàn)了如何用API定義最終被消費(fèi)者執(zhí)行的“任務(wù)”——用task()裝飾器簡(jiǎn)單裝飾你想要讓消費(fèi)者運(yùn)行的函數(shù)任務(wù)。而當(dāng)它被調(diào)用時(shí)候,主進(jìn)程將立即返回而不是進(jìn)入函數(shù)內(nèi)部。在消費(fèi)者進(jìn)程中會(huì)看到這個(gè)新消息并運(yùn)行這個(gè)函數(shù)。
我們的主程序很簡(jiǎn)單。它導(dǎo)入了配置和任務(wù)——這確保了在我們根據(jù)指定的配置運(yùn)行消費(fèi)者時(shí),所有任務(wù)都會(huì)被加載入內(nèi)存。
# main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)
啟動(dòng)腳本需要依次進(jìn)行以下步驟:
- 確保本地運(yùn)行
Redis - 確保安裝了
huey - 啟動(dòng)消費(fèi)者:
huey_consumer.py main.huey(注意是"main.huey"而不是"config.huey",這里提示一下huey_consumer.py需要自己從huey腳本的bin下拷貝到當(dāng)前的路徑,這樣才能用該命令來啟動(dòng)。) - 運(yùn)行主程序:
python main.py
獲取結(jié)果
上面的例子實(shí)現(xiàn)了一個(gè)“發(fā)送并且忘記”的方法,但是如果你的應(yīng)用程序需要對(duì)任務(wù)的結(jié)果做些什么呢?要從你的任務(wù)中獲取結(jié)果,只需返回任務(wù)函數(shù)中的值即可。
如果你得到了存儲(chǔ)結(jié)果但不使用它們,那么可能會(huì)浪費(fèi)大量空間,特別是如果你的任務(wù)量很高。要禁用存儲(chǔ)功能,可以在初始化
Huey實(shí)例時(shí)返回None或指定result_store = False。
為了更好地說明獲取結(jié)果的代碼,我們還將修改tasks.py模塊以返回一個(gè)字符串,而不是打印結(jié)果到標(biāo)準(zhǔn)輸出窗口:
from config import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num
我們準(zhǔn)備向消費(fèi)者輸入大量任務(wù)。不是簡(jiǎn)單地執(zhí)行主程序,我們這回將啟動(dòng)一個(gè)解釋器并運(yùn)行以下操作:
>>> from main import count_beans
>>> res = count_beans(100)
>>> print(res) # What is "res" ?
<huey.api.TaskResultWrapper object at 0xb7471a4c>
>>> res() # Get the result of this task
'Counted 100 beans'
按照與上一個(gè)例子相同的布局,下面是三個(gè)主要工作流程的截圖:
- 左邊,解釋器生成任務(wù)并詢問結(jié)果
- 右上角, 消費(fèi)者執(zhí)行任務(wù)并存儲(chǔ)結(jié)果
- 右下角是
Redis數(shù)據(jù)庫(kù),我們能夠看到它儲(chǔ)存結(jié)果然后在我們?nèi)』財(cái)?shù)據(jù)后刪除它們

延遲執(zhí)行任務(wù)
將某個(gè)特定任務(wù)排入任意時(shí)間來執(zhí)行通常很用的,例如,標(biāo)記在某個(gè)時(shí)間發(fā)布的博客條目。
這在huey中很容易完成。返回上一節(jié)描述的在解釋器中執(zhí)行的代碼,讓我們安排一個(gè)一分鐘后執(zhí)行的數(shù)豆子的任務(wù),然后看看huey怎么處理它。執(zhí)行以下代碼:
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> print(res)
<huey.api.TaskResultWrapper object at 0xb72915ec>
>>> res() # This returns None, no data is ready.
>>> res() # A couple seconds later.
>>> res(blocking=True) # OK, let's just block until its ready
'Counted 100 beans'
你也可以使用datetime類型來指定“預(yù)計(jì)到達(dá)時(shí)間”:
>>> in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
>>> res = count_beans.schedule(args=(100,), eta=in_a_minute)
默認(rèn)情況下,
Huey實(shí)例以UTC時(shí)間運(yùn)行。這對(duì)計(jì)劃任務(wù)的影響是,當(dāng)使用本地的時(shí)間時(shí),它們必須與datetime.utcnow()相關(guān)聯(lián)。在上述代碼中我們不采用
utcnow()的原因是schedule()包含默認(rèn)值為True的第三個(gè)參數(shù)叫做convert_utc。所以在上面的代碼中,datetime在被發(fā)送到隊(duì)列之前從本地時(shí)間轉(zhuǎn)換為了UTC。當(dāng)你想要以本地時(shí)間模式運(yùn)行(-o),你需要總是指定
schedule()的第三個(gè)參數(shù)convert_utc=False,包括在指定delay參數(shù)時(shí)。
瞧redis的輸出,我們看到以下(簡(jiǎn)化內(nèi)容):
+1325563365.910640 "LPUSH" count_beans(100)
+1325563365.911912 "BRPOP" wait for next job
+1325563365.912435 "HSET" store 'Counted 100 beans'
+1325563366.393236 "HGET" retrieve result from task
+1325563366.393464 "HDEL" delete result after reading
截圖也展示了相同的內(nèi)容:

重試失敗任務(wù)
Huey支持有限次重試失敗任務(wù)。如果在執(zhí)行任務(wù)期間引發(fā)了異常,但是你已經(jīng)指定了retries參數(shù),則任務(wù)將重新入隊(duì)并再次嘗試,直到指定的重試次數(shù)。
如下顯示的一個(gè)任務(wù),將重試3次,每次都會(huì)拋出異常:
# tasks.py
from config import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num
@huey.task(retries=3)
def try_thrice():
print('trying....')
raise Exception('nope')
控制臺(tái)輸出顯示我們的任務(wù)在解釋器會(huì)話中被調(diào)用,然后當(dāng)消費(fèi)者接收然后執(zhí)行它時(shí),我們看到它失敗但進(jìn)行了重試:

在重試之間等待一定的時(shí)間間隔通常是個(gè)好的主意。你可以指定重試之間的delay參數(shù)(以秒為單位),這會(huì)是任務(wù)重試之前等待的最短時(shí)間。這里我們修改代碼使它包含delay參數(shù),并打印當(dāng)前時(shí)間來顯示它的工作。
# tasks.py
from datetime import datetime
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
print('trying....%s' % datetime.now())
raise Exception('nope')
下面的控制臺(tái)輸出顯示正在重試的任務(wù),同時(shí)在重試過程中,我還在添加了“數(shù)豆子”的任務(wù)——處于正常執(zhí)行狀態(tài)。

定期任務(wù)
huey支持的另一個(gè)使用模式是定期執(zhí)行任務(wù)。依照crontab行為,同時(shí)遵循類似的語法。定期執(zhí)行的任務(wù),不應(yīng)返回有意義的結(jié)果,也不應(yīng)接受任何參數(shù)。
讓我們添加一個(gè)每分鐘打印一次字符竄的新任務(wù)——我們將使用它來測(cè)試消費(fèi)者是否正在按計(jì)劃執(zhí)行任務(wù)。
# tasks.py
from datetime import datetime
from huey import crontab
from config import huey
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())
現(xiàn)在,當(dāng)我們運(yùn)行消費(fèi)者時(shí),它將每分鐘開始打印時(shí)間:

取消或暫停任務(wù)
huey可以停止任務(wù)執(zhí)行。這適用于正常任務(wù),延遲任務(wù)和定期任務(wù)。
為了“revoke(撤銷)”任務(wù),你需要在實(shí)例化Huey對(duì)象時(shí)指定一個(gè)result_store參數(shù)。
如果消費(fèi)者沒有開始執(zhí)行任務(wù),你可以取消正常的任務(wù):
# count some beans
res = count_beans(10000000)
# provided the command has not started executing yet, you can
# cancel it by calling revoke() on the TaskResultWrapper object
res.revoke()
這同樣適用于延遲任務(wù):
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
# and you can actually change your mind and restore it, provided
# it has not already been "skipped" by the consumer
res.restore()
要撤消給定任務(wù)的所有實(shí)例,請(qǐng)對(duì)任務(wù)本身使用revoke()和restore()方法:
count_beans.revoke()
assert count_beans.is_revoked() is True
res = count_beans(100)
assert res.is_revoked() is True
count_beans.restore()
assert count_beans.is_revoked() is False
取消或暫停定期任務(wù)
當(dāng)我們開始處理定期任務(wù)時(shí),撤銷選項(xiàng)會(huì)更有意思。
我們將使用打印時(shí)間代碼作為示例:
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())
我們可以防止周期性的任務(wù)在下一個(gè)循環(huán)中執(zhí)行:
# only prevent it from running once
print_time.revoke(revoke_once=True)
由于上述任務(wù)每分鐘執(zhí)行一次,我們將看到輸出將跳過下一次的一分鐘,然后才恢復(fù)正常。
我們也可以防止任務(wù)執(zhí)行直到指定時(shí)間:
# prevent printing time for 10 minutes
now = datetime.datetime.utcnow()
in_10 = now + datetime.timedelta(seconds=600)
print_time.revoke(revoke_until=in_10)
當(dāng)指定
revoke_until設(shè)置,如果消費(fèi)者以UTC默認(rèn)時(shí)間模式運(yùn)行,本地時(shí)間需要關(guān)聯(lián)到datetime.utcnow()。如果消費(fèi)者以本地時(shí)間(-o參數(shù)指定),則可以使用datetime.now()。
最后,我們可以防止任務(wù)無限期地運(yùn)行:
# will not print time until we call revoke() again with
# different parameters or restore the task
print_time.revoke()
assert print_time.is_revoked() is True
我們隨時(shí)可以恢復(fù)任務(wù),它將會(huì)正常執(zhí)行:
print_time.restore()
查看更多
這總結(jié)了huey的基本使用模式。以下是有關(guān)API其他方面的詳細(xì)信息的鏈接:
- Huey——負(fù)責(zé)協(xié)調(diào)可執(zhí)行任務(wù)和隊(duì)列后端
- Huey.task()——裝飾器來指示可執(zhí)行任務(wù)
- Huey.periodic_task() ——裝飾器以指示以周期性間隔執(zhí)行的任務(wù)
- TaskResultWrapper.get() ——從任務(wù)獲取返回值
- crontab() ——用于定義執(zhí)行周期性任務(wù)的間隔時(shí)間
結(jié)束
有一周混過去了,繼續(xù)啃爬蟲去了。。。。