Celery基礎,django-celery在django如何使用和一些問題如何解決

Celery(https://www.celerycn.io/v/4.4.0/ru-men/celery-jin-jie-shi-yong):

? ?一:特點:

? ? ? ? 高可用:如果出現(xiàn)丟失連接或連接失敗,職程(Worker)和客戶端會自動重試,并且中間人通過 主/主 主/從 的方式來進行提高可用性

? ? ? ? 快速: 單個 Celery 進行每分鐘可以處理數(shù)以百萬的任務,而且延遲僅為亞毫秒(使用 RabbitMQ、 librabbitmq 在優(yōu)化過后)

? ? ? ? 靈活:Celery 的每個部分幾乎都可以自定義擴展和單獨使用,例如自定義連接池、序列化方式、壓縮方式、日志記錄方式、任務調度、生產(chǎn)者、消費者、中間人(Broker)等

? ? 功能:

? ? ? ? 監(jiān)控:可以針對整個流程進行監(jiān)控,內置的工具或可以實時說明當前集群的概況

? ? ? ? 調度:可以通過調度功能在一段時間內指定任務的執(zhí)行時間 datetime,也可以根據(jù)簡單每隔一段時間進行執(zhí)行重復的任務,支持分鐘、小時、星期幾,也支持某一天或某一年的Crontab表達式

? ? ? ? 工作流:可以通過“canvas“進行組成工作流,其中包含分組、鏈接、分塊等等。簡單和復雜的工作流程可以使用一組“canvas“組成,其中包含分組、鏈接、分塊等

? ? ? ? 資源(內存)泄漏保護: --max-tasks-per-child 參數(shù)適用于可能會出現(xiàn)資源泄漏(例如:內存泄漏)的任務

? ? ? ? 時間和速率的限制:您可以控制每秒/分鐘/小時執(zhí)行任務的次數(shù),或者任務執(zhí)行的最長時間,也將這些設置為默認值,針對特定的任務或程序進行定制化配置

? ? ? ? 自定義組件:開發(fā)者可以定制化每一個職程(Worker)以及額外的組件。職程(Worker)是用 “bootsteps” 構建的-一個依賴關系圖,可以對職程(Worker)的內部進行細粒度控制


? ? 二: 用法

? ? 2.1創(chuàng)建一個文件夾proj, 在proj目錄下創(chuàng)建3個py文件__init__.py, celery.py, tasks.py

? ? 在celery.py代碼:

? ? ? ? from __future__ import absolute_import, unicode_literals

? ? ? ? from celery import Celery

? ? ? ? # app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks'])

? ? ? ? app = Celery('proj', backend='redis://localhost', broker='redis://localhost:6379', include=['proj.tasks'])

? ? ? ? # Optional configuration, see the application user guide.

? ? ? ? app.conf.update(

? ? ? ? ? ? result_expires=3600,

? ? ? ? )

? ? ? ? if __name__ == '__main__':

? ? ? ? ? ? app.start()

? ? 采用的是redis作為中間件(broker)sudo apt-get install redis-server? ? sudo service redis-server restart

? ? task.py:

? ? ? ? from __future__ import absolute_import, unicode_literals

? ? ? ? from .celery import app

? ? ? ? @app.task

? ? ? ? def add(x, y):

? ? ? ? ? ? return x + y

? ? ? ? @app.task

? ? ? ? def mul(x, y):

? ? ? ? ? ? return x * y

? ? ? ? @app.task

? ? ? ? def xsum(numbers):

? ? ? ? ? ? return sum(numbers)

? ? 運行:celery -A tasks proj --loglevel=info?

? ? 在proj目錄外面新建一個腳本proj_test.py:

?from proj.tasks import *

from celery import group

from celery import chain, chord

# delay() 實際上為 apply_async() 的快捷使用, apply_async() 可以指定調用時執(zhí)行的參數(shù),例如運行的時間,使用的任務隊列等

# res 參數(shù):res.state, res.successful(), res.failed()

res = add.delay(2, 2)? # add.apply_async((2, 2), queue='lopri', countdown=10)

# 任務執(zhí)行引發(fā)異常,可以進行檢查異常以及溯源,默認情況下 result.get() 會拋出異常,

# 如果不希望 Celery 拋出異常,可以通過設置 propagate 來進行禁用

result_add = res.get(timeout=1)? # 或者res.get(propagate=False)

# r_add2 = res.get(propagate=False)

print(res.failed())

print(result_add)

# print(add(2, 2))

print(res.id)? # 每一個任務都有一個id, 獲取任務的ID

# 一個任務只能有當前只能有一個狀態(tài),但他的執(zhí)行過程可以為多個狀態(tài),一個典型的階段是:

# PENDING -> STARTED -> SUCCESS

# 重試任務比較復雜,為了證明,一個任務會重試兩次,任務的階段為:

# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

from proj.celery import app

res2 = app.AsyncResult('this-id-does-not-exist')

print(res2.state)

2.2:# Canvas:設計工作流程

s1 = add.signature((2, 2), countdown=1)? # add.s(2, 2)

res3 = s1.delay()

print(res3.get())

s2 = add.s(2)

res4 = s2.delay(8)

print(res4.get())

# 組:Groups

# 一個 group 并行調用任務列表,返回一個特殊的結果實例,可以將結果作為一個列表進行查看,并且通過索引進去獲取返回值。

g1 = group(add.s(i, i) for i in range(10))().get()

g2 = group(add.s(i) for i in range(10))

res5 = g2(10).get()

print(res5)

2.3# 鏈:Chains可以將任務鏈接在一起,在一個人返回后進行調用另外一個任務

c1 = chain(add.s(4, 4) | mul.s(8))().get()

print(c1)

2.4:# 和弦:Chords 和弦是一個帶有回調的組:

c2 = chord((add.s(i, i) for i in range(10)), xsum.s())().get()

print(c2)

c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()

# c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()

print(c3)


三:在django中使用celery

環(huán)境: python3.7 ,django3.0.6

安裝: pip install django-celery

在settings.py中配置:

import djcelery

djcelery.setup_loader()

CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = ('app.tasks', )

BROKER_URL = 'redis://127.0.0.1:6379/8'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

INSTALLED_APPS = [

? ? ...

? ? 'djcelery',

]

當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標記為task的方法,將它們注冊為celery task。

BROKER_URL ='redis://127.0.0.1:6379/6'

broker是代理人,它負責分發(fā)任務給worker去執(zhí)行。我使用的是Redis作為broker,當然你也可以用其它的broker,比如官方就比較推薦使用RabbitMQ.

有的博客中提到要配置關鍵字:CELERY_RESULT_BACKEND,例如:

CELERY_RESULT_BACKEND='amqp://guest@localhost//'#可以不用寫

我沒有配置這個關鍵字。因為如果沒有配置,此時Django會使用默認的數(shù)據(jù)庫(也是你指定的orm數(shù)據(jù)庫),作為它的結果作為它的backend。因此你也可以不用寫,使用Django默認設置的數(shù)據(jù)庫就很好。

CELERY_IMPORTS = ('app.tasks', )

CELERY_TIMEZONE = TIME_ZONE

CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'

上面第一句是導入目標任務文件,第二句是設置時區(qū),第三句表示使用了django-celery默認的數(shù)據(jù)庫調度模型,任務執(zhí)行周期都被存在默認指定的orm數(shù)據(jù)庫中.

更深入的Celery配置:(http://www.cnblogs.com/ajianbeyourself/p/4950758.html)

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {

#定時任務一: 每24小時周期執(zhí)行任務(del_redis_data)

u'刪除過期的redis數(shù)據(jù)': {

????"task":"app.tasks.del_redis_data","schedule": crontab(hour='*/24'),"args": (), },

上面是設置定時的時間配置,關于crontab的具體用法,celery的官方文檔講解的十分詳盡(表格):

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

我選的三個任務,是我特意挑選的,非常有代表性。第一個是周期任務,它會每隔一個固定時間周期去執(zhí)行一次相應的task,比如隔1分鐘,隔1小時等; 第二個和第三個都是定時任務,定時在每個時間點,比如每天的6點,或者定時在每個月的1號。

周期任務和定時任務有小小的差別,這也是crontab的強大之處,它同時支持這兩種。

同步數(shù)據(jù)庫:

? ? python manage.py makemigrations

? ? python manage.py migrate

在app路徑下創(chuàng)建task.py腳本, 內容如下:

from __future__ import absolute_import

from celery import task

from celery import shared_task

from data_draw.script.database_operation import *

from django.conf import settings

import os

@task()

def select_evaluation(data_name_dict):

? ? oss_data_name_list = select_dir(settings.EVALUATION_DIR, 0)

? ? # print(f"oss_data_name_list:{oss_data_name_list}")

? ? print("*"*200)

? ? print("異步執(zhí)行 select_evaluation 方法")

? ? for data_name in oss_data_name_list:

? ? ? ? oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name)

? ? ? ? eval_oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name, 'evaluation/')

? ? ? ? item = {}

? ? ? ? mp4_name = select_video(oss_data_path)

? ? ? ? if mp4_name:

? ? ? ? ? ? mp4 = True

? ? ? ? ? ? video_name = mp4_name[0]

? ? ? ? else:

? ? ? ? ? ? mp4 = False

? ? ? ? ? ? video_name = ''

? ? ? ? file_name_list = select_dir(eval_oss_data_path, 1)

? ? ? ? for key, value in settings.MEED_EVALUATION_FILE.items():

? ? ? ? ? ? if value in file_name_list:

? ? ? ? ? ? ? ? item[key] = True

? ? ? ? ? ? else:

? ? ? ? ? ? ? ? item[key] = False

? ? ? ? try:

? ? ? ? ? ? value = data_name_dict[data_name]

? ? ? ? ? ? update_data_result(data_name, mp4, video_name, item)

? ? ? ? except:

? ? ? ? ? ? add_data_result(data_name, mp4, video_name, item)

在view.py腳本引用:

from data_draw.task import select_evaluation

select_evaluation.delay(data_name_dict)

運行:

python manage.py runserver 0.0.0.0:8001#啟動django的應用,可以動態(tài)的使用django-admin來管理任務

python manage.py celery beat #應該是用來監(jiān)控任務變化的

python manage.py celery worker -c 6 -l debug #任務執(zhí)行進程,worker進程

運行時報的第一個錯誤, async導入錯誤:

from . import async, base

? ? ? ? ? ? ? ? ? ? ? ^

SyntaxError: invalid syntax

這是因為在 python 3.7 中將 async 作為了關鍵字,所以當 py 文件中出現(xiàn)類似 from . import async, base 這類不符合python語法的語句時,Python會報錯

解決:

在 celery 官方的提議下,建議將 async.py 文件的文件名改成 asynchronous。

所以我們只需要將 celery\backends\async.py 改成 celery\backends\asynchronous.py,并且把 celery下代碼中的所有 async 改成 asynchronous 就可以了

運行時報的第二個錯誤:

File "/home/python/.virtualenvs/django_class/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems

? ? return iter(x.items())

AttributeError: 'str' object has no attribute 'items'

這是因為以前版本的redis太高(3.0.1),所以重新加載了redis

解決:

? ? pip install redis==2.10.6

參考文檔:

https://www.celerycn.io/v/4.4.0/ru-men/celery-chu-ci-shi-yong

http://www.itdecent.cn/p/e97ca5315c90

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容