REST API Development

Real-time APIs with SSE & WebSockets

15 min Lesson 28 of 35

Understanding Real-time Communication

Traditional REST APIs follow a request-response pattern where clients must poll the server for updates. Real-time APIs enable bidirectional communication, allowing servers to push updates to clients instantly. This is essential for applications like live dashboards, chat applications, collaborative tools, notifications, and real-time analytics.

Real-time Communication Technologies

Three Main Approaches:
  • Polling: Client repeatedly requests data at intervals (simple but inefficient)
  • Server-Sent Events (SSE): Server pushes data to client over HTTP (unidirectional)
  • WebSockets: Full-duplex bidirectional communication (most flexible but complex)

Server-Sent Events (SSE)

SSE provides a simple, standardized way for servers to push data to clients over HTTP. Unlike WebSockets, SSE is unidirectional (server to client only) and uses regular HTTP connections.

SSE Advantages

  • Built on standard HTTP/HTTPS (works through firewalls and proxies)
  • Automatic reconnection with configurable retry intervals
  • Event IDs for tracking and resuming streams
  • Simple implementation compared to WebSockets
  • Browser native support via EventSource API

SSE Implementation - Server Side (Node.js)

// Express.js SSE endpoint const express = require('express'); const app = express(); // SSE endpoint for live notifications app.get('/api/notifications/stream', (req, res) => { // Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('Access-Control-Allow-Origin', '*'); // Send initial comment to establish connection res.write(': SSE connection established\n\n'); // Get user ID from query params const userId = req.query.userId; // Send welcome message const welcomeData = { id: Date.now(), type: 'connected', message: 'Connected to notification stream', timestamp: new Date().toISOString() }; res.write(`id: ${welcomeData.id}\n`); res.write(`event: connected\n`); res.write(`data: ${JSON.stringify(welcomeData)}\n\n`); // Send heartbeat every 30 seconds to keep connection alive const heartbeatInterval = setInterval(() => { res.write(': heartbeat\n\n'); }, 30000); // Simulate sending notifications const notificationInterval = setInterval(() => { const notification = { id: Date.now(), userId: userId, type: 'info', title: 'New Update', message: `Update received at ${new Date().toLocaleTimeString()}`, timestamp: new Date().toISOString() }; res.write(`id: ${notification.id}\n`); res.write(`event: notification\n`); res.write(`data: ${JSON.stringify(notification)}\n\n`); }, 10000); // Send notification every 10 seconds // Clean up on client disconnect req.on('close', () => { console.log(`Client ${userId} disconnected from SSE`); clearInterval(heartbeatInterval); clearInterval(notificationInterval); res.end(); }); });
SSE Message Format: Each SSE message consists of fields separated by newlines. Key fields include id: (unique message ID), event: (event type), data: (message payload), and retry: (reconnection time in ms).

SSE Implementation - Client Side (JavaScript)

// Client-side SSE with EventSource API class NotificationStream { constructor(userId) { this.userId = userId; this.eventSource = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; } connect() { const url = `/api/notifications/stream?userId=${this.userId}`; this.eventSource = new EventSource(url); // Listen for connection open this.eventSource.onopen = (event) => { console.log('SSE connection opened'); this.reconnectAttempts = 0; // Reset on successful connection }; // Listen for custom 'notification' events this.eventSource.addEventListener('notification', (event) => { const notification = JSON.parse(event.data); console.log('Notification received:', notification); this.handleNotification(notification); }); // Listen for 'connected' events this.eventSource.addEventListener('connected', (event) => { const data = JSON.parse(event.data); console.log('Connection established:', data.message); }); // Handle errors and reconnection this.eventSource.onerror = (error) => { console.error('SSE error:', error); if (this.eventSource.readyState === EventSource.CLOSED) { console.log('SSE connection closed'); this.handleReconnect(); } }; // Listen for generic messages (messages without event type) this.eventSource.onmessage = (event) => { console.log('Generic message:', event.data); }; } handleNotification(notification) { // Display notification to user if ('Notification' in window && Notification.permission === 'granted') { new Notification(notification.title, { body: notification.message, icon: '/icons/notification.png', timestamp: new Date(notification.timestamp).getTime() }); } // Update UI with notification this.updateNotificationUI(notification); } updateNotificationUI(notification) { const notificationList = document.getElementById('notification-list'); const notificationElement = document.createElement('div'); notificationElement.className = 'notification-item'; notificationElement.innerHTML = ` <div class="notification-header"> <strong>${notification.title}</strong> <span class="notification-time">${new Date(notification.timestamp).toLocaleTimeString()}</span> </div> <div class="notification-body">${notification.message}</div> `; notificationList.prepend(notificationElement); } handleReconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`); setTimeout(() => { console.log('Attempting to reconnect...'); this.connect(); }, delay); } else { console.error('Max reconnection attempts reached'); this.showReconnectButton(); } } showReconnectButton() { const button = document.getElementById('reconnect-button'); button.style.display = 'block'; button.onclick = () => { this.reconnectAttempts = 0; this.connect(); button.style.display = 'none'; }; } disconnect() { if (this.eventSource) { this.eventSource.close(); console.log('SSE connection closed by client'); } } } // Usage const notificationStream = new NotificationStream(123); notificationStream.connect(); // Disconnect when user leaves page window.addEventListener('beforeunload', () => { notificationStream.disconnect(); });

Advanced SSE: Broadcasting to Multiple Clients

// SSE with Redis pub/sub for multi-server broadcasting const express = require('express'); const Redis = require('ioredis'); const app = express(); const redis = new Redis(); const subscriber = new Redis(); // Store active SSE connections const connections = new Map(); // Subscribe to Redis channel subscriber.subscribe('notifications', (err, count) => { if (err) { console.error('Failed to subscribe:', err); } else { console.log(`Subscribed to ${count} channel(s)`); } }); // Handle incoming messages from Redis subscriber.on('message', (channel, message) => { if (channel === 'notifications') { const notification = JSON.parse(message); broadcastToUser(notification.userId, notification); } }); // SSE endpoint app.get('/api/notifications/stream', (req, res) => { const userId = req.query.userId; res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // Store connection if (!connections.has(userId)) { connections.set(userId, []); } connections.get(userId).push(res); console.log(`User ${userId} connected. Total connections: ${connections.get(userId).length}`); // Clean up on disconnect req.on('close', () => { const userConnections = connections.get(userId); const index = userConnections.indexOf(res); if (index !== -1) { userConnections.splice(index, 1); } if (userConnections.length === 0) { connections.delete(userId); } console.log(`User ${userId} disconnected`); }); }); // Function to broadcast to specific user function broadcastToUser(userId, notification) { const userConnections = connections.get(userId.toString()); if (!userConnections) return; const message = `id: ${notification.id}\nevent: notification\ndata: ${JSON.stringify(notification)}\n\n`; userConnections.forEach((res) => { try { res.write(message); } catch (error) { console.error('Error writing to connection:', error); } }); } // API endpoint to publish notification app.post('/api/notifications', express.json(), async (req, res) => { const notification = { id: Date.now(), userId: req.body.userId, title: req.body.title, message: req.body.message, timestamp: new Date().toISOString() }; // Publish to Redis (will be received by all server instances) await redis.publish('notifications', JSON.stringify(notification)); res.json({ success: true, notification }); });
SSE Limitations: SSE has a maximum connection limit (typically 6 per browser for HTTP/1.1). Use HTTP/2 for higher limits. SSE is unidirectional; if you need client-to-server communication, consider WebSockets or combine SSE with regular HTTP requests.

WebSockets

WebSockets provide full-duplex communication channels over a single TCP connection. Unlike SSE, WebSockets support bidirectional data flow and can send both text and binary data.

When to Use WebSockets

  • Chat applications requiring bidirectional real-time messaging
  • Collaborative editing (Google Docs-style applications)
  • Multiplayer gaming
  • Real-time trading platforms
  • Live video/audio streaming controls

WebSocket Implementation - Server Side (Node.js with ws)

// WebSocket server with authentication const WebSocket = require('ws'); const http = require('http'); const express = require('express'); const jwt = require('jsonwebtoken'); const app = express(); const server = http.createServer(app); const wss = new WebSocket.Server({ noServer: true }); // Store active connections const clients = new Map(); // Handle WebSocket upgrade server.on('upgrade', (request, socket, head) => { // Extract token from query string const url = new URL(request.url, `http://${request.headers.host}`); const token = url.searchParams.get('token'); // Verify authentication try { const decoded = jwt.verify(token, process.env.JWT_SECRET); request.userId = decoded.userId; wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, request); }); } catch (error) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); } }); // Handle new WebSocket connections wss.on('connection', (ws, request) => { const userId = request.userId; console.log(`User ${userId} connected via WebSocket`); // Store connection clients.set(userId, ws); // Send welcome message ws.send(JSON.stringify({ type: 'connected', message: 'WebSocket connection established', userId: userId, timestamp: new Date().toISOString() })); // Handle incoming messages ws.on('message', (data) => { try { const message = JSON.parse(data); handleMessage(userId, message, ws); } catch (error) { console.error('Error parsing message:', error); ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' })); } }); // Handle ping/pong for connection health ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); // Handle connection errors ws.on('error', (error) => { console.error(`WebSocket error for user ${userId}:`, error); }); // Handle disconnection ws.on('close', () => { console.log(`User ${userId} disconnected`); clients.delete(userId); }); }); // Heartbeat to detect broken connections const heartbeatInterval = setInterval(() => { wss.clients.forEach((ws) => { if (ws.isAlive === false) { return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, 30000); // Clean up on server shutdown wss.on('close', () => { clearInterval(heartbeatInterval); }); // Message handler function handleMessage(userId, message, ws) { console.log(`Message from user ${userId}:`, message); switch (message.type) { case 'chat': handleChatMessage(userId, message); break; case 'typing': handleTypingIndicator(userId, message); break; case 'ping': ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() })); break; default: ws.send(JSON.stringify({ type: 'error', message: `Unknown message type: ${message.type}` })); } } // Broadcast chat message to room function handleChatMessage(senderId, message) { const chatMessage = { type: 'chat', senderId: senderId, roomId: message.roomId, content: message.content, timestamp: new Date().toISOString() }; // Get all users in the room const roomUsers = getRoomUsers(message.roomId); roomUsers.forEach(userId => { const client = clients.get(userId); if (client && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(chatMessage)); } }); } // Broadcast typing indicator function handleTypingIndicator(userId, message) { const typingMessage = { type: 'typing', userId: userId, roomId: message.roomId, isTyping: message.isTyping }; const roomUsers = getRoomUsers(message.roomId); roomUsers.forEach(recipientId => { if (recipientId !== userId) { const client = clients.get(recipientId); if (client && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(typingMessage)); } } }); } // Helper to get room users (simplified) function getRoomUsers(roomId) { // In production, fetch from database return Array.from(clients.keys()); } server.listen(3000, () => { console.log('WebSocket server running on port 3000'); });

WebSocket Implementation - Client Side

// Client-side WebSocket with reconnection logic class WebSocketClient { constructor(url, token) { this.url = `${url}?token=${token}`; this.ws = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectInterval = 1000; this.messageQueue = []; this.handlers = new Map(); } connect() { this.ws = new WebSocket(this.url); this.ws.onopen = () => { console.log('WebSocket connected'); this.reconnectAttempts = 0; this.reconnectInterval = 1000; // Send queued messages this.flushMessageQueue(); // Notify listeners this.trigger('connected'); }; this.ws.onmessage = (event) => { try { const message = JSON.parse(event.data); this.handleMessage(message); } catch (error) { console.error('Error parsing message:', error); } }; this.ws.onerror = (error) => { console.error('WebSocket error:', error); this.trigger('error', error); }; this.ws.onclose = () => { console.log('WebSocket disconnected'); this.trigger('disconnected'); this.handleReconnect(); }; } send(type, data) { const message = { type, ...data, timestamp: new Date().toISOString() }; if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } else { // Queue message if not connected this.messageQueue.push(message); } } flushMessageQueue() { while (this.messageQueue.length > 0) { const message = this.messageQueue.shift(); this.ws.send(JSON.stringify(message)); } } handleMessage(message) { console.log('Message received:', message); // Trigger type-specific handlers this.trigger(message.type, message); } on(event, handler) { if (!this.handlers.has(event)) { this.handlers.set(event, []); } this.handlers.get(event).push(handler); } trigger(event, data) { const handlers = this.handlers.get(event); if (handlers) { handlers.forEach(handler => handler(data)); } } handleReconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; console.log(`Reconnecting in ${this.reconnectInterval}ms (attempt ${this.reconnectAttempts})`); setTimeout(() => { this.connect(); }, this.reconnectInterval); // Exponential backoff this.reconnectInterval = Math.min(this.reconnectInterval * 2, 30000); } else { console.error('Max reconnection attempts reached'); this.trigger('reconnect_failed'); } } disconnect() { if (this.ws) { this.ws.close(); } } } // Usage example const token = localStorage.getItem('authToken'); const wsClient = new WebSocketClient('ws://localhost:3000', token); // Register event handlers wsClient.on('connected', () => { console.log('Successfully connected to chat'); document.getElementById('connection-status').textContent = 'Connected'; }); wsClient.on('chat', (message) => { displayChatMessage(message); }); wsClient.on('typing', (data) => { showTypingIndicator(data.userId, data.isTyping); }); wsClient.on('disconnected', () => { document.getElementById('connection-status').textContent = 'Disconnected'; }); // Connect wsClient.connect(); // Send chat message function sendMessage(roomId, content) { wsClient.send('chat', { roomId, content }); } // Send typing indicator let typingTimeout; function handleTyping(roomId) { wsClient.send('typing', { roomId, isTyping: true }); clearTimeout(typingTimeout); typingTimeout = setTimeout(() => { wsClient.send('typing', { roomId, isTyping: false }); }, 2000); }
WebSocket Best Practices: Always implement heartbeat/ping-pong mechanisms to detect dead connections. Use message queuing to handle offline periods. Implement exponential backoff for reconnection attempts. Always authenticate WebSocket connections.

SSE vs WebSockets: Choosing the Right Technology

Feature Server-Sent Events (SSE) WebSockets
Communication Unidirectional (server to client) Bidirectional (full-duplex)
Protocol HTTP/HTTPS WS/WSS (WebSocket protocol)
Data Format Text only Text and binary
Reconnection Automatic with EventSource Manual implementation required
Browser Support All modern browsers (except IE) All modern browsers
Firewall/Proxy Works through most firewalls May be blocked by some firewalls
Message IDs Built-in support Manual implementation
Complexity Simple to implement More complex
Best For Notifications, live feeds, dashboards Chat, gaming, collaboration

Hybrid Approach: Combining SSE and HTTP

For many use cases, combining SSE for server-to-client updates with regular HTTP POST/PUT requests for client-to-server communication is simpler and more reliable than WebSockets.

// Notification system with SSE + HTTP class NotificationSystem { constructor(userId, token) { this.userId = userId; this.token = token; this.eventSource = null; } // Start receiving notifications via SSE startListening() { this.eventSource = new EventSource( `/api/notifications/stream?userId=${this.userId}&token=${this.token}` ); this.eventSource.addEventListener('notification', (event) => { const notification = JSON.parse(event.data); this.displayNotification(notification); }); } // Mark notification as read via HTTP async markAsRead(notificationId) { try { const response = await fetch(`/api/notifications/${notificationId}/read`, { method: 'PUT', headers: { 'Authorization': `Bearer ${this.token}`, 'Content-Type': 'application/json' } }); if (response.ok) { console.log('Notification marked as read'); } } catch (error) { console.error('Error marking notification as read:', error); } } // Delete notification via HTTP async deleteNotification(notificationId) { try { const response = await fetch(`/api/notifications/${notificationId}`, { method: 'DELETE', headers: { 'Authorization': `Bearer ${this.token}` } }); if (response.ok) { console.log('Notification deleted'); } } catch (error) { console.error('Error deleting notification:', error); } } displayNotification(notification) { // Update UI console.log('New notification:', notification); } stopListening() { if (this.eventSource) { this.eventSource.close(); } } }
Exercise: Build a real-time dashboard system with the following requirements:
  1. Server Side (Node.js):
    • Create an SSE endpoint /api/dashboard/stream that pushes updates every 5 seconds
    • Include metrics: active users, orders per minute, revenue, and system status
    • Implement authentication using JWT tokens
    • Support multiple simultaneous connections
    • Send heartbeat every 30 seconds
  2. Client Side (JavaScript):
    • Connect to SSE stream and display metrics in real-time
    • Implement automatic reconnection with exponential backoff (max 5 attempts)
    • Show connection status indicator (connected/disconnected/reconnecting)
    • Display timestamp of last update
    • Add error handling and user-friendly error messages
  3. Bonus:
    • Add a button to manually trigger data refresh via HTTP POST
    • Implement message queuing for handling offline periods
    • Add visual charts that update in real-time

Test your implementation by simulating network interruptions and verifying reconnection behavior.