Flask:在Flask中使用Celery實現(xiàn)異步任務

摘要:Flask,Celery

Celery簡介

Celery是一個專注于實時處理任務調度的分布式任務隊列,使用Celery的常見場景:

  • Web應用:當用戶觸發(fā)一個操作需要較長時間才能執(zhí)行完成,可以把這個任務交給Celery異步執(zhí)行,這段時間不需要用戶等待,提高網站吞吐量和降低響應時間
  • 定時任務:Celery可以快速在不同機器設定不同的定時任務
  • 需要異步執(zhí)行任務的其他場景:所有不需要必須同步完成的附加工作都可以異步完成,比如發(fā)送短信/郵件,推送消息,清理緩存等

Celery組件

(1)Celery包含的組件如下
  • Celery Beat:任務調度器,負責周期性的將需要執(zhí)行的任務發(fā)送給任務隊列
  • Celery Worker:消費者,負責執(zhí)行任務,通常部署在多個服務器起多個消費者提升執(zhí)行效率
  • Broker:消息代理(消息中間件),接受生產者發(fā)來的任務消息,再分發(fā)給消費者
  • Producer:生產者,負責調用Celery的API,函數(shù)或者裝飾器產生任務,發(fā)送給消息中間件
  • Result Backend:任務處理完后保存的狀態(tài)信息和結果,支持Redis,MongoDB,RabbitMQ存儲等方式

Celery的各組件的工作流程如下


(2)消息代理的選擇

Celery推薦使用RabbitMQ,Redis,如果選擇Redis則存在斷電停機數(shù)據丟失的問題

(3)數(shù)據序列化

數(shù)據在消息隊列的傳輸需要序列化和反序列化,Celery支持json,yaml,msgpack,默認是json,其中msgpack是一個二進制類似json的序列化方案,比json數(shù)據結構更小更快


快速開始:Flask + Celery異步發(fā)送郵件

(1)安裝準備

安裝準備,以redis作為消息代理和結果存儲,在安裝celery,flask

pip install celery
pip install redis

版本如下

celery --version
5.1.2 (sun-harmonics)

flask --version
Python 3.7.6
Flask 1.1.1
Werkzeug 1.0.0
(2)Flask應用腳本

構建一個Web應用實現(xiàn)給輸入的郵箱發(fā)送郵件,發(fā)送郵件采用Python自帶模塊smtplibemail,前端使用HTML渲染,F(xiàn)lask后臺完成POST表單獲取郵箱數(shù)據發(fā)送郵件,分別對比同步執(zhí)行和使用Celery異步執(zhí)行的響應的時間

import os
import sys
import traceback
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from flask import Flask, render_template, request, redirect, url_for, session
from celery import Celery

from settings import Config

app = Flask(__name__)
app.config.from_object(Config)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'  # 消息代理
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'  # 任務結果寫入redis
app.config['CELERY_TASK_SERIALIZER'] = 'msgpack'  # 序列化方式
app.config['CELERY_ACCEPT_CONTENT'] = ['msgpack', 'json']  # 如果指定了msgpack序列化方式,需要增加msgpack為可接受

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)  # 使用flask app的config

@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    email = request.form.get("email")
    if request.form.get("submit") == "Send":
        # send_async_email.delay("直接發(fā)送", email)
        import time
        t = time.time()
        # send_async_email("直接發(fā)送", email)
        send_async_email.delay("直接發(fā)送", email)
        # TODO somethine
        print(time.time() - t)
    elif request.form.get("submit") == "Send in 1 minute":
        send_async_email.apply_async(["延遲1分分鐘發(fā)送", email], countdown=60)
    return redirect(url_for('index'))


@celery.task(name='main.send_async_email')
def send_async_email(message, to_email):
    send_mail(message, to_email)


def send_mail(message, to_email):
    conn = None
    try:
        conn = smtplib.SMTP_SSL(app.config["SMTP_HOST"], app.config["SMTP_PORT"])
        conn.login(app.config["FROM_EMAIL_ACCOUNT"], app.config["FROM_EMAIL_PASSWORD"])
        msg = MIMEMultipart()
        subject = Header('測試郵件', 'utf-8').encode()
        msg['Subject'] = subject
        msg['From'] = app.config["FROM_EMAIL_ACCOUNT"]
        msg['To'] = to_email
        text = MIMEText(message, 'plain', 'utf-8')
        msg.attach(text)
        conn.sendmail(app.config["FROM_EMAIL_ACCOUNT"], to_email, msg.as_string())
    except Exception as e:
        traceback.print_exc()
    finally:
        if conn:
            conn.quit()


if __name__ == '__main__':
    app.run("0.0.0.0", "5010")

腳本主要有2個地方加入和調用了Celery生成的任務

  • 創(chuàng)建Celery異步任務:腳本中使用@celery.task(name='main.send_async_email')裝飾器構建了一個Celery任務,他的作用是把普通的Python函數(shù)包裝成Celery任務給隊列異步執(zhí)行,其中需要指定name為模塊名.函數(shù)名,否則Celery會找不到這個任務(沒有注冊成功)
  • 執(zhí)行Celery異步任務:調用了send_async_email.delay()方法,如果不加delay就是調用普通的Python函數(shù)

下一步啟動Flask,最終的Web界面顯示效果如下


(3)啟動Celery消費者

只有啟動了消費者,腳本中需要被傳輸給中間件的任務才會被執(zhí)行,下一步啟動Celery的消費者,在項目根目錄下命令行中輸入

root@ubuntu:~/myproject/CELERY_TEST# celery -A celery_test.main:celery worker -l info
/opt/anaconda3/lib/python3.7/site-packages/celery/platforms.py:835: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@ubuntu (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-4.15.0-54-generic-x86_64-with-debian-buster-sid 2021-07-22 19:57:25
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_test.main:0x7f6430716690
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . main.send_async_email

[2021-07-22 19:57:25,863: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2021-07-22 19:57:26,144: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-07-22 19:57:26,169: INFO/MainProcess] mingle: searching for neighbors
[2021-07-22 19:57:27,192: INFO/MainProcess] mingle: all alone
[2021-07-22 19:57:27,231: INFO/MainProcess] celery@ubuntu ready.

出現(xiàn)最后一行ready表示消費者啟動成功,同時可以看到celery配置的相關參數(shù):

  • app:應用在celery_test.main模塊下
  • transport:消息隊列使用redis存儲,地址是localhost:6379/0
  • results:完成結果存儲在redis,地址是localhost:6379/0
  • concurrency:線程4
  • [tasks]:執(zhí)行的任務是main模塊下的send_async_email方法

Celery可以采用后臺運行的方式,并且將日志寫入指定目錄,方法是使用celery multi來管理任務的啟動和停止等,指定logfile日志目錄和pidfile的pid目錄,以一個自定義的命名來定義任務名,比如web

root@ubuntu# celery multi start web -A celery_test.main:celery -l info --logfile=logs/celery_%n.log --pidfile=celery_%n.pid
celery multi v5.1.2 (sun-harmonics)
> Starting nodes...
    > web@ubuntu: OK(sun-harmonics)
> Starting nodes...
    > web@ubuntu: OK
root@ubuntu:~/myproject
root@ubuntu:~/myproject/CELERY_TEST# ls
celery_test  celery_web.pid  logs  __pycache__  settings.py
root@ubuntu:~/myproject/CELERY_TEST/logs# ls
celery_web.log

任務在后臺啟動,并在在目錄下生成了pid文件也log目錄

(4)異步任務測試

在界面輸入郵箱地址發(fā)送郵件,點擊send發(fā)送,此時是采用delay方式執(zhí)行函數(shù),采用異步發(fā)送郵件,視圖函數(shù)中使用time進行計時,假設在發(fā)送郵件下面還有其他任務,記錄這些總共花費的時間
異步方式執(zhí)行完發(fā)送郵件和其他任務總計花費8ms

127.0.0.1 - - [23/Jul/2021 14:32:44] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:44] "GET / HTTP/1.1" 200 -
0.009320974349975586
127.0.0.1 - - [23/Jul/2021 14:32:46] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:46] "GET / HTTP/1.1" 200 -
0.008226156234741211
127.0.0.1 - - [23/Jul/2021 14:32:48] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:48] "GET / HTTP/1.1" 200 -
0.007288694381713867

然后使用原始的Python函數(shù)不調用delay(send_async_email("直接發(fā)送", email)),需要800ms左右,并且網頁端明顯感覺有卡頓

0.8022255897521973
127.0.0.1 - - [23/Jul/2021 14:35:00] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:00] "GET / HTTP/1.1" 200 -
0.7504754066467285
127.0.0.1 - - [23/Jul/2021 14:35:02] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:02] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2021 14:35:04] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:04] "GET / HTTP/1.1" 200 -
0.8088760375976562
0.8098959922790527

進一步修改html加入計算機的功能,將執(zhí)行函數(shù)改為直接sleep 10秒,可以在Web界面發(fā)現(xiàn)提交之后等10秒才能得到計算結果

@celery.task(name='main.send_async_email')
def send_async_email(message, to_email):
    import time
    time.sleep(10)
@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    email = request.form.get("email")
    val1 = request.form.get("value1")
    val2 = request.form.get("value2")
    if request.form.get("submit") == "Send":
        # send_async_email.delay("直接發(fā)送", email)
        import time
        t = time.time()
        send_async_email("直接發(fā)送", email)
        # send_async_email.delay("異步發(fā)送", email)
        value3 = float(val1) + float(val2)
    return render_template("index.html", **locals())
(5)觀察Redis中存儲的隊列和處理結果

修改消息隊列的broker地址為redis庫1,結果處理地址為redis庫2,查看redis的數(shù)據

root@ubuntu:~# redis-cli 
127.0.0.1:6379> select 1;
(error) ERR invalid DB index
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
1) "_kombu.binding.celery.pidbox"
2) "_kombu.binding.celeryev"
3) "_kombu.binding.celery"
127.0.0.1:6379[1]> select 2
OK
127.0.0.1:6379[2]> keys *
1) "celery-task-meta-4b64592e-bcc5-4dcf-b838-e0960b7c8e4b"
2) "celery-task-meta-80cb2a66-23f0-4e22-80af-7faa39f12094"
127.0.0.1:6379[2]> get celery-task-meta-80cb2a66-23f0-4e22-80af-7faa39f12094
"{\"status\": \"SUCCESS\", \"result\": null, \"traceback\": null, \"children\": [], \"date_done\": \"2021-07-23T07:19:06.224197\", \"task_id\": \"80cb2a66-23f0-4e22-80af-7faa39f12094\"}"

其中kombu是Celery使用kombu 維護消息隊列存儲的相關數(shù)據,celery-task-meta相關的數(shù)據是存儲結果數(shù)據,json類型,包含處理狀態(tài),時間,處理結果,任務id等

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容