Node.js streams: processing gigabytes of data without loading it into memory
← Back
April 4, 2026NodeJS7 min read

Node.js streams: processing gigabytes of data without loading it into memory

Published April 4, 20267 min read

I inherited a script that loaded a 2GB CSV into memory, processed it, and wrote the output to another file. It crashed with OOM on most machines. Rewriting it with Node.js streams took the memory usage from 2GB to under 20MB. Streams are one of the most powerful and underused APIs in Node.js.

The core concept: piping

typescript
import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { Transform } from 'stream';
import { createGzip } from 'zlib';

// Compress a 2GB file without loading it into memory
await pipeline(
  createReadStream('large-file.csv'),      // Read chunks
  createGzip(),                             // Compress each chunk
  createWriteStream('large-file.csv.gz')   // Write compressed chunks
);
// Memory used: ~64KB (the buffer size per chunk)

Custom Transform streams

typescript
import { Transform, TransformCallback } from 'stream';

class CSVParser extends Transform {
  private buffer = '';
  private headers: string[] = [];
  private isFirstLine = true;

  constructor() {
    super({ objectMode: true }); // Output objects, not Buffers
  }

  _transform(chunk: Buffer, _encoding: string, callback: TransformCallback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('
');
    
    // Keep the incomplete last line in buffer
    this.buffer = lines.pop() ?? '';
    
    for (const line of lines) {
      if (!line.trim()) continue;
      
      if (this.isFirstLine) {
        this.headers = line.split(',').map(h => h.trim());
        this.isFirstLine = false;
        continue;
      }
      
      const values = line.split(',');
      const row = Object.fromEntries(
        this.headers.map((h, i) => [h, values[i]?.trim() ?? ''])
      );
      
      this.push(row); // Emit one object per line
    }
    
    callback();
  }

  _flush(callback: TransformCallback) {
    // Process any remaining buffer content
    if (this.buffer.trim()) {
      const values = this.buffer.split(',');
      const row = Object.fromEntries(
        this.headers.map((h, i) => [h, values[i]?.trim() ?? ''])
      );
      this.push(row);
    }
    callback();
  }
}

class RecordFilter extends Transform {
  constructor(private predicate: (row: Record) => boolean) {
    super({ objectMode: true });
  }

  _transform(row: Record, _enc: string, callback: TransformCallback) {
    if (this.predicate(row)) {
      this.push(row);
    }
    callback();
  }
}

class JSONLinesSerializer extends Transform {
  constructor() {
    super({ writableObjectMode: true }); // Input is objects, output is text
  }

  _transform(row: Record, _enc: string, callback: TransformCallback) {
    this.push(JSON.stringify(row) + '
');
    callback();
  }
}

Putting it all together

typescript
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';

async function processCSV(inputPath: string, outputPath: string): Promise {
  let processedCount = 0;
  
  const counter = new Transform({
    objectMode: true,
    transform(row, _enc, callback) {
      processedCount++;
      if (processedCount % 10000 === 0) {
        console.log(`Processed ${processedCount} rows...`);
      }
      this.push(row);
      callback();
    }
  });
  
  await pipeline(
    createReadStream(inputPath),
    new CSVParser(),
    new RecordFilter(row => row.status === 'completed'),
    counter,
    new JSONLinesSerializer(),
    createWriteStream(outputPath)
  );
  
  console.log(`Done: ${processedCount} rows processed`);
}

Streaming HTTP responses

typescript
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { createGzip } from 'zlib';
import express from 'express';

const app = express();

app.get('/download/report', (req, res) => {
  // Stream a large file directly to the response
  // Never loads the whole file into memory
  res.setHeader('Content-Type', 'application/gzip');
  res.setHeader('Content-Disposition', 'attachment; filename="report.csv.gz"');
  
  pipeline(
    createReadStream('/data/large-report.csv'),
    createGzip(),
    res
  ).catch(err => {
    if (!res.headersSent) {
      res.status(500).json({ error: 'Stream failed' });
    }
  });
});

// Streaming with async generator (Node 16+)
async function* generateRows() {
  let page = 0;
  while (true) {
    const rows = await db.fetchPage(page++, 1000);
    if (rows.length === 0) break;
    for (const row of rows) yield JSON.stringify(row) + '
';
  }
}

app.get('/export', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');
  for await (const line of generateRows()) {
    res.write(line);
  }
  res.end();
});

Streams are the right tool whenever you process data that does not all fit in memory, or when you want to start sending results to a client before all the data is ready. The pipeline API (in stream/promises) handles backpressure automatically — it pauses upstream sources when downstream consumers are slow, preventing memory buildup. This is the "free" performance and memory management that most Node.js developers leave on the table.

Share this
← All Posts7 min read