FastAPI 认证指南
FastAPI 完整认证模式:JWT、OAuth2 密码流、HTTP Bearer 和 API 密钥方案。
1. JWT Token 创建与验证
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_token(data: dict, expires_in: timedelta) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + expires_in
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_access_token(user_id: int, role: str) -> str:
return create_token({"sub": str(user_id), "role": role}, timedelta(minutes=30))
2. OAuth2 密码流
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
@app.post("/auth/token")
async def login(form: OAuth2PasswordRequestForm = Depends(), db: DB = Depends(get_db)):
user = db.query(User).filter(User.email == form.username).first()
if not user or not pwd_ctx.verify(form.password, user.hashed_password):
raise HTTPException(status_code=401, detail="邮箱或密码错误")
return {
"access_token": create_access_token(user.id, user.role),
"token_type": "bearer",
}
3. 当前用户依赖
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: DB) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="token 验证失败")
user = db.query(User).get(int(payload["sub"]))
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="用户不存在或已禁用")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
@app.get("/me")
async def me(user: CurrentUser):
return user
4. API 密钥认证
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def get_api_key(key: Annotated[str, Depends(api_key_header)], db: DB) -> APIKey:
api_key = db.query(APIKey).filter(APIKey.key == key, APIKey.is_active == True).first()
if not api_key:
raise HTTPException(status_code=403, detail="API 密钥无效")
return api_key
@app.get("/data", dependencies=[Depends(get_api_key)])
async def get_data():
return {"data": "..."}
5. 认证方案对比
| 方案 | 请求头 | 适用场景 |
|---|---|---|
| OAuth2 Bearer | Authorization: Bearer | Web/移动应用 |
| API Key (头部) | X-API-Key: key | 服务器对服务器 |
| API Key (查询) | ?api_key=key | 简单集成 |
| Basic Auth | Authorization: Basic | 内部工具 |