FastAPI BackgroundTasks: the right way to handle post-response work
← Back
April 4, 2026Python6 min read

FastAPI BackgroundTasks: the right way to handle post-response work

Published April 4, 20266 min read

I was sending welcome emails synchronously in my signup endpoint and users were waiting 2-3 seconds for the email service to respond. FastAPI's BackgroundTasks lets you return the HTTP response immediately and run code afterward — the user sees success in 80ms and the email sends in the background. Here is the full pattern with proper error handling.

Basic BackgroundTasks usage

python
from fastapi import FastAPI, BackgroundTasks
import httpx
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

async def send_welcome_email(user_id: str, email: str) -> None:
    """Runs after the response is already sent to the client."""
    try:
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://api.emailservice.com/send",
                json={
                    "to": email,
                    "template": "welcome",
                    "variables": {"user_id": user_id},
                },
                timeout=10.0,
            )
        logger.info("Welcome email sent to %s", email)
    except Exception as e:
        # Don't crash the background task — just log
        logger.error("Failed to send welcome email to %s: %s", email, e)

@app.post("/users/signup")
async def signup(
    request: SignupRequest,
    background_tasks: BackgroundTasks,
    db: Database = Depends(get_db),
):
    user = await db.users.create(email=request.email, password=hash(request.password))
    
    # Schedule background task — runs AFTER this function returns
    background_tasks.add_task(send_welcome_email, str(user.id), user.email)
    
    # This response is sent immediately — user doesn't wait for email
    return {"user_id": str(user.id), "status": "created"}

Dependency injection in background tasks

Background tasks do not automatically get FastAPI's Depends() — you need to pass dependencies explicitly:

python
from fastapi import Depends, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession

async def track_user_event(
    user_id: str,
    event_type: str,
    db: AsyncSession,  # Passed explicitly, not injected
) -> None:
    try:
        await db.execute(
            "INSERT INTO user_events (user_id, event_type) VALUES (:user_id, :event)",
            {"user_id": user_id, "event": event_type},
        )
        await db.commit()
    except Exception as e:
        await db.rollback()
        logger.error("Event tracking failed: %s", e)
    finally:
        await db.close()  # IMPORTANT: close the session in the background task

@app.post("/products/{product_id}/view")
async def track_view(
    product_id: str,
    background_tasks: BackgroundTasks,
    request: Request,
    db: AsyncSession = Depends(get_async_session),
):
    product = await get_product(product_id, db)
    
    # Create a fresh session for the background task
    # Don't reuse the request's session — it closes when the request ends
    async_session_factory = request.app.state.async_session_factory
    bg_db = async_session_factory()
    
    background_tasks.add_task(
        track_user_event,
        request.headers.get("x-user-id"),
        f"product_view:{product_id}",
        bg_db,
    )
    
    return product

Multiple background tasks

python
@app.post("/orders")
async def create_order(
    order_data: OrderCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    order = await db.orders.create(order_data, user_id=current_user.id)
    
    # Multiple tasks run sequentially (not in parallel) after response
    background_tasks.add_task(send_order_confirmation_email, order.id, current_user.email)
    background_tasks.add_task(notify_inventory_service, order.items)
    background_tasks.add_task(track_analytics_event, "order_created", order.id)
    background_tasks.add_task(invalidate_user_cache, current_user.id)
    
    return {"order_id": str(order.id), "status": "pending"}

When BackgroundTasks is not enough

BackgroundTasks has limitations:

  • Tasks run sequentially (not in parallel)
  • Tasks are lost if the process dies mid-execution
  • No retry on failure
  • No queue depth monitoring
python
# For more robust background work, use Celery or ARQ

# ARQ (async, Redis-backed, simpler than Celery)
import arq

async def send_email_job(ctx: dict, user_id: str, email: str) -> None:
    """ARQ job: retries on failure, persists across restarts."""
    await send_welcome_email(user_id, email)

class WorkerSettings:
    functions = [send_email_job]
    redis_settings = arq.connections.RedisSettings(host='localhost')

# Enqueue from endpoint
@app.post("/users/signup")
async def signup(request: SignupRequest, arq_pool=Depends(get_arq_pool)):
    user = await create_user(request)
    await arq_pool.enqueue_job("send_email_job", str(user.id), user.email)
    return {"user_id": str(user.id)}

Decision: BackgroundTasks vs Celery/ARQ

  • BackgroundTasks: Send an email, update a counter, invalidate cache. Simple, fast, no infrastructure.
  • ARQ/Celery: Tasks that must not be lost, tasks that need retry, tasks that take more than a few seconds, tasks you need to monitor.

BackgroundTasks is not a job queue — it is a convenience for fire-and-forget work that is acceptable to lose in a crash. For anything you cannot lose, use a proper queue.

Share this
← All Posts6 min read