背景
最近在項目中遇到一個情況,在我們的測試執(zhí)行結束時需要將測試結果更新到我們的缺陷跟蹤系統(tǒng)里面,但是同一時刻可能會有很多客戶端都有更新結果的需求。為了避免過度耦合并且盡量讓客戶端輕量級一些,我們抽取了測試結果更新相關的接口并將其以REST API的形式進行了封裝, 以便不同的客戶端程序能夠通過簡單的發(fā)送HTTP請求的方式完成相關任務,用戶也能使用自己喜歡的語言直接調(diào)用我們的服務。而我們實際在后端是通過調(diào)用Rally(我們的缺陷跟蹤系統(tǒng))提供的API進行更新的,單次更新結果的過程大概需要10-20s左右時間(中間包括一些資源占用趨勢圖的生成等操作),為了保證我們的web服務能夠處理此類耗時較長的任務并且在高并發(fā)的情況下不至于出現(xiàn)用戶體驗的大幅下降,于是我們決定將對應的后端任務轉為異步處理。下文就是筆者在使用Flask+Celery+Redis實現(xiàn)異步任務的一些總結。
應用
注:
項目的完整代碼放在GitHub上了,不想碼字的朋友可以直接下載:
git clone https://github.com/hdw868/async_flask.git
這里采用一個簡化的模型來說明如何實現(xiàn),首先是構建一個celery的應用, 這里我們使用Redis作為消息代理和存儲結果的后端,然后我們模擬一個啟動測試的任務,中間休眠30秒用來模擬長時間的測試執(zhí)行,整體的代碼沒有什么難點參考官方的教程即可:
tasks.py
import os
import time
from celery import Celery
celery_app = Celery(
'celery_app',
backend=os.getenv('REDIS_URI', 'redis://localhost:6379/0'),
broker=os.getenv('REDIS_URI', 'redis://localhost:6379/0')
)
@celery_app.task()
def launch_new_test(tc_id):
print(f'Provisioning environment for {tc_id}...')
# Simulate the test execution process
time.sleep(30)
print(f'{tc_id} is completed!')
return {"result": 'pass',
"testCaseId": tc_id
}
接著,我們編寫一個flask應用作為這個異步任務的前端,并且返回任務的id以便后續(xù)查詢?nèi)蝿諣顟B(tài)及結果,如下:
app.py
import os
from celery.result import AsyncResult
from flask import Flask, render_template, request, jsonify, url_for
from tasks import celery_app, launch_new_test
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'something hard to guess')
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html')
if request.form['submit'] == 'Launch':
tc_id = request.form['test_case_id']
result = launch_new_test.delay(tc_id)
summary = {"taskId": result.id,
"location": str(url_for('get_task_state', task_id=result.id))
}
return jsonify(summary), 202
@app.route('/tasks/<string:task_id>/state')
def get_task_state(task_id):
result = AsyncResult(task_id, app=celery_app)
summary = {
"state": result.state,
"result": result.result,
"id": result.id,
}
return jsonify(summary), 200
注:
- 為了演示方便,這里直接用了一個web前端來觸發(fā)任務,實際項目中應該使用的REST API形式來組織視圖函數(shù)。
- 對于異步任務,返回的狀態(tài)碼應該使用202,Accepted;body里面最好包含一個location和id相關信息,以便用戶可以根據(jù)此類信息進一步查詢?nèi)蝿盏臓顟B(tài);
index.html
<html>
<head>
<title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
{% for message in get_flashed_messages() %}
<p style="color: blue;">{{ message }}</p>
{% endfor %}
<form method="POST">
<p>Launch new test:
<input type="text" name="test_case_id" placeholder="test_case_id"></p>
<input type="submit" name="submit" value="Launch">
</form>
</body>
</html>
容器化
這里我們希望通過容器的方式來進行部署,對于鏡像構建,我們使用的配置如下:
Dockerfile
FROM python:3.7
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
COPY . .
CMD [ "flask", "run"]
EXPOSE 5000
Tips:
- 在安裝依賴的時候應該選擇用
--no-cache-dir的方式安裝依賴的包。這是因為從pip 6.0+開始,pip在下載對應包時會緩存對應的包文件,以便需要時直接使用。但是對于docker鏡像構建,這顯然是沒有必要的,并且會增加鏡像的大小,所以我們再安裝依賴時一般都會帶上--no-cache-dir的選項;- 鑒于國內(nèi)訪問官方pip鏡像源的感人速率,如果pip安裝包時特別慢或者超時,可以帶上
-i <pypi source site>??蛇x的站點網(wǎng)上搜一下,比如清華的 https://pypi.tuna.tsinghua.edu.cn/simple/。- 我們使用推薦的
flask run命令來啟動flask應用,通過環(huán)境變量的方式指定對應的host、port以及debug模式參數(shù),便于在不同的環(huán)境上進行開發(fā)測試,下面的docker-compose文件可以看到這一點。- 如果你的項目文件夾下面還有一些不希望打包進image的文件,比如測試代碼等,可以使用.dockerignore文件來排除這些文件或者文件夾,用法類似于.gitignore文件
由于我們的應用需要多個服務之間的協(xié)作,我這邊通過docker-compose來控制多個服務的啟停與管理,配置如下:
docker-compose.yml
version: '3.7'
services:
redis:
image: "redis:alpine"
hostname: redis
networks:
- redis-net
flask:
build: .
ports:
- 5000:5000
env_file:
- ~/.env
depends_on:
- redis
networks:
- redis-net
volumes:
- HelloQA:/usr/src/app/public
celery:
build: .
command: celery -A tasks.celery_app worker -l Info
env_file:
- ~/.env
depends_on:
- redis
networks:
- redis-net
volumes:
- HelloQA:/usr/src/app/public
networks:
redis-net:
volumes:
HelloQA:
需要注意的是這里指定了Redis服務的hostname,以便同一網(wǎng)絡的其他容器可以通過hostname訪問該容器, 在指定Redis的URI時我們只要指定其使用該hostname即可,例如我們得環(huán)境變量文件可以寫成下面的樣子:
~/.env
FLASK_DEBUG=FALSE
REDIS_URI="redis://redis:6379/0"
FLASK_APP=app.py
FLASK_RUN_HOST=0.0.0.0
部署
一切配置完畢后,在docker-compose.yml所在目錄下輸入如下命令就可以啟動整個應用了:
docker-compose up -d
一切順利的話服務應該全部起來了,這時通過如下命令就能查看各個容器的狀態(tài)了:
docker-compose ps
測試
確認一切OK以后,我們就可以訪問我們的web網(wǎng)頁,通過觸發(fā)幾個異步任務來測試一下功能是否正常。當用戶訪問flask的網(wǎng)站并且提交表單后會立即(不會被阻塞)返回類似如下的結果:
{
"location": "/tasks/cf103488-64e3-4868-aff6-abbbf79b4f31/state",
"taskId": "cf103488-64e3-4868-aff6-abbbf79b4f31"
}
用戶可以根據(jù)返回的JSON結果中的location字段進一步查詢具體的task狀態(tài),比如通過訪問http://127.0.0.1:5000/tasks/cf103488-64e3-4868-aff6-abbbf79b4f31/state 就會得到類似如下的JSON結果:
{
"id": "cf103488-64e3-4868-aff6-abbbf79b4f31",
"result": null,
"state": "PENDING"
}
而等到任務結束以后再次發(fā)送GET請求,則會得到類似如下的結果 :
{
"id": "cf103488-64e3-4868-aff6-abbbf79b4f31",
"result": {
"result": "pass",
"testCaseId": "11111"
},
"state": "SUCCESS"
}
至此,我們就完成了一個簡單的異步任務從構建到部署的全部過程。