Flask框架——基于Celery的后臺任務(wù)

上篇文章我們學(xué)習(xí)了Flask框架——MongoEngine使用MongoDB數(shù)據(jù)庫,這篇文章我們學(xué)習(xí)Flask框架——基于Celery的后臺任務(wù)。

Celery

在Web開發(fā)中,我們經(jīng)常會遇到一些耗時的操作,例如:上傳/下載數(shù)據(jù)、發(fā)送郵件/短信,執(zhí)行各種任務(wù)等等。這時我們可以使用分布式異步消息任務(wù)隊列去執(zhí)行這些任務(wù)。

Celery是一款非常簡單、靈活、可靠的分布式異步消息隊列工具,可以用于處理大量消息、實(shí)時數(shù)據(jù)以及任務(wù)調(diào)度。

Celery通過消息機(jī)制進(jìn)行通信,一般使用中間人(Broker)作為客戶端和職程(Worker)調(diào)節(jié)。

其工作流程如下圖所示:



客戶端發(fā)送消息任務(wù)給中間人(Broker),任務(wù)執(zhí)行單元(Celery Worker)監(jiān)控中間人中的任務(wù)隊列,當(dāng)中間人有消息任務(wù)時就分配任務(wù)給任務(wù)執(zhí)行單元,任務(wù)執(zhí)行單元在后臺運(yùn)行任務(wù)并返回請求。

注意:Celery可以有多個職程(Worker)和中間人(Broker),用來提高Celery的高可用性以及橫向擴(kuò)展能力。

Celery優(yōu)點(diǎn)

  • 簡單:上手比較簡單,不需要配置文件就可以直接運(yùn)行;
  • 高可用:如果出現(xiàn)丟失連接或連接失敗,職程(Worker)和客戶端會自動重試,并且中間人通過 主/主 主/從 的方式來進(jìn)行提高可用性;
  • 快速:單個 Celery 進(jìn)行每分鐘可以處理數(shù)以百萬的任務(wù),而且延遲僅為亞毫秒(使用 RabbitMQ、 librabbitmq 在優(yōu)化過后);
  • 靈活:Celery 的每個部分幾乎都可以自定義擴(kuò)展和單獨(dú)使用,例如自定義連接池、序列化方式、壓縮方式、日志記錄方式、任務(wù)調(diào)度、生產(chǎn)者、消費(fèi)者、中間人(Broker)等。

安裝

Celery安裝方式很簡單,執(zhí)行如下命令即可:

pip install celery

這里我們使用redis作為中間人,執(zhí)行如下代碼安裝redis:

pip install redis

創(chuàng)建Celery程序

對比說明

(1)不使用Celery執(zhí)行耗時任務(wù),創(chuàng)建一個名為test.py文件,其示例代碼如下:

import time

def add(a,b):                   
    time.sleep(5)               #休眠5秒
    return a+b      

if __name__ == '__main__':
    print('開始執(zhí)行')
    result=add(2,3)             #調(diào)用add函數(shù)
    print('執(zhí)行結(jié)束')
    print(result)

運(yùn)行test.py文件,運(yùn)行結(jié)果如下圖:


(2)使用Celery執(zhí)行耗時任務(wù),創(chuàng)建一個名為tasks.py文件,示例代碼如下:

import time
from celery import Celery

celery = Celery(                          #實(shí)例化Celery對象
    'tasks',                            #當(dāng)前模塊名
    broker='redis://localhost:6379/1',      #使用redis為中間人
    backend='redis://localhost:6379/2'      #結(jié)果存儲
)

@celery.task()              #使用異步任務(wù)裝飾器task
def add(a,b):
    time.sleep(5)           #休眠5秒
    return a+b

if __name__ == '__main__':
    print('開始執(zhí)行')
    result=add.delay(2,3)           #調(diào)用add方法并使用delay延時函數(shù)    
    print('執(zhí)行結(jié)束')
    print(result)

實(shí)例化Celery對象,其中第一個參數(shù)為當(dāng)前模塊名,第二個參數(shù)為中間人(Broker)的URL鏈接,第三個參數(shù)為中間人結(jié)果放回的存儲URL鏈接,再調(diào)用add()方法時,需要使用delay延時函數(shù)。

運(yùn)行tasks.py文件,運(yùn)行結(jié)果如下圖所示:


當(dāng)我們運(yùn)行tasks.py文件時,發(fā)現(xiàn)程序一下子就運(yùn)行結(jié)束并返回任務(wù)id,

在終端執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):

celery -A tasks worker -l info

如下圖所示:



雖然職程已經(jīng)收到任務(wù)并且在分配到子進(jìn)程運(yùn)行了,但是發(fā)現(xiàn)該任務(wù)沒有運(yùn)行結(jié)束,這時因為Celery不支持在windows下運(yùn)行任務(wù),需要借助eventlet來完成,執(zhí)行如下安裝eventlet:

pip install eventlet 

安裝成功后,執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):

celery -A tasks worker -l info -P eventlet  -c 10

運(yùn)行結(jié)果如下:


Celery配置

大多數(shù)情況下,使用默認(rèn)的配置即可滿足我們的開發(fā),不需要修改配置,當(dāng)我們需要修改配置時,可以通過update進(jìn)行配置,在上面的tasks.py添加如下代碼:

celery.conf.update(
    task_serializer='json',
    accept_content=['json'],   
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

其中:

  • accept_content:允許的內(nèi)容類型/序列化程序的白名單,如果收到不在此列表中的消息,則該消息將被丟棄并出現(xiàn)錯誤,默認(rèn)只為json;

  • task_serializer:標(biāo)識要使用的默認(rèn)序列化方法的字符串,默認(rèn)值為json;

  • result_serializer:結(jié)果序列化格式,默認(rèn)值為json;

  • timezone:配置Celery以使用自定義時區(qū);

  • enable_utc:啟用消息中的日期和時間,將轉(zhuǎn)換為使用 UTC 時區(qū),與timezone連用,當(dāng)設(shè)置為 false 時,將使用系統(tǒng)本地時區(qū)。

除了上面的配置參數(shù),Celery還提供了很多很多配置參數(shù),大家可以在官方配置文檔中查看

Celery的配置信息比較多,通常情況下,我們會在tasks.py同級目錄下為創(chuàng)建Celery的配置文件, 這里命名為celeryconfig.py,示例代碼如下:

broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

執(zhí)行如下代碼加載配置:

import celeryconfig
app.config_from_object('celeryconfig')

在Flask項目中使用Celery

首先創(chuàng)建一個名為mycelery.py文件,該文件用來實(shí)例化Celery對象,示例代碼如下:

from celery import Celery

def make_celery(app):
    celery = Celery(                        #實(shí)例化Celery
        'tasks',
        broker='redis://localhost:6379/1',      #使用redis為中間人
        backend='redis://localhost:6379/2'      #結(jié)果存儲
    )
    class ContextTask(celery.Task):             #創(chuàng)建ContextTask類并繼承Celery.Task子類
        def __call__(self, *args, **kwargs):    
            with app.app_context():                 #和Flask中的app建立關(guān)系
                return self.run(*args, **kwargs)    #返回任務(wù)
    celery.Task = ContextTask                   #異步任務(wù)實(shí)例化ContextTask
    return celery                               #返回celery對象

首先自定義一個名為make_celery()方法,該方法傳入Flask程序中的app,在方法中實(shí)例化Celery,并創(chuàng)建一個名為ContextTask類用來和Flask中的app建立關(guān)系,最后返回celery。

創(chuàng)建名為tasks.py文件,該文件用來存放我們的耗時任務(wù),示例代碼如下:

import time
from app import celery
@celery.task            #使用異步任務(wù)裝飾器task
def add(x, y):
    time.sleep(5)       #休眠5秒
    return x + y

這里我們通過休眠的方式來模擬耗時的下載任務(wù)。

Flask程序app.py文件示例代碼如下:

from flask import Flask
import tasks
from mycelery import make_celery

app = Flask(__name__)
celery = make_celery(app)               #調(diào)用make_celery方法并傳入app使celery和app進(jìn)行關(guān)聯(lián)

@app.route('/')
def hello():
    tasks.add.delay(1,2)                #調(diào)用tasks文件中的add()異步任務(wù)方法
    return '請求正在后臺處理中,您可以去處理其他事情'

if __name__ == '__main__':
    app.run(debug=True)

app.py文件很簡單,就調(diào)用make_celery方法使celery和app進(jìn)行關(guān)聯(lián),并在視圖函數(shù)中使用tasks中的異步任務(wù)方法。

在終端執(zhí)行如下代碼運(yùn)行Celery職程(Worker)服務(wù):

celery -A tasks worker -l info -P eventlet  -c 10

啟動Flask程序,訪問http://127.0.0.1:5000/后在終端查Worker服務(wù),如下圖所示:


這樣就成功使用Celery把耗時任務(wù)交給后臺來處理,避免了不必要的耗時等待(如下載數(shù)據(jù)任務(wù))。

當(dāng)我們不使用Celery時,用戶在執(zhí)行耗時任務(wù)時,用戶可能要等耗時任務(wù)完成后,才能進(jìn)行其他操作。

好了,F(xiàn)lask框架——基于Celery的后臺任務(wù)就講到這里了,感謝觀看,下篇文章繼續(xù)學(xué)習(xí)Flask框架的其他知識。
公眾號:白巧克力LIN

該公眾號發(fā)布Python、數(shù)據(jù)庫、Linux、Flask、自動化測試、Git等相關(guān)文章!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容