Staff Prep 19: Connection Pooling & SQLAlchemy — Pool Sizing & Leaks
Back to Part 18: Celery Task Queues. Every Postgres connection is an OS process consuming roughly 5-10MB of RAM. At 500 connections you have burned 5GB before a single row is read. Pool sizing is arithmetic, not vibes. Connection leaks are worse. They accumulate invisibly for weeks and then your database refuses everything at the exact moment you least want it to. (I learned this during a Black Friday I would rather not relive.)
Sqlalchemy async engine configuration
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
# Standard configuration
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/mydb",
pool_size=20, # keep 20 connections open permanently
max_overflow=10, # allow up to 10 extra under burst (total: 30)
pool_timeout=30, # wait up to 30s for a connection from pool
pool_recycle=3600, # recycle connections hourly (prevents stale connections)
pool_pre_ping=True, # test connection before use (detects dropped connections)
echo=False, # set True for SQL query logging (dev only)
)
# Session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # prevent lazy loading after commit
)
# FastAPI dependency
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# session.close() is called automatically by the context manager
Pool sizing math
The rule: max connections = pool_size + max_overflow per application instance.
Total connections = max connections × number of app instances.
"""
Scenario:
- 4 Uvicorn workers per app instance
- 3 app instances (3 pods/VMs)
- Each worker creates its own engine pool (pool_size=10, max_overflow=5)
Total connections = 4 workers × 3 instances × (10 + 5) = 180 connections
Postgres max_connections default: 100
Result: CONNECTION EXHAUSTED
Fix: reduce pool_size or route through PgBouncer
- With PgBouncer (transaction mode, default_pool_size=25):
App connects to PgBouncer with pool_size=50
PgBouncer maintains 25 connections to Postgres
Total Postgres connections: 25 (not 180)
"""
# With PgBouncer
engine = create_async_engine(
"postgresql+asyncpg://user:pass@pgbouncer:6432/mydb", # connect to PgBouncer
pool_size=50,
max_overflow=0, # no overflow — PgBouncer handles excess by queuing
pool_recycle=1800, # shorter recycle time (PgBouncer may close idle connections)
connect_args={
"prepared_statement_cache_size": 0, # disable prepared statements (incompatible with PgBouncer transaction mode)
"statement_cache_size": 0,
},
)
pool_pre_ping: detecting stale connections
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_pre_ping=True, # before returning a connection from pool, send SELECT 1
# If the connection is stale (Postgres restarted, network blip), it is discarded
# and a fresh connection is established
# Cost: one extra round-trip per connection checkout (~0.2ms)
# Worth it in production environments where connections can go stale
pool_recycle=3600,
# Independently recycles connections older than 1 hour
# Prevents issues with Postgres's connection timeout settings
# Does NOT test the connection — use pool_pre_ping for that
)
Detecting and preventing connection leaks
from sqlalchemy import event
import asyncio
import logging
# Connection leak: session not properly closed
async def get_user_leaky(user_id: int):
session = AsyncSessionLocal() # WRONG: no context manager
user = await session.get(User, user_id)
return user # session never closed — connection held in pool forever
# Proper: always use context manager
async def get_user_safe(user_id: int):
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
return user
# session auto-closed here
# Detecting leaks: monitor pool status
async def log_pool_status():
pool = engine.pool
logging.info(
f"Pool: size={pool.size()}, checked_out={pool.checkedout()}, "
f"overflow={pool.overflow()}, checked_in={pool.checkedin()}"
)
# If checkedout keeps growing over time: leak detected
# Monitor this metric in your APM/Prometheus setup
# SQLAlchemy connection pool events for debugging
@event.listens_for(engine.sync_engine, "checkin")
def receive_checkin(dbapi_connection, connection_record):
pass # log when connections returned to pool
@event.listens_for(engine.sync_engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
pass # log when connections checked out
NullPool for short-lived processes
from sqlalchemy.pool import NullPool
# NullPool: no pooling — create a new connection for every request
# Use case: Celery workers, CLI scripts, Alembic migrations
# Why: worker processes may run for only seconds; keeping a pool alive wastes connections
celery_engine = create_async_engine(
"postgresql+asyncpg://...",
poolclass=NullPool, # create/destroy connection per use
)
# For Alembic migrations (sync engine)
from sqlalchemy import create_engine
migration_engine = create_engine(
"postgresql://...",
poolclass=NullPool, # migrations run once, no need for a pool
)
Connection timeout handling
from sqlalchemy.exc import TimeoutError as SATimeoutError, OperationalError
from fastapi import HTTPException
async def execute_with_timeout(session: AsyncSession, stmt, timeout: float = 5.0):
try:
return await asyncio.wait_for(session.execute(stmt), timeout=timeout)
except asyncio.TimeoutError:
await session.rollback()
raise HTTPException(503, "Database query timed out")
except OperationalError as e:
# Connection error (DB down, network issue)
await session.rollback()
raise HTTPException(503, "Database unavailable")
except SATimeoutError:
# Pool timeout: no connection available in pool_timeout seconds
raise HTTPException(503, "Database connection pool exhausted")
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- You have 8 Uvicorn workers, pool_size=20, max_overflow=10. How many Postgres connections can your app hold at peak? What happens if Postgres max_connections is 100?
- What does
pool_pre_ping=Truedo? What is the cost? When is it essential? - You notice that
pool.checkedout()grows by ~10 per hour and never decreases. What is happening? How do you find which code path is leaking? - Why do Celery workers use NullPool instead of a connection pool? What would go wrong with a regular pool in a worker process that restarts frequently?
- You switch from direct Postgres connection to PgBouncer in transaction mode. What SQLAlchemy settings must you change, and why?
Next up: Part 20: API Design at Scale. Idempotency keys, ETags, API versioning, and bulk endpoints that do not destroy your database.