Designing multi-agent pipelines with Claude
The single-prompt approach to complex AI tasks has a ceiling. Once your prompt reaches a certain complexity — extract data, validate it, enrich it, format it, check for policy violations, and output structured JSON — the quality degrades. The model tries to do too many things at once and drops subtle requirements. Multi-agent pipelines solve this by giving each step to a focused agent with a specific role, a tight context window, and a defined output contract.
When to use multi-agent vs single-prompt
Single-prompt is simpler and cheaper. Use it when:
- The task is one coherent operation (summarise this document)
- The steps are not independently reviewable or testable
- Latency matters and you cannot run agents in parallel
Multi-agent adds value when:
- Different steps require different expertise or personas
- You need to validate the output of one step before passing it to the next
- Some steps can run in parallel
- You want to swap out one step without rebuilding the whole pipeline
- You need an audit trail of each step's output
The pipeline data structure
Each agent in the pipeline receives a typed input, produces a typed output, and can append to a shared trace log. Here is the base class:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, TypeVar, Generic
import anthropic
T_in = TypeVar("T_in")
T_out = TypeVar("T_out")
@dataclass
class PipelineContext:
"""Shared context passed through the pipeline."""
trace: list[dict] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
def log(self, agent_name: str, input_data: Any, output_data: Any) -> None:
self.trace.append({
"agent": agent_name,
"input_summary": str(input_data)[:200],
"output_summary": str(output_data)[:200],
})
class Agent(ABC, Generic[T_in, T_out]):
def __init__(self, name: str, model: str = "claude-opus-4-5"):
self.name = name
self.client = anthropic.Anthropic()
self.model = model
def run(self, input_data: T_in, ctx: PipelineContext) -> T_out:
output = self._run(input_data, ctx)
ctx.log(self.name, input_data, output)
return output
@abstractmethod
def _run(self, input_data: T_in, ctx: PipelineContext) -> T_out:
pass
def call_claude(self, system: str, user: str, max_tokens: int = 2048) -> str:
response = self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": user}],
)
return response.content[0].text
A real example: job application screener
Three agents: extract structured data from a CV, score it against requirements, and generate a recruiter summary. Each is testable in isolation.
import json
from dataclasses import dataclass
@dataclass
class CVData:
raw_text: str
@dataclass
class ExtractedProfile:
name: str
years_experience: int
skills: list[str]
education: str
previous_roles: list[str]
@dataclass
class ScoredProfile:
profile: ExtractedProfile
score: int # 0-100
strengths: list[str]
gaps: list[str]
@dataclass
class RecruiterSummary:
scored: ScoredProfile
recommendation: str # "advance" | "hold" | "reject"
summary: str # 2-3 sentences for the recruiter
class ExtractionAgent(Agent[CVData, ExtractedProfile]):
def _run(self, input_data: CVData, ctx: PipelineContext) -> ExtractedProfile:
raw = self.call_claude(
system="Extract structured information from CVs. Always respond with valid JSON.",
user=f"""Extract the following from this CV. Respond with JSON only.
Fields: name (string), years_experience (int), skills (list of strings),
education (string, highest degree), previous_roles (list of job titles).
CV:
{input_data.raw_text}""",
)
data = json.loads(raw)
return ExtractedProfile(**data)
JOB_REQUIREMENTS = """
Required: 3+ years Python, REST API design, PostgreSQL, AWS
Preferred: FastAPI, Docker, experience with financial systems
Nice to have: Kafka, TypeScript
"""
class ScoringAgent(Agent[ExtractedProfile, ScoredProfile]):
def _run(self, input_data: ExtractedProfile, ctx: PipelineContext) -> ScoredProfile:
raw = self.call_claude(
system="You are an objective technical recruiter. Respond with valid JSON.",
user=f"""Score this candidate against our requirements.
Job Requirements:
{JOB_REQUIREMENTS}
Candidate Profile:
{json.dumps(input_data.__dict__, indent=2)}
Respond with JSON: {{ "score": 0-100, "strengths": [...], "gaps": [...] }}""",
)
data = json.loads(raw)
return ScoredProfile(
profile=input_data,
score=data["score"],
strengths=data["strengths"],
gaps=data["gaps"],
)
class SummaryAgent(Agent[ScoredProfile, RecruiterSummary]):
def _run(self, input_data: ScoredProfile, ctx: PipelineContext) -> RecruiterSummary:
raw = self.call_claude(
system="Write clear, concise recruiter summaries. Respond with valid JSON.",
user=f"""Write a recruiter summary for this candidate.
Score: {input_data.score}/100
Strengths: {input_data.strengths}
Gaps: {input_data.gaps}
Determine: recommendation ("advance" if score >= 70, "hold" if 50-69, "reject" if below 50)
Write a 2-3 sentence summary explaining the recommendation in plain language.
Respond with JSON: {{ "recommendation": "...", "summary": "..." }}""",
)
data = json.loads(raw)
return RecruiterSummary(
scored=input_data,
recommendation=data["recommendation"],
summary=data["summary"],
)
Wiring the pipeline
def screen_candidate(cv_text: str) -> RecruiterSummary:
ctx = PipelineContext()
extractor = ExtractionAgent("extractor")
scorer = ScoringAgent("scorer")
summariser = SummaryAgent("summariser")
cv = CVData(raw_text=cv_text)
profile = extractor.run(cv, ctx)
scored = scorer.run(profile, ctx)
summary = summariser.run(scored, ctx)
# ctx.trace has a full audit log of every step
return summary
# Usage
result = screen_candidate(open("cv.txt").read())
print(result.recommendation) # "advance"
print(result.summary) # "Alice has 6 years of Python experience..."
Parallel agents
When steps are independent, run them in parallel using asyncio.gather:
import asyncio
async def run_parallel_checks(cv_text: str) -> dict:
async def run_agent(agent, input_data, ctx):
return await asyncio.to_thread(agent.run, input_data, ctx)
ctx = PipelineContext()
cv = CVData(raw_text=cv_text)
# Run technical and culture fit checks in parallel
tech_agent = TechnicalFitAgent("tech")
culture_agent = CultureFitAgent("culture")
tech_result, culture_result = await asyncio.gather(
run_agent(tech_agent, cv, ctx),
run_agent(culture_agent, cv, ctx),
)
return {"technical": tech_result, "culture": culture_result, "trace": ctx.trace}
What makes a good agent boundary
The single most important decision in pipeline design is where to draw agent boundaries. Good boundaries have:
- A clear, testable output contract: The output is structured (JSON, dataclass) and can be validated before passing to the next step
- A single responsibility: The agent does one thing — extract, score, summarise, validate, enrich
- Minimal context requirements: The agent should not need to know what came before it in the pipeline
Bad agent boundaries: splitting a single coherent reasoning task in half, creating agents so narrow they need constant back-and-forth with neighbours, or putting validation logic in multiple agents.
The pipeline above screens hundreds of CVs daily in production. Each agent runs in 2-4 seconds. The extraction agent was the first one we improved — because it had a clear input (raw CV text), a clear output (structured JSON), and a clear failure mode (invalid JSON). That is the kind of problem you can iterate on quickly.