app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}
這是官方提供的一個(gè)使用示例

Swagger中的標(biāo)識(shí)

點(diǎn)擊后
我們重新來觀察一下上面代碼中的細(xì)節(jié)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token"),我們可以發(fā)現(xiàn),關(guān)鍵的點(diǎn)在于參數(shù)tokenUrl="token",這仿佛是提供了一個(gè)path,讓我們點(diǎn)擊下Authorize

果不其然
這個(gè)參數(shù)聲明了token的認(rèn)證path,但是我們還沒有實(shí)現(xiàn)它的邏輯,所以會(huì)得到404。
async def read_items(token: str = Depends(oauth2_scheme)):,這里用到了依賴項(xiàng)Depends(oauth2_scheme)。返回值推測(cè)為認(rèn)證結(jié)果。我們稍后會(huì)說到。
官方又提供了一個(gè)案例
async def get_current_user(token: str = Depends(oauth2_scheme)):
```
token驗(yàn)證邏輯
```
return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
可以像這樣,使用子依賴,對(duì)token進(jìn)行處理,得到結(jié)果。通常會(huì)通過查數(shù)據(jù)庫返回用戶的model。
現(xiàn)在我們按照官方的完整案例修改一下
from datetime import datetime, timedelta
from typing import Optional
from starlette import status
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
USER_LIST = [
User(username="test", password="test_pw")
]
def get_user(username: str) -> User:
# 偽數(shù)據(jù)庫
for user in USER_LIST:
if user.username == username:
return user
form_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def create_token(user: User, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + expires_delta or timedelta(minutes=15)
return jwt.encode(
claims={"sub": user.username, "exp": expire},
key=SECRET_KEY,
algorithm=ALGORITHM
)
@app.post("/token")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
user: User = get_user(form_data.username)
if not user or user.password != form_data.password:
raise form_exception
access_token = create_token(user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def token_to_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username)
if user is None:
raise JWTError
except JWTError:
raise credentials_exception
return user
@app.get("/items/", response_model=User)
async def read_items(user: User = Depends(token_to_user)):
return user
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
以上代碼是一個(gè)簡(jiǎn)單的模擬demo,它包含了用戶登錄獲取token,與進(jìn)行token認(rèn)證兩種功能,下面我們將慢慢解釋這些內(nèi)容。
預(yù)定義內(nèi)容
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
這里包含了安全認(rèn)證類,秘鑰,算法,過期時(shí)間的定義,以及兩個(gè)驗(yàn)證model。
模擬數(shù)據(jù)庫
假設(shè)通過用戶名從數(shù)據(jù)庫中獲取user的過程
USER_LIST = [
User(username="test", password="test_pw")
]
def get_user(username: str) -> User:
# 偽數(shù)據(jù)庫
for user in USER_LIST:
if user.username == username:
return user
登錄獲取token
form_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def create_token(user: User, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + expires_delta or timedelta(minutes=15)
return jwt.encode(
claims={"sub": user.username, "exp": expire},
key=SECRET_KEY,
algorithm=ALGORITHM
)
@app.post("/token")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
user: User = get_user(form_data.username)
if not user or user.password != form_data.password:
raise form_exception
access_token = create_token(user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
認(rèn)證token獲取用戶
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def token_to_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username)
if user is None:
raise JWTError
except JWTError:
raise credentials_exception
return user
@app.get("/items/", response_model=User)
async def read_items(user: User = Depends(token_to_user)):
return user
實(shí)際效果

登錄成功獲取token

攜帶token訪問API
下一部分我們將通過源碼解析來了解OAuth2.0的安全認(rèn)證