在上一篇文章爬蟲架構(gòu)|Celery+RabbitMQ快速入門(一)中簡單介紹了Celery和RabbitMQ的使用以及它們之間的合作流程。本篇文章將繼續(xù)講解它們是如何配合工作的。
一、Celery介紹和基本使用
Celery是一個基于Python開發(fā)的分布式異步消息任務(wù)隊列,它簡單、靈活、可靠,是一個專注于實時處理的任務(wù)隊列,同時也支持任務(wù)調(diào)度。通過它可以輕松的實現(xiàn)任務(wù)的異步處理,如果你的業(yè)務(wù)場景中需要用到異步任務(wù),就可以考慮使用Celery。舉幾個適用場景:
1)可以在 Request-Response 循環(huán)之外執(zhí)行的操作:發(fā)送郵件、推送消息。
2)耗時的操作:調(diào)用第三方 API、視頻處理(前端通過 AJAX 展示進度和結(jié)果)。
3)周期性任務(wù):取代 crontab。
Celery有以下幾個優(yōu)點:
簡單:一旦熟悉了Celery的工作流程后,配置和使用是比較簡單的。
高可用:當任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,Celery 會自動嘗試重新執(zhí)行任務(wù)。
快速:一個單進程的Celery每分鐘可處理上百萬個任務(wù)。
靈活: Celery的大部分組件都可以被擴展及自定制。
二、選擇Broker
Celery的基本架構(gòu)和工作流程如下圖2-1所示:

常用的Broker有RabbitMQ、Redis、數(shù)據(jù)庫等,我們這里使用的是RabbitMQ,如下圖2-2所示:

三、Celery安裝使用
Celery是一個Python的應(yīng)用,而且已經(jīng)上傳到了PyPi,所以可以使用pip或easy_install安裝:
pip install celery
安裝完成后會在PATH(或virtualenv的bin目錄)添加幾個命令:celery、celerybeat、celeryd 和celeryd-multi。我們這里只使用 celery 命令。
四、創(chuàng)建Application和Task
Celery的默認broker是RabbitMQ,僅需配置一行就可以:
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 沒裝的話請裝一下,安裝看這里http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
創(chuàng)建一個Celery Application用來定義任務(wù)列表。
實例化一個Celery對象app,然后通過@app.task?裝飾器注冊一個 task。任務(wù)文件就叫tasks.py:
from celery import Celery
app = Celery(__name__, broker='amqp://guest:guest@localhost:5672//')
@app.task
def add(x, y):? ?
????????return x + y
五、運行 worker,啟動Celery Worker來開始監(jiān)聽并執(zhí)行任務(wù)
在 tasks.py 文件所在目錄運行
$ celery worker -A tasks.app -l INFO
這個命令會開啟一個在前臺運行的 worker,解釋這個命令的意義:
worker: 運行 worker 模塊。
-A: –app=APP, 指定使用的 Celery 實例。
-l: –loglevel=INFO, 指定日志級別,可選:DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
其它常用的選項:
-P: –pool=prefork, 并發(fā)模型,可選:prefork (默認,multiprocessing), eventlet, gevent, threads.
-c: –concurrency=10, 并發(fā)級別,prefork 模型下就是子進程數(shù)量,默認等于 CPU 核心數(shù)
完整的命令行選項可以這樣查看:
$ celery worker --help
六、調(diào)用Task
再打開一個終端, 進行命令行模式,調(diào)用任務(wù)。
from tasks import add
add.delay(1,2)
add.apply_async(args=(1,2))
上面兩種調(diào)用方式等價,delay()?方法是?apply_async()?方法的簡寫。這個調(diào)用會把?add?操作放入到隊列里,然后立即返回一個?AsyncResult?對象。如果關(guān)心處理結(jié)果,需要給?app?配置?CELERY_RESULT_BACKEND,指定一個存儲后端保存任務(wù)的返回值。
七、在項目中的簡單使用流程
1)RabbitMQ所在服務(wù)器,啟動crontab設(shè)置? crontable -user user -e設(shè)置定時執(zhí)行celery application應(yīng)用。
python tasks.py day?
2)在task.py文件里面啟動一個叫做app的Celery Application,編寫一個app.task函數(shù)來produce 任務(wù)到rabbitmq。
app = Celery()
app.config_from_object(celeryconfig)
3)在每個worker里面通過命令啟動worker消費任務(wù)
$ celery worker -A tasks.app -l INFO