Staff Prep 18: Task Queues & Celery — Broker, Workers & Idempotency
Back to Part 17: FastAPI Internals. Celery is the standard Python task queue. It handles retries, scheduling, routing, monitoring and error handling in one package. It also causes more production incidents than it prevents when you leave the defaults alone, which most teams do. This post is about the settings I wish someone had screamed at me about on day one.
Celery architecture: broker vs backend
Celery has two storage layers with different responsibilities:
- The broker stores the task queue. Workers pull tasks from here (Redis or RabbitMQ). If the broker loses data, the tasks are gone.
- The backend, or result store, holds task results and status. You only need it if you
poll task outcomes via
AsyncResult. Skip it for fire-and-forget tasks.
from celery import Celery
app = Celery(
"myapp",
broker="redis://redis:6379/0", # task queue
backend="redis://redis:6379/1", # result store (optional)
include=["myapp.tasks.email", "myapp.tasks.reports"],
)
app.conf.update(
# Serialisation
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_compression="gzip", # compress large payloads
# Reliability
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # critical: pull one task at a time
# Timeouts
task_soft_time_limit=300, # sends SoftTimeLimitExceeded after 5 min
task_time_limit=360, # kills worker after 6 min (hard limit)
# Results
result_expires=3600, # auto-expire results after 1 hour
task_ignore_result=True, # skip result storage for fire-and-forget tasks
)
worker_prefetch_multiplier: the most important setting
The default prefetch_multiplier=4 means each worker thread prefetches four tasks up front.
For long-running tasks this is disastrous. One slow worker sits on four tasks while your other workers starve.
I have debugged this exact scenario at 2am on a launch weekend. Set it to 1 unless your tasks are genuinely short.
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# Critical configuration for long-running tasks
app.conf.worker_prefetch_multiplier = 1
# Each worker pulls exactly 1 task, processes it, then pulls the next
# Fair distribution across workers regardless of task duration
# For fast, short tasks (< 100ms each):
# app.conf.worker_prefetch_multiplier = 4 # OK: overhead of pulling tasks matters more
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30, # 30 seconds between retries
queue="reports",
soft_time_limit=300,
time_limit=360,
)
def generate_report(self, report_id: int):
try:
logger.info(f"Starting report {report_id}, attempt {self.request.retries + 1}")
result = _generate_report_internal(report_id)
return result
except SoftTimeLimitExceeded:
logger.warning(f"Report {report_id} timed out — saving partial state")
save_partial_state(report_id)
raise # do not retry time limit exceeded
except ReportDataError as e:
logger.error(f"Report {report_id} data error: {e}")
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Exponential-ish backoff: 60s, 120s, 180s
Idempotency: the rule every task must follow
import hashlib
# Non-idempotent: dangerous with acks_late
@app.task
def send_welcome_email_bad(user_id: int, email: str):
send_email(email, "Welcome!")
# If this task runs twice (worker crash + re-queue), user gets 2 emails
# Idempotent: safe to run multiple times
@app.task(acks_late=True)
def send_welcome_email_safe(user_id: int, email: str):
# Check if already sent
key = f"welcome_sent:{user_id}"
if redis_client.get(key):
logger.info(f"Welcome email already sent to user {user_id}, skipping")
return
send_email(email, "Welcome!")
# Mark as sent (with TTL to handle edge cases)
redis_client.setex(key, 86400, "1") # expires after 24h
# Database-backed idempotency (stronger guarantee)
@app.task(acks_late=True)
def process_payment(order_id: int, amount: float):
with db.transaction():
# SELECT FOR UPDATE prevents concurrent processing
order = db.execute(
"SELECT * FROM orders WHERE id = $1 AND status = 'pending' FOR UPDATE",
order_id
).fetchone()
if not order:
logger.info(f"Order {order_id} already processed or not found")
return
# Process payment
charge_card(order["payment_method_id"], amount)
# Update status atomically
db.execute(
"UPDATE orders SET status = 'paid' WHERE id = $1", order_id
)
Queue routing and priority
from celery import Celery
app.conf.task_routes = {
"myapp.tasks.email.*": {"queue": "email"},
"myapp.tasks.reports.*": {"queue": "reports"},
"myapp.tasks.payments.*": {"queue": "critical"},
}
# Start dedicated workers per queue
# celery -A myapp worker -Q critical --concurrency=4 --prefetch-multiplier=1
# celery -A myapp worker -Q email --concurrency=8 --prefetch-multiplier=4
# celery -A myapp worker -Q reports --concurrency=2 --prefetch-multiplier=1
# Dynamic routing per task call
process_payment.apply_async(args=[order_id], queue="critical", priority=9)
send_newsletter.apply_async(args=[campaign_id], queue="email", countdown=60)
# ETA scheduling
from datetime import datetime, timedelta
send_reminder.apply_async(
args=[user_id],
eta=datetime.utcnow() + timedelta(hours=24)
)
Celery beat: scheduled tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
"daily-report": {
"task": "myapp.tasks.reports.daily_summary",
"schedule": crontab(hour=7, minute=0), # 7 AM daily
"args": [],
},
"cleanup-expired-sessions": {
"task": "myapp.tasks.auth.cleanup_sessions",
"schedule": 3600, # every hour (seconds)
},
"sync-analytics": {
"task": "myapp.tasks.analytics.sync",
"schedule": crontab(minute="*/5"), # every 5 minutes
},
}
# Start beat scheduler (run ONE instance only — not per worker)
# celery -A myapp beat --loglevel=info
# celery -A myapp worker --beat # combined (dev only)
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- What is the difference between Celery's broker and backend? Which is required? What happens if the backend is unavailable?
- You have a task that processes video files (takes 10 minutes each). You have 4 workers, prefetch_multiplier=4. A slow task is assigned to worker 1. What happens to the other 3 tasks prefetched by worker 1? How do you fix this?
- A payment task with
acks_late=Truecharges a card, then the worker crashes before setting status to "paid". The task is re-queued and runs again. The card gets charged twice. How do you prevent this? - What is
task_soft_time_limitvstask_time_limit? What can you do in the soft limit handler that you cannot do after the hard limit? - You need to run a cleanup job every day at 3 AM UTC. How do you configure Celery Beat? What happens if you run two Celery Beat instances simultaneously?
Next up: Part 19: Connection Pooling & SQLAlchemy. pool_size math, async SQLAlchemy, and PgBouncer mode compatibility.