Managing Claude API rate limits in production
← Back
April 4, 2026Claude8 min read

Managing Claude API rate limits in production

Published April 4, 20268 min read

The first time your Claude-powered feature hits a rate limit in production, it will happen at the worst possible time — during a spike, when a customer is watching, at 2am. The Anthropic API has rate limits on requests per minute and tokens per minute. If you have not built a strategy around them, you will hit them. Here is the complete strategy I use: request queuing, exponential backoff, prompt caching, and graceful degradation — with code you can drop in today.

Understanding the limits

Anthropic rate limits have two dimensions:

  • Requests per minute (RPM): How many API calls you can make per minute
  • Tokens per minute (TPM): How many input + output tokens you can consume per minute

The TPM limit is the one that bites most applications. A 429 response includes a retry-after header telling you exactly how long to wait. Naive retry without checking this header leads to thundering herd problems when multiple requests retry simultaneously.

Exponential backoff with jitter

The minimum viable approach: catch 429 errors and retry with exponential backoff plus random jitter to spread retries across time.

python
import time
import random
import logging
import anthropic

logger = logging.getLogger(__name__)


def call_with_backoff(
    client: anthropic.Anthropic,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    **kwargs,
) -> anthropic.types.Message:
    """Call client.messages.create with exponential backoff on rate limits."""
    for attempt in range(max_retries):
        try:
            return client.messages.create(**kwargs)
        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            # Check for retry-after header
            retry_after = None
            if hasattr(e, 'response') and e.response:
                retry_after_header = e.response.headers.get('retry-after')
                if retry_after_header:
                    retry_after = float(retry_after_header)

            if retry_after:
                delay = retry_after
            else:
                # Exponential backoff with full jitter
                delay = min(
                    max_delay,
                    base_delay * (2 ** attempt) * (0.5 + random.random() * 0.5)
                )

            logger.warning(
                "Rate limited (attempt %d/%d). Waiting %.1fs",
                attempt + 1, max_retries, delay
            )
            time.sleep(delay)

Request queue with token budget tracking

For applications with burst traffic, backoff alone is not enough — you need to proactively manage your token budget. A token-aware queue prevents rate limit errors instead of just recovering from them.

python
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TokenBucket:
    """Token bucket for tracking API token usage."""
    capacity: int           # max tokens per minute
    tokens: float = 0
    last_refill: float = field(default_factory=time.monotonic)

    def refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + (elapsed / 60.0) * self.capacity
        )
        self.last_refill = now

    def consume(self, amount: int) -> bool:
        self.refill()
        if self.tokens >= amount:
            self.tokens -= amount
            return True
        return False

    def seconds_until_available(self, amount: int) -> float:
        self.refill()
        deficit = amount - self.tokens
        if deficit <= 0:
            return 0.0
        return (deficit / self.capacity) * 60.0


class RateLimitedQueue:
    """Async queue that respects token and request rate limits."""

    def __init__(self, tokens_per_minute: int, requests_per_minute: int):
        self.token_bucket = TokenBucket(capacity=tokens_per_minute)
        self.rpm_bucket = TokenBucket(capacity=requests_per_minute)
        self._queue: asyncio.Queue = asyncio.Queue()

    async def enqueue(self, request: dict, estimated_tokens: int) -> asyncio.Future:
        """Add a request to the queue. Returns a future for the result."""
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        await self._queue.put((request, estimated_tokens, future))
        return future

    async def process(self, client: Any) -> None:
        """Process the queue. Run as a background task."""
        while True:
            request, estimated_tokens, future = await self._queue.get()

            # Wait until we have budget
            while True:
                token_wait = self.token_bucket.seconds_until_available(estimated_tokens)
                rpm_wait = self.rpm_bucket.seconds_until_available(1)
                wait = max(token_wait, rpm_wait)
                if wait <= 0:
                    break
                await asyncio.sleep(wait)

            self.token_bucket.consume(estimated_tokens)
            self.rpm_bucket.consume(1)

            try:
                result = await asyncio.to_thread(
                    client.messages.create, **request
                )
                future.set_result(result)
            except Exception as e:
                future.set_exception(e)
            finally:
                self._queue.task_done()

Prompt caching to reduce token usage

The fastest way to avoid hitting token limits is to use fewer tokens. Claude's prompt caching feature caches the first N tokens of your prompt and does not count cached tokens against your input token usage on subsequent calls.

python
import anthropic

client = anthropic.Anthropic()

# Long system prompt — cache it
SYSTEM_PROMPT = """You are a senior code reviewer for a fintech company...
[2000 tokens of detailed review guidelines]
"""

def review_code(code: str) -> str:
    response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        system=[
            {
                "type": "text",
                "text": SYSTEM_PROMPT,
                "cache_control": {"type": "ephemeral"},  # cache this block
            }
        ],
        messages=[{"role": "user", "content": f"Review this code:

{code}"}],
    )
    return response.content[0].text

# First call: full token cost
# Subsequent calls: only the code snippet is counted as input tokens
# Cache lasts 5 minutes and resets with each use

If your system prompt is 2000 tokens and you process 100 reviews in a batch, you save 200,000 input tokens — a 90%+ reduction on the system prompt portion of your token usage.

Graceful degradation

For user-facing features, build a fallback when the API is rate limited:

python
async def generate_summary(text: str) -> str:
    try:
        response = await asyncio.wait_for(
            call_claude(text),
            timeout=10.0
        )
        return response
    except anthropic.RateLimitError:
        logger.warning("Claude rate limited — returning truncated text as fallback")
        # Simple truncation as fallback
        return text[:500] + "..." if len(text) > 500 else text
    except asyncio.TimeoutError:
        logger.warning("Claude timed out — returning fallback")
        return text[:500] + "..." if len(text) > 500 else text

Monitoring your usage

Add usage logging to every API call so you can see where you are relative to your limits before you hit them:

python
def call_and_log(client, **kwargs):
    response = client.messages.create(**kwargs)
    usage = response.usage
    logger.info(
        "Claude API usage: input_tokens=%d, output_tokens=%d, cache_read=%d, cache_write=%d",
        usage.input_tokens,
        usage.output_tokens,
        getattr(usage, 'cache_read_input_tokens', 0),
        getattr(usage, 'cache_creation_input_tokens', 0),
    )
    return response

Ship these metrics to your observability platform. Set an alert at 70% of your TPM budget. That gives you time to reduce batch sizes or enable caching before you hit the wall.

The full strategy in one sentence

Cache your system prompts, queue your requests with a token budget, use exponential backoff with jitter on 429s, and fall back gracefully instead of erroring. Apply all four and you will ship a Claude-powered feature that handles production traffic without waking you up at 2am.

Share this
← All Posts8 min read