Staff Prep 24: Redis Data Structures & Caching Patterns Beyond GET/SET
ArchitectureStaff

Staff Prep 24: Redis Data Structures & Caching Patterns Beyond GET/SET

April 4, 202610 min readPART 02 / 06

Back to Part 23: VACUUM & Bloat. Redis is far more than a cache. Its native data structures solve problems that would require complex database schemas otherwise. Leaderboards, rate limiters, real-time feeds, distributed locks, and approximate unique counts — all solvable with the right Redis primitive. This is the guide to using Redis as a data structure server, not just a cache.

Sorted sets (ZSET): leaderboards and ranked data

python
import redis.asyncio as redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# Leaderboard: ZADD user_id score
async def update_score(user_id: int, points: int):
    await r.zincrby("leaderboard:global", points, str(user_id))

async def get_top_10():
    # ZREVRANGE: highest scores first, with scores
    entries = await r.zrevrange("leaderboard:global", 0, 9, withscores=True)
    return [{"user_id": int(uid), "score": score} for uid, score in entries]

async def get_user_rank(user_id: int) -> int:
    rank = await r.zrevrank("leaderboard:global", str(user_id))
    return rank + 1 if rank is not None else None  # 1-indexed

async def get_user_score(user_id: int) -> float:
    return await r.zscore("leaderboard:global", str(user_id)) or 0

# Range query: get users with score between 1000 and 5000
async def get_users_in_score_range(min_score: float, max_score: float):
    return await r.zrangebyscore("leaderboard:global", min_score, max_score, withscores=True)

# Weekly leaderboard: key per week, TTL for auto-expiration
import datetime

def this_week_key() -> str:
    week = datetime.date.today().isocalendar()
    return f"leaderboard:{week.year}:w{week.week}"

async def update_weekly_score(user_id: int, points: int):
    key = this_week_key()
    await r.zincrby(key, points, str(user_id))
    await r.expire(key, 7 * 24 * 3600)  # auto-expire after 7 days

Redis streams: persistent message queue

python
import asyncio

# Producer: append events to stream
async def publish_event(event_type: str, data: dict):
    stream_id = await r.xadd(
        "events:user_activity",
        {"type": event_type, "data": json.dumps(data)},
        maxlen=100000,  # cap at 100k entries (FIFO eviction)
        approximate=True,  # approximate maxlen (faster)
    )
    return stream_id

# Consumer group: multiple workers share the stream
async def setup_consumer_group():
    try:
        await r.xgroup_create("events:user_activity", "analytics_workers", id="0", mkstream=True)
    except redis.ResponseError:
        pass  # group already exists

# Worker: pull and process events
async def process_events_worker(worker_id: str):
    await setup_consumer_group()

    while True:
        # Read up to 10 new messages (blocking for up to 2 seconds)
        messages = await r.xreadgroup(
            "analytics_workers",
            f"worker_{worker_id}",
            {"events:user_activity": ">"},  # ">" means undelivered messages
            count=10,
            block=2000,
        )

        if not messages:
            continue

        for stream_name, entries in messages:
            for message_id, data in entries:
                try:
                    await handle_event(data)
                    await r.xack("events:user_activity", "analytics_workers", message_id)
                except Exception as e:
                    # Leave unacknowledged — will be redelivered
                    print(f"Failed to process {message_id}: {e}")

Pub/sub: real-time notifications

python
import asyncio

# Publisher: broadcast event
async def notify_user(user_id: int, message: str):
    channel = f"notifications:{user_id}"
    await r.publish(channel, json.dumps({"message": message, "timestamp": time.time()}))

# Subscriber: WebSocket handler
async def websocket_handler(websocket, user_id: int):
    channel = f"notifications:{user_id}"
    pubsub = r.pubsub()
    await pubsub.subscribe(channel)

    try:
        async for message in pubsub.listen():
            if message["type"] == "message":
                await websocket.send_text(message["data"])
    finally:
        await pubsub.unsubscribe(channel)

# Pattern subscribe: all notifications
async def subscribe_all_notifications():
    pubsub = r.pubsub()
    await pubsub.psubscribe("notifications:*")
    async for message in pubsub.listen():
        if message["type"] == "pmessage":
            user_id = message["channel"].split(":")[1]
            print(f"Notification for user {user_id}: {message['data']}")

HyperLogLog: approximate unique counts at scale

python
import time

# Count unique page visitors — without storing every visitor ID
async def track_page_view(page_id: str, user_id: str):
    today = time.strftime("%Y-%m-%d")
    key = f"unique_visitors:{page_id}:{today}"
    await r.pfadd(key, user_id)  # PFADD for HyperLogLog
    await r.expire(key, 7 * 24 * 3600)

async def get_unique_visitors(page_id: str, date: str) -> int:
    key = f"unique_visitors:{page_id}:{date}"
    return await r.pfcount(key)  # returns approximate count (0.81% std error)
    # Accurate enough for analytics — uses only 12 KB regardless of cardinality
    # Counting 1 million unique users: 12 KB in Redis vs 8 MB for a SET

# Merge multiple HyperLogLogs: total unique across pages
async def total_unique_visitors(page_ids: list[str], date: str) -> int:
    keys = [f"unique_visitors:{pid}:{date}" for pid in page_ids]
    return await r.pfcount(*keys)  # pfcount with multiple keys = merged count

Distributed lock with redlock

python
import asyncio
import secrets

# Simple single-node distributed lock
async def acquire_lock(lock_name: str, ttl_seconds: int = 30) -> str | None:
    token = secrets.token_hex(16)  # unique token per lock acquisition
    key = f"lock:{lock_name}"

    # SET NX PX: set if not exists, with millisecond TTL
    acquired = await r.set(key, token, nx=True, px=ttl_seconds * 1000)
    return token if acquired else None

async def release_lock(lock_name: str, token: str) -> bool:
    key = f"lock:{lock_name}"
    # Lua: only release if we own the lock (prevents accidental release of another holder's lock)
    script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    result = await r.eval(script, 1, key, token)
    return bool(result)

# Usage
async def run_exclusive_job():
    token = await acquire_lock("daily_report", ttl_seconds=60)
    if not token:
        print("Another worker is running this job")
        return

    try:
        await generate_daily_report()
    finally:
        await release_lock("daily_report", token)

Quiz: test your understanding

Before moving on, answer these in your head (or out loud):

  1. You need a real-time leaderboard for a game with 1M players, updated multiple times per second. Why is a Postgres table a bad choice? How does a Redis sorted set solve it? What is the time complexity of getting rank 1-10?
  2. What is the difference between Redis Pub/Sub and Redis Streams? When would you choose streams over pub/sub?
  3. You need to count unique daily active users across 50 million users. Storing user IDs in a Redis SET vs HyperLogLog: compare memory usage and accuracy trade-offs.
  4. In the distributed lock implementation, why do you store a unique token in the lock value and check it before releasing? What attack does this prevent?
  5. Redis Pub/Sub does not persist messages. If a subscriber is offline when a message is published, it never receives it. How do you design a reliable notification system that handles offline users?

Next up — Part 25: Message Queues — Kafka vs SQS. Consumer groups, exactly-once semantics, and dead letter queues.

← PREV
Designing a Real-Time Cinema Seat Booking System
← All Architecture Posts