I use Claude to design data pipelines from rough specs in one afternoon
A product manager dropped a two-paragraph spec on my desk: "We need to ingest daily sales data from three CSV sources and make it queryable in Postgres by 8am." Previously I would spend a day designing the schema, error handling, and monitoring strategy. Now I spend an hour with Claude and the rest of the day implementing. Here is the exact conversation structure that works.
The three-phase Claude workflow
Trying to get the full pipeline design in one prompt produces generic output. The iterative approach produces something you can actually implement.
Phase 1: Clarify requirements
Prompt 1:
I need to design an ETL pipeline with this requirement:
"Daily CSV ingestion from three sales sources (Shopify, WooCommerce, Stripe)
into Postgres. Must complete by 8am. Historical backfill needed for 2 years."
Before we design anything, what questions do you need answered about:
- Data volume and frequency
- Error handling requirements
- Idempotency needs
- Schema requirements
- Monitoring and alerting
Claude will ask 8-10 clarifying questions. Answer them — this forces you to think through requirements you might otherwise discover during implementation.
Phase 2: Schema design
Prompt 2 (after answering clarifying questions):
Design the Postgres schema for this pipeline.
Requirements from Phase 1:
- ~50,000 orders/day across all sources
- Must be idempotent (safe to re-run if it fails midway)
- Need to track ingestion history and detect anomalies
- Must support historical queries by date and source
Design:
1. The core order schema (normalized, source-agnostic)
2. Source-specific staging tables
3. An ingestion metadata table to track runs
4. Any useful indexes
Claude produces:
-- Staging table (one per source, temporary)
CREATE TABLE staging_orders_shopify (
raw_id TEXT NOT NULL,
source_data JSONB NOT NULL,
ingested_at TIMESTAMPTZ DEFAULT NOW(),
run_id UUID NOT NULL -- Links to ingestion_runs
);
-- Core normalized orders
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source TEXT NOT NULL, -- 'shopify', 'woocommerce', 'stripe'
source_order_id TEXT NOT NULL,
customer_email TEXT,
amount_cents INTEGER NOT NULL,
currency CHAR(3) NOT NULL,
ordered_at TIMESTAMPTZ NOT NULL,
raw_data JSONB, -- Preserve original for reprocessing
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source, source_order_id) -- Idempotency key
);
-- Ingestion run tracking
CREATE TABLE ingestion_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source TEXT NOT NULL,
run_date DATE NOT NULL,
status TEXT NOT NULL DEFAULT 'running', -- running, completed, failed
rows_ingested INTEGER,
rows_failed INTEGER,
error_message TEXT,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
UNIQUE (source, run_date) -- Prevent double runs
);
CREATE INDEX idx_orders_source_date ON orders (source, ordered_at DESC);
CREATE INDEX idx_orders_email ON orders (customer_email) WHERE customer_email IS NOT NULL;
Phase 3: Pipeline implementation
Prompt 3:
Now write the Python ETL pipeline for the Shopify source using:
- The schema we designed
- pandas for transformation
- asyncpg for database writes
- Error handling that marks failed rows without stopping the run
- Progress logging at every major step
import asyncio
import uuid
import asyncpg
import pandas as pd
from datetime import date, datetime
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
async def ingest_shopify(
csv_path: str,
run_date: date,
db_url: str,
) -> dict:
run_id = uuid.uuid4()
conn = await asyncpg.connect(db_url)
try:
# Register this run
await conn.execute(
"INSERT INTO ingestion_runs (id, source, run_date) VALUES ($1, $2, $3)"
" ON CONFLICT (source, run_date) DO UPDATE SET status = 'running', started_at = NOW()",
run_id, 'shopify', run_date
)
# Load and validate CSV
df = pd.read_csv(csv_path)
required_cols = {'id', 'email', 'total_price', 'currency', 'created_at'}
missing = required_cols - set(df.columns)
if missing:
raise ValueError(f"CSV missing columns: {missing}")
logger.info("Loaded %d rows from %s", len(df), csv_path)
# Transform
orders = []
failed = []
for _, row in df.iterrows():
try:
orders.append((
'shopify',
str(row['id']),
row.get('email'),
int(float(row['total_price']) * 100), # To cents
row['currency'].upper()[:3],
pd.to_datetime(row['created_at']),
row.to_json(),
))
except Exception as e:
failed.append({'row': row.to_dict(), 'error': str(e)})
# Bulk upsert (idempotent)
ingested = await conn.executemany(
"""INSERT INTO orders
(source, source_order_id, customer_email, amount_cents, currency, ordered_at, raw_data)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (source, source_order_id) DO UPDATE
SET amount_cents = EXCLUDED.amount_cents""",
orders
)
# Mark run complete
await conn.execute(
"""UPDATE ingestion_runs SET status = 'completed',
rows_ingested = $2, rows_failed = $3, completed_at = NOW()
WHERE id = $1""",
run_id, len(orders), len(failed)
)
logger.info("Ingested %d rows, %d failed", len(orders), len(failed))
return {"ingested": len(orders), "failed": len(failed), "errors": failed}
except Exception as e:
await conn.execute(
"UPDATE ingestion_runs SET status = 'failed', error_message = $2 WHERE id = $1",
run_id, str(e)
)
raise
finally:
await conn.close()
The design quality difference
What makes the Claude-designed pipeline better than what I would have produced alone in an afternoon: the idempotency design (the ON CONFLICT DO UPDATE and the unique constraint on source + run_date), the staging run table that lets you diagnose what ran when, and the separation of raw JSONB storage from the normalized columns for future reprocessing.
These are all things I would eventually add — after the first production incident. The iterative Claude conversation surfaces them before you start writing code.