FastAPI Authentication Guide
Complete authentication patterns for FastAPI including JWT, OAuth2 password flow, HTTP Bearer, and API key schemes.
1. JWT Token Creation & Verification
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
import os
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = timedelta(minutes=30)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_token(data: dict, expires_in: timedelta) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + expires_in
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_access_token(user_id: int, role: str) -> str:
return create_token({"sub": str(user_id), "role": role, "type": "access"}, ACCESS_TOKEN_EXPIRE)
def create_refresh_token(user_id: int) -> str:
return create_token({"sub": str(user_id), "type": "refresh"}, REFRESH_TOKEN_EXPIRE)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError as e:
raise HTTPException(status_code=401, detail=str(e))
2. OAuth2 Password Flow
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
@app.post("/auth/token", response_model=TokenResponse)
async def login(form: OAuth2PasswordRequestForm = Depends(), db: DB = Depends(get_db)):
user = db.query(User).filter(User.email == form.username).first()
if not user or not pwd_ctx.verify(form.password, user.hashed_password):
raise HTTPException(
status_code=401,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
return TokenResponse(
access_token=create_access_token(user.id, user.role),
refresh_token=create_refresh_token(user.id),
)
@app.post("/auth/refresh")
async def refresh(refresh_token: str, db: DB = Depends(get_db)):
payload = decode_token(refresh_token)
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Not a refresh token")
user = db.query(User).get(int(payload["sub"]))
return {"access_token": create_access_token(user.id, user.role), "token_type": "bearer"}
3. Current User Dependency
from typing import Annotated
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: DB,
) -> User:
payload = decode_token(token)
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token type")
user = db.query(User).get(int(payload["sub"]))
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="Inactive or missing user")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
def require_role(*roles: str):
async def checker(current_user: CurrentUser) -> User:
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return checker
@app.get("/me")
async def me(user: CurrentUser):
return user
@app.delete("/admin/users/{user_id}")
async def admin_delete(user_id: int, _: Annotated[User, Depends(require_role("admin"))], db: DB):
db.query(User).filter(User.id == user_id).delete()
db.commit()
4. API Key Authentication
from fastapi.security import APIKeyHeader, APIKeyQuery
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
async def get_api_key(
header_key: Annotated[str | None, Depends(api_key_header)],
query_key: Annotated[str | None, Depends(api_key_query)],
db: DB,
) -> APIKey:
token = header_key or query_key
if not token:
raise HTTPException(status_code=403, detail="API key required")
key = db.query(APIKey).filter(APIKey.key == token, APIKey.is_active == True).first()
if not key:
raise HTTPException(status_code=403, detail="Invalid API key")
key.last_used_at = datetime.utcnow()
db.commit()
return key
@app.get("/data", dependencies=[Depends(get_api_key)])
async def get_data():
return {"data": "..."}
5. HTTPBearer Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
bearer_scheme = HTTPBearer(auto_error=False)
async def verify_bearer(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_scheme)],
) -> dict:
if not credentials:
raise HTTPException(status_code=401, detail="Bearer token required")
return decode_token(credentials.credentials)
@app.get("/protected")
async def protected(payload: Annotated[dict, Depends(verify_bearer)]):
return {"user_id": payload["sub"]}
6. Auth Schemes Comparison
| Scheme | Header | Best For | FastAPI Class |
|---|---|---|---|
| OAuth2 Bearer | Authorization: Bearer | Web/mobile apps | OAuth2PasswordBearer |
| HTTP Bearer | Authorization: Bearer | Microservices | HTTPBearer |
| API Key (header) | X-API-Key: key | Server-to-server | APIKeyHeader |
| API Key (query) | ?api_key=key | Simple integrations | APIKeyQuery |
| Basic Auth | Authorization: Basic | Internal tools | HTTPBasic |