需用場景
celery功能強(qiáng)大,但對服務(wù)器的性能消耗也大,其次維護(hù)成本也不小,所以先搞清楚什么樣的場景使用使用celery。
- 并發(fā)需求大,需要做異步的延時隊列。比如說:下單任務(wù),如果體量小的應(yīng)用,1秒并發(fā)也就不到10筆訂單,那就可以忽略異步延時操作。但是如果可以達(dá)到1秒2000筆訂單,那么就得上異步延時隊列了。大體上的操作思路就是:客戶端發(fā)起下單請求,服務(wù)把下單操作放到一個異步的延時隊列去操作,然后返回客戶端下單操作已接受,等待下單成功的通知。
- 需要定時執(zhí)行任務(wù)或者計時執(zhí)行任務(wù)隊列。比如:30分鐘取消待支付訂單,每隔1個小時刷新Redis緩存數(shù)據(jù)等。
使用依賴
# 筆者的開發(fā)環(huán)境是:python 3.6 django 2.2.6
pip install celery
pip install django-celery-results # 支持將任務(wù)結(jié)果存儲到Django的ORM中
pip install django-celery-beat # 支持定時任務(wù)
你還需要搭建好RabbitMQ服務(wù)。
lucky-mall
├── celerybeat-schedule.db
├── lucky
│ ├── __init__.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── mall
│ ├── __init__.py
│ ├── admin.py
│ ├── api
│ ├── apps.py
│ ├── migrations
│ ├── model
│ ├── tasks.py
│ └── urls.py
└──
代碼實現(xiàn)
在setting.py里添加配置
INSTALLED_APPS = [
...
'django_celery_beat',
'django_celery_results'
]
# Broker配置,使用RabbitMQ作為消息中間件
CELERY_BROKER_URL = 'amqp://user:password@host:port'
# 使用django orm 作為結(jié)果存儲
CELERY_RESULT_BACKEND = 'django-db'
# 結(jié)果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
在項目管理目前下lucky-mall/lucky 里添加celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lucky.settings') # 設(shè)置django環(huán)境 lucky是筆者的項目,開發(fā)者填寫自己的項目名
app = Celery('lucky') # lucky是筆者的自定義的APP名,開發(fā)者也可以自定義
app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作為前綴,在settings中寫配置
app.autodiscover_tasks() # 發(fā)現(xiàn)任務(wù)文件每個app下的tasks.py
在項目管理目前下lucky-mall/lucky/__init__.py 里添加
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
按需在app目錄下面建tasks.py文件,并創(chuàng)建任務(wù)
from celery import shared_task
from .model.order.order import Order
# 檢查還有多少條待支付的訂單,返回其數(shù)量
@shared_task
def check_cancel_order():
order_data = Order.objects.filter(order_status=Order.order_status_unpaid)
return {'order_cancel_length': order_data.count()}
執(zhí)行驗證
執(zhí)行migrate命令,完成數(shù)據(jù)庫模型的遷移
python manage.py migrate
啟動celery,celery需要額外的命令啟動(生產(chǎn)環(huán)境部署時,最好使用守護(hù)線程來啟動,比如:supervisor)
# 啟動定時執(zhí)行調(diào)度 其中 lucky 是celery app名,開發(fā)者改為自己的
celery -A lucky beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 啟動worker 其中 lucky 是celery app名,開發(fā)者改為自己的
celery -A lucky worker -l info
# 啟動django
python manage.py runserver
執(zhí)行完,啟動admin頁面,可以看到如下模型

Celery Results.png

Periodic Tasks.png
下一步
- 如何在業(yè)務(wù)代碼中將延時任務(wù)加入隊列中執(zhí)行?
- 如何設(shè)置延時任務(wù)指定時間后執(zhí)行?
- 如何設(shè)置定時任務(wù),并執(zhí)行?