I use Claude to design data pipelines from rough specs in one afternoon
← Back
April 4, 2026Claude7 min read

I use Claude to design data pipelines from rough specs in one afternoon

Published April 4, 20267 min read

A product manager dropped a two-paragraph spec on my desk: "We need to ingest daily sales data from three CSV sources and make it queryable in Postgres by 8am." Previously I would spend a day designing the schema, error handling, and monitoring strategy. Now I spend an hour with Claude and the rest of the day implementing. Here is the exact conversation structure that works.

The three-phase Claude workflow

Trying to get the full pipeline design in one prompt produces generic output. The iterative approach produces something you can actually implement.

Phase 1: Clarify requirements

text
Prompt 1:
I need to design an ETL pipeline with this requirement:

"Daily CSV ingestion from three sales sources (Shopify, WooCommerce, Stripe)
into Postgres. Must complete by 8am. Historical backfill needed for 2 years."

Before we design anything, what questions do you need answered about:
- Data volume and frequency
- Error handling requirements
- Idempotency needs
- Schema requirements
- Monitoring and alerting

Claude will ask 8-10 clarifying questions. Answer them — this forces you to think through requirements you might otherwise discover during implementation.

Phase 2: Schema design

text
Prompt 2 (after answering clarifying questions):
Design the Postgres schema for this pipeline.

Requirements from Phase 1:
- ~50,000 orders/day across all sources
- Must be idempotent (safe to re-run if it fails midway)
- Need to track ingestion history and detect anomalies
- Must support historical queries by date and source

Design:
1. The core order schema (normalized, source-agnostic)
2. Source-specific staging tables
3. An ingestion metadata table to track runs
4. Any useful indexes

Claude produces:

sql
-- Staging table (one per source, temporary)
CREATE TABLE staging_orders_shopify (
    raw_id TEXT NOT NULL,
    source_data JSONB NOT NULL,
    ingested_at TIMESTAMPTZ DEFAULT NOW(),
    run_id UUID NOT NULL  -- Links to ingestion_runs
);

-- Core normalized orders
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    source TEXT NOT NULL,           -- 'shopify', 'woocommerce', 'stripe'
    source_order_id TEXT NOT NULL,
    customer_email TEXT,
    amount_cents INTEGER NOT NULL,
    currency CHAR(3) NOT NULL,
    ordered_at TIMESTAMPTZ NOT NULL,
    raw_data JSONB,                 -- Preserve original for reprocessing
    created_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (source, source_order_id)  -- Idempotency key
);

-- Ingestion run tracking
CREATE TABLE ingestion_runs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    source TEXT NOT NULL,
    run_date DATE NOT NULL,
    status TEXT NOT NULL DEFAULT 'running',  -- running, completed, failed
    rows_ingested INTEGER,
    rows_failed INTEGER,
    error_message TEXT,
    started_at TIMESTAMPTZ DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    UNIQUE (source, run_date)  -- Prevent double runs
);

CREATE INDEX idx_orders_source_date ON orders (source, ordered_at DESC);
CREATE INDEX idx_orders_email ON orders (customer_email) WHERE customer_email IS NOT NULL;

Phase 3: Pipeline implementation

text
Prompt 3:
Now write the Python ETL pipeline for the Shopify source using:
- The schema we designed
- pandas for transformation
- asyncpg for database writes
- Error handling that marks failed rows without stopping the run
- Progress logging at every major step
python
import asyncio
import uuid
import asyncpg
import pandas as pd
from datetime import date, datetime
from pathlib import Path
import logging

logger = logging.getLogger(__name__)


async def ingest_shopify(
    csv_path: str,
    run_date: date,
    db_url: str,
) -> dict:
    run_id = uuid.uuid4()
    conn = await asyncpg.connect(db_url)
    
    try:
        # Register this run
        await conn.execute(
            "INSERT INTO ingestion_runs (id, source, run_date) VALUES ($1, $2, $3)"
            " ON CONFLICT (source, run_date) DO UPDATE SET status = 'running', started_at = NOW()",
            run_id, 'shopify', run_date
        )
        
        # Load and validate CSV
        df = pd.read_csv(csv_path)
        required_cols = {'id', 'email', 'total_price', 'currency', 'created_at'}
        missing = required_cols - set(df.columns)
        if missing:
            raise ValueError(f"CSV missing columns: {missing}")
        
        logger.info("Loaded %d rows from %s", len(df), csv_path)
        
        # Transform
        orders = []
        failed = []
        
        for _, row in df.iterrows():
            try:
                orders.append((
                    'shopify',
                    str(row['id']),
                    row.get('email'),
                    int(float(row['total_price']) * 100),  # To cents
                    row['currency'].upper()[:3],
                    pd.to_datetime(row['created_at']),
                    row.to_json(),
                ))
            except Exception as e:
                failed.append({'row': row.to_dict(), 'error': str(e)})
        
        # Bulk upsert (idempotent)
        ingested = await conn.executemany(
            """INSERT INTO orders 
               (source, source_order_id, customer_email, amount_cents, currency, ordered_at, raw_data)
               VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
               ON CONFLICT (source, source_order_id) DO UPDATE
               SET amount_cents = EXCLUDED.amount_cents""",
            orders
        )
        
        # Mark run complete
        await conn.execute(
            """UPDATE ingestion_runs SET status = 'completed', 
               rows_ingested = $2, rows_failed = $3, completed_at = NOW()
               WHERE id = $1""",
            run_id, len(orders), len(failed)
        )
        
        logger.info("Ingested %d rows, %d failed", len(orders), len(failed))
        return {"ingested": len(orders), "failed": len(failed), "errors": failed}
        
    except Exception as e:
        await conn.execute(
            "UPDATE ingestion_runs SET status = 'failed', error_message = $2 WHERE id = $1",
            run_id, str(e)
        )
        raise
    finally:
        await conn.close()

The design quality difference

What makes the Claude-designed pipeline better than what I would have produced alone in an afternoon: the idempotency design (the ON CONFLICT DO UPDATE and the unique constraint on source + run_date), the staging run table that lets you diagnose what ran when, and the separation of raw JSONB storage from the normalized columns for future reprocessing.

These are all things I would eventually add — after the first production incident. The iterative Claude conversation surfaces them before you start writing code.

Share this
← All Posts7 min read