WebSockets & Real-Time Apps

Message Queues for Real-Time Systems

20 min Lesson 23 of 35

Introduction to Message Queues

Message queues are essential for building robust, scalable real-time systems. Unlike Redis Pub/Sub which is "fire and forget," message queues guarantee message delivery and provide advanced features like retry logic, priority queuing, and dead-letter queues.

What is a Message Queue?

A message queue is a form of asynchronous communication where:

  • Producers: Create and send messages to the queue
  • Queue: Stores messages until they're consumed
  • Consumers: Process messages from the queue
  • Broker: Manages the queue infrastructure
Key Difference: Pub/Sub is push-based (messages are pushed to all subscribers). Queues are pull-based (consumers pull messages when ready).

Why Use Message Queues?

Message queues provide critical benefits:

  • Guaranteed Delivery: Messages persist until acknowledged
  • Load Balancing: Multiple consumers share the workload
  • Decoupling: Producers and consumers operate independently
  • Retry Logic: Failed messages are automatically retried
  • Rate Limiting: Control message processing speed
  • Priority Handling: Process important messages first

RabbitMQ Basics

RabbitMQ is a popular message broker that implements AMQP protocol:

// Install dependencies // npm install amqplib const amqp = require('amqplib'); // Connect to RabbitMQ const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // Declare a queue const queueName = 'tasks'; await channel.assertQueue(queueName, { durable: true // Queue survives broker restart }); // Send a message to the queue const message = { task: 'send_email', userId: 123 }; channel.sendToQueue( queueName, Buffer.from(JSON.stringify(message)), { persistent: true } // Message survives broker restart ); console.log('Message sent to queue'); // Consume messages from the queue channel.consume(queueName, async (msg) => { const data = JSON.parse(msg.content.toString()); console.log('Processing:', data); try { // Process the message await processTask(data); // Acknowledge successful processing channel.ack(msg); } catch (error) { console.error('Error processing message:', error); // Reject and requeue the message channel.nack(msg, false, true); } }, { noAck: false }); // Manual acknowledgment
Important: Always use manual acknowledgment (noAck: false) to ensure messages aren't lost if processing fails.

Bull Queue with Redis

Bull is a popular Node.js queue library built on Redis:

// Install dependencies // npm install bull const Queue = require('bull'); // Create a queue const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } }); // Add a job to the queue await emailQueue.add('welcome-email', { to: 'user@example.com', name: 'John Doe', template: 'welcome' }, { attempts: 3, // Retry up to 3 times backoff: { type: 'exponential', delay: 2000 // Start with 2 second delay }, priority: 1 // Higher number = higher priority }); // Process jobs from the queue emailQueue.process('welcome-email', 5, async (job) => { console.log(`Processing job ${job.id}:`, job.data); // Simulate email sending await sendEmail(job.data.to, job.data.template, { name: job.data.name }); // Update progress job.progress(100); return { sent: true, timestamp: new Date() }; }); // Handle job completion emailQueue.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result); }); // Handle job failure emailQueue.on('failed', (job, error) => { console.error(`Job ${job.id} failed:`, error.message); });

Event-Driven Architecture

Use queues to build an event-driven system:

const Queue = require('bull'); // Create specialized queues const userQueue = new Queue('user-events'); const orderQueue = new Queue('order-events'); const notificationQueue = new Queue('notifications'); // User registration workflow async function registerUser(userData) { // Save user to database const user = await db.users.create(userData); // Trigger downstream tasks via queues await userQueue.add('user-registered', { userId: user.id, email: user.email, name: user.name }); return user; } // Process user registration events userQueue.process('user-registered', async (job) => { const { userId, email, name } = job.data; // Send welcome email (separate queue) await notificationQueue.add('send-email', { type: 'welcome', to: email, data: { name } }); // Create default settings await db.userSettings.create({ userId, theme: 'light', notifications: true }); // Track analytics await analytics.track('user_registered', { userId }); return { processed: true }; }); // Process notifications notificationQueue.process('send-email', 10, async (job) => { const { type, to, data } = job.data; const template = await getEmailTemplate(type); await emailService.send(to, template, data); return { sent: true, emailType: type }; });
Best Practice: Use separate queues for different concerns (users, orders, notifications). This allows independent scaling and failure isolation.

Decoupling Services with Queues

Queues enable true service independence:

// Service 1: Order Service (orders.js) const Queue = require('bull'); const orderQueue = new Queue('orders'); async function createOrder(orderData) { // Save order to database const order = await db.orders.create(orderData); // Emit event - doesn't care who processes it await orderQueue.add('order-created', { orderId: order.id, userId: order.userId, total: order.total, items: order.items }); return order; } // Service 2: Inventory Service (inventory.js) const orderQueue = new Queue('orders'); // Subscribe to order events orderQueue.process('order-created', async (job) => { const { orderId, items } = job.data; // Reserve inventory for (const item of items) { await db.inventory.decrement(item.productId, item.quantity); } console.log(`Inventory reserved for order ${orderId}`); }); // Service 3: Email Service (email.js) const orderQueue = new Queue('orders'); // Also subscribes to same events orderQueue.process('order-created', async (job) => { const { userId } = job.data; const user = await db.users.findById(userId); // Send order confirmation await sendEmail(user.email, 'order-confirmation', job.data); console.log('Order confirmation sent'); });
Scalability: Each service can scale independently. Add more workers to any service without affecting others.

Guaranteed Message Delivery

Ensure no messages are lost:

const Queue = require('bull'); const criticalQueue = new Queue('critical-tasks', { redis: { host: 'localhost', port: 6379 }, defaultJobOptions: { attempts: 5, // Retry 5 times backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: false, // Keep completed jobs removeOnFail: false // Keep failed jobs for inspection } }); // Add critical job await criticalQueue.add('process-payment', { orderId: 123, amount: 99.99, paymentMethod: 'credit_card' }, { jobId: `payment-${orderId}`, // Unique ID prevents duplicates timeout: 30000 // Job must complete in 30 seconds }); // Process with robust error handling criticalQueue.process('process-payment', async (job) => { const { orderId, amount, paymentMethod } = job.data; try { // Process payment const result = await paymentGateway.charge({ amount, method: paymentMethod }); // Update order await db.orders.update(orderId, { status: 'paid', transactionId: result.transactionId }); return result; } catch (error) { // Log error for monitoring console.error(`Payment failed for order ${orderId}:`, error); // If final attempt, send alert if (job.attemptsMade >= job.opts.attempts) { await alertService.sendCritical({ message: `Payment failed after ${job.attemptsMade} attempts`, orderId }); } throw error; // Trigger retry } }); // Monitor failed jobs criticalQueue.on('failed', async (job, error) => { console.error(`Job ${job.id} failed permanently:`, error); // Move to dead-letter queue for manual inspection const deadLetterQueue = new Queue('dead-letter'); await deadLetterQueue.add('failed-job', { originalJob: job.data, error: error.message, attempts: job.attemptsMade }); });

Priority Queues

Process high-priority messages first:

const taskQueue = new Queue('tasks'); // Add jobs with different priorities await taskQueue.add('task', { type: 'normal' }, { priority: 5 }); await taskQueue.add('task', { type: 'important' }, { priority: 10 }); await taskQueue.add('task', { type: 'urgent' }, { priority: 20 }); // Jobs are processed in priority order: urgent → important → normal taskQueue.process('task', async (job) => { console.log(`Processing ${job.data.type} task with priority ${job.opts.priority}`); await performTask(job.data); });

Delayed Jobs

Schedule jobs to run in the future:

// Send reminder in 24 hours await reminderQueue.add('send-reminder', { userId: 123, message: 'Don\'t forget to complete your profile!' }, { delay: 24 * 60 * 60 * 1000 // 24 hours in milliseconds }); // Schedule recurring job await reportQueue.add('daily-report', { reportType: 'sales' }, { repeat: { cron: '0 9 * * *' // Every day at 9 AM } });

Monitoring Queue Health

// Get queue statistics const waiting = await queue.getWaitingCount(); const active = await queue.getActiveCount(); const completed = await queue.getCompletedCount(); const failed = await queue.getFailedCount(); console.log('Queue Stats:', { waiting, active, completed, failed }); // Get all failed jobs const failedJobs = await queue.getFailed(); failedJobs.forEach(job => { console.log(`Failed job ${job.id}:`, job.failedReason); }); // Clean old jobs await queue.clean(5000, 'completed'); // Remove completed jobs older than 5 seconds await queue.clean(10000, 'failed'); // Remove failed jobs older than 10 seconds
Exercise:
  1. Set up Bull queue with Redis locally
  2. Create an order processing system with three queues:
    • Order queue: Creates orders
    • Payment queue: Processes payments (with retry logic)
    • Fulfillment queue: Handles shipping
  3. Implement proper error handling and retry logic
  4. Add priority handling for expedited orders
  5. Create a monitoring endpoint that shows queue statistics
  6. Simulate failures and verify messages are retried

Summary

Message queues are critical for reliable real-time systems:

  • Guarantee message delivery with acknowledgments
  • Enable service decoupling and independence
  • Provide automatic retry logic
  • Support priority and delayed processing
  • Allow independent scaling of producers and consumers
  • Essential for event-driven architectures