上篇文章我們學(xué)習(xí)了Flask框架——MongoEngine使用MongoDB數(shù)據(jù)庫,這篇文章我們學(xué)習(xí)Flask框架——基于Celery的后臺任務(wù)。
Celery
在Web開發(fā)中,我們經(jīng)常會遇到一些耗時的操作,例如:上傳/下載數(shù)據(jù)、發(fā)送郵件/短信,執(zhí)行各種任務(wù)等等。這時我們可以使用分布式異步消息任務(wù)隊列去執(zhí)行這些任務(wù)。
Celery是一款非常簡單、靈活、可靠的分布式異步消息隊列工具,可以用于處理大量消息、實(shí)時數(shù)據(jù)以及任務(wù)調(diào)度。
Celery通過消息機(jī)制進(jìn)行通信,一般使用中間人(Broker)作為客戶端和職程(Worker)調(diào)節(jié)。
其工作流程如下圖所示:
客戶端發(fā)送消息任務(wù)給中間人(Broker),任務(wù)執(zhí)行單元(Celery Worker)監(jiān)控中間人中的任務(wù)隊列,當(dāng)中間人有消息任務(wù)時就分配任務(wù)給任務(wù)執(zhí)行單元,任務(wù)執(zhí)行單元在后臺運(yùn)行任務(wù)并返回請求。
注意:Celery可以有多個職程(Worker)和中間人(Broker),用來提高Celery的高可用性以及橫向擴(kuò)展能力。
Celery優(yōu)點(diǎn):
- 簡單:上手比較簡單,不需要配置文件就可以直接運(yùn)行;
- 高可用:如果出現(xiàn)丟失連接或連接失敗,職程(Worker)和客戶端會自動重試,并且中間人通過 主/主 主/從 的方式來進(jìn)行提高可用性;
- 快速:單個 Celery 進(jìn)行每分鐘可以處理數(shù)以百萬的任務(wù),而且延遲僅為亞毫秒(使用 RabbitMQ、 librabbitmq 在優(yōu)化過后);
- 靈活:Celery 的每個部分幾乎都可以自定義擴(kuò)展和單獨(dú)使用,例如自定義連接池、序列化方式、壓縮方式、日志記錄方式、任務(wù)調(diào)度、生產(chǎn)者、消費(fèi)者、中間人(Broker)等。
安裝
Celery安裝方式很簡單,執(zhí)行如下命令即可:
pip install celery
這里我們使用redis作為中間人,執(zhí)行如下代碼安裝redis:
pip install redis
創(chuàng)建Celery程序
對比說明
(1)不使用Celery執(zhí)行耗時任務(wù),創(chuàng)建一個名為test.py文件,其示例代碼如下:
import time
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('開始執(zhí)行')
result=add(2,3) #調(diào)用add函數(shù)
print('執(zhí)行結(jié)束')
print(result)
運(yùn)行test.py文件,運(yùn)行結(jié)果如下圖:
(2)使用Celery執(zhí)行耗時任務(wù),創(chuàng)建一個名為tasks.py文件,示例代碼如下:
import time
from celery import Celery
celery = Celery( #實(shí)例化Celery對象
'tasks', #當(dāng)前模塊名
broker='redis://localhost:6379/1', #使用redis為中間人
backend='redis://localhost:6379/2' #結(jié)果存儲
)
@celery.task() #使用異步任務(wù)裝飾器task
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('開始執(zhí)行')
result=add.delay(2,3) #調(diào)用add方法并使用delay延時函數(shù)
print('執(zhí)行結(jié)束')
print(result)
實(shí)例化Celery對象,其中第一個參數(shù)為當(dāng)前模塊名,第二個參數(shù)為中間人(Broker)的URL鏈接,第三個參數(shù)為中間人結(jié)果放回的存儲URL鏈接,再調(diào)用add()方法時,需要使用delay延時函數(shù)。
運(yùn)行tasks.py文件,運(yùn)行結(jié)果如下圖所示:
當(dāng)我們運(yùn)行tasks.py文件時,發(fā)現(xiàn)程序一下子就運(yùn)行結(jié)束并返回任務(wù)id,
在終端執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):
celery -A tasks worker -l info
如下圖所示:
雖然職程已經(jīng)收到任務(wù)并且在分配到子進(jìn)程運(yùn)行了,但是發(fā)現(xiàn)該任務(wù)沒有運(yùn)行結(jié)束,這時因為Celery不支持在windows下運(yùn)行任務(wù),需要借助eventlet來完成,執(zhí)行如下安裝eventlet:
pip install eventlet
安裝成功后,執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):
celery -A tasks worker -l info -P eventlet -c 10
運(yùn)行結(jié)果如下:
Celery配置
大多數(shù)情況下,使用默認(rèn)的配置即可滿足我們的開發(fā),不需要修改配置,當(dāng)我們需要修改配置時,可以通過update進(jìn)行配置,在上面的tasks.py添加如下代碼:
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
其中:
accept_content:允許的內(nèi)容類型/序列化程序的白名單,如果收到不在此列表中的消息,則該消息將被丟棄并出現(xiàn)錯誤,默認(rèn)只為json;
task_serializer:標(biāo)識要使用的默認(rèn)序列化方法的字符串,默認(rèn)值為json;
result_serializer:結(jié)果序列化格式,默認(rèn)值為json;
timezone:配置Celery以使用自定義時區(qū);
enable_utc:啟用消息中的日期和時間,將轉(zhuǎn)換為使用 UTC 時區(qū),與timezone連用,當(dāng)設(shè)置為 false 時,將使用系統(tǒng)本地時區(qū)。
除了上面的配置參數(shù),Celery還提供了很多很多配置參數(shù),大家可以在官方配置文檔中查看
Celery的配置信息比較多,通常情況下,我們會在tasks.py同級目錄下為創(chuàng)建Celery的配置文件, 這里命名為celeryconfig.py,示例代碼如下:
broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
執(zhí)行如下代碼加載配置:
import celeryconfig
app.config_from_object('celeryconfig')
在Flask項目中使用Celery
首先創(chuàng)建一個名為mycelery.py文件,該文件用來實(shí)例化Celery對象,示例代碼如下:
from celery import Celery
def make_celery(app):
celery = Celery( #實(shí)例化Celery
'tasks',
broker='redis://localhost:6379/1', #使用redis為中間人
backend='redis://localhost:6379/2' #結(jié)果存儲
)
class ContextTask(celery.Task): #創(chuàng)建ContextTask類并繼承Celery.Task子類
def __call__(self, *args, **kwargs):
with app.app_context(): #和Flask中的app建立關(guān)系
return self.run(*args, **kwargs) #返回任務(wù)
celery.Task = ContextTask #異步任務(wù)實(shí)例化ContextTask
return celery #返回celery對象
首先自定義一個名為make_celery()方法,該方法傳入Flask程序中的app,在方法中實(shí)例化Celery,并創(chuàng)建一個名為ContextTask類用來和Flask中的app建立關(guān)系,最后返回celery。
創(chuàng)建名為tasks.py文件,該文件用來存放我們的耗時任務(wù),示例代碼如下:
import time
from app import celery
@celery.task #使用異步任務(wù)裝飾器task
def add(x, y):
time.sleep(5) #休眠5秒
return x + y
這里我們通過休眠的方式來模擬耗時的下載任務(wù)。
Flask程序app.py文件示例代碼如下:
from flask import Flask
import tasks
from mycelery import make_celery
app = Flask(__name__)
celery = make_celery(app) #調(diào)用make_celery方法并傳入app使celery和app進(jìn)行關(guān)聯(lián)
@app.route('/')
def hello():
tasks.add.delay(1,2) #調(diào)用tasks文件中的add()異步任務(wù)方法
return '請求正在后臺處理中,您可以去處理其他事情'
if __name__ == '__main__':
app.run(debug=True)
app.py文件很簡單,就調(diào)用make_celery方法使celery和app進(jìn)行關(guān)聯(lián),并在視圖函數(shù)中使用tasks中的異步任務(wù)方法。
在終端執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):
celery -A tasks worker -l info -P eventlet -c 10
啟動Flask程序,訪問http://127.0.0.1:5000/后在終端查Worker服務(wù),如下圖所示:
這樣就成功使用Celery把耗時任務(wù)交給后臺來處理,避免了不必要的耗時等待(如下載數(shù)據(jù)任務(wù))。
當(dāng)我們不使用Celery時,用戶在執(zhí)行耗時任務(wù)時,用戶可能要等耗時任務(wù)完成后,才能進(jìn)行其他操作。
好了,F(xiàn)lask框架——基于Celery的后臺任務(wù)就講到這里了,感謝觀看,下篇文章繼續(xù)學(xué)習(xí)Flask框架的其他知識。
公眾號:白巧克力LIN
該公眾號發(fā)布Python、數(shù)據(jù)庫、Linux、Flask、自動化測試、Git等相關(guān)文章!