Celery 使用 +

在項(xiàng)目中使用celery

項(xiàng)目結(jié)構(gòu):

proj/__init__.py  # 注意這個(gè)文件 django是不帶的默認(rèn) 對(duì)應(yīng)django的項(xiàng)目
      /celery.py
      /tasks.py

proj/celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
         broker='amqp://',  # 使用的消息隊(duì)列
         backend='amqp://',  # 使用的結(jié)果存儲(chǔ)
         include=['proj.tasks'])  # 任務(wù)們

if __name__ == '__main__':
    app.start()

proj/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y): 
    return x + y

啟動(dòng)工作進(jìn)程:

celery -A proj worker -l info

-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//

  • ** ---------- . app: main:0x1012d8590
  • ** ---------- . concurrency: 8 (processes)
  • ** ---------- . events: OFF (enable -E to monitor this worker)
  • ** ----------
  • *** --- * --- [Queues]
    -- ******* ---- . celery: exchange:celery(direct) binding:celery
    --- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

broker就是配置中的消息隊(duì)列,concurrency就是工作進(jìn)程數(shù)量(默認(rèn)是cpu數(shù)量)如果所有進(jìn)程都被占用了那么新的任務(wù)需要等待,events是指定celery是否發(fā)送監(jiān)控信息,queue就是隊(duì)列。

終止工作進(jìn)程

直接control -c(按照上述命令在前臺(tái)啟動(dòng)的),當(dāng)然了如果在后臺(tái)啟動(dòng)和停止:

celery multi start w1 -A proj -l info
celery  multi restart w1 -A proj -l info

# 異步關(guān)閉 立即返回
celery multi stop w1 -A proj -l info
# 等待關(guān)閉操作完成
celery multi stopwait w1 -A proj -l info

默認(rèn)會(huì)在當(dāng)前目錄下創(chuàng)建pid和log文件,指定路徑和名稱:

celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                    --logfile=/var/log/celery/%n%I.log

關(guān)于-A(--app)選項(xiàng):
指定celery app,可以用 module.path:attribute的形式指定,也可直接指定package,如--app=proj,然后會(huì)搜索其中的proj.app,proj.celery等。

操作已被創(chuàng)建的worker時(shí),不需要參數(shù)什么的完全一樣,只要pidfile和logfile一樣即可。

調(diào)用

add.delay(1, 1)
add.apply_async((1, 1))

add.apply_async((2, 2), queue='lopri', countdown=10)
# 指定要發(fā)送到哪個(gè)隊(duì)列 運(yùn)行時(shí)間延遲countdown

# 結(jié)果獲得
res  = add.delay(1, 1)
res.get()

celery默認(rèn)不產(chǎn)生結(jié)果的原因:因?yàn)楦鞣N應(yīng)用的需求不同,而大多數(shù)任務(wù)保存返回值沒有什么意義,而且產(chǎn)生結(jié)果并不是用來監(jiān)控任務(wù)和工作進(jìn)程,應(yīng)該是使用event消息的專門的監(jiān)控模塊。

res.id  # 任務(wù)id uuid

# 任務(wù)出錯(cuò)會(huì)raise 錯(cuò)誤 可指定propagate
res.get(propagate=False)
res.failed()
res.successful()
res.state

# PENDING -> STARTED -> SUCCESS(FAILURE)
# STARTED 需要設(shè)置 task_track_started 或在任務(wù)級(jí)別設(shè)置@task(track_started=True)
# 重試的情況也有
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

工作流

有時(shí)想把一個(gè)任務(wù)的簽名(包含了實(shí)參和任務(wù)的執(zhí)行選項(xiàng))發(fā)送到另一個(gè)進(jìn)程或把參數(shù)發(fā)送到另一個(gè)函數(shù)。

add.signature((2, 2), countdown=10)  # with args and kwargs
add.s(2, 2)  # with args only

然后獲得的簽名就可以調(diào)用:

s1 = add.s(2, 2)
res = s1.delay()
res.get()

# 也可以部分指定參數(shù) 然后在調(diào)用時(shí)補(bǔ)全參數(shù)    
s2 = add.s(2)
res = s2.delay(8)
res.get()

調(diào)用方式

之前一直使用的是delay(*args, **kwargs)的方式來調(diào)用,還有apply_async的調(diào)用方式,支持一些執(zhí)行時(shí)的選項(xiàng)

Groups

group是同時(shí)調(diào)用的一系列任務(wù),返回特殊的結(jié)果可以獲得組內(nèi)所有的執(zhí)行結(jié)果,可以按照任務(wù)的順序從該結(jié)果中獲得對(duì)應(yīng)任務(wù)的結(jié)果。

group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# 組也支持部分參數(shù)化
g = group(add.s(i) for i in xrange(10))
g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

把任務(wù)連接起來,執(zhí)行完一個(gè)后把結(jié)果傳入另一個(gè)再調(diào)用。

chain(add.s(4, 4) | mul.s(8))().get()
g = chain(add.s(4) | mul.s(8))
g(4).get()
# 也支持部分參數(shù)化

chords

chord是一個(gè)帶callback的group

from celery import chord
from proj.tasks import add, xsum

chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
# 而group的結(jié)果發(fā)送到另一個(gè)任務(wù)則自動(dòng)轉(zhuǎn)換為chord
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()

# 可以把單任務(wù)的結(jié)果發(fā)送到組中
upload_doc.s(file) | group(apply_filter.s() for filter in filters)

Routing

在app層面配置task到某個(gè)隊(duì)列:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

在調(diào)用時(shí)指定使用某個(gè)隊(duì)列:

add.apply_async((2, 2), queue='hipri')

啟動(dòng)工作進(jìn)程時(shí)指定該進(jìn)程處理的隊(duì)列:

celery -A proj worker -Q hipri

# 指定多個(gè)處理的隊(duì)列 默認(rèn)隊(duì)列名稱:celery
celery -A proj worker -Q hipri,celery

遠(yuǎn)程控制

查看工作進(jìn)程當(dāng)前的工作:

celery -A proj inspect active

這個(gè)命令是一個(gè)廣播,所有工作進(jìn)程都會(huì)收到。(需要啟用event)

celery -A proj inspect active --destination=celery@example.com

讓工作進(jìn)程啟用event

celery -A proj control enable_events
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.定義: Celery是一個(gè)異步的任務(wù)隊(duì)列(也叫做分布式任務(wù)隊(duì)列) 2.工作結(jié)構(gòu) Celery分為3個(gè)部...
    四號(hào)公園_2016閱讀 28,950評(píng)論 5 60
  • 應(yīng)用 需要一個(gè)celery實(shí)例,即應(yīng)用。這個(gè)應(yīng)用是使用所有東西的進(jìn)入點(diǎn),例如創(chuàng)建任務(wù)、管理工作進(jìn)程,必須可被其他模...
    xncode閱讀 1,466評(píng)論 0 1
  • 序言第1章 并行和分布式計(jì)算介紹第2章 異步編程第3章 Python的并行計(jì)算第4章 Celery分布式應(yīng)用第5章...
    SeanCheney閱讀 12,789評(píng)論 3 35
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • 粉絲,這個(gè)帶來無限可能的群體,無疑是每個(gè)商家夢(mèng)寐以求的東西。誠(chéng)然,有了粉絲,你就不必要為產(chǎn)品的銷量而擔(dān)憂,也...
    鍛鎏閱讀 203評(píng)論 0 0

友情鏈接更多精彩內(nèi)容