詳解python3+flask+celery+redis
Celery是什么?
Celery是個異步分布式任務(wù)隊列。
通過Celery在后臺跑任務(wù)并不像用線程那么的簡單,但是用Celery的話,能夠使應(yīng)用有較好的可擴(kuò)展性,因為Celery是個分布式架構(gòu)。下面介紹Celery的三個核心組件。
生產(chǎn)者(Celery client)。生產(chǎn)者(Celery client)發(fā)送消息。在Flask上工作時,生產(chǎn)者(Celery client)在Flask應(yīng)用內(nèi)運行。
消費者(Celery workers)。消費者用于處理后臺任務(wù)。消費者(Celery client)可以是本地的也可以是遠(yuǎn)程的。我們可以在運行Flask的server上運行一個單一的消費者(Celery workers),當(dāng)業(yè)務(wù)量上漲之后再去添加更多消費者(Celery workers)。
消息傳遞者(message broker)。生產(chǎn)者(Celery client)和消費者(Celery workers)的信息的交互使用的是消息隊列(message queue)。Celery支持若干方式的消息隊列,其中最常用的是RabbitMQ和Redis.

話不多說上代碼先。
一、基本框架結(jié)構(gòu)

二、重要文件配置如下
在Flask中集成celery需要做到兩點:
創(chuàng)建celery的實例對象的名字必須是flask應(yīng)用程序app的名字,否則celery啟動會失??;
celery必須能順利加載初始化文件
1、__init__.py文件 (初始化flask與celery)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import *
import pymysql
pymysql.install_as_MySQLdb()
db= SQLAlchemy()
from celery import Celery
# Celery相關(guān)配置
CELERY_RESULT_BACKEND= "redis://localhost:6379/0"
CELERY_BROKER_URL= "redis://localhost:6379/0"
def create_app(config_name):
? ? app= Flask(__name__)
????app.config.from_object(config[config_name])
????config[config_name].init_app(app)
????db.init_app(app)
????register_blueprint(app)
????return app
def make_celery(app=None):
? ? app= app or create_app(os.getenv('FLASK_CONFIG')or 'default')
? ? ##在第一階段的基礎(chǔ)上 開始使用celery 任務(wù)調(diào)度,我這使用 redis 做為緩存服務(wù)器,安裝配置redis 這? ? ? 里不再贅述
????celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)
????celery.conf.update(app.config)
????TaskBase= celery.Task
????class ContextTask(TaskBase):
? ? ? ? abstract= True
? ? ? ? def __call__(self,*args,**kwargs):
? ? ? ? ? ? with app.app_context():
? ? ? ? ? ? ? ? return TaskBase.__call__(self,*args,**kwargs)
????celery.Task= ContextTask
????return celery
def register_blueprint(app):
? ? from app.mainimport main
????app.register_blueprint(main)
????from app.mailimport mail
????app.register_blueprint(mail)
????from app.testsimport tests
????app.register_blueprint(tests)
2、tasks.py文件
"""
執(zhí)行的任務(wù)文件
"""
from .import make_celery
celery= make_celery(app=None)
@celery.task()
def add_together(a,b):
? ? return a + b
@celery.task()
def print_hello():
? ? print('Hello World!')
3.config.py 項目的配置文件
import os
basedir= os.path.abspath(os.path.dirname(__file__))
class config:
? ? SECRET_KEY= os.environ.get('SECRET_KEY')or 'this is a secret string'
? ? SQLALCHEMY_TRACK_MODIFICATIONS= True
? ? @staticmethod
? ? def init_app(app):
? ? ? ? pass
class DevelopmentConfig(config):
? ? DEBUG= True
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
class TestingConfig(config):
? ? TESTING= True
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
class ProductionConfig(config):
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
config= {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
4、manage.py (flask框架項目啟動運行文件)
import os
from appimport create_app, db
from flask_scriptimport Manager, Shell
from flask_migrateimport Migrate, MigrateCommand
app= create_app(os.getenv('FLASK_CONFIG')or 'default')
manager= Manager(app)
migrate= Migrate(app, db)
def make_shell_context():
? ? return dict(app=app,db=db)
manager.add_command("shell",Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)
if __name__== '__main__':
? ? manager.run()
5、views.py 文件 (flask主要接口業(yè)務(wù))
from .import main
from flaskimport Flask,request, jsonify
from app.tasksimport *
@main.route("/api/task_start",methods=['POST'])
def task_start():
? ? result= add_together.delay(10,20)
????print(result.wait())
????return jsonify({"msg":"Welcome to my app!"})
6、啟動項目步驟:
1)啟動falsk框架:python manage.py runserver -h 127.0.0.1 -p 8090
2)?啟動 Celery Worker::??celery -A app.tasks worker --loglevel=info
注意:如果flask和 celery 聯(lián)合用的時候發(fā)現(xiàn)報了個錯誤:
NotImplementedError: No result backend is configured.
用的時候是這么用的:
CELERY_RESULT_BACKEND= "redis://localhost:6379/0"
CELERY_BROKER_URL= "redis://localhost:6379/0"
celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)