Staff Prep 14: Auth & Authorization — JWT, OAuth2, RBAC vs ABAC
Back to Part 13: Caching Strategies. Authentication (who are you) and authorization (what can you do) are adjacent problems solved by different tools. JWT handles the first without a database lookup on every request. OAuth2 handles delegated authorization between services. RBAC and ABAC are two access control models that scale very differently. A staff engineer needs to know all four, and more importantly, where each one stops working.
JWT internals: what is in the token
A JWT is three base64url-encoded parts separated by dots: header.payload.signature.
The server signs the payload with a secret key. Verification doesn't require a database lookup;
you just recompute the signature and compare. That's the whole appeal, and also the whole
problem, as we'll see in a minute.
import jwt
import time
from datetime import datetime, timedelta, timezone
SECRET_KEY = "your-secret-key-min-32-chars"
ALGORITHM = "HS256"
def create_access_token(user_id: int, role: str, expires_minutes: int = 15) -> str:
payload = {
"sub": str(user_id), # subject: user identifier
"role": role,
"iat": int(time.time()), # issued at
"exp": int((datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)).timestamp()),
"jti": generate_uuid(), # JWT ID: unique per token (for revocation)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
# JWT structure decoded:
# Header: {"alg": "HS256", "typ": "JWT"}
# Payload: {"sub": "42", "role": "admin", "exp": 1743710400}
# Signature: HMACSHA256(base64(header) + "." + base64(payload), secret)
The revocation problem and refresh token rotation
Here's the uncomfortable truth about JWTs. They're stateless, which means once issued, you can't revoke them before they expire unless you maintain a denylist (which puts you back in the database on every request, which was the whole thing you were trying to avoid). The workable answer is short-lived access tokens (15 minutes is my default) plus long-lived refresh tokens stored server-side.
import secrets
# Refresh token: stored in DB, one per user session
async def create_refresh_token(user_id: int, db) -> str:
token = secrets.token_urlsafe(32) # cryptographically random
expires_at = datetime.now(timezone.utc) + timedelta(days=30)
await db.execute(
"""INSERT INTO refresh_tokens (user_id, token_hash, expires_at, created_at)
VALUES ($1, $2, $3, NOW())""",
user_id, hash_token(token), expires_at
)
return token
async def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
"""Refresh token rotation: exchange old refresh token for new pair."""
token_hash = hash_token(old_token)
# Find and validate old token
row = await db.fetchrow(
"SELECT user_id, used, expires_at FROM refresh_tokens WHERE token_hash = $1",
token_hash
)
if not row:
raise HTTPException(401, "Invalid refresh token")
if row["used"]:
# Token reuse detected — possible theft — invalidate ALL tokens for user
await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = $1", row["user_id"]
)
raise HTTPException(401, "Refresh token reuse detected — all sessions invalidated")
if row["expires_at"] < datetime.now(timezone.utc):
raise HTTPException(401, "Refresh token expired")
# Mark old token as used (cannot be used again)
await db.execute(
"UPDATE refresh_tokens SET used = true WHERE token_hash = $1", token_hash
)
# Issue new pair
new_refresh_token = await create_refresh_token(row["user_id"], db)
new_access_token = create_access_token(row["user_id"], role=row.get("role", "user"))
return new_access_token, new_refresh_token
FastAPI JWT dependency
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
payload = verify_access_token(token)
user_id = int(payload["sub"])
user = await db.get(User, user_id)
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return user
async def require_role(role: str):
"""Factory for role-based dependencies."""
async def check_role(user: User = Depends(get_current_user)) -> User:
if user.role != role:
raise HTTPException(403, f"Requires {role} role")
return user
return check_role
@app.get("/admin/users")
async def admin_list_users(admin: User = Depends(require_role("admin"))):
return await list_all_users()
Oauth2: delegated authorization flow
from fastapi.security import OAuth2AuthorizationCodeBearer
# OAuth2 Authorization Code Flow (for user-facing apps)
# 1. Redirect user to authorization server:
# GET /oauth/authorize?response_type=code&client_id=XYZ&redirect_uri=...&scope=read:profile
#
# 2. User logs in and approves; server redirects back:
# GET /callback?code=AUTHORIZATION_CODE
#
# 3. Exchange code for tokens (server-to-server):
async def exchange_code_for_token(code: str) -> dict:
response = await http_client.post(
"https://auth.example.com/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uri": REDIRECT_URI,
}
)
return response.json()
# Returns: {"access_token": "...", "refresh_token": "...", "expires_in": 3600}
# OAuth2 Client Credentials (for service-to-service)
async def get_service_token(client_id: str, client_secret: str) -> str:
response = await http_client.post(
"https://auth.example.com/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "read:orders write:fulfillment",
}
)
return response.json()["access_token"]
RBAC vs ABAC
RBAC is simple. Users have roles, roles have permissions, and it's easy to reason about in code and in audits. It scales fine to hundreds of roles. It breaks the moment you need conditional access like "a user can edit their own posts but not anyone else's," because the role doesn't know which resource you're acting on.
ABAC lets you write richer policies that take attributes of the user and the resource into account. It's more expressive, and it's harder to audit. Answering "who can access resource X?" now means evaluating every policy against every user, which nobody enjoys at 2am during an incident.
PERMISSIONS = {
"admin": {"read:any", "write:any", "delete:any"},
"editor": {"read:any", "write:own", "delete:own"},
"viewer": {"read:any"},
}
def has_permission(user: User, permission: str) -> bool:
role_permissions = PERMISSIONS.get(user.role, set())
return permission in role_permissions
# RBAC breaks on ownership: "write:own" is not expressive enough
# You need to know WHICH resource is "own"
# ABAC (Attribute-Based Access Control): evaluate policies with context
def can_edit_post(user: User, post: Post) -> bool:
# Policy: user can edit if they own the post OR are an admin
return user.id == post.author_id or user.role == "admin"
def can_access_tenant_data(user: User, resource: dict) -> bool:
# Policy: user can access data if same tenant AND has required role
return (
user.tenant_id == resource["tenant_id"]
and user.role in resource["allowed_roles"]
)
# ABAC is more expressive but harder to audit
# "Who can access resource X?" requires evaluating policies for all users
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- A user's JWT access token is stolen. It expires in 15 minutes. What can you do right now to invalidate it before expiration? What does this cost architecturally?
- What is refresh token rotation? What happens when a stolen refresh token is detected being reused?
- What is the difference between OAuth2 Authorization Code flow and Client Credentials flow? When would you use each?
- Your RBAC has roles: admin, manager, viewer. A manager should be able to approve requests only within their department. How does pure RBAC fail here? How would ABAC solve it?
- JWT claims are base64-encoded, not encrypted. What information should you never put in a JWT payload? Why?
Next up: Part 15: Python GIL Explained. What the GIL actually is, when it hurts, and what Python 3.13 changes.