** 背景 **
后臺(tái)開(kāi)發(fā)過(guò)程中,一個(gè)請(qǐng)求比較耗時(shí),大致過(guò)程如下:
- 根據(jù)前端傳入的兩個(gè)img_url去第三方下載圖片。
- 將獲取到的圖片進(jìn)行適當(dāng)縮放,并解析成base64編碼。
- 再將解析好的base64編碼的圖片拿去請(qǐng)求第三方。
** 處理過(guò)程 **
明顯的,以上三步均比較耗時(shí)。
所以,將其處理為異步是必然的。況且用的是tornado這個(gè)單進(jìn)程框架,不異步豈不是作死。
因?yàn)檎?qǐng)求會(huì)相對(duì)比較頻繁,單純的異步還不能完美解決。這個(gè)時(shí)候,初出茅廬的我當(dāng)然是想到了Celery(只想到了它)。
具體實(shí)施為:
- 當(dāng)接收到請(qǐng)求時(shí),項(xiàng)目對(duì)參數(shù)作詳細(xì)檢查,然后發(fā)送一個(gè)task任務(wù)到Celery的BROKER中,然后立即返回“請(qǐng)求成功”的結(jié)果至前端(結(jié)果僅僅表示前端的請(qǐng)求,后端接收成功,并開(kāi)始處理)。
- 事先啟動(dòng)好的worker進(jìn)程(可放在其他服務(wù)器上)從BROKER中讀取task任務(wù),并執(zhí)行。
- Celery的worker執(zhí)行完任務(wù)后,將得到的數(shù)據(jù)寫入數(shù)據(jù)庫(kù)。
這樣處理的好處是:
- 實(shí)現(xiàn)了異步,用戶不用“耐心”的等待。
- 分布式任務(wù)處理,解決的單機(jī)的cpu處理能力不足的問(wèn)題。
這樣處理的缺點(diǎn)是:
- 用戶不能立馬得到結(jié)果。
- 需要額外再次請(qǐng)求另一個(gè)查詢接口才能獲取數(shù)據(jù)。
** 解決的問(wèn)題 **
在執(zhí)行task的時(shí)候,task中需要讀寫數(shù)據(jù)庫(kù),但celery的worker是額外的進(jìn)程,并不能繼續(xù)使用項(xiàng)目中的數(shù)據(jù)庫(kù)對(duì)象(用的pony orm)來(lái)讀寫數(shù)據(jù)庫(kù)。所以,導(dǎo)致了調(diào)試過(guò)程中出現(xiàn)MySQL server has gone away或其他類似的數(shù)據(jù)庫(kù)連接或映射的錯(cuò)誤。
** 解決辦法 **
既然是因?yàn)閿?shù)據(jù)庫(kù)連接問(wèn)題,那么簡(jiǎn)單,直接再連接一次就行了。剛開(kāi)始的時(shí)候,懵逼般的在每次任務(wù)的時(shí)候去連接一次。后來(lái)在后續(xù)的代碼閱讀的過(guò)程中發(fā)現(xiàn)這樣處理有些不妥啊。任務(wù)很多的話豈不是會(huì)頻繁的去連接?后來(lái)發(fā)現(xiàn),在啟動(dòng)worker的命令中,需要指定一個(gè)task任務(wù)文件(就是被celery的task裝飾器修飾的任務(wù)函數(shù)),所以,可以在task所在文件的最外層作一個(gè)數(shù)據(jù)庫(kù)的連接。這樣的話,最終實(shí)現(xiàn)的就是每一個(gè)worker均有一個(gè)自己的數(shù)據(jù)庫(kù)連接對(duì)象,就像這樣的task文件:
# coding=utf-8
from celery import Celery, platforms
from configs.config import BROKER, DATABASE
from apps.ylzh.ylzh import PylzhProcess
from apps.zs.handler import FaceOneToOneProcess
from apps.commons.database import PonyDatabase
# 加try是因?yàn)轫?xiàng)目(不是worker)在執(zhí)行時(shí),已經(jīng)有了一個(gè)PonyDatabase
(這是一個(gè)我處理過(guò)的pony的Database的單例類)數(shù)據(jù)庫(kù)操作對(duì)象,
而項(xiàng)目因?yàn)橐l(fā)送task,所以會(huì)引入這個(gè)類,會(huì)導(dǎo)致db再次bind而發(fā)生異常
所以,用了try
try:
db = PonyDatabase()
db.bind('mysql', host=DATABASE['host'], db=DATABASE['db_name'], user=DATABASE['username'], passwd=DATABASE['password'] )
db.generate_mapping()
except Exception:
pass
app = Celery("celery_model", broker=BROKER) platforms.C_FORCE_ROOT = True
app.conf.update(CELERY_DEFAULT_QUEUE='default_queue')
@app.task()
def ylzh(user_data):
PylzhProcess().request_data(user_data)
...