代碼框架
- 整個(gè)項(xiàng)目包含api層、業(yè)務(wù)邏輯層和service層,在api層進(jìn)行入?yún)⒏袷叫r?yàn),業(yè)務(wù)層處理具體的業(yè)務(wù)信息,在service層進(jìn)行返回格式校驗(yàn),使用的框架為FastApi
Api層
基于pydantic設(shè)置參數(shù)的類型,對(duì)入?yún)⑦M(jìn)行格式校驗(yàn)
- 入?yún)⑿r?yàn)示例
from typing import Optional
from pydantic import BaseModel, ValidationError, validator
from datetime import datetime
class DatetimeMixin(BaseModel):
startDatetime: Optional[datetime] = '2000-01-01T00:00:00' # 設(shè)定時(shí)間的默認(rèn)值
endDatetime: Optional[datetime] = '2100-01-01T00:00:00'
class PageInfo(BaseModel):
page: int = 1 # 設(shè)定參數(shù)為int類型
size: int = 10
@validator('page', 'size')
def check_page(cls, v): # 檢驗(yàn)字段名是否符合要求
if v > 0:
return v
else:
raise ValidationError
- 出參校驗(yàn)示例
from pydantic import BaseModel
class ResponseModel(BaseModel): # 設(shè)置狀態(tài)碼和信息,直接繼承
statusCode: int
message: str
接口實(shí)現(xiàn)
fastapi使用 '.' 操作符獲取參數(shù),應(yīng)該在這一層獲取所有具體參數(shù),傳給service
- get實(shí)現(xiàn),入?yún)⑹褂肈epends注入依賴,表示將類對(duì)象RarioParams傳遞給query
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Depends
router = APIRouter()
@router.get('/ratio', response_model=RatioResponse, tags=Tag.Analysis.value)
def ratio(query: RatioParams = Depends()):
key = query.key
start = query.startDatetime
end = query.endDatetime
response = ratio_service.ratio_analysis(key, start, end)
return response
- post實(shí)現(xiàn)
@router.post('/aggregation', response_model=AggregationResponse, tags=Tag.Analysis.value) # tag是swagger的顯示欄標(biāo)簽
def aggregation(query: AggregationQuery):
aggs = query.agg
filters = query.filter
page_info = query.pageInfo
order = query.order
aggregation_service = AggregationService(aggs, page_info, filters, order)
response = aggregation_service.aggregation_analysis()
return response
- 后臺(tái)任務(wù)
@router.post('/upload', response_model=ResponseModel, tags=Tag.DataManagement.value)
def upload(background_tasks: BackgroundTasks, file: UploadFile = File(...)): # ...Ellipsis對(duì)象,相當(dāng)于占位符
background_tasks.add_task(service.upload_task, file.file, file.filename)
return service.response()
寫著寫著發(fā)現(xiàn)知識(shí)太零碎沒法寫了,還是過幾天專門記錄一下fastapi的學(xué)習(xí)吧