Database sharding patterns — when and how to shard
Sharding is one of the most consequential architectural decisions you can make. Get it right and you can scale a database to billions of rows with linear cost growth. Get it wrong and you end up with hotspots, cross-shard queries that span every node, or a resharding operation that takes weeks and requires a maintenance window. Here is what you actually need to know.
When NOT to shard
Sharding adds enormous complexity. Before considering it, exhaust these options:
- Read replicas: If reads are your bottleneck, add replicas before sharding
- Vertical scaling: A 32-core machine with 256GB RAM can handle more than you think
- Partitioning: Postgres table partitioning handles hundreds of millions of rows without application-level sharding
- Caching: Many "database is too slow" problems are actually "we are missing a cache layer" problems
Shard when you genuinely cannot fit data or write throughput on a single node. That threshold is higher than most teams think.
Shard key selection — the most important decision
The shard key determines how data is distributed. A bad shard key creates hotspots (one shard handles 90% of traffic) or makes common queries cross-shard. Good shard key properties:
- High cardinality: Many distinct values (user_id: good, country_code: bad)
- Even distribution: Values distributed uniformly across shards
- Query locality: Common queries access only one shard
- Immutability: The key never changes (email address: bad choice — users change emails)
Hash sharding
Route each record to a shard based on the hash of the shard key:
# Hash sharding implementation
import hashlib
from typing import Any
class HashShardRouter:
def __init__(self, shard_connections: list):
self.shards = shard_connections
self.num_shards = len(shard_connections)
def get_shard(self, shard_key: str):
"""Return the database connection for this shard key."""
hash_value = int(hashlib.md5(shard_key.encode()).hexdigest(), 16)
shard_index = hash_value % self.num_shards
return self.shards[shard_index]
def execute(self, shard_key: str, query: str, params: tuple = ()):
db = self.get_shard(shard_key)
cursor = db.cursor()
cursor.execute(query, params)
return cursor.fetchall()
# Usage
router = HashShardRouter([shard_0, shard_1, shard_2, shard_3])
# User queries always go to the same shard
def get_user(user_id: str):
return router.execute(
user_id,
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
# Insert also goes to the correct shard
def create_order(user_id: str, order_data: dict):
return router.execute(
user_id, # shard on user_id for locality with user's other orders
"INSERT INTO orders (user_id, ...) VALUES (%s, ...)",
(user_id, ...)
)
Pros: Even distribution if the key has high cardinality.
Cons: Range queries cross all shards ("orders created in the last 7 days" requires querying all shards).
Range sharding
Route based on ranges of the shard key value. Good for time-series data:
# Range sharding for time-series data
from datetime import date
SHARD_RANGES = [
(date(2020, 1, 1), date(2022, 12, 31), "shard_0"),
(date(2023, 1, 1), date(2024, 12, 31), "shard_1"),
(date(2025, 1, 1), date(2026, 12, 31), "shard_2"),
]
def get_shard_for_date(event_date: date) -> str:
for start, end, shard in SHARD_RANGES:
if start <= event_date <= end:
return shard
raise ValueError(f"No shard for date {event_date}")
Pros: Range queries on the shard key hit only one or a few shards.
Cons: Hotspot risk — recent data all goes to the newest shard (the "write hotspot" problem with time-based sharding). Must plan new shards in advance.
Directory-based sharding
A lookup table maps shard keys to shards. Maximum flexibility, requires an additional lookup:
# Directory-based sharding with Redis lookup
import redis
shard_directory = redis.Redis()
def get_shard_for_tenant(tenant_id: str) -> str:
shard = shard_directory.get(f"shard:{tenant_id}")
if not shard:
# Assign tenant to a shard (e.g., least-loaded)
shard = assign_tenant_to_shard(tenant_id)
shard_directory.set(f"shard:{tenant_id}", shard)
return shard.decode()
def assign_tenant_to_shard(tenant_id: str) -> str:
# Balance across shards by row count or tenant tier
shard_loads = get_shard_loads()
return min(shard_loads, key=shard_loads.get)
Pros: Easy to rebalance (update the directory, migrate data, update directory again). Can assign VIP tenants to dedicated shards.
Cons: Extra lookup for every query. Directory is a single point of failure (mitigate with replication/caching).
Resharding — the hard part
Adding shards is painful because you need to redistribute existing data. The safest pattern: consistent hashing and a two-phase migration.
# Phase 1: dual-write to old and new shard configuration
class ReshardingRouter:
def __init__(self, old_router, new_router, migrated_keys: set):
self.old_router = old_router
self.new_router = new_router
self.migrated_keys = migrated_keys # track which keys have been migrated
def read(self, key: str, query: str, params):
if key in self.migrated_keys:
return self.new_router.execute(key, query, params)
return self.old_router.execute(key, query, params)
def write(self, key: str, query: str, params):
# Write to both during migration
self.new_router.execute(key, query, params)
if key not in self.migrated_keys:
self.old_router.execute(key, query, params)
Migrate in batches: copy data to new shards, mark keys as migrated, cut over reads, stop dual-writes. This achieves zero-downtime resharding at the cost of temporary storage overhead and write latency.
The cross-shard query problem
Some queries are inherently cross-shard: "Find all orders across all users placed in the last 24 hours." Options:
- Denormalize: maintain an analytics table on a separate unsharded database, updated via events
- Scatter-gather: query all shards in parallel, merge results in the application
- Accept the limitation: some queries require a full-scan event streaming pipeline, not real-time SQL
Cross-shard queries are the hidden cost of sharding. Design your shard key so that 99% of your actual queries are single-shard. The remaining 1% should be analytics/reporting that can tolerate scatter-gather latency.