← Back
April 4, 2026NodeJS7 min read
Node.js streams: processing gigabytes of data without loading it into memory
Published April 4, 20267 min read
I inherited a script that loaded a 2GB CSV into memory, processed it, and wrote the output to another file. It crashed with OOM on most machines. Rewriting it with Node.js streams took the memory usage from 2GB to under 20MB. Streams are one of the most powerful and underused APIs in Node.js.
The core concept: piping
typescript
import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { Transform } from 'stream';
import { createGzip } from 'zlib';
// Compress a 2GB file without loading it into memory
await pipeline(
createReadStream('large-file.csv'), // Read chunks
createGzip(), // Compress each chunk
createWriteStream('large-file.csv.gz') // Write compressed chunks
);
// Memory used: ~64KB (the buffer size per chunk)
Custom Transform streams
typescript
import { Transform, TransformCallback } from 'stream';
class CSVParser extends Transform {
private buffer = '';
private headers: string[] = [];
private isFirstLine = true;
constructor() {
super({ objectMode: true }); // Output objects, not Buffers
}
_transform(chunk: Buffer, _encoding: string, callback: TransformCallback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('
');
// Keep the incomplete last line in buffer
this.buffer = lines.pop() ?? '';
for (const line of lines) {
if (!line.trim()) continue;
if (this.isFirstLine) {
this.headers = line.split(',').map(h => h.trim());
this.isFirstLine = false;
continue;
}
const values = line.split(',');
const row = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]?.trim() ?? ''])
);
this.push(row); // Emit one object per line
}
callback();
}
_flush(callback: TransformCallback) {
// Process any remaining buffer content
if (this.buffer.trim()) {
const values = this.buffer.split(',');
const row = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]?.trim() ?? ''])
);
this.push(row);
}
callback();
}
}
class RecordFilter extends Transform {
constructor(private predicate: (row: Record) => boolean) {
super({ objectMode: true });
}
_transform(row: Record, _enc: string, callback: TransformCallback) {
if (this.predicate(row)) {
this.push(row);
}
callback();
}
}
class JSONLinesSerializer extends Transform {
constructor() {
super({ writableObjectMode: true }); // Input is objects, output is text
}
_transform(row: Record, _enc: string, callback: TransformCallback) {
this.push(JSON.stringify(row) + '
');
callback();
}
}
Putting it all together
typescript
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
async function processCSV(inputPath: string, outputPath: string): Promise {
let processedCount = 0;
const counter = new Transform({
objectMode: true,
transform(row, _enc, callback) {
processedCount++;
if (processedCount % 10000 === 0) {
console.log(`Processed ${processedCount} rows...`);
}
this.push(row);
callback();
}
});
await pipeline(
createReadStream(inputPath),
new CSVParser(),
new RecordFilter(row => row.status === 'completed'),
counter,
new JSONLinesSerializer(),
createWriteStream(outputPath)
);
console.log(`Done: ${processedCount} rows processed`);
}
Streaming HTTP responses
typescript
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { createGzip } from 'zlib';
import express from 'express';
const app = express();
app.get('/download/report', (req, res) => {
// Stream a large file directly to the response
// Never loads the whole file into memory
res.setHeader('Content-Type', 'application/gzip');
res.setHeader('Content-Disposition', 'attachment; filename="report.csv.gz"');
pipeline(
createReadStream('/data/large-report.csv'),
createGzip(),
res
).catch(err => {
if (!res.headersSent) {
res.status(500).json({ error: 'Stream failed' });
}
});
});
// Streaming with async generator (Node 16+)
async function* generateRows() {
let page = 0;
while (true) {
const rows = await db.fetchPage(page++, 1000);
if (rows.length === 0) break;
for (const row of rows) yield JSON.stringify(row) + '
';
}
}
app.get('/export', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
for await (const line of generateRows()) {
res.write(line);
}
res.end();
});
Streams are the right tool whenever you process data that does not all fit in memory, or when you want to start sending results to a client before all the data is ready. The pipeline API (in stream/promises) handles backpressure automatically — it pauses upstream sources when downstream consumers are slow, preventing memory buildup. This is the "free" performance and memory management that most Node.js developers leave on the table.
Share this
← All Posts7 min read