WebSockets & Real-Time Apps

Redis Pub/Sub for Real-Time Communication

16 min Lesson 22 of 35

Introduction to Redis Pub/Sub

Redis Pub/Sub (Publish/Subscribe) is a messaging pattern that enables communication between different parts of your application or between multiple services. It's essential for building scalable real-time systems.

What is Pub/Sub?

The Pub/Sub pattern has three main components:

  • Publishers: Send messages to channels
  • Subscribers: Listen to channels and receive messages
  • Channels: Named communication pathways
Key Concept: Publishers and subscribers are decoupled. Publishers don't know who subscribes, and subscribers don't know who publishes.

Why Redis Pub/Sub?

Redis Pub/Sub is ideal for real-time applications because:

  • Extremely fast (in-memory operations)
  • Simple API and easy to implement
  • Supports pattern-based subscriptions
  • Perfect for cross-service communication
  • Works seamlessly with Socket.io and other WebSocket libraries

Basic Redis Pub/Sub Example

const redis = require('redis'); // Create publisher client const publisher = redis.createClient({ url: 'redis://localhost:6379' }); // Create subscriber client const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); // Connect both clients await publisher.connect(); await subscriber.connect(); // Subscribe to a channel await subscriber.subscribe('news', (message) => { console.log('Received message:', message); }); // Publish a message await publisher.publish('news', 'Breaking news: Redis is awesome!'); // Output: Received message: Breaking news: Redis is awesome!
Important: You need separate Redis client instances for publishing and subscribing. A client in subscribe mode cannot execute other Redis commands.

Publishing Messages

Publishing is straightforward with Redis:

// Simple text message await publisher.publish('channel-name', 'Hello World'); // JSON data const data = { userId: 123, action: 'login', timestamp: Date.now() }; await publisher.publish('user-events', JSON.stringify(data)); // Publish returns the number of subscribers who received the message const subscriberCount = await publisher.publish('notifications', 'New message'); console.log(`Message delivered to ${subscriberCount} subscribers`);

Subscribing to Channels

Subscribers listen to one or more channels:

// Subscribe to a single channel await subscriber.subscribe('chat', (message) => { console.log('Chat message:', message); }); // Subscribe to multiple channels await subscriber.subscribe('chat', (message, channel) => { console.log(`Message from ${channel}:`, message); }); await subscriber.subscribe('notifications', (message, channel) => { console.log(`Notification from ${channel}:`, message); }); // Unsubscribe from a channel await subscriber.unsubscribe('chat'); // Unsubscribe from all channels await subscriber.unsubscribe();

Pattern-Based Subscriptions

Subscribe to multiple channels matching a pattern:

// Subscribe to all channels starting with "user:" await subscriber.pSubscribe('user:*', (message, channel) => { console.log(`Message from ${channel}:`, message); }); // Matches: user:login, user:logout, user:register, etc. // Subscribe to all chat room channels await subscriber.pSubscribe('chat:room:*', (message, channel) => { const roomId = channel.split(':')[2]; console.log(`Message in room ${roomId}:`, message); }); // Matches: chat:room:1, chat:room:2, chat:room:lobby, etc. // Unsubscribe from patterns await subscriber.pUnsubscribe('user:*');
Best Practice: Use descriptive channel naming conventions like "resource:action" or "service:event:id" to make pattern subscriptions powerful.

Integrating Redis Pub/Sub with Socket.io

Combine Redis Pub/Sub with Socket.io for scalable real-time apps:

const express = require('express'); const { Server } = require('socket.io'); const redis = require('redis'); const app = express(); const server = app.listen(3000); const io = new Server(server); // Redis clients const publisher = redis.createClient({ url: 'redis://localhost:6379' }); const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); await publisher.connect(); await subscriber.connect(); // Subscribe to Redis channels await subscriber.subscribe('broadcast', (message) => { const data = JSON.parse(message); // Broadcast to all Socket.io clients io.emit('message', data); }); await subscriber.subscribe('notifications', (message) => { const notification = JSON.parse(message); // Send to specific user io.to(`user-${notification.userId}`).emit('notification', notification); }); // Socket.io connection io.on('connection', (socket) => { socket.on('send_message', (data) => { // Publish to Redis instead of emitting directly publisher.publish('broadcast', JSON.stringify({ userId: socket.userId, message: data.message, timestamp: new Date() })); }); socket.on('join_room', (roomId) => { socket.join(`room-${roomId}`); // Subscribe to room-specific Redis channel subscriber.subscribe(`room:${roomId}`, (message) => { io.to(`room-${roomId}`).emit('room_message', JSON.parse(message)); }); }); });

Cross-Service Communication

Use Redis Pub/Sub to communicate between different services:

// Service 1: User Service (users.js) const redis = require('redis'); const publisher = redis.createClient({ url: 'redis://localhost:6379' }); await publisher.connect(); // When user registers async function registerUser(userData) { // Save to database const user = await db.users.create(userData); // Publish event await publisher.publish('user:registered', JSON.stringify({ userId: user.id, email: user.email, name: user.name, timestamp: new Date() })); return user; } // Service 2: Email Service (email.js) const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); await subscriber.connect(); // Listen for user registration events await subscriber.subscribe('user:registered', async (message) => { const user = JSON.parse(message); // Send welcome email await sendWelcomeEmail(user.email, user.name); console.log(`Welcome email sent to ${user.email}`); }); // Service 3: Analytics Service (analytics.js) const analyticsSubscriber = redis.createClient({ url: 'redis://localhost:6379' }); await analyticsSubscriber.connect(); // Listen for all user events await analyticsSubscriber.pSubscribe('user:*', async (message, channel) => { const event = channel.split(':')[1]; // registered, login, logout, etc. const data = JSON.parse(message); // Track event in analytics await trackEvent(event, data); });

Event-Driven Architecture with Pub/Sub

// Event publisher utility class EventBus { constructor(redisUrl) { this.publisher = redis.createClient({ url: redisUrl }); } async connect() { await this.publisher.connect(); } async emit(event, data) { const message = JSON.stringify({ event, data, timestamp: new Date(), id: generateUniqueId() }); await this.publisher.publish(event, message); } } // Event subscriber utility class EventListener { constructor(redisUrl) { this.subscriber = redis.createClient({ url: redisUrl }); this.handlers = new Map(); } async connect() { await this.subscriber.connect(); } async on(event, handler) { this.handlers.set(event, handler); await this.subscriber.subscribe(event, async (message) => { const { data } = JSON.parse(message); await handler(data); }); } async onPattern(pattern, handler) { await this.subscriber.pSubscribe(pattern, async (message, channel) => { const { data } = JSON.parse(message); await handler(data, channel); }); } } // Usage const eventBus = new EventBus('redis://localhost:6379'); await eventBus.connect(); const listener = new EventListener('redis://localhost:6379'); await listener.connect(); // Emit events await eventBus.emit('order:created', { orderId: 123, userId: 456 }); await eventBus.emit('order:shipped', { orderId: 123, trackingNumber: 'ABC123' }); // Listen to events await listener.on('order:created', async (data) => { console.log('New order:', data); await processOrder(data.orderId); }); await listener.onPattern('order:*', async (data, channel) => { const action = channel.split(':')[1]; console.log(`Order ${action}:`, data); });
Scalability: This pattern allows you to add new services that react to events without modifying existing services. Just subscribe to the events you need.

Handling Message Delivery

Redis Pub/Sub is "fire and forget" - messages are not persisted:

// Messages are lost if: // 1. No subscribers are listening when published // 2. A subscriber disconnects temporarily // 3. Redis server restarts // Check if message was delivered const count = await publisher.publish('channel', 'message'); if (count === 0) { console.log('No subscribers - message was not delivered'); // Consider using Redis Streams or a message queue instead }
Limitation: Redis Pub/Sub does not guarantee message delivery. For critical messages that require guaranteed delivery, use Redis Streams or message queues like RabbitMQ.

Monitoring Pub/Sub

// Get active channels const channels = await publisher.pubSubChannels(); console.log('Active channels:', channels); // Get number of subscribers for a channel const numSubs = await publisher.pubSubNumSub('chat'); console.log('Subscribers to chat:', numSubs); // Get number of pattern subscriptions const numPat = await publisher.pubSubNumPat(); console.log('Active pattern subscriptions:', numPat);
Exercise:
  1. Create a multi-service application with three services:
    • API service: Handles HTTP requests and publishes events
    • Notification service: Subscribes to events and sends notifications
    • Logger service: Subscribes to all events and logs them
  2. Implement at least 3 different event types (user.created, order.placed, payment.completed)
  3. Use pattern subscriptions in the logger service to capture all events
  4. Test that events are received by multiple subscribers simultaneously
  5. Monitor active channels and subscriber counts

Summary

Redis Pub/Sub is a powerful pattern for real-time communication:

  • Decouples publishers and subscribers
  • Supports pattern-based subscriptions
  • Perfect for cross-service communication
  • Integrates seamlessly with Socket.io
  • Enables event-driven architecture
  • Fast and simple, but doesn't guarantee delivery