WebSockets & Real-Time Apps
Redis Pub/Sub for Real-Time Communication
Introduction to Redis Pub/Sub
Redis Pub/Sub (Publish/Subscribe) is a messaging pattern that enables communication between different parts of your application or between multiple services. It's essential for building scalable real-time systems.
What is Pub/Sub?
The Pub/Sub pattern has three main components:
- Publishers: Send messages to channels
- Subscribers: Listen to channels and receive messages
- Channels: Named communication pathways
Key Concept: Publishers and subscribers are decoupled. Publishers don't know who subscribes, and subscribers don't know who publishes.
Why Redis Pub/Sub?
Redis Pub/Sub is ideal for real-time applications because:
- Extremely fast (in-memory operations)
- Simple API and easy to implement
- Supports pattern-based subscriptions
- Perfect for cross-service communication
- Works seamlessly with Socket.io and other WebSocket libraries
Basic Redis Pub/Sub Example
const redis = require('redis');
// Create publisher client
const publisher = redis.createClient({
url: 'redis://localhost:6379'
});
// Create subscriber client
const subscriber = redis.createClient({
url: 'redis://localhost:6379'
});
// Connect both clients
await publisher.connect();
await subscriber.connect();
// Subscribe to a channel
await subscriber.subscribe('news', (message) => {
console.log('Received message:', message);
});
// Publish a message
await publisher.publish('news', 'Breaking news: Redis is awesome!');
// Output: Received message: Breaking news: Redis is awesome!
Important: You need separate Redis client instances for publishing and subscribing. A client in subscribe mode cannot execute other Redis commands.
Publishing Messages
Publishing is straightforward with Redis:
// Simple text message
await publisher.publish('channel-name', 'Hello World');
// JSON data
const data = { userId: 123, action: 'login', timestamp: Date.now() };
await publisher.publish('user-events', JSON.stringify(data));
// Publish returns the number of subscribers who received the message
const subscriberCount = await publisher.publish('notifications', 'New message');
console.log(`Message delivered to ${subscriberCount} subscribers`);
Subscribing to Channels
Subscribers listen to one or more channels:
// Subscribe to a single channel
await subscriber.subscribe('chat', (message) => {
console.log('Chat message:', message);
});
// Subscribe to multiple channels
await subscriber.subscribe('chat', (message, channel) => {
console.log(`Message from ${channel}:`, message);
});
await subscriber.subscribe('notifications', (message, channel) => {
console.log(`Notification from ${channel}:`, message);
});
// Unsubscribe from a channel
await subscriber.unsubscribe('chat');
// Unsubscribe from all channels
await subscriber.unsubscribe();
Pattern-Based Subscriptions
Subscribe to multiple channels matching a pattern:
// Subscribe to all channels starting with "user:"
await subscriber.pSubscribe('user:*', (message, channel) => {
console.log(`Message from ${channel}:`, message);
});
// Matches: user:login, user:logout, user:register, etc.
// Subscribe to all chat room channels
await subscriber.pSubscribe('chat:room:*', (message, channel) => {
const roomId = channel.split(':')[2];
console.log(`Message in room ${roomId}:`, message);
});
// Matches: chat:room:1, chat:room:2, chat:room:lobby, etc.
// Unsubscribe from patterns
await subscriber.pUnsubscribe('user:*');
Best Practice: Use descriptive channel naming conventions like "resource:action" or "service:event:id" to make pattern subscriptions powerful.
Integrating Redis Pub/Sub with Socket.io
Combine Redis Pub/Sub with Socket.io for scalable real-time apps:
const express = require('express');
const { Server } = require('socket.io');
const redis = require('redis');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// Redis clients
const publisher = redis.createClient({ url: 'redis://localhost:6379' });
const subscriber = redis.createClient({ url: 'redis://localhost:6379' });
await publisher.connect();
await subscriber.connect();
// Subscribe to Redis channels
await subscriber.subscribe('broadcast', (message) => {
const data = JSON.parse(message);
// Broadcast to all Socket.io clients
io.emit('message', data);
});
await subscriber.subscribe('notifications', (message) => {
const notification = JSON.parse(message);
// Send to specific user
io.to(`user-${notification.userId}`).emit('notification', notification);
});
// Socket.io connection
io.on('connection', (socket) => {
socket.on('send_message', (data) => {
// Publish to Redis instead of emitting directly
publisher.publish('broadcast', JSON.stringify({
userId: socket.userId,
message: data.message,
timestamp: new Date()
}));
});
socket.on('join_room', (roomId) => {
socket.join(`room-${roomId}`);
// Subscribe to room-specific Redis channel
subscriber.subscribe(`room:${roomId}`, (message) => {
io.to(`room-${roomId}`).emit('room_message', JSON.parse(message));
});
});
});
Cross-Service Communication
Use Redis Pub/Sub to communicate between different services:
// Service 1: User Service (users.js)
const redis = require('redis');
const publisher = redis.createClient({ url: 'redis://localhost:6379' });
await publisher.connect();
// When user registers
async function registerUser(userData) {
// Save to database
const user = await db.users.create(userData);
// Publish event
await publisher.publish('user:registered', JSON.stringify({
userId: user.id,
email: user.email,
name: user.name,
timestamp: new Date()
}));
return user;
}
// Service 2: Email Service (email.js)
const subscriber = redis.createClient({ url: 'redis://localhost:6379' });
await subscriber.connect();
// Listen for user registration events
await subscriber.subscribe('user:registered', async (message) => {
const user = JSON.parse(message);
// Send welcome email
await sendWelcomeEmail(user.email, user.name);
console.log(`Welcome email sent to ${user.email}`);
});
// Service 3: Analytics Service (analytics.js)
const analyticsSubscriber = redis.createClient({ url: 'redis://localhost:6379' });
await analyticsSubscriber.connect();
// Listen for all user events
await analyticsSubscriber.pSubscribe('user:*', async (message, channel) => {
const event = channel.split(':')[1]; // registered, login, logout, etc.
const data = JSON.parse(message);
// Track event in analytics
await trackEvent(event, data);
});
Event-Driven Architecture with Pub/Sub
// Event publisher utility
class EventBus {
constructor(redisUrl) {
this.publisher = redis.createClient({ url: redisUrl });
}
async connect() {
await this.publisher.connect();
}
async emit(event, data) {
const message = JSON.stringify({
event,
data,
timestamp: new Date(),
id: generateUniqueId()
});
await this.publisher.publish(event, message);
}
}
// Event subscriber utility
class EventListener {
constructor(redisUrl) {
this.subscriber = redis.createClient({ url: redisUrl });
this.handlers = new Map();
}
async connect() {
await this.subscriber.connect();
}
async on(event, handler) {
this.handlers.set(event, handler);
await this.subscriber.subscribe(event, async (message) => {
const { data } = JSON.parse(message);
await handler(data);
});
}
async onPattern(pattern, handler) {
await this.subscriber.pSubscribe(pattern, async (message, channel) => {
const { data } = JSON.parse(message);
await handler(data, channel);
});
}
}
// Usage
const eventBus = new EventBus('redis://localhost:6379');
await eventBus.connect();
const listener = new EventListener('redis://localhost:6379');
await listener.connect();
// Emit events
await eventBus.emit('order:created', { orderId: 123, userId: 456 });
await eventBus.emit('order:shipped', { orderId: 123, trackingNumber: 'ABC123' });
// Listen to events
await listener.on('order:created', async (data) => {
console.log('New order:', data);
await processOrder(data.orderId);
});
await listener.onPattern('order:*', async (data, channel) => {
const action = channel.split(':')[1];
console.log(`Order ${action}:`, data);
});
Scalability: This pattern allows you to add new services that react to events without modifying existing services. Just subscribe to the events you need.
Handling Message Delivery
Redis Pub/Sub is "fire and forget" - messages are not persisted:
// Messages are lost if:
// 1. No subscribers are listening when published
// 2. A subscriber disconnects temporarily
// 3. Redis server restarts
// Check if message was delivered
const count = await publisher.publish('channel', 'message');
if (count === 0) {
console.log('No subscribers - message was not delivered');
// Consider using Redis Streams or a message queue instead
}
Limitation: Redis Pub/Sub does not guarantee message delivery. For critical messages that require guaranteed delivery, use Redis Streams or message queues like RabbitMQ.
Monitoring Pub/Sub
// Get active channels
const channels = await publisher.pubSubChannels();
console.log('Active channels:', channels);
// Get number of subscribers for a channel
const numSubs = await publisher.pubSubNumSub('chat');
console.log('Subscribers to chat:', numSubs);
// Get number of pattern subscriptions
const numPat = await publisher.pubSubNumPat();
console.log('Active pattern subscriptions:', numPat);
Exercise:
- Create a multi-service application with three services:
- API service: Handles HTTP requests and publishes events
- Notification service: Subscribes to events and sends notifications
- Logger service: Subscribes to all events and logs them
- Implement at least 3 different event types (user.created, order.placed, payment.completed)
- Use pattern subscriptions in the logger service to capture all events
- Test that events are received by multiple subscribers simultaneously
- Monitor active channels and subscriber counts
Summary
Redis Pub/Sub is a powerful pattern for real-time communication:
- Decouples publishers and subscribers
- Supports pattern-based subscriptions
- Perfect for cross-service communication
- Integrates seamlessly with Socket.io
- Enables event-driven architecture
- Fast and simple, but doesn't guarantee delivery