基于:http://www.itdecent.cn/p/e2f9eef96f53
手動驅(qū)動任務(wù)示例,相關(guān)的文件主要涉及:
celeryconfig.py # 任務(wù)配置文件
__init__.py #實(shí)例化celery對象
task1.py #具體的任務(wù)實(shí)體
task2.py #具體的任務(wù)實(shí)體
client.py #任務(wù)的生產(chǎn)提交客戶端
圖示:

image.png
init.py 文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:00
@version: v1.0.0
@Contact: 308711822@qq.com
@File: __init__.py.py
@文件功能描述:
"""
from celery import Celery
app = Celery('demo') # 創(chuàng)建 Celery 實(shí)例
app.config_from_object('celery_app.celeryconfig') # 通過 Celery 實(shí)例加載配置模塊
celeryconfig.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:01
@version: v1.0.0
@Contact: 308711822@qq.com
@File: celeryconfig.py
@文件功能描述:
"""
BROKER_URL = "redis://localhost:6379/0" # 指定 Broker
CELERY_RESULT_BACKEND = "redis://localhost:6379/1" # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區(qū),默認(rèn)是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = ( # 指定導(dǎo)入的任務(wù)模塊
'celery_app.task1',
'celery_app.task2'
)
task1.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:53
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task1.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
task2.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:01
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task2.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
client.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + +
"""
Author = zyx
@Create_Time: 2018/1/11 15:02
@version: v1.0.0
@Contact: 308711822@qq.com
@File: client.py.py
@文件功能描述:
"""
import datetime
from datetime import timedelta
from celery_app import task1
from celery_app import task2
task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)
task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7)
print('hello world')
# countdown:指定多少秒后執(zhí)行任務(wù)
task1.add.apply_async(args=(2, 23), countdown=5) # 5 秒后執(zhí)行任務(wù)
task1.add.apply_async(args=[6, 7], expires=10) # 10 秒后過期
# 當(dāng)前 UTC 時間再加 10 秒后執(zhí)行任務(wù)
# task1.add.multiply.apply_async(args=[8, 7], eta=datetime.utcnow() + timedelta(seconds=10))
# task1.add.apply_async(args=[8, 7], eta=datetime.utcnow() + timedelta(seconds=10))
# expires:任務(wù)過期時間,參數(shù)類型可以是 int,也可以是 datetime
# task1.add.multiply.apply_async(args=[6, 7], expires=10) # 10 秒后過期
# task1.add.apply_async(args=[6, 7], expires=10) # 10 秒后過期
運(yùn)行的方式:
1:進(jìn)入到目錄:
cd celery_demo2
2: 啟動 Celery Worker 進(jìn)程,在項(xiàng)目的根目錄下執(zhí)行下面命令:
celery_demo2 $ celery -A celery_app worker --loglevel=info
3: 運(yùn)行客戶端:
運(yùn)行client.py,查看對應(yīng)的調(diào)式的信息
bast定時任務(wù)驅(qū)動任務(wù)示例,相關(guān)的文件主要涉及:
celeryconfig.py # 任務(wù)配置文件
task1.py #具體的任務(wù)實(shí)體
task2.py #具體的任務(wù)實(shí)體

image.png
celeryconfig.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:35
@version: v1.0.0
@Contact: 308711822@qq.com
@File: celeryconfig.py.py
@文件功能描述:
"""
from datetime import timedelta
from celery.schedules import crontab
# Broker and Backend
BROKER_URL = "redis://localhost:6379/2" # 指定 Broker
# CELERY_RESULT_BACKEND = "redis://localhost:6379/3" # 指定 Backend
# Timezone
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區(qū),不指定默認(rèn)為 'UTC'
# CELERY_TIMEZONE='UTC'
# import
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
# schedules
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task1.add',
'schedule': timedelta(seconds=3), # 每 30 秒執(zhí)行一次
'args': (5, 8) # 任務(wù)函數(shù)參數(shù)
},
'multiply-at-some-time': {
'task': 'celery_app.task2.multiply',
'schedule': crontab(hour=9, minute=50), # 每天早上 9 點(diǎn) 50 分執(zhí)行一次
'args': (3, 7) # 任務(wù)函數(shù)參數(shù)
}
}
task1.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:39
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task1.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
task2.py文件內(nèi)容
#!/usr/bin/evn python
# coding=utf-8
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +
# ┏┓ ┏┓+ +
# ┏┛┻━━━┛┻┓ + +
# ┃ ┃
# ┃ ━ ┃ ++ + + +
# ████━████ ┃+
# ┃ ┃ +
# ┃ ┻ ┃
# ┃ ┃ + +
# ┗━┓ ┏━┛
# ┃ ┃
# ┃ ┃ + + + +
# ┃ ┃ Codes are far away from bugs with the animal protecting
# ┃ ┃ + 神獸保佑,代碼無bug
# ┃ ┃
# ┃ ┃ +
# ┃ ┗━━━┓ + +
# ┃ ┣┓
# ┃ ┏┛
# ┗┓┓┏━┳┓┏┛ + + + +
# ┃┫┫ ┃┫┫
# ┗┻┛ ┗┻┛+ + + +
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +"""
"""
Author = zyx
@Create_Time: 2018/1/11 15:39
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task2.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
運(yùn)行的方式:
1:進(jìn)入到目錄
cd celery_periodic_tasks
2: 啟動 Celery Beat 進(jìn)程,定時將任務(wù)發(fā)送到 Broker,在項(xiàng)目根目錄下執(zhí)行下面命令:
celery_periodic_tasks $ celery beat -A celery_app
3: 啟動 Celery Worker 進(jìn)程,在項(xiàng)目的根目錄下執(zhí)行下面命令:
celery_periodic_tasks $ celery -A celery_app worker --loglevel=info