python huey中文文檔(二)

緊接著上回繼續(xù)翻譯吧。有關(guān)huey這個(gè)python寫的的輕量級(jí)消息隊(duì)列

個(gè)人才疏學(xué)淺,可能很多英文都要借助翻譯軟件,但盡量做到能夠易于理解。


教程指導(dǎo)

這個(gè)文檔的目的是為了幫助人盡可能快速使用huey。

簡(jiǎn)單入門

使用huey需要注意有如下三個(gè)主要的組成(或者過程):

  • 生產(chǎn)者,例如web應(yīng)用等。
  • 消費(fèi)者,運(yùn)行放置在消息隊(duì)列中的任務(wù)(jobs)。
  • 隊(duì)列,存放任務(wù)。例如Redis等。

底下的截圖展示了上述三個(gè)不同的過程。左邊是生產(chǎn)者:一個(gè)簡(jiǎn)單的程序詢問用戶要輸入多少的“豆子”。右上角消費(fèi)者一直運(yùn)行,它正在做“計(jì)算”,舉例如圖中所示打印了有多少“豆子”被計(jì)數(shù)。右下角是是一個(gè)隊(duì)列,圖中使用的是Redis。我們可以看到任務(wù)被添加(LPUSH)入隊(duì)列和從數(shù)據(jù)庫(kù)中讀取(BRPOP)任務(wù)。

截圖

自我嘗試

假定你安裝了huey,讓我們來看一下代碼例子。
第一步先配置隊(duì)列。消費(fèi)者需要指定一個(gè)Huey實(shí)例,這代表了使用的后端類型。

# config.py
from huey import RedisHuey

huey = RedisHuey()

huey對(duì)象封裝了隊(duì)列。隊(duì)列負(fù)責(zé)存儲(chǔ)和取回消息,你的應(yīng)用程序代碼使用huey實(shí)例來聯(lián)系函數(shù)調(diào)用。我們來看一下怎么使用huey來連接一個(gè)計(jì)算豆子的函數(shù):

# tasks.py
from config import huey # import the huey we instantiated in config.py


@huey.task()
def count_beans(num):
    print('-- counted %s beans --' % num)

上述代碼展現(xiàn)了如何用API定義最終被消費(fèi)者執(zhí)行的“任務(wù)”——用task()裝飾器簡(jiǎn)單裝飾你想要讓消費(fèi)者運(yùn)行的函數(shù)任務(wù)。而當(dāng)它被調(diào)用時(shí)候,主進(jìn)程將立即返回而不是進(jìn)入函數(shù)內(nèi)部。在消費(fèi)者進(jìn)程中會(huì)看到這個(gè)新消息并運(yùn)行這個(gè)函數(shù)。
我們的主程序很簡(jiǎn)單。它導(dǎo)入了配置和任務(wù)——這確保了在我們根據(jù)指定的配置運(yùn)行消費(fèi)者時(shí),所有任務(wù)都會(huì)被加載入內(nèi)存。

# main.py
from config import huey  # import our "huey" object
from tasks import count_beans  # import our task


if __name__ == '__main__':
    beans = raw_input('How many beans? ')
    count_beans(int(beans))
    print('Enqueued job to count %s beans' % beans)

啟動(dòng)腳本需要依次進(jìn)行以下步驟:

  1. 確保本地運(yùn)行Redis
  2. 確保安裝了huey
  3. 啟動(dòng)消費(fèi)者: huey_consumer.py main.huey(注意是"main.huey"而不是"config.huey",這里提示一下huey_consumer.py需要自己從huey腳本的bin下拷貝到當(dāng)前的路徑,這樣才能用該命令來啟動(dòng)。)
  4. 運(yùn)行主程序: python main.py

獲取結(jié)果

上面的例子實(shí)現(xiàn)了一個(gè)“發(fā)送并且忘記”的方法,但是如果你的應(yīng)用程序需要對(duì)任務(wù)的結(jié)果做些什么呢?要從你的任務(wù)中獲取結(jié)果,只需返回任務(wù)函數(shù)中的值即可。

如果你得到了存儲(chǔ)結(jié)果但不使用它們,那么可能會(huì)浪費(fèi)大量空間,特別是如果你的任務(wù)量很高。要禁用存儲(chǔ)功能,可以在初始化Huey實(shí)例時(shí)返回None或指定result_store = False。

為了更好地說明獲取結(jié)果的代碼,我們還將修改tasks.py模塊以返回一個(gè)字符串,而不是打印結(jié)果到標(biāo)準(zhǔn)輸出窗口:

from config import huey


@huey.task()
def count_beans(num):
    print('-- counted %s beans --' % num)
    return 'Counted %s beans' % num

我們準(zhǔn)備向消費(fèi)者輸入大量任務(wù)。不是簡(jiǎn)單地執(zhí)行主程序,我們這回將啟動(dòng)一個(gè)解釋器并運(yùn)行以下操作:

>>> from main import count_beans
>>> res = count_beans(100)
>>> print(res)                      # What is "res" ?
<huey.api.TaskResultWrapper object at 0xb7471a4c>

>>> res()                          # Get the result of this task
'Counted 100 beans'

按照與上一個(gè)例子相同的布局,下面是三個(gè)主要工作流程的截圖:

  1. 左邊,解釋器生成任務(wù)并詢問結(jié)果
  2. 右上角, 消費(fèi)者執(zhí)行任務(wù)并存儲(chǔ)結(jié)果
  3. 右下角是Redis數(shù)據(jù)庫(kù),我們能夠看到它儲(chǔ)存結(jié)果然后在我們?nèi)』財(cái)?shù)據(jù)后刪除它們
結(jié)果

延遲執(zhí)行任務(wù)

將某個(gè)特定任務(wù)排入任意時(shí)間來執(zhí)行通常很用的,例如,標(biāo)記在某個(gè)時(shí)間發(fā)布的博客條目。
這在huey中很容易完成。返回上一節(jié)描述的在解釋器中執(zhí)行的代碼,讓我們安排一個(gè)一分鐘后執(zhí)行的數(shù)豆子的任務(wù),然后看看huey怎么處理它。執(zhí)行以下代碼:

>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> print(res)
<huey.api.TaskResultWrapper object at 0xb72915ec>

>>> res()  # This returns None, no data is ready.

>>> res()  # A couple seconds later.

>>> res(blocking=True)  # OK, let's just block until its ready
'Counted 100 beans'

你也可以使用datetime類型來指定“預(yù)計(jì)到達(dá)時(shí)間”:

>>> in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
>>> res = count_beans.schedule(args=(100,), eta=in_a_minute)

默認(rèn)情況下,Huey實(shí)例以UTC時(shí)間運(yùn)行。這對(duì)計(jì)劃任務(wù)的影響是,當(dāng)使用本地的時(shí)間時(shí),它們必須與datetime.utcnow()相關(guān)聯(lián)。

在上述代碼中我們不采用utcnow()的原因是schedule()包含默認(rèn)值為True的第三個(gè)參數(shù)叫做convert_utc。所以在上面的代碼中,datetime在被發(fā)送到隊(duì)列之前從本地時(shí)間轉(zhuǎn)換為了UTC。

當(dāng)你想要以本地時(shí)間模式運(yùn)行(-o),你需要總是指定schedule()的第三個(gè)參數(shù)convert_utc=False,包括在指定delay參數(shù)時(shí)。

redis的輸出,我們看到以下(簡(jiǎn)化內(nèi)容):

+1325563365.910640 "LPUSH" count_beans(100)
+1325563365.911912 "BRPOP" wait for next job
+1325563365.912435 "HSET" store 'Counted 100 beans'
+1325563366.393236 "HGET" retrieve result from task
+1325563366.393464 "HDEL" delete result after reading

截圖也展示了相同的內(nèi)容:

安排

重試失敗任務(wù)

Huey支持有限次重試失敗任務(wù)。如果在執(zhí)行任務(wù)期間引發(fā)了異常,但是你已經(jīng)指定了retries參數(shù),則任務(wù)將重新入隊(duì)并再次嘗試,直到指定的重試次數(shù)。
如下顯示的一個(gè)任務(wù),將重試3次,每次都會(huì)拋出異常:

# tasks.py
from config import huey


@huey.task()
def count_beans(num):
    print('-- counted %s beans --' % num)
    return 'Counted %s beans' % num

@huey.task(retries=3)
def try_thrice():
    print('trying....')
    raise Exception('nope')

控制臺(tái)輸出顯示我們的任務(wù)在解釋器會(huì)話中被調(diào)用,然后當(dāng)消費(fèi)者接收然后執(zhí)行它時(shí),我們看到它失敗但進(jìn)行了重試:

重試

在重試之間等待一定的時(shí)間間隔通常是個(gè)好的主意。你可以指定重試之間的delay參數(shù)(以秒為單位),這會(huì)是任務(wù)重試之前等待的最短時(shí)間。這里我們修改代碼使它包含delay參數(shù),并打印當(dāng)前時(shí)間來顯示它的工作。

# tasks.py
from datetime import datetime

from config import huey

@huey.task(retries=3, retry_delay=10)
def try_thrice():
    print('trying....%s' % datetime.now())
    raise Exception('nope')

下面的控制臺(tái)輸出顯示正在重試的任務(wù),同時(shí)在重試過程中,我還在添加了“數(shù)豆子”的任務(wù)——處于正常執(zhí)行狀態(tài)。

添加延遲

定期任務(wù)

huey支持的另一個(gè)使用模式是定期執(zhí)行任務(wù)。依照crontab行為,同時(shí)遵循類似的語法。定期執(zhí)行的任務(wù),不應(yīng)返回有意義的結(jié)果,也不應(yīng)接受任何參數(shù)。
讓我們添加一個(gè)每分鐘打印一次字符竄的新任務(wù)——我們將使用它來測(cè)試消費(fèi)者是否正在按計(jì)劃執(zhí)行任務(wù)。

# tasks.py
from datetime import datetime
from huey import crontab

from config import huey

@huey.periodic_task(crontab(minute='*'))
def print_time():
    print(datetime.now())

現(xiàn)在,當(dāng)我們運(yùn)行消費(fèi)者時(shí),它將每分鐘開始打印時(shí)間:

定時(shí)任務(wù)

取消或暫停任務(wù)

huey可以停止任務(wù)執(zhí)行。這適用于正常任務(wù),延遲任務(wù)和定期任務(wù)。
為了“revoke(撤銷)”任務(wù),你需要在實(shí)例化Huey對(duì)象時(shí)指定一個(gè)result_store參數(shù)。
如果消費(fèi)者沒有開始執(zhí)行任務(wù),你可以取消正常的任務(wù):

# count some beans
res = count_beans(10000000)

# provided the command has not started executing yet, you can
# cancel it by calling revoke() on the TaskResultWrapper object
res.revoke()

這同樣適用于延遲任務(wù):

res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()

# and you can actually change your mind and restore it, provided
# it has not already been "skipped" by the consumer
res.restore()

要撤消給定任務(wù)的所有實(shí)例,請(qǐng)對(duì)任務(wù)本身使用revoke()restore()方法:

count_beans.revoke()
assert count_beans.is_revoked() is True

res = count_beans(100)
assert res.is_revoked() is True

count_beans.restore()
assert count_beans.is_revoked() is False

取消或暫停定期任務(wù)

當(dāng)我們開始處理定期任務(wù)時(shí),撤銷選項(xiàng)會(huì)更有意思。
我們將使用打印時(shí)間代碼作為示例:

@huey.periodic_task(crontab(minute='*'))
def print_time():
    print(datetime.now())

我們可以防止周期性的任務(wù)在下一個(gè)循環(huán)中執(zhí)行:

# only prevent it from running once
print_time.revoke(revoke_once=True)

由于上述任務(wù)每分鐘執(zhí)行一次,我們將看到輸出將跳過下一次的一分鐘,然后才恢復(fù)正常。
我們也可以防止任務(wù)執(zhí)行直到指定時(shí)間:

# prevent printing time for 10 minutes
now = datetime.datetime.utcnow()
in_10 = now + datetime.timedelta(seconds=600)

print_time.revoke(revoke_until=in_10)

當(dāng)指定revoke_until設(shè)置,如果消費(fèi)者以UTC默認(rèn)時(shí)間模式運(yùn)行,本地時(shí)間需要關(guān)聯(lián)到datetime.utcnow()。如果消費(fèi)者以本地時(shí)間(-o參數(shù)指定),則可以使用datetime.now()。

最后,我們可以防止任務(wù)無限期地運(yùn)行:

# will not print time until we call revoke() again with
# different parameters or restore the task
print_time.revoke()
assert print_time.is_revoked() is True

我們隨時(shí)可以恢復(fù)任務(wù),它將會(huì)正常執(zhí)行:

print_time.restore()

查看更多

這總結(jié)了huey的基本使用模式。以下是有關(guān)API其他方面的詳細(xì)信息的鏈接:

  • Huey——負(fù)責(zé)協(xié)調(diào)可執(zhí)行任務(wù)和隊(duì)列后端
  • Huey.task()——裝飾器來指示可執(zhí)行任務(wù)
  • Huey.periodic_task() ——裝飾器以指示以周期性間隔執(zhí)行的任務(wù)
  • TaskResultWrapper.get() ——從任務(wù)獲取返回值
  • crontab() ——用于定義執(zhí)行周期性任務(wù)的間隔時(shí)間

結(jié)束

有一周混過去了,繼續(xù)啃爬蟲去了。。。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,536評(píng)論 19 139
  • 本文章是 Vert.x 藍(lán)圖系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待...
    sczyh30閱讀 2,186評(píng)論 0 10
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,512評(píng)論 2 34
  • 41.多用派發(fā)隊(duì)列,少用同步鎖 在Objective-C中,如果有多個(gè)線程要執(zhí)行同一份代碼,那么有時(shí)可能會(huì)出問題。...
    Code_Ninja閱讀 1,251評(píng)論 1 13
  • GCD調(diào)度隊(duì)列是執(zhí)行任務(wù)的強(qiáng)大工具。調(diào)度隊(duì)列允許您相對(duì)于調(diào)度者異步或者同步的執(zhí)行任意代碼塊。您能夠使用調(diào)度隊(duì)列來執(zhí)...
    坤坤同學(xué)閱讀 6,745評(píng)論 1 3

友情鏈接更多精彩內(nèi)容