一. celery 簡介
Celery 是一個專注于實時處理和任務(wù)調(diào)度的分布式任務(wù)隊列, 同時提供操作和維護分布式系統(tǒng)所需的工具.. 所謂任務(wù)就是消息, 消息中的有效載荷中包含要執(zhí)行任務(wù)需要的全部數(shù)據(jù).
Celery 是一個分布式隊列的管理工具, 可以用 Celery 提供的接口快速實現(xiàn)并管理一個分布式的任務(wù)隊列.
Celery 本身不是任務(wù)隊列, 是管理分布式任務(wù)隊列的工具. 它封裝了操作常見任務(wù)隊列的各種操作, 我們使用它可以快速進行任務(wù)隊列的使用與管理.
Celery 特性 :
- 方便查看定時任務(wù)的執(zhí)行情況, 如 是否成功, 當(dāng)前狀態(tài), 執(zhí)行任務(wù)花費的時間等.
- 使用功能齊備的管理后臺或命令行添加,更新,刪除任務(wù).
- 方便把任務(wù)和配置管理相關(guān)聯(lián).
- 可選 多進程, Eventlet 和 Gevent 三種模型并發(fā)執(zhí)行.
- 提供錯誤處理機制.
- 提供多種任務(wù)原語, 方便實現(xiàn)任務(wù)分組,拆分,和調(diào)用鏈.
- 支持多種消息代理和存儲后端.
- Celery 是語言無關(guān)的.它提供了python 等常見語言的接口支持.
二. celery 組件
1. Celery 扮演生產(chǎn)者和消費者的角色,
- Celery Beat : 任務(wù)調(diào)度器. Beat 進程會讀取配置文件的內(nèi)容, 周期性的將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊列.
- Celery Worker : 執(zhí)行任務(wù)的消費者, 通常會在多臺服務(wù)器運行多個消費者, 提高運行效率.
- Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務(wù)生產(chǎn)者發(fā)送過來的任務(wù)消息, 存進隊列再按序分發(fā)給任務(wù)消費方(通常是消息隊列或者數(shù)據(jù)庫).
- Producer : 任務(wù)生產(chǎn)者. 調(diào)用 Celery API , 函數(shù)或者裝飾器, 而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者.
- Result Backend : 任務(wù)處理完成之后保存狀態(tài)信息和結(jié)果, 以供查詢.
Celery架構(gòu)圖
2. 產(chǎn)生任務(wù)的方式 :
- 發(fā)布者發(fā)布任務(wù)(WEB 應(yīng)用)
- 任務(wù)調(diào)度按期發(fā)布任務(wù)(定時任務(wù))
3. celery 依賴三個庫: 這三個庫, 都由 Celery 的開發(fā)者開發(fā)和維護.
- billiard : 基于 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩(wěn)定性.
- librabbitmp : C 語言實現(xiàn)的 Python 客戶端,
- kombu : Celery 自帶的用來收發(fā)消息的庫, 提供了符合 Python 語言習(xí)慣的, 使用 AMQP 協(xié)議的高級借口.
三. 選擇消息代理
使用于生產(chǎn)環(huán)境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.
四. Celery 序列化
在客戶端和消費者之間傳輸數(shù)據(jù)需要 序列化和反序列化. Celery 支出的序列化方案如下所示:
| 方案 | 說明 |
|---|---|
| pickle | pickle 是Python 標(biāo)準庫中的一個模塊, 支持 Pyuthon 內(nèi)置的數(shù)據(jù)結(jié)構(gòu), 但他是 Python 的專有協(xié)議. Celery 官方不推薦. |
| json | json 支持多種語言, 可用于跨語言方案. |
| yaml | yaml 表達能力更強, 支持的數(shù)據(jù)類型較 json 多, 但是 python 客戶端的性能不如 json |
| msgpack | 二進制的類 json 序列化方案, 但比 json 的數(shù)據(jù)結(jié)構(gòu)更小, 更快. |
五. 安裝,配置與簡單示例
Celery 配置參數(shù)匯總
| 配置項 | 說明 |
|---|---|
| CELERY_DEFAULT_QUEUE | 默認隊列 |
| CELERY_BROKER_URL | Broker 地址 |
| CELERY_RESULT_BACKEND | 結(jié)果存儲地址 |
| CELERY_TASK_SERIALIZER | 任務(wù)序列化方式 |
| CELERY_RESULT_SERIALIZER | 任務(wù)執(zhí)行結(jié)果序列化方式 |
| CELERY_TASK_RESULT_EXPIRES | 任務(wù)過期時間 |
| CELERY_ACCEPT_CONTENT | 指定任務(wù)接受的內(nèi)容類型(序列化) |
代碼示例 :
# 安裝
$ pip install celery, redis, msgpack
# 配置文件 celeryconfig.py
CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過期時間
CELERY_ACCEPT_CONTENT = ["json"] # 指定任務(wù)接受的內(nèi)容類型.
# 初始化文件 celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj', include=["proj.tasks"])
app.config_from_object("proj.celeryconfig")
if __name__ == "__main__":
app.start()
# 任務(wù)文件 tasks.py
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
# 啟動消費者
$ celery -A proj worker -l info
# 在終端中測試
> from proj.tasks import add
> r = add.delay(2,4)
> r.result
6
> r.status
u"SUCCESS"
> r.successful()
True
> r.ready() # 返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False.
> r.wait() # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果.
> r.get() # 獲取任務(wù)執(zhí)行結(jié)果
> r.result # 任務(wù)執(zhí)行結(jié)果.
> r.state # PENDING, START, SUCCESS
> r.status # PENDING, START, SUCCESS
# 使用 AsyncResult 方式獲取執(zhí)行結(jié)果.
# AsyncResult 主要用來存儲任務(wù)執(zhí)行信息與執(zhí)行結(jié)果(類似 js 中的 Promise 對象),
> from celery.result import AsyncResult
> AsyncResult(task_id).get()
4
六. 調(diào)用任務(wù)的方法 :
1. delay
task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
2. apply_async
delay 實際上是 apply_async 的別名, 還可以使用如下方法調(diào)用, 但是 apply_async 支持更多的參數(shù):
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
支持的參數(shù) :
-
countdown : 等待一段時間再執(zhí)行.
add.apply_async((2,3), countdown=5) -
eta : 定義任務(wù)的開始時間.
add.apply_async((2,3), eta=now+tiedelta(second=10)) -
expires : 設(shè)置超時時間.
add.apply_async((2,3), expires=60) -
retry : 定時如果任務(wù)失敗后, 是否重試.
add.apply_async((2,3), retry=False) -
retry_policy : 重試策略.
- max_retries : 最大重試次數(shù), 默認為 3 次.
- interval_start : 重試等待的時間間隔秒數(shù), 默認為 0 , 表示直接重試不等待.
- interval_step : 每次重試讓重試間隔增加的秒數(shù), 可以是數(shù)字或浮點數(shù), 默認為 0.2
- interval_max : 重試間隔最大的秒數(shù), 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點數(shù), 默認為 0.2 .
自定義發(fā)布者,交換機,路由鍵, 隊列, 優(yōu)先級,序列方案和壓縮方法:
task.apply_async((2,2), compression='zlib',
serialize='json',
queue='priority.high',
routing_key='web.add',
priority=0,
exchange='web_exchange')
七. 指定隊列 :
Celery 默認使用名為 celery 的隊列 (可以通過 CELERY_DEFAULT_QUEUE 修改) 來存放任務(wù). 我們可以使用 優(yōu)先級不同的隊列 來確保高優(yōu)先級的任務(wù)優(yōu)先執(zhí)行.
# 修改配置文件, 保證隊列優(yōu)先級
from kombu import Queue
CELERY_QUEUE = ( # 定義任務(wù)隊列.
Queue('default', routing_key="task.#"), # 路由鍵 以 "task." 開頭的消息都進入 default 隊列.
Queue('web_tasks', routing_key="web.#") # 路由鍵 以 "web." 開頭的消息都進入 web_tasks 隊列.
)
CELERY_DEFAULT_EXCHANGE = 'tasks' # 默認的交換機名字為 tasks
CELERY_DEFAULT_EXCHANGE_KEY = 'topic' # 默認的交換機類型為 topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默認的路由鍵是 task.default , 這個路由鍵符合上面的 default 隊列.
CELERY_ROUTES = {
'proj.tasks.add': {
'queue': 'web_tasks',
'routing_key': 'web.add',
}
}
# 使用指定隊列的方式啟動消費者進程.
$ celery -A proj worker -Q web_tasks -l info # 該 worker 只會執(zhí)行 web_tasks 中任務(wù), 我們可以合理安排消費者數(shù)量, 讓 web_tasks 中任務(wù)的優(yōu)先級更高.
閱后即焚模式(transient):
from kombu import Queue
Queue('transient', routing_key='transient', delivery_mode=1)
八. 使用任務(wù)調(diào)度
使用 Beat 進程自動生成任務(wù).
# 修改配置文件,
# 下面的任務(wù)指定 tasks.add 任務(wù) 每 10s 跑一次, 任務(wù)參數(shù)為 (16,16).
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'proj.tasks.add',
'schedule': timedelta(seconds=10),
'args': (16, 16)
}
}
# crontab 風(fēng)格
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
"add": {
"task": "tasks.add",
"schedule": crontab(hour="*/3", minute=12),
"args": (16, 16),
}
}
# 啟動 Beat 程序
$ celery beat -A proj
# 之后啟動 worker 進程.
$ celery -A proj worker -l info
或者
$ celery -B -A proj worker -l info
使用自定義調(diào)度類還可以實現(xiàn)動態(tài)添加任務(wù). 使用 Django 可以通過 Django-celery 實現(xiàn)在管理后臺創(chuàng)建,刪除,更新任務(wù), 是因為他使用了自定義的 調(diào)度類 djcelery.schedulers.DatabaseScheduler .
九. 任務(wù)綁定, 記錄日志, 重試
# 修改 tasks.py 文件.
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def div(self, x, y):
logger.info(('Executing task id {0.id}, args: {0.args!r}'
'kwargs: {0.kwargs!r}').format(self.request))
try:
result = x/y
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=5, max_retries=3) # 發(fā)生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.
return result
當(dāng)使用 bind=True 參數(shù)之后, 函數(shù)的參數(shù)發(fā)生變化, 多出了參數(shù) self, 這這相當(dāng)于把 div 編程了一個已綁定的方法, 通過 self 可以獲得任務(wù)的上下文.
十. 信號系統(tǒng) :
信號可以幫助我們了解任務(wù)執(zhí)行情況, 分析任務(wù)運行的瓶頸. Celery 支持 7 種信號類型.
- 任務(wù)信號
- before_task_publish : 任務(wù)發(fā)布前
- after_task_publish : 任務(wù)發(fā)布后
- task_prerun : 任務(wù)執(zhí)行前
- task_postrun : 任務(wù)執(zhí)行后
- task_retry : 任務(wù)重試時
- task_success : 任務(wù)成功時
- task_failure : 任務(wù)失敗時
- task_revoked : 任務(wù)被撤銷或終止時
- 應(yīng)用信號
- Worker 信號
- Beat 信號
- Eventlet 信號
- 日志信號
- 命令信號
不同的信號參數(shù)格式不同, 具體格式參見官方文檔
代碼示例 :
# 在執(zhí)行任務(wù) add 之后, 打印一些信息.
@after_task_publish
def task_send_handler(sender=None, body=None, **kwargs):
print 'after_task_publish: task_id: {body[id]}; sender: {sender}'.format(body=body, sender=sender)
十一. 子任務(wù)與工作流:
可以把任務(wù) 通過簽名的方法傳給其他任務(wù), 成為一個子任務(wù).
from celery import signature
task = signature('task.add', args=(2,2), countdown=10)
task
task.add(2,2) # 通過簽名生成任務(wù)
task.apply_async()
還可以通過如下方式生成子任務(wù) :
from proj.task import add
task = add.subtask((2,2), countdown=10) # 快捷方式 add.s((2,2), countdown-10)
task.apply_async()
自任務(wù)實現(xiàn)片函數(shù)的方式非常有用, 這種方式可以讓任務(wù)在傳遞過程中財傳入?yún)?shù).
partial = add.s(2)
partial.apply_async((4,))
子任務(wù)支持如下 5 種原語,實現(xiàn)工作流. 原語表示由若干指令組成的, 用于完成一定功能的過程.
-
chain : 調(diào)用連, 前面的執(zhí)行結(jié)果, 作為參數(shù)傳給后面的任務(wù), 直到全部完成, 類似管道.
from celery import chain res = chain(add.s(2,2), add.s(4), add.s(8))() res.get() 管道式: (add.s(2,2) | add.s(4) | add.s(8))().get() -
group : 一次創(chuàng)建多個(一組)任務(wù).
from celery import group res = group(add.s(i,i) for i in range(10))() res.get() -
chord : 等待任務(wù)全部完成時添加一個回調(diào)任務(wù).
res = chord((add.s(i,i) for i in range(10)), add.s(['a']))() res.get() # 執(zhí)行完前面的循環(huán), 把結(jié)果拼成一個列表之后, 再對這個列表 添加 'a'. [0,2,4,6,8,10,12,14,16,18,u'a'] -
map/starmap : 每個參數(shù)都作為任務(wù)的參數(shù)執(zhí)行一遍, map 的參數(shù)只有一個, starmap 支持多個參數(shù).
add.starmap(zip(range(10), range(10))) 相當(dāng)于: @app.task def temp(): return [add(i,i) for i in range(10)] -
chunks : 將任務(wù)分塊.
res = add.chunks(zip(range(50), range(50)),10)() res.get()
在生成任務(wù)的時候, 應(yīng)該充分利用 group/chain/chunks 這些原語.
十二. 其他
關(guān)閉不想要的功能 :
@app.task(ignore_result=True) # 關(guān)閉任務(wù)執(zhí)行結(jié)果.
def func():
pass
CELERY_DISABLE_RATE_LIMITS=True # 關(guān)閉限速.
根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作 :
# tasks.py
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
# 正確函數(shù), 執(zhí)行 MyTask.on_success() :
@app.task(base=MyTask)
def add(x, y):
return x + y
# 錯誤函數(shù), 執(zhí)行 MyTask.on_failure() :
@app.task #普通函數(shù)裝飾為 celery task
def add(x, y):
raise KeyError
return x + y
十三. Celery 管理命令
任務(wù)狀態(tài)回調(diào) :
| 參數(shù) | 說明 |
|---|---|
| PENDING | 任務(wù)等待中 |
| STARTED | 任務(wù)已開始 |
| SUCCESS | 任務(wù)執(zhí)行成功 |
| FAILURE | 任務(wù)執(zhí)行失敗 |
| RETRY | 任務(wù)將被 |
| REVOKED | 任務(wù)取消 |
| PROGRESS | 任務(wù)進行中 |
普通啟動命令 :
$ celery -A proj worker -l info
使用 daemon 方式 multi :
$ celery multi start web -A proj -l info --pidfile=/path/to/celery_%n.pid --logfile=/path/to/celery_%n.log
# web 是對項目啟動的標(biāo)識,
# %n 是對節(jié)點的格式化用法.
%n : 只包含主機名
%h : 包含域名的主機
%d : 只包含域名
%i : Prefork 類型的進程索引,如果是主進程, 則為 0.
%I : 帶分隔符的 Prefork 類型的進程索引. 假設(shè)主進程為 worker1, 那么進程池的第一個進程則為 worker1-1
常用 multi 相關(guān)命令:
$ celery multi show web # 查看 web 啟動時的命令
$ celery multi names web # 獲取 web 的節(jié)點名字
$ celery multi stop web # 停止 web 進程
$ celery multi restart web # 重啟 web
$ celery multi kill web # 殺掉 web 進程
常用監(jiān)控和管理命令 :
-
shell : 交互時環(huán)境, 內(nèi)置了 Celery 應(yīng)用實例和全部已注冊的任務(wù), 支持 默認解釋器,IPython,BPython .
$ celery shell -A proj -
result : 通過 task_id 在命令行獲得任務(wù)執(zhí)行結(jié)果
$ celery -A proj result TASK_ID -
inspect active : 列出當(dāng)前正在執(zhí)行的任務(wù)
$ celery -A proj inspect active -
inspect stats : 列出 worker 的統(tǒng)計數(shù)據(jù), 常用來查看配置是否正確以及系統(tǒng)的使用情況.
$ celery -A proj inspect stats
Flower web 監(jiān)控工具
- 查看任務(wù)歷史,任務(wù)具體參數(shù),開始時間等信息;
- 提供圖表和統(tǒng)計數(shù)據(jù)
- 實現(xiàn)全面的遠程控制功能, 包括但不限于 撤銷/終止任務(wù), 關(guān)閉重啟 worker, 查看正在運行任務(wù)
- 提供一個 HTTP API , 方便集成.
Flower 的 supervisor 管理配置文件:
[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3
Celery 自帶的事件監(jiān)控工具顯示任務(wù)歷史等信息.
$ celery -A proj event
** 需要把 CELERY_SEND_TASK_SEND_EVENT = True 設(shè)置, 才可以獲取時間.
使用自動擴展 :
$ celery -A proj worker -l info --autoscale=6,3 # 平時保持 3 個進程, 最大時可以達到 6 個.
Celery 命令匯總
$ celery --help
-A APP, --app APP
-b BROKER, --broker BROKER
--loader LOADER
--config CONFIG
--workdir WORKDIR
--no-color, -C
--quiet, -q
$ celery <command> --help
+ Main:
| celery worker
| celery events
| celery beat
| celery shell
| celery multi
| celery amqp
+ Remote Control:
| celery status
| celery inspect --help
| celery inspect active
| celery inspect active_queues
| celery inspect clock
| celery inspect conf [include_defaults=False]
| celery inspect memdump [n_samples=10]
| celery inspect memsample
| celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
| celery inspect ping
| celery inspect query_task [id1 [id2 [... [idN]]]]
| celery inspect registered [attr1 [attr2 [... [attrN]]]]
| celery inspect report
| celery inspect reserved
| celery inspect revoked
| celery inspect scheduled
| celery inspect stats
| celery control --help
| celery control add_consumer <queue> [exchange [type [routing_key]]]
| celery control autoscale [max [min]]
| celery control cancel_consumer <queue>
| celery control disable_events
| celery control election
| celery control enable_events
| celery control heartbeat
| celery control pool_grow [N=1]
| celery control pool_restart
| celery control pool_shrink [N=1]
| celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
| celery control revoke [id1 [id2 [... [idN]]]]
| celery control shutdown
| celery control terminate <signal> [id1 [id2 [... [idN]]]]
| celery control time_limit <task_name> <soft_secs> [hard_secs]
+ Utils:
| celery purge
| celery list
| celery call
| celery result
| celery migrate
| celery graph
| celery upgrade
+ Debugging:
| celery report
| celery logtool
+ Extensions:
| celery flower
十四. 在 Flask 中使用 Celery
Flask 文檔: 基于 Celery 的后臺任務(wù)
在 Flask 中使用 Celery