原文鏈接:Django 2.1.7 Celery 4.3.0 在項目中使用Celery
相關篇章:
- Django 2.1.7 Celery 4.3.0 示例,解決Task handler raised error: ValueError('not enough values to unp...
- Django 2.1.7 集成Celery 4.3.0 從介紹到入門
- Django 2.1.7 Celery 4.3.0 配置
上一篇在講解Celery的配置中,提到將Celery的配置抽出到一個獨立的文件進行管理,如下:

下面我們再來看看,當task任務越來越多的時候,也應該要抽出來。
整理Celery模塊的目錄
在前面的目錄基礎上,再創(chuàng)建一個celery.py文件,然后我們的文件結(jié)構(gòu)如下:

調(diào)整各模塊文件的代碼
- celery.py內(nèi)容如下:
from celery import Celery
from celery_tasks import celeryconfig
## 使用增加配置的方式創(chuàng)建celery app
app = Celery('celery_tasks.tasks')
# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)
# 自動搜索任務
app.autodiscover_tasks(['celery_tasks'])
- celeryconfig.py模塊內(nèi)容如下:
from kombu import Exchange, Queue
# 設置結(jié)果存儲
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/9'
# 設置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/8'
- tasks.py模塊內(nèi)容如下:
from celery_tasks.celery import app as celery_app
# 創(chuàng)建任務函數(shù)
@celery_app.task
def my_task1():
print("任務函數(shù)(my_task1)正在執(zhí)行....")
@celery_app.task
def my_task2():
print("任務函數(shù)(my_task2)正在執(zhí)行....")
@celery_app.task
def my_task3():
print("任務函數(shù)(my_task3)正在執(zhí)行....")
- 啟動worker:
celery -A celery_tasks worker -l info -P eventlet
要注意:這里是以Celery的包進行啟動,之前是使用tasks進行啟動的。
celery -A celery_tasks.tasks worker -l info -P eventlet
啟動日志如下:
(venv) F:\pythonProject\django-pratice>celery -A celery_tasks worker -l info -P eventlet
-------------- celery@USC2VG2F9NPB650 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Windows-10-10.0.17763-SP0 2019-08-03 17:26:52
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_tasks.tasks:0x2a98a1b7320
- ** ---------- .> transport: redis://127.0.0.1:6379/8 # 設置的 broker
- ** ---------- .> results: redis://127.0.0.1:6379/9 # 設置的結(jié)果存儲
- *** --- * --- .> concurrency: 12 (eventlet) # 并發(fā)的數(shù)量
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks] # 注冊的任務
. celery_tasks.tasks.my_task1
. celery_tasks.tasks.my_task2
. celery_tasks.tasks.my_task3
[2019-08-03 17:26:52,822: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
[2019-08-03 17:26:52,872: INFO/MainProcess] mingle: searching for neighbors
[2019-08-03 17:26:54,073: INFO/MainProcess] mingle: all alone
[2019-08-03 17:26:54,196: INFO/MainProcess] celery@USC2VG2F9NPB650 ready.
[2019-08-03 17:26:54,230: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/8.
下面來調(diào)用一下任務測試一下,打開另一個終端,如下:
# 導入三個task任務
In [1]: from celery_tasks.tasks import my_task1,my_task2,my_task3
# 下面使用delay方法調(diào)用各個task任務
In [2]: my_task1.delay()
Out[2]: <AsyncResult: 20d3ee91-d101-477a-9326-6dba394d8bda>
In [3]: my_task1.delay()
Out[3]: <AsyncResult: b069c036-ede8-4061-a490-5a63131871ab>
In [4]: my_task2.delay()
Out[4]: <AsyncResult: 44e9570a-d5c1-445b-92cc-885d0752a5a7>
In [5]: my_task3.delay()
Out[5]: <AsyncResult: 71ded829-2270-418c-afbf-72fb39a5169a>
In [6]:
回到celery的日志終端,查看任務執(zhí)行信息,如下:

