異步神器celery

一. celery 簡介

Celery 是一個專注于實時處理和任務(wù)調(diào)度的分布式任務(wù)隊列, 同時提供操作和維護分布式系統(tǒng)所需的工具.. 所謂任務(wù)就是消息, 消息中的有效載荷中包含要執(zhí)行任務(wù)需要的全部數(shù)據(jù).

Celery 是一個分布式隊列的管理工具, 可以用 Celery 提供的接口快速實現(xiàn)并管理一個分布式的任務(wù)隊列.

Celery 本身不是任務(wù)隊列, 是管理分布式任務(wù)隊列的工具. 它封裝了操作常見任務(wù)隊列的各種操作, 我們使用它可以快速進行任務(wù)隊列的使用與管理.

Celery 特性 :

  • 方便查看定時任務(wù)的執(zhí)行情況, 如 是否成功, 當(dāng)前狀態(tài), 執(zhí)行任務(wù)花費的時間等.
  • 使用功能齊備的管理后臺或命令行添加,更新,刪除任務(wù).
  • 方便把任務(wù)和配置管理相關(guān)聯(lián).
  • 可選 多進程, Eventlet 和 Gevent 三種模型并發(fā)執(zhí)行.
  • 提供錯誤處理機制.
  • 提供多種任務(wù)原語, 方便實現(xiàn)任務(wù)分組,拆分,和調(diào)用鏈.
  • 支持多種消息代理和存儲后端.
  • Celery 是語言無關(guān)的.它提供了python 等常見語言的接口支持.

二. celery 組件

1. Celery 扮演生產(chǎn)者和消費者的角色,

  • Celery Beat : 任務(wù)調(diào)度器. Beat 進程會讀取配置文件的內(nèi)容, 周期性的將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊列.
  • Celery Worker : 執(zhí)行任務(wù)的消費者, 通常會在多臺服務(wù)器運行多個消費者, 提高運行效率.
  • Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務(wù)生產(chǎn)者發(fā)送過來的任務(wù)消息, 存進隊列再按序分發(fā)給任務(wù)消費方(通常是消息隊列或者數(shù)據(jù)庫).
  • Producer : 任務(wù)生產(chǎn)者. 調(diào)用 Celery API , 函數(shù)或者裝飾器, 而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者.
  • Result Backend : 任務(wù)處理完成之后保存狀態(tài)信息和結(jié)果, 以供查詢.

Celery架構(gòu)圖


celery架構(gòu)圖

2. 產(chǎn)生任務(wù)的方式 :

  1. 發(fā)布者發(fā)布任務(wù)(WEB 應(yīng)用)
  2. 任務(wù)調(diào)度按期發(fā)布任務(wù)(定時任務(wù))

3. celery 依賴三個庫: 這三個庫, 都由 Celery 的開發(fā)者開發(fā)和維護.

  • billiard : 基于 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩(wěn)定性.
  • librabbitmp : C 語言實現(xiàn)的 Python 客戶端,
  • kombu : Celery 自帶的用來收發(fā)消息的庫, 提供了符合 Python 語言習(xí)慣的, 使用 AMQP 協(xié)議的高級借口.

三. 選擇消息代理

使用于生產(chǎn)環(huán)境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.

四. Celery 序列化

在客戶端和消費者之間傳輸數(shù)據(jù)需要 序列化和反序列化. Celery 支出的序列化方案如下所示:

方案 說明
pickle pickle 是Python 標(biāo)準庫中的一個模塊, 支持 Pyuthon 內(nèi)置的數(shù)據(jù)結(jié)構(gòu), 但他是 Python 的專有協(xié)議. Celery 官方不推薦.
json json 支持多種語言, 可用于跨語言方案.
yaml yaml 表達能力更強, 支持的數(shù)據(jù)類型較 json 多, 但是 python 客戶端的性能不如 json
msgpack 二進制的類 json 序列化方案, 但比 json 的數(shù)據(jù)結(jié)構(gòu)更小, 更快.

五. 安裝,配置與簡單示例

Celery 配置參數(shù)匯總

配置項 說明
CELERY_DEFAULT_QUEUE 默認隊列
CELERY_BROKER_URL Broker 地址
CELERY_RESULT_BACKEND 結(jié)果存儲地址
CELERY_TASK_SERIALIZER 任務(wù)序列化方式
CELERY_RESULT_SERIALIZER 任務(wù)執(zhí)行結(jié)果序列化方式
CELERY_TASK_RESULT_EXPIRES 任務(wù)過期時間
CELERY_ACCEPT_CONTENT 指定任務(wù)接受的內(nèi)容類型(序列化)

代碼示例 :

# 安裝
$ pip install celery, redis, msgpack

# 配置文件 celeryconfig.py

    CELERY_BROKER_URL = 'redis://localhost:6379/1'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任務(wù)過期時間
    CELERY_ACCEPT_CONTENT = ["json"]            # 指定任務(wù)接受的內(nèi)容類型.

# 初始化文件 celery.py

    from __future__ import absolute_import
    from celery import Celery

    app = Celery('proj', include=["proj.tasks"])
    app.config_from_object("proj.celeryconfig")

    if __name__ == "__main__":
        app.start()     

# 任務(wù)文件 tasks.py

    from __future__ import absolute_import
    from proj.celery import app

    @app.task
    def add(x, y):
        return x + y    

# 啟動消費者
    $ celery -A proj worker -l info

# 在終端中測試
    > from proj.tasks import add
    > r = add.delay(2,4)
    > r.result
      6
    > r.status
      u"SUCCESS"
    > r.successful()
      True

    > r.ready()     # 返回布爾值,  任務(wù)執(zhí)行完成, 返回 True, 否則返回 False.
    > r.wait()      # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果.
    > r.get()       # 獲取任務(wù)執(zhí)行結(jié)果
    > r.result      # 任務(wù)執(zhí)行結(jié)果.
    > r.state       # PENDING, START, SUCCESS
    > r.status      # PENDING, START, SUCCESS

    # 使用 AsyncResult 方式獲取執(zhí)行結(jié)果.
    # AsyncResult 主要用來存儲任務(wù)執(zhí)行信息與執(zhí)行結(jié)果(類似 js 中的 Promise 對象), 
    > from celery.result import AsyncResult
    > AsyncResult(task_id).get()
      4

六. 調(diào)用任務(wù)的方法 :

1. delay

task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

2. apply_async

delay 實際上是 apply_async 的別名, 還可以使用如下方法調(diào)用, 但是 apply_async 支持更多的參數(shù):

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

支持的參數(shù) :

  • countdown : 等待一段時間再執(zhí)行.

    add.apply_async((2,3), countdown=5)
    
  • eta : 定義任務(wù)的開始時間.

    add.apply_async((2,3), eta=now+tiedelta(second=10))
    
  • expires : 設(shè)置超時時間.

    add.apply_async((2,3), expires=60)
    
  • retry : 定時如果任務(wù)失敗后, 是否重試.

    add.apply_async((2,3), retry=False)
    
  • retry_policy : 重試策略.

    • max_retries : 最大重試次數(shù), 默認為 3 次.
    • interval_start : 重試等待的時間間隔秒數(shù), 默認為 0 , 表示直接重試不等待.
    • interval_step : 每次重試讓重試間隔增加的秒數(shù), 可以是數(shù)字或浮點數(shù), 默認為 0.2
    • interval_max : 重試間隔最大的秒數(shù), 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點數(shù), 默認為 0.2 .

自定義發(fā)布者,交換機,路由鍵, 隊列, 優(yōu)先級,序列方案和壓縮方法:

task.apply_async((2,2), compression='zlib',
    serialize='json',
    queue='priority.high',
    routing_key='web.add',
    priority=0,
    exchange='web_exchange')

七. 指定隊列 :

Celery 默認使用名為 celery 的隊列 (可以通過 CELERY_DEFAULT_QUEUE 修改) 來存放任務(wù). 我們可以使用 優(yōu)先級不同的隊列 來確保高優(yōu)先級的任務(wù)優(yōu)先執(zhí)行.

# 修改配置文件, 保證隊列優(yōu)先級

from kombu import Queue

CELERY_QUEUE = (        # 定義任務(wù)隊列.
    Queue('default', routing_key="task.#"),     # 路由鍵 以 "task." 開頭的消息都進入 default 隊列.
    Queue('web_tasks', routing_key="web.#")     # 路由鍵 以 "web." 開頭的消息都進入 web_tasks 隊列.
)

CELERY_DEFAULT_EXCHANGE = 'tasks'               # 默認的交換機名字為 tasks
CELERY_DEFAULT_EXCHANGE_KEY = 'topic'           # 默認的交換機類型為 topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default'     # 默認的路由鍵是 task.default , 這個路由鍵符合上面的 default 隊列.

CELERY_ROUTES = {
    'proj.tasks.add': {
        'queue': 'web_tasks',
        'routing_key': 'web.add',
    }
}

# 使用指定隊列的方式啟動消費者進程.
$ celery -A proj worker -Q web_tasks -l info    # 該 worker 只會執(zhí)行 web_tasks 中任務(wù), 我們可以合理安排消費者數(shù)量, 讓 web_tasks 中任務(wù)的優(yōu)先級更高.

閱后即焚模式(transient):

from kombu import Queue
Queue('transient', routing_key='transient', delivery_mode=1)

八. 使用任務(wù)調(diào)度

使用 Beat 進程自動生成任務(wù).

# 修改配置文件, 
# 下面的任務(wù)指定 tasks.add 任務(wù) 每 10s 跑一次, 任務(wù)參數(shù)為 (16,16).

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'proj.tasks.add',
        'schedule': timedelta(seconds=10),
        'args': (16, 16)
    }
}

# crontab 風(fēng)格

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
        "add": {
                "task": "tasks.add",
                "schedule": crontab(hour="*/3", minute=12),
                "args": (16, 16),
                }
            }

# 啟動 Beat 程序
$ celery beat -A proj

# 之后啟動 worker 進程.
$ celery -A proj worker -l info

或者
$ celery -B -A proj worker -l info

使用自定義調(diào)度類還可以實現(xiàn)動態(tài)添加任務(wù). 使用 Django 可以通過 Django-celery 實現(xiàn)在管理后臺創(chuàng)建,刪除,更新任務(wù), 是因為他使用了自定義的 調(diào)度類 djcelery.schedulers.DatabaseScheduler .

九. 任務(wù)綁定, 記錄日志, 重試

# 修改 tasks.py 文件.

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

@app.task(bind=True)
def div(self, x, y):
    logger.info(('Executing task id {0.id}, args: {0.args!r}'
                 'kwargs: {0.kwargs!r}').format(self.request))
    try:
        result = x/y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)     # 發(fā)生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.

    return result

當(dāng)使用 bind=True 參數(shù)之后, 函數(shù)的參數(shù)發(fā)生變化, 多出了參數(shù) self, 這這相當(dāng)于把 div 編程了一個已綁定的方法, 通過 self 可以獲得任務(wù)的上下文.

十. 信號系統(tǒng) :

信號可以幫助我們了解任務(wù)執(zhí)行情況, 分析任務(wù)運行的瓶頸. Celery 支持 7 種信號類型.

  1. 任務(wù)信號
    • before_task_publish : 任務(wù)發(fā)布前
    • after_task_publish : 任務(wù)發(fā)布后
    • task_prerun : 任務(wù)執(zhí)行前
    • task_postrun : 任務(wù)執(zhí)行后
    • task_retry : 任務(wù)重試時
    • task_success : 任務(wù)成功時
    • task_failure : 任務(wù)失敗時
    • task_revoked : 任務(wù)被撤銷或終止時
  2. 應(yīng)用信號
  3. Worker 信號
  4. Beat 信號
  5. Eventlet 信號
  6. 日志信號
  7. 命令信號

不同的信號參數(shù)格式不同, 具體格式參見官方文檔

代碼示例 :

# 在執(zhí)行任務(wù) add 之后, 打印一些信息.

@after_task_publish
def task_send_handler(sender=None, body=None, **kwargs):
    print 'after_task_publish: task_id: {body[id]}; sender: {sender}'.format(body=body, sender=sender)

十一. 子任務(wù)與工作流:

可以把任務(wù) 通過簽名的方法傳給其他任務(wù), 成為一個子任務(wù).

from celery import signature
task = signature('task.add', args=(2,2), countdown=10)
task
task.add(2,2)   # 通過簽名生成任務(wù)
task.apply_async()

還可以通過如下方式生成子任務(wù) :

from proj.task import add
task = add.subtask((2,2), countdown=10)     # 快捷方式 add.s((2,2), countdown-10) 
task.apply_async()

自任務(wù)實現(xiàn)片函數(shù)的方式非常有用, 這種方式可以讓任務(wù)在傳遞過程中財傳入?yún)?shù).

partial = add.s(2)
partial.apply_async((4,))

子任務(wù)支持如下 5 種原語,實現(xiàn)工作流. 原語表示由若干指令組成的, 用于完成一定功能的過程.

  1. chain : 調(diào)用連, 前面的執(zhí)行結(jié)果, 作為參數(shù)傳給后面的任務(wù), 直到全部完成, 類似管道.

    from celery import chain
    res = chain(add.s(2,2), add.s(4), add.s(8))()
    res.get()
    
    管道式:
    
    (add.s(2,2) | add.s(4) | add.s(8))().get()
    
  2. group : 一次創(chuàng)建多個(一組)任務(wù).

    from celery import group
    
    res = group(add.s(i,i) for i in range(10))()
    res.get()
    
  3. chord : 等待任務(wù)全部完成時添加一個回調(diào)任務(wù).

    res = chord((add.s(i,i) for i in range(10)), add.s(['a']))()
    res.get()    # 執(zhí)行完前面的循環(huán), 把結(jié)果拼成一個列表之后, 再對這個列表 添加 'a'.
    [0,2,4,6,8,10,12,14,16,18,u'a']
    
  4. map/starmap : 每個參數(shù)都作為任務(wù)的參數(shù)執(zhí)行一遍, map 的參數(shù)只有一個, starmap 支持多個參數(shù).

    add.starmap(zip(range(10), range(10)))
    
    相當(dāng)于:
    
    @app.task
    def temp():
        return [add(i,i) for i in range(10)]
    
  5. chunks : 將任務(wù)分塊.

    res = add.chunks(zip(range(50), range(50)),10)()
    res.get()
    

在生成任務(wù)的時候, 應(yīng)該充分利用 group/chain/chunks 這些原語.

十二. 其他

關(guān)閉不想要的功能 :

@app.task(ignore_result=True)   # 關(guān)閉任務(wù)執(zhí)行結(jié)果.
def func():
    pass

CELERY_DISABLE_RATE_LIMITS=True     # 關(guān)閉限速.

根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作 :

# tasks.py
class MyTask(Task):

    def on_success(self, retval, task_id, args, kwargs):
        print 'task done: {0}'.format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print 'task fail, reason: {0}'.format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

# 正確函數(shù), 執(zhí)行 MyTask.on_success() :
@app.task(base=MyTask)
def add(x, y):
    return x + y

# 錯誤函數(shù), 執(zhí)行 MyTask.on_failure() : 
@app.task  #普通函數(shù)裝飾為 celery task
def add(x, y):
    raise KeyError
    return x + y

十三. Celery 管理命令

任務(wù)狀態(tài)回調(diào) :

參數(shù) 說明
PENDING 任務(wù)等待中
STARTED 任務(wù)已開始
SUCCESS 任務(wù)執(zhí)行成功
FAILURE 任務(wù)執(zhí)行失敗
RETRY 任務(wù)將被
REVOKED 任務(wù)取消
PROGRESS 任務(wù)進行中

普通啟動命令 :

$ celery -A proj worker -l info

使用 daemon 方式 multi :

$ celery multi start web -A proj -l info --pidfile=/path/to/celery_%n.pid --logfile=/path/to/celery_%n.log 

# web 是對項目啟動的標(biāo)識, 
# %n 是對節(jié)點的格式化用法.
    %n : 只包含主機名
    %h : 包含域名的主機
    %d : 只包含域名
    %i : Prefork 類型的進程索引,如果是主進程, 則為 0.
    %I : 帶分隔符的 Prefork 類型的進程索引. 假設(shè)主進程為 worker1, 那么進程池的第一個進程則為 worker1-1

常用 multi 相關(guān)命令:

$ celery multi show web     # 查看 web 啟動時的命令
$ celery multi names web    # 獲取 web 的節(jié)點名字
$ celery multi stop web     # 停止 web 進程
$ celery multi restart web  # 重啟 web
$ celery multi kill web     # 殺掉 web 進程

常用監(jiān)控和管理命令 :

  • shell : 交互時環(huán)境, 內(nèi)置了 Celery 應(yīng)用實例和全部已注冊的任務(wù), 支持 默認解釋器,IPython,BPython .

    $ celery shell -A proj
    
  • result : 通過 task_id 在命令行獲得任務(wù)執(zhí)行結(jié)果

    $ celery -A proj result TASK_ID
    
  • inspect active : 列出當(dāng)前正在執(zhí)行的任務(wù)

    $ celery -A proj inspect active
    
  • inspect stats : 列出 worker 的統(tǒng)計數(shù)據(jù), 常用來查看配置是否正確以及系統(tǒng)的使用情況.

    $ celery -A proj inspect stats
    

Flower web 監(jiān)控工具

  • 查看任務(wù)歷史,任務(wù)具體參數(shù),開始時間等信息;
  • 提供圖表和統(tǒng)計數(shù)據(jù)
  • 實現(xiàn)全面的遠程控制功能, 包括但不限于 撤銷/終止任務(wù), 關(guān)閉重啟 worker, 查看正在運行任務(wù)
  • 提供一個 HTTP API , 方便集成.

Flower 的 supervisor 管理配置文件:

[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3 
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3

Celery 自帶的事件監(jiān)控工具顯示任務(wù)歷史等信息.

$ celery -A proj event
** 需要把 CELERY_SEND_TASK_SEND_EVENT = True 設(shè)置, 才可以獲取時間.

使用自動擴展 :

$ celery -A proj worker -l info --autoscale=6,3     # 平時保持 3 個進程, 最大時可以達到 6 個.

Celery 命令匯總

$ celery --help
    -A APP, --app APP
    -b BROKER, --broker BROKER
    --loader LOADER
    --config CONFIG
    --workdir WORKDIR
    --no-color, -C
    --quiet, -q

$ celery <command> --help

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status

|    celery inspect --help
|    celery inspect active 
|    celery inspect active_queues 
|    celery inspect clock 
|    celery inspect conf [include_defaults=False]
|    celery inspect memdump [n_samples=10]
|    celery inspect memsample 
|    celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
|    celery inspect ping 
|    celery inspect query_task [id1 [id2 [... [idN]]]]
|    celery inspect registered [attr1 [attr2 [... [attrN]]]]
|    celery inspect report 
|    celery inspect reserved 
|    celery inspect revoked 
|    celery inspect scheduled 
|    celery inspect stats 

|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max [min]]
|    celery control cancel_consumer <queue>
|    celery control disable_events 
|    celery control election 
|    celery control enable_events 
|    celery control heartbeat 
|    celery control pool_grow [N=1]
|    celery control pool_restart 
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
|    celery control revoke [id1 [id2 [... [idN]]]]
|    celery control shutdown 
|    celery control terminate <signal> [id1 [id2 [... [idN]]]]
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery call
|    celery result
|    celery migrate
|    celery graph
|    celery upgrade

+ Debugging: 
|    celery report
|    celery logtool

+ Extensions: 
|    celery flower

十四. 在 Flask 中使用 Celery

Flask 文檔: 基于 Celery 的后臺任務(wù)
在 Flask 中使用 Celery

十五. 參考鏈接

Python Web開發(fā)實戰(zhàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容