Streams & Buffers in Node.js
Streams and buffers are fundamental concepts in Node.js for handling data efficiently, especially when working with large files, network requests, or real-time data. Understanding them is crucial for building high-performance applications.
What is a Buffer?
A Buffer is a temporary storage area for binary data. Think of it as a fixed-size chunk of memory allocated outside the V8 JavaScript engine. Buffers are used to handle raw binary data directly.
// Creating buffers
const buf1 = Buffer.from('Hello World');
const buf2 = Buffer.alloc(10); // 10 bytes initialized to 0
const buf3 = Buffer.allocUnsafe(10); // 10 bytes, faster but may contain old data
console.log(buf1); // <Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64>
console.log(buf1.toString()); // 'Hello World'
console.log(buf1.length); // 11 bytes
Note: Buffer.allocUnsafe() is faster but may contain old data. Always initialize it before use if security is a concern.
Working with Buffers
const buf = Buffer.from('Hello');
// Reading from buffer
console.log(buf[0]); // 72 (ASCII code for 'H')
console.log(buf.toString()); // 'Hello'
console.log(buf.toString('hex')); // '48656c6c6f'
console.log(buf.toString('base64')); // 'SGVsbG8='
// Writing to buffer
buf[0] = 74; // Change 'H' to 'J' (ASCII 74)
console.log(buf.toString()); // 'Jello'
// Buffer operations
const buf1 = Buffer.from('Hello ');
const buf2 = Buffer.from('World');
const buf3 = Buffer.concat([buf1, buf2]);
console.log(buf3.toString()); // 'Hello World'
// Comparing buffers
const bufA = Buffer.from('ABC');
const bufB = Buffer.from('ABC');
const bufC = Buffer.from('DEF');
console.log(bufA.equals(bufB)); // true
console.log(bufA.equals(bufC)); // false
console.log(Buffer.compare(bufA, bufC)); // -1 (bufA comes before bufC)
What are Streams?
Streams are collections of data that might not be available all at once. They allow you to process data piece by piece (chunks) without loading everything into memory. This is especially useful for large files or real-time data.
Types of Streams:
- Readable: Streams you can read from (e.g., fs.createReadStream)
- Writable: Streams you can write to (e.g., fs.createWriteStream)
- Duplex: Streams that are both readable and writable (e.g., TCP socket)
- Transform: Duplex streams that modify data as it's read or written (e.g., compression)
Readable Streams
const fs = require('fs');
// Create a readable stream
const readStream = fs.createReadStream('./large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks (default is 64KB)
});
// Event: 'data' - fired when a chunk is available
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
console.log(chunk);
});
// Event: 'end' - fired when no more data
readStream.on('end', () => {
console.log('Finished reading');
});
// Event: 'error' - fired on error
readStream.on('error', (error) => {
console.error('Error:', error.message);
});
Writable Streams
const fs = require('fs');
// Create a writable stream
const writeStream = fs.createWriteStream('./output.txt');
// Write data
writeStream.write('First line\n');
writeStream.write('Second line\n');
writeStream.write('Third line\n');
// Close the stream
writeStream.end('Final line\n');
writeStream.on('finish', () => {
console.log('All data written');
});
writeStream.on('error', (error) => {
console.error('Error:', error.message);
});
Piping Streams
Piping is a mechanism to connect a readable stream to a writable stream automatically. It handles backpressure and errors for you.
const fs = require('fs');
const readStream = fs.createReadStream('./input.txt');
const writeStream = fs.createWriteStream('./output.txt');
// Pipe readable stream to writable stream
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('File copied successfully');
});
Tip: Piping is much more efficient than reading the entire file into memory and then writing it. It processes data in chunks.
Chaining Pipes
const fs = require('fs');
const zlib = require('zlib');
// Read file -> Compress -> Write compressed file
fs.createReadStream('./input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('./input.txt.gz'))
.on('finish', () => {
console.log('File compressed successfully');
});
// Decompress -> Read -> Write
fs.createReadStream('./input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('./decompressed.txt'))
.on('finish', () => {
console.log('File decompressed successfully');
});
Duplex Streams
Duplex streams implement both readable and writable interfaces. Network sockets are a common example:
const { Duplex } = require('stream');
const duplexStream = new Duplex({
read(size) {
this.push('Data from readable side\n');
this.push(null); // No more data
},
write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
callback();
}
});
// Write to stream
duplexStream.write('Hello');
duplexStream.end();
// Read from stream
duplexStream.on('data', (chunk) => {
console.log(`Reading: ${chunk.toString()}`);
});
Transform Streams
Transform streams are duplex streams that modify data as it passes through:
const { Transform } = require('stream');
// Transform to uppercase
const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Use the transform stream
process.stdin
.pipe(uppercaseTransform)
.pipe(process.stdout);
// Type something and it will be printed in uppercase
// More practical example: CSV parser
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line, index) => {
if (!line.trim()) return;
if (!this.headers) {
this.headers = line.split(',');
} else {
const values = line.split(',');
const row = {};
this.headers.forEach((header, i) => {
row[header.trim()] = values[i]?.trim();
});
this.push(row);
}
});
callback();
}
}
// Usage
const fs = require('fs');
const parser = new CSVParser();
fs.createReadStream('./data.csv')
.pipe(parser)
.on('data', (row) => {
console.log('Parsed row:', row);
});
Stream Backpressure
Backpressure occurs when data is written to a stream faster than it can be consumed. Node.js handles this automatically, but you should be aware of it:
const fs = require('fs');
const writeStream = fs.createWriteStream('./output.txt');
function writeMillionLines() {
let i = 0;
function write() {
let canContinue = true;
while (i < 1000000 && canContinue) {
canContinue = writeStream.write(`Line ${i}\n`);
i++;
}
if (i < 1000000) {
// Wait for 'drain' event if buffer is full
writeStream.once('drain', write);
} else {
writeStream.end();
}
}
write();
}
writeMillionLines();
writeStream.on('finish', () => {
console.log('Finished writing 1 million lines');
});
Warning: Ignoring backpressure can lead to high memory usage. Always check the return value of write() and wait for the drain event if it returns false.
Practical Stream Examples
1. Processing Large Log Files
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('./access.log'),
crlfDelay: Infinity
});
let errorCount = 0;
rl.on('line', (line) => {
if (line.includes('ERROR')) {
errorCount++;
}
});
rl.on('close', () => {
console.log(`Found ${errorCount} errors`);
});
2. Streaming HTTP Response
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
if (req.url === '/video') {
res.writeHead(200, { 'Content-Type': 'video/mp4' });
// Stream video file instead of loading into memory
const videoStream = fs.createReadStream('./movie.mp4');
videoStream.pipe(res);
videoStream.on('error', (error) => {
res.statusCode = 500;
res.end('Error streaming video');
});
} else {
res.statusCode = 404;
res.end('Not Found');
}
});
server.listen(3000);
3. Upload Progress Tracking
const express = require('express');
const fs = require('fs');
const app = express();
app.post('/upload', (req, res) => {
const writeStream = fs.createWriteStream('./uploaded-file.dat');
let totalBytes = 0;
req.on('data', (chunk) => {
totalBytes += chunk.length;
console.log(`Received: ${totalBytes} bytes`);
});
req.pipe(writeStream);
writeStream.on('finish', () => {
res.json({
message: 'Upload complete',
size: totalBytes
});
});
writeStream.on('error', (error) => {
res.status(500).json({ error: error.message });
});
});
app.listen(3000);
Stream Performance Benefits
// BAD: Loading entire file into memory
const fs = require('fs');
fs.readFile('./huge-file.txt', (err, data) => {
if (err) throw err;
// If file is 1GB, this uses 1GB of RAM
console.log(data.toString());
});
// GOOD: Streaming the file
const readStream = fs.createReadStream('./huge-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // Process 64KB at a time
});
readStream.on('data', (chunk) => {
// Only 64KB in memory at a time
console.log(chunk);
});
Practice Exercise:
Create a Node.js application with the following features:
- Read a large CSV file using streams (don't load into memory)
- Transform each row: convert names to uppercase, calculate age from birthdate
- Filter rows: only include people older than 18
- Write results to a new CSV file using streams
- Track and display progress (lines processed, memory usage)
- Handle errors gracefully
Summary
In this lesson, you learned:
- What buffers are and how to work with binary data
- The four types of streams: Readable, Writable, Duplex, Transform
- Reading and writing streams with events
- Piping streams for efficient data processing
- Understanding and handling backpressure
- Creating custom transform streams
- Practical applications of streams for large files and real-time data
- Performance benefits of using streams over loading entire files