異步神器celery

一. celery 簡(jiǎn)介

Celery 是一個(gè)專注于實(shí)時(shí)處理和任務(wù)調(diào)度的分布式任務(wù)隊(duì)列, 同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具.. 所謂任務(wù)就是消息, 消息中的有效載荷中包含要執(zhí)行任務(wù)需要的全部數(shù)據(jù).

Celery 是一個(gè)分布式隊(duì)列的管理工具, 可以用 Celery 提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列.

Celery 本身不是任務(wù)隊(duì)列, 是管理分布式任務(wù)隊(duì)列的工具. 它封裝了操作常見(jiàn)任務(wù)隊(duì)列的各種操作, 我們使用它可以快速進(jìn)行任務(wù)隊(duì)列的使用與管理.

Celery 特性 :

  • 方便查看定時(shí)任務(wù)的執(zhí)行情況, 如 是否成功, 當(dāng)前狀態(tài), 執(zhí)行任務(wù)花費(fèi)的時(shí)間等.
  • 使用功能齊備的管理后臺(tái)或命令行添加,更新,刪除任務(wù).
  • 方便把任務(wù)和配置管理相關(guān)聯(lián).
  • 可選 多進(jìn)程, Eventlet 和 Gevent 三種模型并發(fā)執(zhí)行.
  • 提供錯(cuò)誤處理機(jī)制.
  • 提供多種任務(wù)原語(yǔ), 方便實(shí)現(xiàn)任務(wù)分組,拆分,和調(diào)用鏈.
  • 支持多種消息代理和存儲(chǔ)后端.
  • Celery 是語(yǔ)言無(wú)關(guān)的.它提供了python 等常見(jiàn)語(yǔ)言的接口支持.

二. celery 組件

1. Celery 扮演生產(chǎn)者和消費(fèi)者的角色,

  • Celery Beat : 任務(wù)調(diào)度器. Beat 進(jìn)程會(huì)讀取配置文件的內(nèi)容, 周期性的將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊(duì)列.
  • Celery Worker : 執(zhí)行任務(wù)的消費(fèi)者, 通常會(huì)在多臺(tái)服務(wù)器運(yùn)行多個(gè)消費(fèi)者, 提高運(yùn)行效率.
  • Broker : 消息代理, 隊(duì)列本身. 也稱為消息中間件. 接受任務(wù)生產(chǎn)者發(fā)送過(guò)來(lái)的任務(wù)消息, 存進(jìn)隊(duì)列再按序分發(fā)給任務(wù)消費(fèi)方(通常是消息隊(duì)列或者數(shù)據(jù)庫(kù)).
  • Producer : 任務(wù)生產(chǎn)者. 調(diào)用 Celery API , 函數(shù)或者裝飾器, 而產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的都是任務(wù)生產(chǎn)者.
  • Result Backend : 任務(wù)處理完成之后保存狀態(tài)信息和結(jié)果, 以供查詢.

Celery架構(gòu)圖


celery架構(gòu)圖

2. 產(chǎn)生任務(wù)的方式 :

  1. 發(fā)布者發(fā)布任務(wù)(WEB 應(yīng)用)
  2. 任務(wù)調(diào)度按期發(fā)布任務(wù)(定時(shí)任務(wù))

3. celery 依賴三個(gè)庫(kù): 這三個(gè)庫(kù), 都由 Celery 的開(kāi)發(fā)者開(kāi)發(fā)和維護(hù).

  • billiard : 基于 Python2.7 的 multisuprocessing 而改進(jìn)的庫(kù), 主要用來(lái)提高性能和穩(wěn)定性.
  • librabbitmp : C 語(yǔ)言實(shí)現(xiàn)的 Python 客戶端,
  • kombu : Celery 自帶的用來(lái)收發(fā)消息的庫(kù), 提供了符合 Python 語(yǔ)言習(xí)慣的, 使用 AMQP 協(xié)議的高級(jí)借口.

三. 選擇消息代理

使用于生產(chǎn)環(huán)境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.

四. Celery 序列化

在客戶端和消費(fèi)者之間傳輸數(shù)據(jù)需要 序列化和反序列化. Celery 支出的序列化方案如下所示:

方案 說(shuō)明
pickle pickle 是Python 標(biāo)準(zhǔn)庫(kù)中的一個(gè)模塊, 支持 Pyuthon 內(nèi)置的數(shù)據(jù)結(jié)構(gòu), 但他是 Python 的專有協(xié)議. Celery 官方不推薦.
json json 支持多種語(yǔ)言, 可用于跨語(yǔ)言方案.
yaml yaml 表達(dá)能力更強(qiáng), 支持的數(shù)據(jù)類型較 json 多, 但是 python 客戶端的性能不如 json
msgpack 二進(jìn)制的類 json 序列化方案, 但比 json 的數(shù)據(jù)結(jié)構(gòu)更小, 更快.

五. 安裝,配置與簡(jiǎn)單示例

Celery 配置參數(shù)匯總

配置項(xiàng) 說(shuō)明
CELERY_DEFAULT_QUEUE 默認(rèn)隊(duì)列
CELERY_BROKER_URL Broker 地址
CELERY_RESULT_BACKEND 結(jié)果存儲(chǔ)地址
CELERY_TASK_SERIALIZER 任務(wù)序列化方式
CELERY_RESULT_SERIALIZER 任務(wù)執(zhí)行結(jié)果序列化方式
CELERY_TASK_RESULT_EXPIRES 任務(wù)過(guò)期時(shí)間
CELERY_ACCEPT_CONTENT 指定任務(wù)接受的內(nèi)容類型(序列化)

代碼示例 :

# 安裝
$ pip install celery, redis, msgpack

# 配置文件 celeryconfig.py

    CELERY_BROKER_URL = 'redis://localhost:6379/1'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任務(wù)過(guò)期時(shí)間
    CELERY_ACCEPT_CONTENT = ["json"]            # 指定任務(wù)接受的內(nèi)容類型.

# 初始化文件 celery.py

    from __future__ import absolute_import
    from celery import Celery

    app = Celery('proj', include=["proj.tasks"])
    app.config_from_object("proj.celeryconfig")

    if __name__ == "__main__":
        app.start()     

# 任務(wù)文件 tasks.py

    from __future__ import absolute_import
    from proj.celery import app

    @app.task
    def add(x, y):
        return x + y    

# 啟動(dòng)消費(fèi)者
    $ celery -A proj worker -l info

# 在終端中測(cè)試
    > from proj.tasks import add
    > r = add.delay(2,4)
    > r.result
      6
    > r.status
      u"SUCCESS"
    > r.successful()
      True

    > r.ready()     # 返回布爾值,  任務(wù)執(zhí)行完成, 返回 True, 否則返回 False.
    > r.wait()      # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果.
    > r.get()       # 獲取任務(wù)執(zhí)行結(jié)果
    > r.result      # 任務(wù)執(zhí)行結(jié)果.
    > r.state       # PENDING, START, SUCCESS
    > r.status      # PENDING, START, SUCCESS

    # 使用 AsyncResult 方式獲取執(zhí)行結(jié)果.
    # AsyncResult 主要用來(lái)存儲(chǔ)任務(wù)執(zhí)行信息與執(zhí)行結(jié)果(類似 js 中的 Promise 對(duì)象), 
    > from celery.result import AsyncResult
    > AsyncResult(task_id).get()
      4

六. 調(diào)用任務(wù)的方法 :

1. delay

task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

2. apply_async

delay 實(shí)際上是 apply_async 的別名, 還可以使用如下方法調(diào)用, 但是 apply_async 支持更多的參數(shù):

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

支持的參數(shù) :

  • countdown : 等待一段時(shí)間再執(zhí)行.

    add.apply_async((2,3), countdown=5)
    
  • eta : 定義任務(wù)的開(kāi)始時(shí)間.

    add.apply_async((2,3), eta=now+tiedelta(second=10))
    
  • expires : 設(shè)置超時(shí)時(shí)間.

    add.apply_async((2,3), expires=60)
    
  • retry : 定時(shí)如果任務(wù)失敗后, 是否重試.

    add.apply_async((2,3), retry=False)
    
  • retry_policy : 重試策略.

    • max_retries : 最大重試次數(shù), 默認(rèn)為 3 次.
    • interval_start : 重試等待的時(shí)間間隔秒數(shù), 默認(rèn)為 0 , 表示直接重試不等待.
    • interval_step : 每次重試讓重試間隔增加的秒數(shù), 可以是數(shù)字或浮點(diǎn)數(shù), 默認(rèn)為 0.2
    • interval_max : 重試間隔最大的秒數(shù), 即 通過(guò) interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點(diǎn)數(shù), 默認(rèn)為 0.2 .

自定義發(fā)布者,交換機(jī),路由鍵, 隊(duì)列, 優(yōu)先級(jí),序列方案和壓縮方法:

task.apply_async((2,2), compression='zlib',
    serialize='json',
    queue='priority.high',
    routing_key='web.add',
    priority=0,
    exchange='web_exchange')

七. 指定隊(duì)列 :

Celery 默認(rèn)使用名為 celery 的隊(duì)列 (可以通過(guò) CELERY_DEFAULT_QUEUE 修改) 來(lái)存放任務(wù). 我們可以使用 優(yōu)先級(jí)不同的隊(duì)列 來(lái)確保高優(yōu)先級(jí)的任務(wù)優(yōu)先執(zhí)行.

# 修改配置文件, 保證隊(duì)列優(yōu)先級(jí)

from kombu import Queue

CELERY_QUEUE = (        # 定義任務(wù)隊(duì)列.
    Queue('default', routing_key="task.#"),     # 路由鍵 以 "task." 開(kāi)頭的消息都進(jìn)入 default 隊(duì)列.
    Queue('web_tasks', routing_key="web.#")     # 路由鍵 以 "web." 開(kāi)頭的消息都進(jìn)入 web_tasks 隊(duì)列.
)

CELERY_DEFAULT_EXCHANGE = 'tasks'               # 默認(rèn)的交換機(jī)名字為 tasks
CELERY_DEFAULT_EXCHANGE_KEY = 'topic'           # 默認(rèn)的交換機(jī)類型為 topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default'     # 默認(rèn)的路由鍵是 task.default , 這個(gè)路由鍵符合上面的 default 隊(duì)列.

CELERY_ROUTES = {
    'proj.tasks.add': {
        'queue': 'web_tasks',
        'routing_key': 'web.add',
    }
}

# 使用指定隊(duì)列的方式啟動(dòng)消費(fèi)者進(jìn)程.
$ celery -A proj worker -Q web_tasks -l info    # 該 worker 只會(huì)執(zhí)行 web_tasks 中任務(wù), 我們可以合理安排消費(fèi)者數(shù)量, 讓 web_tasks 中任務(wù)的優(yōu)先級(jí)更高.

閱后即焚模式(transient):

from kombu import Queue
Queue('transient', routing_key='transient', delivery_mode=1)

八. 使用任務(wù)調(diào)度

使用 Beat 進(jìn)程自動(dòng)生成任務(wù).

# 修改配置文件, 
# 下面的任務(wù)指定 tasks.add 任務(wù) 每 10s 跑一次, 任務(wù)參數(shù)為 (16,16).

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'proj.tasks.add',
        'schedule': timedelta(seconds=10),
        'args': (16, 16)
    }
}

# crontab 風(fēng)格

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
        "add": {
                "task": "tasks.add",
                "schedule": crontab(hour="*/3", minute=12),
                "args": (16, 16),
                }
            }

# 啟動(dòng) Beat 程序
$ celery beat -A proj

# 之后啟動(dòng) worker 進(jìn)程.
$ celery -A proj worker -l info

或者
$ celery -B -A proj worker -l info

使用自定義調(diào)度類還可以實(shí)現(xiàn)動(dòng)態(tài)添加任務(wù). 使用 Django 可以通過(guò) Django-celery 實(shí)現(xiàn)在管理后臺(tái)創(chuàng)建,刪除,更新任務(wù), 是因?yàn)樗褂昧俗远x的 調(diào)度類 djcelery.schedulers.DatabaseScheduler .

九. 任務(wù)綁定, 記錄日志, 重試

# 修改 tasks.py 文件.

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

@app.task(bind=True)
def div(self, x, y):
    logger.info(('Executing task id {0.id}, args: {0.args!r}'
                 'kwargs: {0.kwargs!r}').format(self.request))
    try:
        result = x/y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)     # 發(fā)生 ZeroDivisionError 錯(cuò)誤時(shí), 每 5s 重試一次, 最多重試 3 次.

    return result

當(dāng)使用 bind=True 參數(shù)之后, 函數(shù)的參數(shù)發(fā)生變化, 多出了參數(shù) self, 這這相當(dāng)于把 div 編程了一個(gè)已綁定的方法, 通過(guò) self 可以獲得任務(wù)的上下文.

十. 信號(hào)系統(tǒng) :

信號(hào)可以幫助我們了解任務(wù)執(zhí)行情況, 分析任務(wù)運(yùn)行的瓶頸. Celery 支持 7 種信號(hào)類型.

  1. 任務(wù)信號(hào)
    • before_task_publish : 任務(wù)發(fā)布前
    • after_task_publish : 任務(wù)發(fā)布后
    • task_prerun : 任務(wù)執(zhí)行前
    • task_postrun : 任務(wù)執(zhí)行后
    • task_retry : 任務(wù)重試時(shí)
    • task_success : 任務(wù)成功時(shí)
    • task_failure : 任務(wù)失敗時(shí)
    • task_revoked : 任務(wù)被撤銷(xiāo)或終止時(shí)
  2. 應(yīng)用信號(hào)
  3. Worker 信號(hào)
  4. Beat 信號(hào)
  5. Eventlet 信號(hào)
  6. 日志信號(hào)
  7. 命令信號(hào)

不同的信號(hào)參數(shù)格式不同, 具體格式參見(jiàn)官方文檔

代碼示例 :

# 在執(zhí)行任務(wù) add 之后, 打印一些信息.

@after_task_publish
def task_send_handler(sender=None, body=None, **kwargs):
    print 'after_task_publish: task_id: {body[id]}; sender: {sender}'.format(body=body, sender=sender)

十一. 子任務(wù)與工作流:

可以把任務(wù) 通過(guò)簽名的方法傳給其他任務(wù), 成為一個(gè)子任務(wù).

from celery import signature
task = signature('task.add', args=(2,2), countdown=10)
task
task.add(2,2)   # 通過(guò)簽名生成任務(wù)
task.apply_async()

還可以通過(guò)如下方式生成子任務(wù) :

from proj.task import add
task = add.subtask((2,2), countdown=10)     # 快捷方式 add.s((2,2), countdown-10) 
task.apply_async()

自任務(wù)實(shí)現(xiàn)片函數(shù)的方式非常有用, 這種方式可以讓任務(wù)在傳遞過(guò)程中財(cái)傳入?yún)?shù).

partial = add.s(2)
partial.apply_async((4,))

子任務(wù)支持如下 5 種原語(yǔ),實(shí)現(xiàn)工作流. 原語(yǔ)表示由若干指令組成的, 用于完成一定功能的過(guò)程.

  1. chain : 調(diào)用連, 前面的執(zhí)行結(jié)果, 作為參數(shù)傳給后面的任務(wù), 直到全部完成, 類似管道.

    from celery import chain
    res = chain(add.s(2,2), add.s(4), add.s(8))()
    res.get()
    
    管道式:
    
    (add.s(2,2) | add.s(4) | add.s(8))().get()
    
  2. group : 一次創(chuàng)建多個(gè)(一組)任務(wù).

    from celery import group
    
    res = group(add.s(i,i) for i in range(10))()
    res.get()
    
  3. chord : 等待任務(wù)全部完成時(shí)添加一個(gè)回調(diào)任務(wù).

    res = chord((add.s(i,i) for i in range(10)), add.s(['a']))()
    res.get()    # 執(zhí)行完前面的循環(huán), 把結(jié)果拼成一個(gè)列表之后, 再對(duì)這個(gè)列表 添加 'a'.
    [0,2,4,6,8,10,12,14,16,18,u'a']
    
  4. map/starmap : 每個(gè)參數(shù)都作為任務(wù)的參數(shù)執(zhí)行一遍, map 的參數(shù)只有一個(gè), starmap 支持多個(gè)參數(shù).

    add.starmap(zip(range(10), range(10)))
    
    相當(dāng)于:
    
    @app.task
    def temp():
        return [add(i,i) for i in range(10)]
    
  5. chunks : 將任務(wù)分塊.

    res = add.chunks(zip(range(50), range(50)),10)()
    res.get()
    

在生成任務(wù)的時(shí)候, 應(yīng)該充分利用 group/chain/chunks 這些原語(yǔ).

十二. 其他

關(guān)閉不想要的功能 :

@app.task(ignore_result=True)   # 關(guān)閉任務(wù)執(zhí)行結(jié)果.
def func():
    pass

CELERY_DISABLE_RATE_LIMITS=True     # 關(guān)閉限速.

根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作 :

# tasks.py
class MyTask(Task):

    def on_success(self, retval, task_id, args, kwargs):
        print 'task done: {0}'.format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print 'task fail, reason: {0}'.format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

# 正確函數(shù), 執(zhí)行 MyTask.on_success() :
@app.task(base=MyTask)
def add(x, y):
    return x + y

# 錯(cuò)誤函數(shù), 執(zhí)行 MyTask.on_failure() : 
@app.task  #普通函數(shù)裝飾為 celery task
def add(x, y):
    raise KeyError
    return x + y

十三. Celery 管理命令

任務(wù)狀態(tài)回調(diào) :

參數(shù) 說(shuō)明
PENDING 任務(wù)等待中
STARTED 任務(wù)已開(kāi)始
SUCCESS 任務(wù)執(zhí)行成功
FAILURE 任務(wù)執(zhí)行失敗
RETRY 任務(wù)將被
REVOKED 任務(wù)取消
PROGRESS 任務(wù)進(jìn)行中

普通啟動(dòng)命令 :

$ celery -A proj worker -l info

使用 daemon 方式 multi :

$ celery multi start web -A proj -l info --pidfile=/path/to/celery_%n.pid --logfile=/path/to/celery_%n.log 

# web 是對(duì)項(xiàng)目啟動(dòng)的標(biāo)識(shí), 
# %n 是對(duì)節(jié)點(diǎn)的格式化用法.
    %n : 只包含主機(jī)名
    %h : 包含域名的主機(jī)
    %d : 只包含域名
    %i : Prefork 類型的進(jìn)程索引,如果是主進(jìn)程, 則為 0.
    %I : 帶分隔符的 Prefork 類型的進(jìn)程索引. 假設(shè)主進(jìn)程為 worker1, 那么進(jìn)程池的第一個(gè)進(jìn)程則為 worker1-1

常用 multi 相關(guān)命令:

$ celery multi show web     # 查看 web 啟動(dòng)時(shí)的命令
$ celery multi names web    # 獲取 web 的節(jié)點(diǎn)名字
$ celery multi stop web     # 停止 web 進(jìn)程
$ celery multi restart web  # 重啟 web
$ celery multi kill web     # 殺掉 web 進(jìn)程

常用監(jiān)控和管理命令 :

  • shell : 交互時(shí)環(huán)境, 內(nèi)置了 Celery 應(yīng)用實(shí)例和全部已注冊(cè)的任務(wù), 支持 默認(rèn)解釋器,IPython,BPython .

    $ celery shell -A proj
    
  • result : 通過(guò) task_id 在命令行獲得任務(wù)執(zhí)行結(jié)果

    $ celery -A proj result TASK_ID
    
  • inspect active : 列出當(dāng)前正在執(zhí)行的任務(wù)

    $ celery -A proj inspect active
    
  • inspect stats : 列出 worker 的統(tǒng)計(jì)數(shù)據(jù), 常用來(lái)查看配置是否正確以及系統(tǒng)的使用情況.

    $ celery -A proj inspect stats
    

Flower web 監(jiān)控工具

  • 查看任務(wù)歷史,任務(wù)具體參數(shù),開(kāi)始時(shí)間等信息;
  • 提供圖表和統(tǒng)計(jì)數(shù)據(jù)
  • 實(shí)現(xiàn)全面的遠(yuǎn)程控制功能, 包括但不限于 撤銷(xiāo)/終止任務(wù), 關(guān)閉重啟 worker, 查看正在運(yùn)行任務(wù)
  • 提供一個(gè) HTTP API , 方便集成.

Flower 的 supervisor 管理配置文件:

[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3 
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3

Celery 自帶的事件監(jiān)控工具顯示任務(wù)歷史等信息.

$ celery -A proj event
** 需要把 CELERY_SEND_TASK_SEND_EVENT = True 設(shè)置, 才可以獲取時(shí)間.

使用自動(dòng)擴(kuò)展 :

$ celery -A proj worker -l info --autoscale=6,3     # 平時(shí)保持 3 個(gè)進(jìn)程, 最大時(shí)可以達(dá)到 6 個(gè).

Celery 命令匯總

$ celery --help
    -A APP, --app APP
    -b BROKER, --broker BROKER
    --loader LOADER
    --config CONFIG
    --workdir WORKDIR
    --no-color, -C
    --quiet, -q

$ celery <command> --help

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status

|    celery inspect --help
|    celery inspect active 
|    celery inspect active_queues 
|    celery inspect clock 
|    celery inspect conf [include_defaults=False]
|    celery inspect memdump [n_samples=10]
|    celery inspect memsample 
|    celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
|    celery inspect ping 
|    celery inspect query_task [id1 [id2 [... [idN]]]]
|    celery inspect registered [attr1 [attr2 [... [attrN]]]]
|    celery inspect report 
|    celery inspect reserved 
|    celery inspect revoked 
|    celery inspect scheduled 
|    celery inspect stats 

|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max [min]]
|    celery control cancel_consumer <queue>
|    celery control disable_events 
|    celery control election 
|    celery control enable_events 
|    celery control heartbeat 
|    celery control pool_grow [N=1]
|    celery control pool_restart 
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
|    celery control revoke [id1 [id2 [... [idN]]]]
|    celery control shutdown 
|    celery control terminate <signal> [id1 [id2 [... [idN]]]]
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery call
|    celery result
|    celery migrate
|    celery graph
|    celery upgrade

+ Debugging: 
|    celery report
|    celery logtool

+ Extensions: 
|    celery flower

十四. 在 Flask 中使用 Celery

Flask 文檔: 基于 Celery 的后臺(tái)任務(wù)
在 Flask 中使用 Celery

十五. 參考鏈接

Python Web開(kāi)發(fā)實(shí)戰(zhàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容