Understanding Real-time Communication
Traditional REST APIs follow a request-response pattern where clients must poll the server for updates. Real-time APIs enable bidirectional communication, allowing servers to push updates to clients instantly. This is essential for applications like live dashboards, chat applications, collaborative tools, notifications, and real-time analytics.
Real-time Communication Technologies
Three Main Approaches:
- Polling: Client repeatedly requests data at intervals (simple but inefficient)
- Server-Sent Events (SSE): Server pushes data to client over HTTP (unidirectional)
- WebSockets: Full-duplex bidirectional communication (most flexible but complex)
Server-Sent Events (SSE)
SSE provides a simple, standardized way for servers to push data to clients over HTTP. Unlike WebSockets, SSE is unidirectional (server to client only) and uses regular HTTP connections.
SSE Advantages
- Built on standard HTTP/HTTPS (works through firewalls and proxies)
- Automatic reconnection with configurable retry intervals
- Event IDs for tracking and resuming streams
- Simple implementation compared to WebSockets
- Browser native support via EventSource API
SSE Implementation - Server Side (Node.js)
// Express.js SSE endpoint
const express = require('express');
const app = express();
// SSE endpoint for live notifications
app.get('/api/notifications/stream', (req, res) => {
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// Send initial comment to establish connection
res.write(': SSE connection established\n\n');
// Get user ID from query params
const userId = req.query.userId;
// Send welcome message
const welcomeData = {
id: Date.now(),
type: 'connected',
message: 'Connected to notification stream',
timestamp: new Date().toISOString()
};
res.write(`id: ${welcomeData.id}\n`);
res.write(`event: connected\n`);
res.write(`data: ${JSON.stringify(welcomeData)}\n\n`);
// Send heartbeat every 30 seconds to keep connection alive
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// Simulate sending notifications
const notificationInterval = setInterval(() => {
const notification = {
id: Date.now(),
userId: userId,
type: 'info',
title: 'New Update',
message: `Update received at ${new Date().toLocaleTimeString()}`,
timestamp: new Date().toISOString()
};
res.write(`id: ${notification.id}\n`);
res.write(`event: notification\n`);
res.write(`data: ${JSON.stringify(notification)}\n\n`);
}, 10000); // Send notification every 10 seconds
// Clean up on client disconnect
req.on('close', () => {
console.log(`Client ${userId} disconnected from SSE`);
clearInterval(heartbeatInterval);
clearInterval(notificationInterval);
res.end();
});
});
SSE Message Format: Each SSE message consists of fields separated by newlines. Key fields include id: (unique message ID), event: (event type), data: (message payload), and retry: (reconnection time in ms).
SSE Implementation - Client Side (JavaScript)
// Client-side SSE with EventSource API
class NotificationStream {
constructor(userId) {
this.userId = userId;
this.eventSource = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
const url = `/api/notifications/stream?userId=${this.userId}`;
this.eventSource = new EventSource(url);
// Listen for connection open
this.eventSource.onopen = (event) => {
console.log('SSE connection opened');
this.reconnectAttempts = 0; // Reset on successful connection
};
// Listen for custom 'notification' events
this.eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data);
console.log('Notification received:', notification);
this.handleNotification(notification);
});
// Listen for 'connected' events
this.eventSource.addEventListener('connected', (event) => {
const data = JSON.parse(event.data);
console.log('Connection established:', data.message);
});
// Handle errors and reconnection
this.eventSource.onerror = (error) => {
console.error('SSE error:', error);
if (this.eventSource.readyState === EventSource.CLOSED) {
console.log('SSE connection closed');
this.handleReconnect();
}
};
// Listen for generic messages (messages without event type)
this.eventSource.onmessage = (event) => {
console.log('Generic message:', event.data);
};
}
handleNotification(notification) {
// Display notification to user
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.message,
icon: '/icons/notification.png',
timestamp: new Date(notification.timestamp).getTime()
});
}
// Update UI with notification
this.updateNotificationUI(notification);
}
updateNotificationUI(notification) {
const notificationList = document.getElementById('notification-list');
const notificationElement = document.createElement('div');
notificationElement.className = 'notification-item';
notificationElement.innerHTML = `
<div class="notification-header">
<strong>${notification.title}</strong>
<span class="notification-time">${new Date(notification.timestamp).toLocaleTimeString()}</span>
</div>
<div class="notification-body">${notification.message}</div>
`;
notificationList.prepend(notificationElement);
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
console.log('Attempting to reconnect...');
this.connect();
}, delay);
} else {
console.error('Max reconnection attempts reached');
this.showReconnectButton();
}
}
showReconnectButton() {
const button = document.getElementById('reconnect-button');
button.style.display = 'block';
button.onclick = () => {
this.reconnectAttempts = 0;
this.connect();
button.style.display = 'none';
};
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
console.log('SSE connection closed by client');
}
}
}
// Usage
const notificationStream = new NotificationStream(123);
notificationStream.connect();
// Disconnect when user leaves page
window.addEventListener('beforeunload', () => {
notificationStream.disconnect();
});
Advanced SSE: Broadcasting to Multiple Clients
// SSE with Redis pub/sub for multi-server broadcasting
const express = require('express');
const Redis = require('ioredis');
const app = express();
const redis = new Redis();
const subscriber = new Redis();
// Store active SSE connections
const connections = new Map();
// Subscribe to Redis channel
subscriber.subscribe('notifications', (err, count) => {
if (err) {
console.error('Failed to subscribe:', err);
} else {
console.log(`Subscribed to ${count} channel(s)`);
}
});
// Handle incoming messages from Redis
subscriber.on('message', (channel, message) => {
if (channel === 'notifications') {
const notification = JSON.parse(message);
broadcastToUser(notification.userId, notification);
}
});
// SSE endpoint
app.get('/api/notifications/stream', (req, res) => {
const userId = req.query.userId;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Store connection
if (!connections.has(userId)) {
connections.set(userId, []);
}
connections.get(userId).push(res);
console.log(`User ${userId} connected. Total connections: ${connections.get(userId).length}`);
// Clean up on disconnect
req.on('close', () => {
const userConnections = connections.get(userId);
const index = userConnections.indexOf(res);
if (index !== -1) {
userConnections.splice(index, 1);
}
if (userConnections.length === 0) {
connections.delete(userId);
}
console.log(`User ${userId} disconnected`);
});
});
// Function to broadcast to specific user
function broadcastToUser(userId, notification) {
const userConnections = connections.get(userId.toString());
if (!userConnections) return;
const message = `id: ${notification.id}\nevent: notification\ndata: ${JSON.stringify(notification)}\n\n`;
userConnections.forEach((res) => {
try {
res.write(message);
} catch (error) {
console.error('Error writing to connection:', error);
}
});
}
// API endpoint to publish notification
app.post('/api/notifications', express.json(), async (req, res) => {
const notification = {
id: Date.now(),
userId: req.body.userId,
title: req.body.title,
message: req.body.message,
timestamp: new Date().toISOString()
};
// Publish to Redis (will be received by all server instances)
await redis.publish('notifications', JSON.stringify(notification));
res.json({ success: true, notification });
});
SSE Limitations: SSE has a maximum connection limit (typically 6 per browser for HTTP/1.1). Use HTTP/2 for higher limits. SSE is unidirectional; if you need client-to-server communication, consider WebSockets or combine SSE with regular HTTP requests.
WebSockets
WebSockets provide full-duplex communication channels over a single TCP connection. Unlike SSE, WebSockets support bidirectional data flow and can send both text and binary data.
When to Use WebSockets
- Chat applications requiring bidirectional real-time messaging
- Collaborative editing (Google Docs-style applications)
- Multiplayer gaming
- Real-time trading platforms
- Live video/audio streaming controls
WebSocket Implementation - Server Side (Node.js with ws)
// WebSocket server with authentication
const WebSocket = require('ws');
const http = require('http');
const express = require('express');
const jwt = require('jsonwebtoken');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ noServer: true });
// Store active connections
const clients = new Map();
// Handle WebSocket upgrade
server.on('upgrade', (request, socket, head) => {
// Extract token from query string
const url = new URL(request.url, `http://${request.headers.host}`);
const token = url.searchParams.get('token');
// Verify authentication
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
request.userId = decoded.userId;
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
} catch (error) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
}
});
// Handle new WebSocket connections
wss.on('connection', (ws, request) => {
const userId = request.userId;
console.log(`User ${userId} connected via WebSocket`);
// Store connection
clients.set(userId, ws);
// Send welcome message
ws.send(JSON.stringify({
type: 'connected',
message: 'WebSocket connection established',
userId: userId,
timestamp: new Date().toISOString()
}));
// Handle incoming messages
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(userId, message, ws);
} catch (error) {
console.error('Error parsing message:', error);
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid message format'
}));
}
});
// Handle ping/pong for connection health
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
// Handle connection errors
ws.on('error', (error) => {
console.error(`WebSocket error for user ${userId}:`, error);
});
// Handle disconnection
ws.on('close', () => {
console.log(`User ${userId} disconnected`);
clients.delete(userId);
});
});
// Heartbeat to detect broken connections
const heartbeatInterval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// Clean up on server shutdown
wss.on('close', () => {
clearInterval(heartbeatInterval);
});
// Message handler
function handleMessage(userId, message, ws) {
console.log(`Message from user ${userId}:`, message);
switch (message.type) {
case 'chat':
handleChatMessage(userId, message);
break;
case 'typing':
handleTypingIndicator(userId, message);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: `Unknown message type: ${message.type}`
}));
}
}
// Broadcast chat message to room
function handleChatMessage(senderId, message) {
const chatMessage = {
type: 'chat',
senderId: senderId,
roomId: message.roomId,
content: message.content,
timestamp: new Date().toISOString()
};
// Get all users in the room
const roomUsers = getRoomUsers(message.roomId);
roomUsers.forEach(userId => {
const client = clients.get(userId);
if (client && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(chatMessage));
}
});
}
// Broadcast typing indicator
function handleTypingIndicator(userId, message) {
const typingMessage = {
type: 'typing',
userId: userId,
roomId: message.roomId,
isTyping: message.isTyping
};
const roomUsers = getRoomUsers(message.roomId);
roomUsers.forEach(recipientId => {
if (recipientId !== userId) {
const client = clients.get(recipientId);
if (client && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(typingMessage));
}
}
});
}
// Helper to get room users (simplified)
function getRoomUsers(roomId) {
// In production, fetch from database
return Array.from(clients.keys());
}
server.listen(3000, () => {
console.log('WebSocket server running on port 3000');
});
WebSocket Implementation - Client Side
// Client-side WebSocket with reconnection logic
class WebSocketClient {
constructor(url, token) {
this.url = `${url}?token=${token}`;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectInterval = 1000;
this.messageQueue = [];
this.handlers = new Map();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.reconnectInterval = 1000;
// Send queued messages
this.flushMessageQueue();
// Notify listeners
this.trigger('connected');
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleMessage(message);
} catch (error) {
console.error('Error parsing message:', error);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.trigger('error', error);
};
this.ws.onclose = () => {
console.log('WebSocket disconnected');
this.trigger('disconnected');
this.handleReconnect();
};
}
send(type, data) {
const message = {
type,
...data,
timestamp: new Date().toISOString()
};
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Queue message if not connected
this.messageQueue.push(message);
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.ws.send(JSON.stringify(message));
}
}
handleMessage(message) {
console.log('Message received:', message);
// Trigger type-specific handlers
this.trigger(message.type, message);
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
}
trigger(event, data) {
const handlers = this.handlers.get(event);
if (handlers) {
handlers.forEach(handler => handler(data));
}
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`Reconnecting in ${this.reconnectInterval}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
// Exponential backoff
this.reconnectInterval = Math.min(this.reconnectInterval * 2, 30000);
} else {
console.error('Max reconnection attempts reached');
this.trigger('reconnect_failed');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage example
const token = localStorage.getItem('authToken');
const wsClient = new WebSocketClient('ws://localhost:3000', token);
// Register event handlers
wsClient.on('connected', () => {
console.log('Successfully connected to chat');
document.getElementById('connection-status').textContent = 'Connected';
});
wsClient.on('chat', (message) => {
displayChatMessage(message);
});
wsClient.on('typing', (data) => {
showTypingIndicator(data.userId, data.isTyping);
});
wsClient.on('disconnected', () => {
document.getElementById('connection-status').textContent = 'Disconnected';
});
// Connect
wsClient.connect();
// Send chat message
function sendMessage(roomId, content) {
wsClient.send('chat', { roomId, content });
}
// Send typing indicator
let typingTimeout;
function handleTyping(roomId) {
wsClient.send('typing', { roomId, isTyping: true });
clearTimeout(typingTimeout);
typingTimeout = setTimeout(() => {
wsClient.send('typing', { roomId, isTyping: false });
}, 2000);
}
WebSocket Best Practices: Always implement heartbeat/ping-pong mechanisms to detect dead connections. Use message queuing to handle offline periods. Implement exponential backoff for reconnection attempts. Always authenticate WebSocket connections.
SSE vs WebSockets: Choosing the Right Technology
| Feature |
Server-Sent Events (SSE) |
WebSockets |
| Communication |
Unidirectional (server to client) |
Bidirectional (full-duplex) |
| Protocol |
HTTP/HTTPS |
WS/WSS (WebSocket protocol) |
| Data Format |
Text only |
Text and binary |
| Reconnection |
Automatic with EventSource |
Manual implementation required |
| Browser Support |
All modern browsers (except IE) |
All modern browsers |
| Firewall/Proxy |
Works through most firewalls |
May be blocked by some firewalls |
| Message IDs |
Built-in support |
Manual implementation |
| Complexity |
Simple to implement |
More complex |
| Best For |
Notifications, live feeds, dashboards |
Chat, gaming, collaboration |
Hybrid Approach: Combining SSE and HTTP
For many use cases, combining SSE for server-to-client updates with regular HTTP POST/PUT requests for client-to-server communication is simpler and more reliable than WebSockets.
// Notification system with SSE + HTTP
class NotificationSystem {
constructor(userId, token) {
this.userId = userId;
this.token = token;
this.eventSource = null;
}
// Start receiving notifications via SSE
startListening() {
this.eventSource = new EventSource(
`/api/notifications/stream?userId=${this.userId}&token=${this.token}`
);
this.eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data);
this.displayNotification(notification);
});
}
// Mark notification as read via HTTP
async markAsRead(notificationId) {
try {
const response = await fetch(`/api/notifications/${notificationId}/read`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json'
}
});
if (response.ok) {
console.log('Notification marked as read');
}
} catch (error) {
console.error('Error marking notification as read:', error);
}
}
// Delete notification via HTTP
async deleteNotification(notificationId) {
try {
const response = await fetch(`/api/notifications/${notificationId}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.token}`
}
});
if (response.ok) {
console.log('Notification deleted');
}
} catch (error) {
console.error('Error deleting notification:', error);
}
}
displayNotification(notification) {
// Update UI
console.log('New notification:', notification);
}
stopListening() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
Exercise: Build a real-time dashboard system with the following requirements:
- Server Side (Node.js):
- Create an SSE endpoint
/api/dashboard/stream that pushes updates every 5 seconds
- Include metrics: active users, orders per minute, revenue, and system status
- Implement authentication using JWT tokens
- Support multiple simultaneous connections
- Send heartbeat every 30 seconds
- Client Side (JavaScript):
- Connect to SSE stream and display metrics in real-time
- Implement automatic reconnection with exponential backoff (max 5 attempts)
- Show connection status indicator (connected/disconnected/reconnecting)
- Display timestamp of last update
- Add error handling and user-friendly error messages
- Bonus:
- Add a button to manually trigger data refresh via HTTP POST
- Implement message queuing for handling offline periods
- Add visual charts that update in real-time
Test your implementation by simulating network interruptions and verifying reconnection behavior.