實現(xiàn)簡單的python3+flask+celery+redis框架

詳解python3+flask+celery+redis

Celery是什么?

Celery是個異步分布式任務(wù)隊列。

通過Celery在后臺跑任務(wù)并不像用線程那么的簡單,但是用Celery的話,能夠使應(yīng)用有較好的可擴(kuò)展性,因為Celery是個分布式架構(gòu)。下面介紹Celery的三個核心組件。

生產(chǎn)者(Celery client)。生產(chǎn)者(Celery client)發(fā)送消息。在Flask上工作時,生產(chǎn)者(Celery client)在Flask應(yīng)用內(nèi)運行。

消費者(Celery workers)。消費者用于處理后臺任務(wù)。消費者(Celery client)可以是本地的也可以是遠(yuǎn)程的。我們可以在運行Flask的server上運行一個單一的消費者(Celery workers),當(dāng)業(yè)務(wù)量上漲之后再去添加更多消費者(Celery workers)。

消息傳遞者(message broker)。生產(chǎn)者(Celery client)和消費者(Celery workers)的信息的交互使用的是消息隊列(message queue)。Celery支持若干方式的消息隊列,其中最常用的是RabbitMQRedis.


話不多說上代碼先。


一、基本框架結(jié)構(gòu)

二、重要文件配置如下

在Flask中集成celery需要做到兩點:

創(chuàng)建celery的實例對象的名字必須是flask應(yīng)用程序app的名字,否則celery啟動會失??;

celery必須能順利加載初始化文件

1、__init__.py文件 (初始化flask與celery)

from flask import Flask

from flask_sqlalchemy import SQLAlchemy

from config import *

import pymysql

pymysql.install_as_MySQLdb()

db= SQLAlchemy()

from celery import Celery

# Celery相關(guān)配置

CELERY_RESULT_BACKEND= "redis://localhost:6379/0"

CELERY_BROKER_URL= "redis://localhost:6379/0"

def create_app(config_name):

? ? app= Flask(__name__)

????app.config.from_object(config[config_name])

????config[config_name].init_app(app)

????db.init_app(app)

????register_blueprint(app)

????return app

def make_celery(app=None):

? ? app= app or create_app(os.getenv('FLASK_CONFIG')or 'default')

? ? ##在第一階段的基礎(chǔ)上 開始使用celery 任務(wù)調(diào)度,我這使用 redis 做為緩存服務(wù)器,安裝配置redis 這? ? ? 里不再贅述

????celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)

????celery.conf.update(app.config)

????TaskBase= celery.Task

????class ContextTask(TaskBase):

? ? ? ? abstract= True

? ? ? ? def __call__(self,*args,**kwargs):

? ? ? ? ? ? with app.app_context():

? ? ? ? ? ? ? ? return TaskBase.__call__(self,*args,**kwargs)

????celery.Task= ContextTask

????return celery

def register_blueprint(app):

? ? from app.mainimport main

????app.register_blueprint(main)

????from app.mailimport mail

????app.register_blueprint(mail)

????from app.testsimport tests

????app.register_blueprint(tests)


2、tasks.py文件

"""

執(zhí)行的任務(wù)文件

"""

from .import make_celery

celery= make_celery(app=None)

@celery.task()

def add_together(a,b):

? ? return a + b

@celery.task()

def print_hello():

? ? print('Hello World!')


3.config.py 項目的配置文件

import os

basedir= os.path.abspath(os.path.dirname(__file__))

class config:

? ? SECRET_KEY= os.environ.get('SECRET_KEY')or 'this is a secret string'

? ? SQLALCHEMY_TRACK_MODIFICATIONS= True

? ? @staticmethod

? ? def init_app(app):

? ? ? ? pass

class DevelopmentConfig(config):

? ? DEBUG= True

? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'

class TestingConfig(config):

? ? TESTING= True

? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'

class ProductionConfig(config):

? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'

config= {

'development': DevelopmentConfig,

'testing': TestingConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}


4、manage.py (flask框架項目啟動運行文件)

import os

from appimport create_app, db

from flask_scriptimport Manager, Shell

from flask_migrateimport Migrate, MigrateCommand

app= create_app(os.getenv('FLASK_CONFIG')or 'default')

manager= Manager(app)

migrate= Migrate(app, db)

def make_shell_context():

? ? return dict(app=app,db=db)

manager.add_command("shell",Shell(make_context=make_shell_context))

manager.add_command('db', MigrateCommand)

if __name__== '__main__':

? ? manager.run()


5、views.py 文件 (flask主要接口業(yè)務(wù))

from .import main

from flaskimport Flask,request, jsonify

from app.tasksimport *

@main.route("/api/task_start",methods=['POST'])

def task_start():

? ? result= add_together.delay(10,20)

????print(result.wait())

????return jsonify({"msg":"Welcome to my app!"})


6、啟動項目步驟:

1)啟動falsk框架:python manage.py runserver -h 127.0.0.1 -p 8090

2)?啟動 Celery Worker::??celery -A app.tasks worker --loglevel=info

注意:如果flask和 celery 聯(lián)合用的時候發(fā)現(xiàn)報了個錯誤:

NotImplementedError: No result backend is configured.

用的時候是這么用的:

CELERY_RESULT_BACKEND= "redis://localhost:6379/0"

CELERY_BROKER_URL= "redis://localhost:6379/0"

celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容