在項(xiàng)目中使用celery
項(xiàng)目結(jié)構(gòu):
proj/__init__.py # 注意這個(gè)文件 django是不帶的默認(rèn) 對(duì)應(yīng)django的項(xiàng)目
/celery.py
/tasks.py
proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='amqp://', # 使用的消息隊(duì)列
backend='amqp://', # 使用的結(jié)果存儲(chǔ)
include=['proj.tasks']) # 任務(wù)們
if __name__ == '__main__':
app.start()
proj/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
啟動(dòng)工作進(jìn)程:
celery -A proj worker -l info
-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: main:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
broker就是配置中的消息隊(duì)列,concurrency就是工作進(jìn)程數(shù)量(默認(rèn)是cpu數(shù)量)如果所有進(jìn)程都被占用了那么新的任務(wù)需要等待,events是指定celery是否發(fā)送監(jiān)控信息,queue就是隊(duì)列。
終止工作進(jìn)程
直接control -c(按照上述命令在前臺(tái)啟動(dòng)的),當(dāng)然了如果在后臺(tái)啟動(dòng)和停止:
celery multi start w1 -A proj -l info
celery multi restart w1 -A proj -l info
# 異步關(guān)閉 立即返回
celery multi stop w1 -A proj -l info
# 等待關(guān)閉操作完成
celery multi stopwait w1 -A proj -l info
默認(rèn)會(huì)在當(dāng)前目錄下創(chuàng)建pid和log文件,指定路徑和名稱:
celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
關(guān)于-A(--app)選項(xiàng):
指定celery app,可以用 module.path:attribute的形式指定,也可直接指定package,如--app=proj,然后會(huì)搜索其中的proj.app,proj.celery等。
操作已被創(chuàng)建的worker時(shí),不需要參數(shù)什么的完全一樣,只要pidfile和logfile一樣即可。
調(diào)用
add.delay(1, 1)
add.apply_async((1, 1))
add.apply_async((2, 2), queue='lopri', countdown=10)
# 指定要發(fā)送到哪個(gè)隊(duì)列 運(yùn)行時(shí)間延遲countdown
# 結(jié)果獲得
res = add.delay(1, 1)
res.get()
celery默認(rèn)不產(chǎn)生結(jié)果的原因:因?yàn)楦鞣N應(yīng)用的需求不同,而大多數(shù)任務(wù)保存返回值沒有什么意義,而且產(chǎn)生結(jié)果并不是用來監(jiān)控任務(wù)和工作進(jìn)程,應(yīng)該是使用event消息的專門的監(jiān)控模塊。
res.id # 任務(wù)id uuid
# 任務(wù)出錯(cuò)會(huì)raise 錯(cuò)誤 可指定propagate
res.get(propagate=False)
res.failed()
res.successful()
res.state
# PENDING -> STARTED -> SUCCESS(FAILURE)
# STARTED 需要設(shè)置 task_track_started 或在任務(wù)級(jí)別設(shè)置@task(track_started=True)
# 重試的情況也有
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
工作流
有時(shí)想把一個(gè)任務(wù)的簽名(包含了實(shí)參和任務(wù)的執(zhí)行選項(xiàng))發(fā)送到另一個(gè)進(jìn)程或把參數(shù)發(fā)送到另一個(gè)函數(shù)。
add.signature((2, 2), countdown=10) # with args and kwargs
add.s(2, 2) # with args only
然后獲得的簽名就可以調(diào)用:
s1 = add.s(2, 2)
res = s1.delay()
res.get()
# 也可以部分指定參數(shù) 然后在調(diào)用時(shí)補(bǔ)全參數(shù)
s2 = add.s(2)
res = s2.delay(8)
res.get()
調(diào)用方式
之前一直使用的是delay(*args, **kwargs)的方式來調(diào)用,還有apply_async的調(diào)用方式,支持一些執(zhí)行時(shí)的選項(xiàng)
Groups
group是同時(shí)調(diào)用的一系列任務(wù),返回特殊的結(jié)果可以獲得組內(nèi)所有的執(zhí)行結(jié)果,可以按照任務(wù)的順序從該結(jié)果中獲得對(duì)應(yīng)任務(wù)的結(jié)果。
group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 組也支持部分參數(shù)化
g = group(add.s(i) for i in xrange(10))
g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
把任務(wù)連接起來,執(zhí)行完一個(gè)后把結(jié)果傳入另一個(gè)再調(diào)用。
chain(add.s(4, 4) | mul.s(8))().get()
g = chain(add.s(4) | mul.s(8))
g(4).get()
# 也支持部分參數(shù)化
chords
chord是一個(gè)帶callback的group
from celery import chord
from proj.tasks import add, xsum
chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
# 而group的結(jié)果發(fā)送到另一個(gè)任務(wù)則自動(dòng)轉(zhuǎn)換為chord
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
# 可以把單任務(wù)的結(jié)果發(fā)送到組中
upload_doc.s(file) | group(apply_filter.s() for filter in filters)
Routing
在app層面配置task到某個(gè)隊(duì)列:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
在調(diào)用時(shí)指定使用某個(gè)隊(duì)列:
add.apply_async((2, 2), queue='hipri')
啟動(dòng)工作進(jìn)程時(shí)指定該進(jìn)程處理的隊(duì)列:
celery -A proj worker -Q hipri
# 指定多個(gè)處理的隊(duì)列 默認(rèn)隊(duì)列名稱:celery
celery -A proj worker -Q hipri,celery
遠(yuǎn)程控制
查看工作進(jìn)程當(dāng)前的工作:
celery -A proj inspect active
這個(gè)命令是一個(gè)廣播,所有工作進(jìn)程都會(huì)收到。(需要啟用event)
celery -A proj inspect active --destination=celery@example.com
讓工作進(jìn)程啟用event
celery -A proj control enable_events