FastAPI處理OAuth2

最近在重新學(xué)習(xí)FastAPI的文檔,認證這部分相對獨立,簡單做個demo,基本都是官網(wǎng)的內(nèi)容,稍作修改。

官網(wǎng)鏈接:https://fastapi.tiangolo.com/zh/tutorial/security/oauth2-jwt/

一、基本概念

JWT(Json Web Token),基本的概念可以在網(wǎng)上搜到很多,目前認證的主流都是采用token的方式。

一個典型的JWT的結(jié)構(gòu)是xxxxx.yyyyy.zzzzz

其中第一部分是headers,第二部分payload,第三部分Signature。核心內(nèi)容保存在payload中。payload中有一些固定參數(shù)名稱的意義:

  • iss 【issuer】發(fā)布者的url地址

  • sub 【subject】該JWT所面向的用戶,用于處理特定應(yīng)用,不是常用的字段

  • aud 【audience】接受者的url地址

  • exp 【expiration】 該jwt銷毀的時間;unix時間戳

  • nbf 【not before】 該jwt的使用時間不能早于該時間;unix時間戳

  • iat 【issued at】 該jwt的發(fā)布時間;unix 時間戳

  • jti 【JWT ID】 該jwt的唯一ID編號

當(dāng)然也支持在payload中自定義參數(shù)。

二、demo

第一步,實現(xiàn)用戶的密碼加解密,模擬數(shù)據(jù)庫返回用戶信息

from passlib.context import CryptContext
from pydantic import BaseModel
from typing import Optional

# 所有用到的Schema信息
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class User(BaseModel):
    username: str
    email: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

# 模擬數(shù)據(jù)庫,保存用戶信息
# 其中hashed_password是加密過后的密碼
fake_users_db = {
    "guodabao": {
        "username": "guodabao",
        "email": "guodabao@example.com",
        "hashed_password": "$2b$12$HQfuwwNR857Pp4ySQDvXNOAZkblQuU4wBcsIJVxqsf3sYoiMf7W42",
        "disabled": False,
    }
}
# 通過passlib中CryptContext實現(xiàn)加密,解密
pwd_context = CryptContext(schemes=["bcrypt"]) # schemes指定加密方式

# 驗證密碼
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# 獲取密碼
def get_password_hash(password):
    return pwd_context.hash(password)

# 模擬從數(shù)據(jù)庫取用戶數(shù)據(jù)
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 通過密碼驗證并返回用戶信息
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

第二步,實現(xiàn)認證

from datetime import datetime, timedelta
from jose import JWTError, jwt
from starlette.status import HTTP_401_UNAUTHORIZED

# JTW需要的基本信息
# 通過openssl rand -hex 32 生成的秘鑰
SECRET_KEY = "d6f72340d4afe840c036ba5d593f7fd36c3c811602f77abebdef8127b74ddce2" 
ALGORITHM = "HS256" # 加密簽名算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # token保留時長
TOKEN_URL = "access-token"  # 獲取token的URL


# 創(chuàng)建JWT的核心邏輯
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy() # JWT中payload中的信息
    if expires_delta:       # 設(shè)置token銷毀時長
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})  # 更新payload中exp
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 加密,獲得JTW
    return encoded_jwt

# 對token解密,還原user
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解密
        username: str = payload.get("username") # 本例通過username作為Key
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

第三步,實現(xiàn)API

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=TOKEN_URL)

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/access-token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"username": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

@app.post("/password/")  # 獲取fake_users_db中password的加密密碼
def get_hashed_password(password: str):
    return get_password_hash(password)

三、完成代碼

from datetime import datetime, timedelta
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from starlette.status import HTTP_401_UNAUTHORIZED
from typing import Optional


SECRET_KEY = "d6f72340d4afe840c036ba5d593f7fd36c3c811602f77abebdef8127b74ddce2"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
TOKEN_URL = "access-token"


fake_users_db = {
    "guodabao": {
        "username": "guodabao",
        "email": "guodabao@example.com",
        "hashed_password": "$2b$12$HQfuwwNR857Pp4ySQDvXNOAZkblQuU4wBcsIJVxqsf3sYoiMf7W42",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=TOKEN_URL)

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("username")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/access-token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"username": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]


@app.post("/password/")
def get_hashed_password(password: str):
    return get_password_hash(password)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容