Node.js & Express

Streams & Buffers in Node.js

20 min Lesson 29 of 40

Streams & Buffers in Node.js

Streams and buffers are fundamental concepts in Node.js for handling data efficiently, especially when working with large files, network requests, or real-time data. Understanding them is crucial for building high-performance applications.

What is a Buffer?

A Buffer is a temporary storage area for binary data. Think of it as a fixed-size chunk of memory allocated outside the V8 JavaScript engine. Buffers are used to handle raw binary data directly.

// Creating buffers const buf1 = Buffer.from('Hello World'); const buf2 = Buffer.alloc(10); // 10 bytes initialized to 0 const buf3 = Buffer.allocUnsafe(10); // 10 bytes, faster but may contain old data console.log(buf1); // <Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64> console.log(buf1.toString()); // 'Hello World' console.log(buf1.length); // 11 bytes
Note: Buffer.allocUnsafe() is faster but may contain old data. Always initialize it before use if security is a concern.

Working with Buffers

const buf = Buffer.from('Hello'); // Reading from buffer console.log(buf[0]); // 72 (ASCII code for 'H') console.log(buf.toString()); // 'Hello' console.log(buf.toString('hex')); // '48656c6c6f' console.log(buf.toString('base64')); // 'SGVsbG8=' // Writing to buffer buf[0] = 74; // Change 'H' to 'J' (ASCII 74) console.log(buf.toString()); // 'Jello' // Buffer operations const buf1 = Buffer.from('Hello '); const buf2 = Buffer.from('World'); const buf3 = Buffer.concat([buf1, buf2]); console.log(buf3.toString()); // 'Hello World' // Comparing buffers const bufA = Buffer.from('ABC'); const bufB = Buffer.from('ABC'); const bufC = Buffer.from('DEF'); console.log(bufA.equals(bufB)); // true console.log(bufA.equals(bufC)); // false console.log(Buffer.compare(bufA, bufC)); // -1 (bufA comes before bufC)

What are Streams?

Streams are collections of data that might not be available all at once. They allow you to process data piece by piece (chunks) without loading everything into memory. This is especially useful for large files or real-time data.

Types of Streams:

  • Readable: Streams you can read from (e.g., fs.createReadStream)
  • Writable: Streams you can write to (e.g., fs.createWriteStream)
  • Duplex: Streams that are both readable and writable (e.g., TCP socket)
  • Transform: Duplex streams that modify data as it's read or written (e.g., compression)

Readable Streams

const fs = require('fs'); // Create a readable stream const readStream = fs.createReadStream('./large-file.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 // 64KB chunks (default is 64KB) }); // Event: 'data' - fired when a chunk is available readStream.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes`); console.log(chunk); }); // Event: 'end' - fired when no more data readStream.on('end', () => { console.log('Finished reading'); }); // Event: 'error' - fired on error readStream.on('error', (error) => { console.error('Error:', error.message); });

Writable Streams

const fs = require('fs'); // Create a writable stream const writeStream = fs.createWriteStream('./output.txt'); // Write data writeStream.write('First line\n'); writeStream.write('Second line\n'); writeStream.write('Third line\n'); // Close the stream writeStream.end('Final line\n'); writeStream.on('finish', () => { console.log('All data written'); }); writeStream.on('error', (error) => { console.error('Error:', error.message); });

Piping Streams

Piping is a mechanism to connect a readable stream to a writable stream automatically. It handles backpressure and errors for you.

const fs = require('fs'); const readStream = fs.createReadStream('./input.txt'); const writeStream = fs.createWriteStream('./output.txt'); // Pipe readable stream to writable stream readStream.pipe(writeStream); writeStream.on('finish', () => { console.log('File copied successfully'); });
Tip: Piping is much more efficient than reading the entire file into memory and then writing it. It processes data in chunks.

Chaining Pipes

const fs = require('fs'); const zlib = require('zlib'); // Read file -> Compress -> Write compressed file fs.createReadStream('./input.txt') .pipe(zlib.createGzip()) .pipe(fs.createWriteStream('./input.txt.gz')) .on('finish', () => { console.log('File compressed successfully'); }); // Decompress -> Read -> Write fs.createReadStream('./input.txt.gz') .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream('./decompressed.txt')) .on('finish', () => { console.log('File decompressed successfully'); });

Duplex Streams

Duplex streams implement both readable and writable interfaces. Network sockets are a common example:

const { Duplex } = require('stream'); const duplexStream = new Duplex({ read(size) { this.push('Data from readable side\n'); this.push(null); // No more data }, write(chunk, encoding, callback) { console.log(`Writing: ${chunk.toString()}`); callback(); } }); // Write to stream duplexStream.write('Hello'); duplexStream.end(); // Read from stream duplexStream.on('data', (chunk) => { console.log(`Reading: ${chunk.toString()}`); });

Transform Streams

Transform streams are duplex streams that modify data as it passes through:

const { Transform } = require('stream'); // Transform to uppercase const uppercaseTransform = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); // Use the transform stream process.stdin .pipe(uppercaseTransform) .pipe(process.stdout); // Type something and it will be printed in uppercase
// More practical example: CSV parser const { Transform } = require('stream'); class CSVParser extends Transform { constructor() { super({ objectMode: true }); this.headers = null; } _transform(chunk, encoding, callback) { const lines = chunk.toString().split('\n'); lines.forEach((line, index) => { if (!line.trim()) return; if (!this.headers) { this.headers = line.split(','); } else { const values = line.split(','); const row = {}; this.headers.forEach((header, i) => { row[header.trim()] = values[i]?.trim(); }); this.push(row); } }); callback(); } } // Usage const fs = require('fs'); const parser = new CSVParser(); fs.createReadStream('./data.csv') .pipe(parser) .on('data', (row) => { console.log('Parsed row:', row); });

Stream Backpressure

Backpressure occurs when data is written to a stream faster than it can be consumed. Node.js handles this automatically, but you should be aware of it:

const fs = require('fs'); const writeStream = fs.createWriteStream('./output.txt'); function writeMillionLines() { let i = 0; function write() { let canContinue = true; while (i < 1000000 && canContinue) { canContinue = writeStream.write(`Line ${i}\n`); i++; } if (i < 1000000) { // Wait for 'drain' event if buffer is full writeStream.once('drain', write); } else { writeStream.end(); } } write(); } writeMillionLines(); writeStream.on('finish', () => { console.log('Finished writing 1 million lines'); });
Warning: Ignoring backpressure can lead to high memory usage. Always check the return value of write() and wait for the drain event if it returns false.

Practical Stream Examples

1. Processing Large Log Files

const fs = require('fs'); const readline = require('readline'); const rl = readline.createInterface({ input: fs.createReadStream('./access.log'), crlfDelay: Infinity }); let errorCount = 0; rl.on('line', (line) => { if (line.includes('ERROR')) { errorCount++; } }); rl.on('close', () => { console.log(`Found ${errorCount} errors`); });

2. Streaming HTTP Response

const http = require('http'); const fs = require('fs'); const server = http.createServer((req, res) => { if (req.url === '/video') { res.writeHead(200, { 'Content-Type': 'video/mp4' }); // Stream video file instead of loading into memory const videoStream = fs.createReadStream('./movie.mp4'); videoStream.pipe(res); videoStream.on('error', (error) => { res.statusCode = 500; res.end('Error streaming video'); }); } else { res.statusCode = 404; res.end('Not Found'); } }); server.listen(3000);

3. Upload Progress Tracking

const express = require('express'); const fs = require('fs'); const app = express(); app.post('/upload', (req, res) => { const writeStream = fs.createWriteStream('./uploaded-file.dat'); let totalBytes = 0; req.on('data', (chunk) => { totalBytes += chunk.length; console.log(`Received: ${totalBytes} bytes`); }); req.pipe(writeStream); writeStream.on('finish', () => { res.json({ message: 'Upload complete', size: totalBytes }); }); writeStream.on('error', (error) => { res.status(500).json({ error: error.message }); }); }); app.listen(3000);

Stream Performance Benefits

// BAD: Loading entire file into memory const fs = require('fs'); fs.readFile('./huge-file.txt', (err, data) => { if (err) throw err; // If file is 1GB, this uses 1GB of RAM console.log(data.toString()); }); // GOOD: Streaming the file const readStream = fs.createReadStream('./huge-file.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 // Process 64KB at a time }); readStream.on('data', (chunk) => { // Only 64KB in memory at a time console.log(chunk); });

Practice Exercise:

Create a Node.js application with the following features:

  • Read a large CSV file using streams (don't load into memory)
  • Transform each row: convert names to uppercase, calculate age from birthdate
  • Filter rows: only include people older than 18
  • Write results to a new CSV file using streams
  • Track and display progress (lines processed, memory usage)
  • Handle errors gracefully

Summary

In this lesson, you learned:

  • What buffers are and how to work with binary data
  • The four types of streams: Readable, Writable, Duplex, Transform
  • Reading and writing streams with events
  • Piping streams for efficient data processing
  • Understanding and handling backpressure
  • Creating custom transform streams
  • Practical applications of streams for large files and real-time data
  • Performance benefits of using streams over loading entire files