Managing Claude API rate limits in production
The first time your Claude-powered feature hits a rate limit in production, it will happen at the worst possible time — during a spike, when a customer is watching, at 2am. The Anthropic API has rate limits on requests per minute and tokens per minute. If you have not built a strategy around them, you will hit them. Here is the complete strategy I use: request queuing, exponential backoff, prompt caching, and graceful degradation — with code you can drop in today.
Understanding the limits
Anthropic rate limits have two dimensions:
- Requests per minute (RPM): How many API calls you can make per minute
- Tokens per minute (TPM): How many input + output tokens you can consume per minute
The TPM limit is the one that bites most applications. A 429 response includes a retry-after header telling you exactly how long to wait. Naive retry without checking this header leads to thundering herd problems when multiple requests retry simultaneously.
Exponential backoff with jitter
The minimum viable approach: catch 429 errors and retry with exponential backoff plus random jitter to spread retries across time.
import time
import random
import logging
import anthropic
logger = logging.getLogger(__name__)
def call_with_backoff(
client: anthropic.Anthropic,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
**kwargs,
) -> anthropic.types.Message:
"""Call client.messages.create with exponential backoff on rate limits."""
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# Check for retry-after header
retry_after = None
if hasattr(e, 'response') and e.response:
retry_after_header = e.response.headers.get('retry-after')
if retry_after_header:
retry_after = float(retry_after_header)
if retry_after:
delay = retry_after
else:
# Exponential backoff with full jitter
delay = min(
max_delay,
base_delay * (2 ** attempt) * (0.5 + random.random() * 0.5)
)
logger.warning(
"Rate limited (attempt %d/%d). Waiting %.1fs",
attempt + 1, max_retries, delay
)
time.sleep(delay)
Request queue with token budget tracking
For applications with burst traffic, backoff alone is not enough — you need to proactively manage your token budget. A token-aware queue prevents rate limit errors instead of just recovering from them.
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TokenBucket:
"""Token bucket for tracking API token usage."""
capacity: int # max tokens per minute
tokens: float = 0
last_refill: float = field(default_factory=time.monotonic)
def refill(self) -> None:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + (elapsed / 60.0) * self.capacity
)
self.last_refill = now
def consume(self, amount: int) -> bool:
self.refill()
if self.tokens >= amount:
self.tokens -= amount
return True
return False
def seconds_until_available(self, amount: int) -> float:
self.refill()
deficit = amount - self.tokens
if deficit <= 0:
return 0.0
return (deficit / self.capacity) * 60.0
class RateLimitedQueue:
"""Async queue that respects token and request rate limits."""
def __init__(self, tokens_per_minute: int, requests_per_minute: int):
self.token_bucket = TokenBucket(capacity=tokens_per_minute)
self.rpm_bucket = TokenBucket(capacity=requests_per_minute)
self._queue: asyncio.Queue = asyncio.Queue()
async def enqueue(self, request: dict, estimated_tokens: int) -> asyncio.Future:
"""Add a request to the queue. Returns a future for the result."""
future: asyncio.Future = asyncio.get_event_loop().create_future()
await self._queue.put((request, estimated_tokens, future))
return future
async def process(self, client: Any) -> None:
"""Process the queue. Run as a background task."""
while True:
request, estimated_tokens, future = await self._queue.get()
# Wait until we have budget
while True:
token_wait = self.token_bucket.seconds_until_available(estimated_tokens)
rpm_wait = self.rpm_bucket.seconds_until_available(1)
wait = max(token_wait, rpm_wait)
if wait <= 0:
break
await asyncio.sleep(wait)
self.token_bucket.consume(estimated_tokens)
self.rpm_bucket.consume(1)
try:
result = await asyncio.to_thread(
client.messages.create, **request
)
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self._queue.task_done()
Prompt caching to reduce token usage
The fastest way to avoid hitting token limits is to use fewer tokens. Claude's prompt caching feature caches the first N tokens of your prompt and does not count cached tokens against your input token usage on subsequent calls.
import anthropic
client = anthropic.Anthropic()
# Long system prompt — cache it
SYSTEM_PROMPT = """You are a senior code reviewer for a fintech company...
[2000 tokens of detailed review guidelines]
"""
def review_code(code: str) -> str:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system=[
{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}, # cache this block
}
],
messages=[{"role": "user", "content": f"Review this code:
{code}"}],
)
return response.content[0].text
# First call: full token cost
# Subsequent calls: only the code snippet is counted as input tokens
# Cache lasts 5 minutes and resets with each use
If your system prompt is 2000 tokens and you process 100 reviews in a batch, you save 200,000 input tokens — a 90%+ reduction on the system prompt portion of your token usage.
Graceful degradation
For user-facing features, build a fallback when the API is rate limited:
async def generate_summary(text: str) -> str:
try:
response = await asyncio.wait_for(
call_claude(text),
timeout=10.0
)
return response
except anthropic.RateLimitError:
logger.warning("Claude rate limited — returning truncated text as fallback")
# Simple truncation as fallback
return text[:500] + "..." if len(text) > 500 else text
except asyncio.TimeoutError:
logger.warning("Claude timed out — returning fallback")
return text[:500] + "..." if len(text) > 500 else text
Monitoring your usage
Add usage logging to every API call so you can see where you are relative to your limits before you hit them:
def call_and_log(client, **kwargs):
response = client.messages.create(**kwargs)
usage = response.usage
logger.info(
"Claude API usage: input_tokens=%d, output_tokens=%d, cache_read=%d, cache_write=%d",
usage.input_tokens,
usage.output_tokens,
getattr(usage, 'cache_read_input_tokens', 0),
getattr(usage, 'cache_creation_input_tokens', 0),
)
return response
Ship these metrics to your observability platform. Set an alert at 70% of your TPM budget. That gives you time to reduce batch sizes or enable caching before you hit the wall.
The full strategy in one sentence
Cache your system prompts, queue your requests with a token budget, use exponential backoff with jitter on 429s, and fall back gracefully instead of erroring. Apply all four and you will ship a Claude-powered feature that handles production traffic without waking you up at 2am.