Claude's Batch API cuts my AI processing costs by 50% for overnight jobs
I was running a nightly job to classify 3,000 support tickets using Claude — sending requests one by one, waiting for each response. The job took 45 minutes and cost more than I wanted to pay. Then I switched to the Message Batches API and the same job now costs half as much and I do not need to babysit it. Here is how batch processing works and when to use it.
What the Batches API does
Instead of sending 3,000 individual API requests, you send one batch request containing all 3,000. Anthropic processes them asynchronously (up to 24 hours) at 50% of the standard token price. You poll for completion or check back the next morning.
The trade-off: latency. Individual requests get responses in seconds. Batch requests get results in minutes to hours. If you don't need real-time responses, this is a no-brainer for cost savings.
Building a batch classification job
import anthropic
import json
import time
from pathlib import Path
client = anthropic.Anthropic()
def classify_tickets_batch(tickets: list[dict]) -> str:
"""Submit a batch classification job. Returns batch ID."""
requests = []
for ticket in tickets:
requests.append(
anthropic.types.message_create_params.Request(
custom_id=str(ticket["id"]), # Your ID to match results later
params=anthropic.types.MessageCreateParamsNonStreaming(
model="claude-haiku-4-5", # Haiku is even cheaper for classification
max_tokens=50,
system="""Classify the support ticket into exactly one category.
Categories: billing, technical, account, feedback, other
Return only the category name, lowercase.""",
messages=[
{
"role": "user",
"content": f"Subject: {ticket['subject']}
{ticket['body'][:500]}"
}
],
),
)
)
batch = client.messages.batches.create(requests=requests)
print(f"Batch submitted: {batch.id}")
print(f"Status: {batch.processing_status}")
return batch.id
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> None:
"""Poll until batch completes."""
while True:
batch = client.messages.batches.retrieve(batch_id)
print(f"Status: {batch.processing_status} | "
f"Succeeded: {batch.request_counts.succeeded} | "
f"Processing: {batch.request_counts.processing}")
if batch.processing_status == "ended":
break
time.sleep(poll_interval)
def collect_results(batch_id: str) -> dict[str, str]:
"""Collect results from a completed batch."""
results = {}
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
category = result.result.message.content[0].text.strip()
results[result.custom_id] = category
elif result.result.type == "errored":
results[result.custom_id] = "error"
return results
A complete nightly pipeline
#!/usr/bin/env python3
# classify_tickets.py — runs as a nightly cron job
import json
from datetime import date
from pathlib import Path
def run_nightly_classification():
# Load tickets from your database / file
tickets = load_unclassified_tickets()
print(f"Classifying {len(tickets)} tickets")
if not tickets:
print("No tickets to classify")
return
# Submit batch
batch_id = classify_tickets_batch(tickets)
# Save batch ID for later retrieval (in case process restarts)
state_file = Path(f"/tmp/batch_{date.today()}.json")
state_file.write_text(json.dumps({"batch_id": batch_id}))
# Wait for completion
wait_for_batch(batch_id)
# Collect and save results
results = collect_results(batch_id)
print(f"Classified {len(results)} tickets")
# Update your database
update_ticket_categories(results)
print("Done!")
def load_unclassified_tickets() -> list[dict]:
# Your DB query here
pass
def update_ticket_categories(results: dict[str, str]) -> None:
# Your DB update here
pass
if __name__ == "__main__":
run_nightly_classification()
Resumable batches
One underrated feature: batch jobs survive process restarts. If your script crashes, you can retrieve the batch ID from your state file and pick up where you left off:
from pathlib import Path
import json
from datetime import date
state_file = Path(f"/tmp/batch_{date.today()}.json")
if state_file.exists():
# Resume existing batch
state = json.loads(state_file.read_text())
batch_id = state["batch_id"]
print(f"Resuming batch {batch_id}")
else:
# Start new batch
batch_id = classify_tickets_batch(tickets)
state_file.write_text(json.dumps({"batch_id": batch_id}))
wait_for_batch(batch_id)
results = collect_results(batch_id)
Cost comparison
For my support ticket job with claude-haiku-4-5:
Standard API: 3,000 requests × ~400 tokens = 1.2M tokens
Input: $0.80/MTok × 1.2 = $0.96/night
Batch API: Same 3,000 requests × ~400 tokens = 1.2M tokens
Input: $0.40/MTok × 1.2 = $0.48/night (50% discount)
Savings: $0.48/night = ~$175/year for one job
When to use batch vs real-time
Use batch for: document classification, content moderation at scale, generating descriptions for a product catalog, overnight report generation, bulk data enrichment.
Use real-time for: user-facing features, anything that needs a response in under a few seconds, interactive coding assistance, live chat.
The 50% cost saving on non-latency-sensitive workloads is compelling. If you have any regular AI processing job, converting it to use the Batch API is usually a two-hour project with a multi-hundred-dollar annual return.