FastAPI 安全認(rèn)證 基礎(chǔ)使用

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

這是官方提供的一個(gè)使用示例

Swagger中的標(biāo)識(shí)

點(diǎn)擊后

我們重新來觀察一下上面代碼中的細(xì)節(jié)

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token"),我們可以發(fā)現(xiàn),關(guān)鍵的點(diǎn)在于參數(shù)tokenUrl="token",這仿佛是提供了一個(gè)path,讓我們點(diǎn)擊下Authorize

果不其然

這個(gè)參數(shù)聲明了token的認(rèn)證path,但是我們還沒有實(shí)現(xiàn)它的邏輯,所以會(huì)得到404。

async def read_items(token: str = Depends(oauth2_scheme)):,這里用到了依賴項(xiàng)Depends(oauth2_scheme)。返回值推測(cè)為認(rèn)證結(jié)果。我們稍后會(huì)說到。

官方又提供了一個(gè)案例

async def get_current_user(token: str = Depends(oauth2_scheme)):
    ```
    token驗(yàn)證邏輯
    ```
    return user

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

可以像這樣,使用子依賴,對(duì)token進(jìn)行處理,得到結(jié)果。通常會(huì)通過查數(shù)據(jù)庫返回用戶的model。

現(xiàn)在我們按照官方的完整案例修改一下

from datetime import datetime, timedelta
from typing import Optional
from starlette import status
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


class User(BaseModel):
    username: str
    password: str


class Token(BaseModel):
    access_token: str
    token_type: str


USER_LIST = [
    User(username="test", password="test_pw")
]


def get_user(username: str) -> User:
    # 偽數(shù)據(jù)庫
    for user in USER_LIST:
        if user.username == username:
            return user


form_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)


def create_token(user: User, expires_delta: Optional[timedelta] = None):
    expire = datetime.utcnow() + expires_delta or timedelta(minutes=15)
    return jwt.encode(
        claims={"sub": user.username, "exp": expire},
        key=SECRET_KEY,
        algorithm=ALGORITHM
    )


@app.post("/token")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user: User = get_user(form_data.username)
    if not user or user.password != form_data.password:
        raise form_exception
    access_token = create_token(user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    return {"access_token": access_token, "token_type": "bearer"}


credentials_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)


def token_to_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username, expire = payload.get("sub"), payload.get("exp")
        user = get_user(username)
        if user is None:
            raise JWTError
    except JWTError:
        raise credentials_exception
    return user


@app.get("/items/", response_model=User)
async def read_items(user: User = Depends(token_to_user)):
    return user


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

以上代碼是一個(gè)簡(jiǎn)單的模擬demo,它包含了用戶登錄獲取token,與進(jìn)行token認(rèn)證兩種功能,下面我們將慢慢解釋這些內(nèi)容。

預(yù)定義內(nèi)容

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class User(BaseModel):
    username: str
    password: str

class Token(BaseModel):
    access_token: str
    token_type: str

這里包含了安全認(rèn)證類,秘鑰,算法,過期時(shí)間的定義,以及兩個(gè)驗(yàn)證model。

模擬數(shù)據(jù)庫

假設(shè)通過用戶名從數(shù)據(jù)庫中獲取user的過程

USER_LIST = [
    User(username="test", password="test_pw")
]

def get_user(username: str) -> User:
    # 偽數(shù)據(jù)庫
    for user in USER_LIST:
        if user.username == username:
            return user

登錄獲取token

form_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)


def create_token(user: User, expires_delta: Optional[timedelta] = None):
    expire = datetime.utcnow() + expires_delta or timedelta(minutes=15)
    return jwt.encode(
        claims={"sub": user.username, "exp": expire},
        key=SECRET_KEY,
        algorithm=ALGORITHM
    )


@app.post("/token")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user: User = get_user(form_data.username)
    if not user or user.password != form_data.password:
        raise form_exception
    access_token = create_token(user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    return {"access_token": access_token, "token_type": "bearer"}

認(rèn)證token獲取用戶

credentials_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)


def token_to_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username, expire = payload.get("sub"), payload.get("exp")
        user = get_user(username)
        if user is None:
            raise JWTError
    except JWTError:
        raise credentials_exception
    return user


@app.get("/items/", response_model=User)
async def read_items(user: User = Depends(token_to_user)):
    return user

實(shí)際效果

登錄成功獲取token

攜帶token訪問API

下一部分我們將通過源碼解析來了解OAuth2.0的安全認(rèn)證

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容