Celery 是什么?
- 異步任務(wù)隊(duì)列工具,主要解決 realtime 事件的異步操作,但也支持定時(shí)任務(wù)。
- 什么是異步?那要先理解什么是同步,比如我去麥當(dāng)勞吃飯,如果麥當(dāng)勞前一個(gè)顧客點(diǎn)完單,拿到餐,吃完走人之后才能接待下一個(gè)顧客,就是同步。反過(guò)來(lái)我點(diǎn)完單,它馬上就接下一個(gè)客人的單,我的流程雖然還沒(méi)有走完(time-consuming),但也不影響下一個(gè)顧客點(diǎn)單(blocked),這就是異步。
Celery 安裝
-
pip install celery,由于 celery 自己并不帶隊(duì)列存儲(chǔ),所以根據(jù)官方推薦,還需要安裝 RabbitMQ 或者 Redis 來(lái)存儲(chǔ)隊(duì)列。方便起見(jiàn),本文用 Redis。
Celery 機(jī)制
- celery 是一個(gè)裝飾器類,本質(zhì)上是把一個(gè)函數(shù)變成一個(gè)可以異步調(diào)用的函數(shù)。
- 開(kāi)啟 celery 進(jìn)程。
- 在另一段程序中導(dǎo)入這個(gè)函數(shù),當(dāng)這個(gè)函數(shù)被以 delay 模式調(diào)用時(shí):
xxx.delay(),celery 會(huì)把這個(gè)任務(wù)寫(xiě)到任務(wù)隊(duì)列然后返回,原程序不會(huì)被阻塞可以往下跑。注意這里需要用 delay 模式去跑這個(gè)函數(shù),同時(shí)注意不要把xxx.delay()的結(jié)果賦值給一個(gè)變量,否則依然會(huì)被阻塞。 - celery 的另一個(gè)進(jìn)程會(huì)去這個(gè)任務(wù)隊(duì)列里取任務(wù),完成之后寫(xiě)到 result 隊(duì)列里面。或者多數(shù)情況下,這個(gè)被異步調(diào)用的函數(shù)不需要返回結(jié)果,比如發(fā)送一個(gè)郵件,提醒之類的,連 result 隊(duì)列都不用。
Celery first blood
開(kāi)一個(gè) python 腳本,比如
tasks.py。-
生成一個(gè) Celery 對(duì)象實(shí)例,是一個(gè)裝飾器。
from celery import Celery app = Celery('__name__', broker='redis://localhost:6379') -
定義一個(gè)你想異步操作的函數(shù),并加上 celery 裝飾器
@app.task。@app.task def add(x, y): return x + y -
保存,退出,然后在 terminal 啟動(dòng) celery 服務(wù)。
celery -A tasks worker --loglevel=info
做個(gè)定時(shí)任務(wù):每天發(fā)問(wèn)候
接下來(lái),我要搭配釘釘機(jī)器人了,我希望小仙女每天早上7點(diǎn)給我發(fā)個(gè)問(wèn)候,然后在7點(diǎn)半的時(shí)候確認(rèn)我有沒(méi)有開(kāi)始干活了。
-
先寫(xiě)一個(gè)簡(jiǎn)單的釘釘提醒程序,命名
celery_worker.py:#! /usr/bin/env python # coding: utf-8 import requests import json import time from config import HOST_IP, NOTIFY_URL, MOBILE_NUMBER def notify_dingding(msg): headers = {"Content-Type": "application/json; charset=utf-8"} post_data = { "msgtype": "text", "text": { "content": msg }, "at": { "atMobiles": [MOBILE_NUMBER] } } r = requests.post(NOTIFY_URL, headers=headers, data=json.dumps(post_data)) print(r.content)注意這里從 config 里導(dǎo)入了一些參數(shù),所以要在這個(gè)程序的同一層寫(xiě)一個(gè)
config.py的配置文件。# config.py NOTIFY_URL = ("https://oapi.dingtalk.com/robot/send?access_token=" "c6d5a2936381dfc29394f3c336bea5fad962d90ffd31809e92d95a1xxxxxxxx") MOBILE_NUMBER = "176xxxxx619" HOST_IP = "127.0.0.1" -
導(dǎo)入 celery 包,給函數(shù)加上裝飾器:
from celery import Celery BROKER_URI = 'redis://%s:6379/6' % HOST_IP BACKEND_URI = 'redis://%s:6379/5' % HOST_IP worker = Celery('celery_worker', broker=BROKER_URI, backend=BACKEND_URI) @worker.task def notify_dingding(msg): ... 簡(jiǎn)單學(xué)習(xí)一下celery 的 crontab 定時(shí)任務(wù)。
-
給 worker 加上定時(shí)任務(wù)
from celery.schedules import crontab worker.conf.update( timezone='Asia/Shanghai', enable_utc=True, beat_schedule={ "morning_msg_1": { "task": "celery_worker.notify_dingding", "schedule": crontab(minute=0, hour=7), "args": ("早,起床了喲,先去做個(gè)早飯吧",) }, "morning_msg_2": { "task": "celery_worker.notify_dingding", "schedule": crontab(minute=30, hour=7), "args": ("我就問(wèn)問(wèn)你在干活咩?",) } } ) -
最后程序末尾加一個(gè)小測(cè)試,看看服務(wù)是不是起來(lái)了:
notify_dingding("小仙女上線啦") -
開(kāi)啟 redis-server
nohup redis-server & -
開(kāi)啟我們的 celery worker,這里的
-B是 celery 的 beat 服務(wù),可以理解為一個(gè)周期任務(wù)。celery -A celery_worker worker -B 服務(wù)起來(lái)嘍:

QQ20170818-163110.jpg

QQ20170818-162958.jpg