Staff Prep 09: async vs sync in Python — When Async Actually Helps
Back to Part 08: FastAPI Lifecycle. "Just make it async" is cargo-cult engineering. Async Python is a concurrency mechanism, not a speed mechanism. It helps exactly one class of problem: I/O-bound waiting. Applied to CPU-bound work, it makes things strictly worse. This is the most important distinction in Python backend architecture.
The event loop: one thread, cooperative multitasking
Python's asyncio event loop runs on a single OS thread. It executes coroutines cooperatively: when
a coroutine hits an await, it suspends and yields control back to the event loop, which
then runs another coroutine. No OS context switch. No thread overhead. Pure user-space scheduling.
The benefit: thousands of concurrent I/O operations can be in-flight simultaneously, all on one thread, without the overhead of thousands of OS threads. A single uvicorn worker can handle 10,000 concurrent slow HTTP requests if they are all waiting on I/O.
The constraint: while a coroutine is running CPU-heavy work (no await), the event loop
is blocked. No other coroutines run. A 200ms CPU computation blocks all other requests for 200ms.
import asyncio
import time
# Concurrent I/O: async shines here
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # simulates 100ms database query
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users_concurrent():
start = time.perf_counter()
# Run 10 "queries" concurrently — total time ~100ms, not 1000ms
users = await asyncio.gather(*[fetch_user(i) for i in range(10)])
elapsed = time.perf_counter() - start
print(f"Fetched {len(users)} users in {elapsed:.2f}s") # ~0.10s
asyncio.run(fetch_all_users_concurrent())
# CPU-bound: async does NOT help
import hashlib
async def hash_password_wrong(password: str) -> str:
# This runs synchronously — blocks the event loop for its entire duration
# All other requests wait while this runs
return hashlib.pbkdf2_hmac("sha256", password.encode(), b"salt", 200000).hex()
The fundamental rule
Use async when:
- You are waiting on a network request (database query, HTTP call, Redis GET)
- You are waiting on file I/O (reading/writing files)
- You want to run multiple I/O operations concurrently
Do NOT use async for (or run in a thread pool):
- CPU-bound computation (hashing, image processing, ML inference, data transformation)
- Synchronous library calls that block (any library without async support)
- Long-running calculations
run_in_executor: offloading blocking work
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import hashlib
# Thread pool: for blocking I/O (sync library calls, file operations)
thread_pool = ThreadPoolExecutor(max_workers=10)
async def hash_password_safe(password: str) -> str:
loop = asyncio.get_event_loop()
# Offload to thread pool — event loop stays free
result = await loop.run_in_executor(
thread_pool,
lambda: hashlib.pbkdf2_hmac("sha256", password.encode(), b"salt", 200000).hex()
)
return result
# Process pool: for CPU-bound work (releases the GIL entirely)
process_pool = ProcessPoolExecutor(max_workers=4)
def cpu_bound_task(data: list) -> int:
return sum(x * x for x in data) # pure CPU work
async def run_cpu_task(data: list) -> int:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(process_pool, cpu_bound_task, data)
return result
# FastAPI integration
from fastapi import FastAPI
app = FastAPI()
@app.post("/hash")
async def hash_endpoint(password: str):
hashed = await hash_password_safe(password)
return {"hash": hashed}
Gather vs wait: choosing the right tool
import asyncio
# asyncio.gather: run tasks concurrently, collect all results
# Raises on first exception by default
async def fetch_dashboard_data(user_id: int):
# These three queries run concurrently — total time = max of three, not sum
user, orders, notifications = await asyncio.gather(
get_user(user_id),
get_recent_orders(user_id),
get_notifications(user_id),
)
return {"user": user, "orders": orders, "notifications": notifications}
# gather with return_exceptions=True: collect errors instead of raising
async def fetch_with_partial_failure(user_id: int):
results = await asyncio.gather(
get_user(user_id),
get_recent_orders(user_id),
get_notifications(user_id),
return_exceptions=True, # errors become results, not exceptions
)
return {
"user": results[0] if not isinstance(results[0], Exception) else None,
"orders": results[1] if not isinstance(results[1], Exception) else [],
"notifications": results[2] if not isinstance(results[2], Exception) else [],
}
# asyncio.wait: more control — first_completed, first_exception, all_completed
async def fetch_with_timeout(user_id: int):
tasks = [
asyncio.create_task(get_user(user_id)),
asyncio.create_task(get_orders_expensive(user_id)),
]
done, pending = await asyncio.wait(tasks, timeout=1.0)
for task in pending:
task.cancel() # cancel tasks that did not complete in time
return [task.result() for task in done]
When async hurts: the overhead cost
import time
# Sync: fast for CPU-bound sequences
def compute_sync(n: int) -> int:
return sum(i * i for i in range(n))
# Async: overhead from coroutine machinery for no benefit
async def compute_async(n: int) -> int:
return sum(i * i for i in range(n)) # no await = no concurrency benefit
# Just added coroutine overhead: frame allocation, event loop scheduling
# Benchmark: compute_sync(1_000_000) is faster than compute_async(1_000_000)
# Async overhead: ~5-15% slower for CPU-bound work with no I/O
# FastAPI: sync route functions run in a thread pool automatically
# Use sync for pure CPU work, async for I/O-bound
from fastapi import FastAPI
app = FastAPI()
@app.post("/compute")
def compute_route(n: int): # sync: FastAPI runs this in thread pool
return {"result": compute_sync(n)}
@app.get("/user/{id}")
async def get_user_route(id: int, db=Depends(get_db)): # async: awaiting DB
return await db.get(User, id)
Common async pitfalls
import asyncio
import time
# Pitfall 1: time.sleep() in async code — blocks entire event loop
async def bad_sleep():
time.sleep(5) # WRONG: blocks event loop for 5 seconds
# All other requests are frozen
async def good_sleep():
await asyncio.sleep(5) # CORRECT: yields to event loop
# Pitfall 2: Calling sync DB driver from async context
import psycopg2 # sync driver
async def bad_db_query():
conn = psycopg2.connect(...) # blocks event loop during connection
cursor = conn.cursor()
cursor.execute("SELECT * FROM users") # blocks event loop during query
# Fix: use async driver (asyncpg, aiopg, databases)
import asyncpg
async def good_db_query(pool):
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users") # non-blocking
# Pitfall 3: Missing await (silent bug)
async def silent_bug():
result = some_async_function() # missing await!
# result is a coroutine object, not the actual result
# No error raised, just wrong behavior
print(result) #
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- You have a FastAPI endpoint that calls a function doing 500ms of CPU work (no I/O). What happens to all other concurrent requests while this runs? How do you fix it?
- What is the difference between
asyncio.gatherandasyncio.wait? When would you choose one over the other? - A developer uses
run_in_executorwith a ThreadPoolExecutor for a CPU-intensive task. Why does this not help as much as expected in Python? What should they use instead? - FastAPI allows both
async defanddefroute handlers. What does FastAPI do differently for each? - You see
result = some_async_fn()(missing await) in a codebase. What is the runtime behavior? Will Python raise an error?
Next up — Part 10: Background Processing. In-process background tasks, Celery architecture, and when each is the right tool.