Staff Prep 20: API Design at Scale — Idempotency, ETags & Versioning
Back to Part 19: Connection Pooling. APIs at scale deal with flaky clients and flakier networks. Idempotency keys save you the day a customer gets charged twice. ETags save bandwidth you did not realise you were burning. And a real versioning strategy is the difference between shipping a breaking change as a quiet migration versus an angry Slack thread at 11pm. Most of what follows is boring by design, which is the whole point.
Idempotency keys: preventing duplicate mutations
A client sends a payment request. The server processes it, then the network drops before the response gets back. The client retries. Without idempotency, the payment runs twice and your support queue lights up. Keys make mutations safe to retry by giving each attempt a stable identifier the server can deduplicate on.
from fastapi import Header, HTTPException, Depends
from typing import Optional
import json
async def idempotency_check(
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
redis_client = Depends(get_redis),
):
"""Middleware-style dependency for idempotent endpoints."""
if not idempotency_key:
return None # no key: proceed normally
cache_key = f"idem:{idempotency_key}"
cached = await redis_client.get(cache_key)
if cached:
# Already processed — return the original response
data = json.loads(cached)
raise HTTPException(
status_code=data["status_code"],
detail=data["body"],
headers={"X-Idempotent-Replay": "true"},
)
return idempotency_key
@app.post("/payments")
async def create_payment(
payment: PaymentCreate,
idempotency_key: Optional[str] = Depends(idempotency_check),
db: AsyncSession = Depends(get_db),
redis_client = Depends(get_redis),
):
# Process payment
result = await process_payment(payment, db)
# Store result in Redis (TTL: 24 hours)
if idempotency_key:
cache_key = f"idem:{idempotency_key}"
await redis_client.setex(
cache_key,
86400,
json.dumps({"status_code": 201, "body": result.dict()})
)
return result
Etags: conditional requests and cache validation
import hashlib
from fastapi import Request, Response
from fastapi.responses import JSONResponse
def compute_etag(data: dict) -> str:
content = json.dumps(data, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
@app.get("/products/{id}")
async def get_product(
id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
product = await db.get(Product, id)
if not product:
raise HTTPException(404)
etag = f'"{compute_etag(product.dict())}"'
# Check If-None-Match header (client's cached ETag)
if request.headers.get("If-None-Match") == etag:
return Response(status_code=304) # Not Modified — client uses cached version
response = JSONResponse(content=product.dict())
response.headers["ETag"] = etag
response.headers["Cache-Control"] = "max-age=60, must-revalidate"
return response
# Client flow:
# GET /products/42 → 200 with ETag: "abc123"
# GET /products/42 with If-None-Match: "abc123" → 304 if unchanged (no body sent)
# GET /products/42 with If-None-Match: "abc123" → 200 with new ETag if product changed
API versioning strategies
from fastapi import FastAPI, APIRouter
app = FastAPI()
# Strategy 1: URL versioning (most common, most visible)
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")
@v1_router.get("/users/{id}")
async def get_user_v1(id: int):
return {"id": id, "name": "John Doe"} # old response shape
@v2_router.get("/users/{id}")
async def get_user_v2(id: int):
return {"id": id, "full_name": "John Doe", "avatar_url": "..."} # new shape
app.include_router(v1_router)
app.include_router(v2_router)
# Strategy 2: Header versioning (cleaner URLs, harder to test in browser)
@app.get("/users/{id}")
async def get_user_versioned(
id: int,
api_version: str = Header(default="1", alias="API-Version"),
):
if api_version == "2":
return await get_user_v2_impl(id)
return await get_user_v1_impl(id)
# Strategy 3: Deprecation headers (communicate sunset dates)
@v1_router.get("/users/{id}")
async def get_user_v1_deprecated(id: int):
response = await get_user_v1_impl(id)
response.headers["Deprecation"] = "true"
response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
response.headers["Link"] = '; rel="successor-version"'
return response
Bulk endpoints: preventing N+1 at the API level
from pydantic import BaseModel, Field
class BulkUserRequest(BaseModel):
ids: list[int] = Field(min_length=1, max_length=100)
class BulkOperationResult(BaseModel):
succeeded: list[dict]
failed: list[dict]
@app.post("/users/batch")
async def get_users_bulk(
request: BulkUserRequest,
db: AsyncSession = Depends(get_db),
) -> BulkOperationResult:
# Single query for all IDs (N+1 prevention)
users = await db.execute(
select(User).where(User.id.in_(request.ids))
)
user_map = {u.id: u for u in users.scalars().all()}
succeeded = []
failed = []
for id_ in request.ids:
if id_ in user_map:
succeeded.append(user_map[id_].dict())
else:
failed.append({"id": id_, "error": "not_found"})
return BulkOperationResult(succeeded=succeeded, failed=failed)
# Bulk create with partial success
@app.post("/orders/bulk")
async def create_orders_bulk(orders: list[OrderCreate], db: AsyncSession = Depends(get_db)):
results = []
async with db.begin():
for order in orders:
try:
new_order = await create_order(order, db)
results.append({"status": "created", "id": new_order.id})
except ValidationError as e:
results.append({"status": "failed", "error": str(e)})
# Continue processing remaining orders
return {"results": results}
Rate limit headers: communicating limits to clients
from fastapi import Response
@app.get("/data")
async def get_data(user: User = Depends(get_current_user), response: Response = None):
rate_info = await get_rate_limit_info(user.id)
response.headers.update({
"X-RateLimit-Limit": str(rate_info["limit"]),
"X-RateLimit-Remaining": str(rate_info["remaining"]),
"X-RateLimit-Reset": str(rate_info["reset_at"]), # Unix timestamp
"X-RateLimit-Window": "60", # seconds
})
return await fetch_data()
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- A mobile client sends a payment request. The server processes it but the response is lost. The client retries 3 times. Without idempotency keys, what happens? How do keys prevent it?
- What HTTP status code do you return when an ETag matches and the resource has not changed? What body do you send?
- Compare URL versioning (/v1, /v2) vs header versioning. What are the operational trade-offs for each when running two versions in parallel?
- A client needs data for 100 users. They call
GET /users/{id}100 times. How do you design a bulk endpoint that solves this? What are the limits and why? - Your API has
GET /users/{id}returning v1 format. You need to add a required fieldfull_namethat replacesname. How do you make this change without breaking existing clients?
Next up: Part 21: Postgres MVCC & Concurrency. MVCC internals, visibility rules, write skew, and the transaction isolation model in depth.