基于Flask+Celery+Redis的異步任務實現(xiàn)與容器化部署

背景

最近在項目中遇到一個情況,在我們的測試執(zhí)行結束時需要將測試結果更新到我們的缺陷跟蹤系統(tǒng)里面,但是同一時刻可能會有很多客戶端都有更新結果的需求。為了避免過度耦合并且盡量讓客戶端輕量級一些,我們抽取了測試結果更新相關的接口并將其以REST API的形式進行了封裝, 以便不同的客戶端程序能夠通過簡單的發(fā)送HTTP請求的方式完成相關任務,用戶也能使用自己喜歡的語言直接調(diào)用我們的服務。而我們實際在后端是通過調(diào)用Rally(我們的缺陷跟蹤系統(tǒng))提供的API進行更新的,單次更新結果的過程大概需要10-20s左右時間(中間包括一些資源占用趨勢圖的生成等操作),為了保證我們的web服務能夠處理此類耗時較長的任務并且在高并發(fā)的情況下不至于出現(xiàn)用戶體驗的大幅下降,于是我們決定將對應的后端任務轉為異步處理。下文就是筆者在使用Flask+Celery+Redis實現(xiàn)異步任務的一些總結。

應用

注:
項目的完整代碼放在GitHub上了,不想碼字的朋友可以直接下載:
git clone https://github.com/hdw868/async_flask.git

這里采用一個簡化的模型來說明如何實現(xiàn),首先是構建一個celery的應用, 這里我們使用Redis作為消息代理和存儲結果的后端,然后我們模擬一個啟動測試的任務,中間休眠30秒用來模擬長時間的測試執(zhí)行,整體的代碼沒有什么難點參考官方的教程即可:

tasks.py

import os
import time

from celery import Celery

celery_app = Celery(
    'celery_app',
    backend=os.getenv('REDIS_URI', 'redis://localhost:6379/0'),
    broker=os.getenv('REDIS_URI', 'redis://localhost:6379/0')
)


@celery_app.task()
def launch_new_test(tc_id):
    print(f'Provisioning environment for {tc_id}...')
    # Simulate the test execution process
    time.sleep(30)
    print(f'{tc_id} is completed!')
    return {"result": 'pass',
            "testCaseId": tc_id
            }

接著,我們編寫一個flask應用作為這個異步任務的前端,并且返回任務的id以便后續(xù)查詢?nèi)蝿諣顟B(tài)及結果,如下:

app.py

import os

from celery.result import AsyncResult
from flask import Flask, render_template, request, jsonify, url_for

from tasks import celery_app, launch_new_test

app = Flask(__name__)

app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'something hard to guess')


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')
    if request.form['submit'] == 'Launch':
        tc_id = request.form['test_case_id']
        result = launch_new_test.delay(tc_id)
        summary = {"taskId": result.id,
                   "location": str(url_for('get_task_state', task_id=result.id))
                   }
        return jsonify(summary), 202


@app.route('/tasks/<string:task_id>/state')
def get_task_state(task_id):
    result = AsyncResult(task_id, app=celery_app)
    summary = {
        "state": result.state,
        "result": result.result,
        "id": result.id,
    }
    return jsonify(summary), 200

注:

  • 為了演示方便,這里直接用了一個web前端來觸發(fā)任務,實際項目中應該使用的REST API形式來組織視圖函數(shù)。
  • 對于異步任務,返回的狀態(tài)碼應該使用202,Accepted;body里面最好包含一個location和id相關信息,以便用戶可以根據(jù)此類信息進一步查詢?nèi)蝿盏臓顟B(tài);

index.html

<html>
<head>
    <title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
{% for message in get_flashed_messages() %}
<p style="color: blue;">{{ message }}</p>
{% endfor %}
<form method="POST">
    <p>Launch new test:
        <input type="text" name="test_case_id" placeholder="test_case_id"></p>
    <input type="submit" name="submit" value="Launch">
</form>
</body>
</html>

容器化

這里我們希望通過容器的方式來進行部署,對于鏡像構建,我們使用的配置如下:

Dockerfile

FROM python:3.7

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
COPY . .
CMD [ "flask", "run"]

EXPOSE 5000

Tips:

  • 在安裝依賴的時候應該選擇用--no-cache-dir的方式安裝依賴的包。這是因為從pip 6.0+開始,pip在下載對應包時會緩存對應的包文件,以便需要時直接使用。但是對于docker鏡像構建,這顯然是沒有必要的,并且會增加鏡像的大小,所以我們再安裝依賴時一般都會帶上--no-cache-dir的選項;
  • 鑒于國內(nèi)訪問官方pip鏡像源的感人速率,如果pip安裝包時特別慢或者超時,可以帶上-i <pypi source site>??蛇x的站點網(wǎng)上搜一下,比如清華的 https://pypi.tuna.tsinghua.edu.cn/simple/。
  • 我們使用推薦的flask run命令來啟動flask應用,通過環(huán)境變量的方式指定對應的host、port以及debug模式參數(shù),便于在不同的環(huán)境上進行開發(fā)測試,下面的docker-compose文件可以看到這一點。
  • 如果你的項目文件夾下面還有一些不希望打包進image的文件,比如測試代碼等,可以使用.dockerignore文件來排除這些文件或者文件夾,用法類似于.gitignore文件

由于我們的應用需要多個服務之間的協(xié)作,我這邊通過docker-compose來控制多個服務的啟停與管理,配置如下:

docker-compose.yml

version: '3.7'

services:
  redis:
    image: "redis:alpine"
    hostname: redis
    networks:
      - redis-net
      
  flask:
    build: .
    ports:
      - 5000:5000
    env_file:
      - ~/.env
    depends_on:
      - redis
    networks:
      - redis-net
    volumes:
      - HelloQA:/usr/src/app/public
      
  celery:
    build: .
    command: celery -A tasks.celery_app worker -l Info
    env_file:
      - ~/.env
    depends_on:
      - redis
    networks:
      - redis-net
    volumes:
      - HelloQA:/usr/src/app/public      
      
networks:
  redis-net:
  
volumes:
  HelloQA:

需要注意的是這里指定了Redis服務的hostname,以便同一網(wǎng)絡的其他容器可以通過hostname訪問該容器, 在指定Redis的URI時我們只要指定其使用該hostname即可,例如我們得環(huán)境變量文件可以寫成下面的樣子:

~/.env

FLASK_DEBUG=FALSE
REDIS_URI="redis://redis:6379/0"
FLASK_APP=app.py
FLASK_RUN_HOST=0.0.0.0

部署

一切配置完畢后,在docker-compose.yml所在目錄下輸入如下命令就可以啟動整個應用了:

docker-compose up -d

一切順利的話服務應該全部起來了,這時通過如下命令就能查看各個容器的狀態(tài)了:

docker-compose ps

測試

確認一切OK以后,我們就可以訪問我們的web網(wǎng)頁,通過觸發(fā)幾個異步任務來測試一下功能是否正常。當用戶訪問flask的網(wǎng)站并且提交表單后會立即(不會被阻塞)返回類似如下的結果:

{
"location": "/tasks/cf103488-64e3-4868-aff6-abbbf79b4f31/state",
"taskId": "cf103488-64e3-4868-aff6-abbbf79b4f31"
}

用戶可以根據(jù)返回的JSON結果中的location字段進一步查詢具體的task狀態(tài),比如通過訪問http://127.0.0.1:5000/tasks/cf103488-64e3-4868-aff6-abbbf79b4f31/state 就會得到類似如下的JSON結果:

{
"id": "cf103488-64e3-4868-aff6-abbbf79b4f31",
"result": null,
"state": "PENDING"
}

而等到任務結束以后再次發(fā)送GET請求,則會得到類似如下的結果 :

{
"id": "cf103488-64e3-4868-aff6-abbbf79b4f31",
"result": {
"result": "pass",
"testCaseId": "11111"
},
"state": "SUCCESS"
}

至此,我們就完成了一個簡單的異步任務從構建到部署的全部過程。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容