← Back
April 4, 2026Python6 min read
FastAPI BackgroundTasks: the right way to handle post-response work
Published April 4, 20266 min read
I was sending welcome emails synchronously in my signup endpoint and users were waiting 2-3 seconds for the email service to respond. FastAPI's BackgroundTasks lets you return the HTTP response immediately and run code afterward — the user sees success in 80ms and the email sends in the background. Here is the full pattern with proper error handling.
Basic BackgroundTasks usage
python
from fastapi import FastAPI, BackgroundTasks
import httpx
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
async def send_welcome_email(user_id: str, email: str) -> None:
"""Runs after the response is already sent to the client."""
try:
async with httpx.AsyncClient() as client:
await client.post(
"https://api.emailservice.com/send",
json={
"to": email,
"template": "welcome",
"variables": {"user_id": user_id},
},
timeout=10.0,
)
logger.info("Welcome email sent to %s", email)
except Exception as e:
# Don't crash the background task — just log
logger.error("Failed to send welcome email to %s: %s", email, e)
@app.post("/users/signup")
async def signup(
request: SignupRequest,
background_tasks: BackgroundTasks,
db: Database = Depends(get_db),
):
user = await db.users.create(email=request.email, password=hash(request.password))
# Schedule background task — runs AFTER this function returns
background_tasks.add_task(send_welcome_email, str(user.id), user.email)
# This response is sent immediately — user doesn't wait for email
return {"user_id": str(user.id), "status": "created"}
Dependency injection in background tasks
Background tasks do not automatically get FastAPI's Depends() — you need to pass dependencies explicitly:
python
from fastapi import Depends, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
async def track_user_event(
user_id: str,
event_type: str,
db: AsyncSession, # Passed explicitly, not injected
) -> None:
try:
await db.execute(
"INSERT INTO user_events (user_id, event_type) VALUES (:user_id, :event)",
{"user_id": user_id, "event": event_type},
)
await db.commit()
except Exception as e:
await db.rollback()
logger.error("Event tracking failed: %s", e)
finally:
await db.close() # IMPORTANT: close the session in the background task
@app.post("/products/{product_id}/view")
async def track_view(
product_id: str,
background_tasks: BackgroundTasks,
request: Request,
db: AsyncSession = Depends(get_async_session),
):
product = await get_product(product_id, db)
# Create a fresh session for the background task
# Don't reuse the request's session — it closes when the request ends
async_session_factory = request.app.state.async_session_factory
bg_db = async_session_factory()
background_tasks.add_task(
track_user_event,
request.headers.get("x-user-id"),
f"product_view:{product_id}",
bg_db,
)
return product
Multiple background tasks
python
@app.post("/orders")
async def create_order(
order_data: OrderCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
order = await db.orders.create(order_data, user_id=current_user.id)
# Multiple tasks run sequentially (not in parallel) after response
background_tasks.add_task(send_order_confirmation_email, order.id, current_user.email)
background_tasks.add_task(notify_inventory_service, order.items)
background_tasks.add_task(track_analytics_event, "order_created", order.id)
background_tasks.add_task(invalidate_user_cache, current_user.id)
return {"order_id": str(order.id), "status": "pending"}
When BackgroundTasks is not enough
BackgroundTasks has limitations:
- Tasks run sequentially (not in parallel)
- Tasks are lost if the process dies mid-execution
- No retry on failure
- No queue depth monitoring
python
# For more robust background work, use Celery or ARQ
# ARQ (async, Redis-backed, simpler than Celery)
import arq
async def send_email_job(ctx: dict, user_id: str, email: str) -> None:
"""ARQ job: retries on failure, persists across restarts."""
await send_welcome_email(user_id, email)
class WorkerSettings:
functions = [send_email_job]
redis_settings = arq.connections.RedisSettings(host='localhost')
# Enqueue from endpoint
@app.post("/users/signup")
async def signup(request: SignupRequest, arq_pool=Depends(get_arq_pool)):
user = await create_user(request)
await arq_pool.enqueue_job("send_email_job", str(user.id), user.email)
return {"user_id": str(user.id)}
Decision: BackgroundTasks vs Celery/ARQ
- BackgroundTasks: Send an email, update a counter, invalidate cache. Simple, fast, no infrastructure.
- ARQ/Celery: Tasks that must not be lost, tasks that need retry, tasks that take more than a few seconds, tasks you need to monitor.
BackgroundTasks is not a job queue — it is a convenience for fire-and-forget work that is acceptable to lose in a crash. For anything you cannot lose, use a proper queue.
Share this
← All Posts6 min read