Staff Prep 13: Caching Strategies — Redis Patterns & Invalidation
Back to Part 12: Rate Limiting. Caching is the most effective single lever for read-heavy system performance. A Redis GET takes ~0.1ms. A Postgres query takes 5–50ms. At 1,000 req/s, caching can reduce database load by 90% or more. But caching has traps: stale data, thundering herds, memory leaks, and silent corruption. This is the complete caching playbook.
Pattern 1: cache-aside (lazy loading)
import json
import redis.asyncio as redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
async def get_user_profile(user_id: int) -> dict:
key = f"user:{user_id}:profile"
# 1. Check cache
cached = await r.get(key)
if cached:
return json.loads(cached)
# 2. Cache miss — query DB
user = await db.fetchrow(
"SELECT id, name, email, role FROM users WHERE id = $1", user_id
)
if not user:
return None
# 3. Write to cache with TTL
await r.setex(key, 300, json.dumps(dict(user))) # 5-minute TTL
return dict(user)
async def update_user_profile(user_id: int, data: dict):
await db.execute(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
data["name"], data["email"], user_id
)
# Explicit invalidation on write — cache will rebuild on next read
await r.delete(f"user:{user_id}:profile")
Pattern 2: write-through
async def update_user_write_through(user_id: int, data: dict):
# Write to DB
await db.execute(
"UPDATE users SET name = $1 WHERE id = $2", data["name"], user_id
)
# Write to cache immediately (keeps cache warm, no staleness window)
user = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
await r.setex(f"user:{user_id}:profile", 300, json.dumps(dict(user)))
# When to use: read-heavy APIs where the cache MUST be fresh after a write
# Cost: every write does two hops (DB + cache)
# Risk: cache and DB can drift if the DB write succeeds but cache write fails
# Mitigation: wrap both in a try/except and invalidate on failure
The thundering herd: distributed lock solution
import asyncio
LOCK_TTL = 5 # seconds
async def get_with_stampede_protection(key: str, rebuild_fn, ttl: int = 300):
"""Cache-aside with distributed lock to prevent thundering herd."""
# Fast path: cache hit
cached = await r.get(key)
if cached:
return json.loads(cached)
lock_key = f"lock:{key}"
# Try to acquire distributed lock (SET NX = only if key does not exist)
acquired = await r.set(lock_key, "1", nx=True, ex=LOCK_TTL)
if acquired:
# This worker rebuilds the cache
try:
value = await rebuild_fn()
await r.setex(key, ttl, json.dumps(value))
return value
finally:
await r.delete(lock_key)
else:
# Another worker is rebuilding — wait briefly and retry
for _ in range(10): # up to 500ms total wait
await asyncio.sleep(0.05)
cached = await r.get(key)
if cached:
return json.loads(cached)
# Fallback: if lock holder is taking too long, rebuild anyway
return await rebuild_fn()
# Usage
async def get_expensive_report(report_id: int):
key = f"report:{report_id}"
return await get_with_stampede_protection(
key=key,
rebuild_fn=lambda: generate_report(report_id),
ttl=600,
)
TTL jitter: prevent cache avalanche
If 10,000 cache keys all have the same TTL and are all set at the same time (e.g., after a cache flush), they all expire at the same time. Every key becomes a thundering herd simultaneously. Add random jitter to TTLs to spread expirations.
import random
def jittered_ttl(base_ttl: int, jitter_fraction: float = 0.2) -> int:
"""Add up to +/-20% jitter to the base TTL."""
jitter = int(base_ttl * jitter_fraction)
return base_ttl + random.randint(-jitter, jitter)
# Instead of:
await r.setex(key, 300, value)
# Use:
await r.setex(key, jittered_ttl(300), value)
# TTL will be between 240 and 360 seconds
# Spreads cache expirations across a 2-minute window instead of all at once
Cache key design
import hashlib
import json
# Namespace keys to avoid collisions
def user_key(user_id: int) -> str:
return f"v1:user:{user_id}:profile" # versioned namespace
def query_key(sql: str, params: dict) -> str:
payload = json.dumps({"sql": sql, "params": params}, sort_keys=True)
hash_ = hashlib.md5(payload.encode()).hexdigest()
return f"v1:qcache:{hash_}"
# Pattern invalidation: delete all keys for a user
# Use a set to track related keys
async def set_user_cache(user_id: int, key: str, value: str, ttl: int):
await r.setex(key, ttl, value)
# Track the key in a set for bulk invalidation
await r.sadd(f"user:{user_id}:cache_keys", key)
await r.expire(f"user:{user_id}:cache_keys", ttl * 2)
async def invalidate_all_user_cache(user_id: int):
keys_set = f"user:{user_id}:cache_keys"
keys = await r.smembers(keys_set)
if keys:
await r.delete(*keys, keys_set) # delete all tracked keys + the set
Redis fallback: graceful degradation
async def get_user_resilient(user_id: int) -> dict:
key = f"v1:user:{user_id}:profile"
try:
cached = await r.get(key)
if cached:
return json.loads(cached)
except redis.RedisError:
# Redis is down — fall through to DB
# Log this for monitoring
pass
user = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
try:
if user:
await r.setex(key, jittered_ttl(300), json.dumps(dict(user)))
except redis.RedisError:
pass # Cannot write to cache — DB serves all traffic until Redis recovers
return dict(user)
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- A popular product page has TTL=300 seconds. At exactly T=300, 500 concurrent requests all miss the cache. Walk through the thundering herd problem and the distributed lock solution.
- You set
order_count = 150in cache with TTL=60s. A new order is placed at T=45. What are two approaches to keep the cache consistent? Trade-offs of each? - What is cache avalanche? How does TTL jitter prevent it?
- Your Redis instance goes down. Your API is using cache-aside. What should happen for users? How do you implement this graceful fallback?
- You never set a TTL when writing to Redis. Six months later, what do you observe in production? What is the mechanism?
Next up — Part 14: Auth & Authorization. JWT internals, refresh token rotation, OAuth2 flows, and RBAC vs ABAC.