WebSockets & Real-Time Apps
Message Queues for Real-Time Systems
Introduction to Message Queues
Message queues are essential for building robust, scalable real-time systems. Unlike Redis Pub/Sub which is "fire and forget," message queues guarantee message delivery and provide advanced features like retry logic, priority queuing, and dead-letter queues.
What is a Message Queue?
A message queue is a form of asynchronous communication where:
- Producers: Create and send messages to the queue
- Queue: Stores messages until they're consumed
- Consumers: Process messages from the queue
- Broker: Manages the queue infrastructure
Key Difference: Pub/Sub is push-based (messages are pushed to all subscribers). Queues are pull-based (consumers pull messages when ready).
Why Use Message Queues?
Message queues provide critical benefits:
- Guaranteed Delivery: Messages persist until acknowledged
- Load Balancing: Multiple consumers share the workload
- Decoupling: Producers and consumers operate independently
- Retry Logic: Failed messages are automatically retried
- Rate Limiting: Control message processing speed
- Priority Handling: Process important messages first
RabbitMQ Basics
RabbitMQ is a popular message broker that implements AMQP protocol:
// Install dependencies
// npm install amqplib
const amqp = require('amqplib');
// Connect to RabbitMQ
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Declare a queue
const queueName = 'tasks';
await channel.assertQueue(queueName, {
durable: true // Queue survives broker restart
});
// Send a message to the queue
const message = { task: 'send_email', userId: 123 };
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(message)),
{ persistent: true } // Message survives broker restart
);
console.log('Message sent to queue');
// Consume messages from the queue
channel.consume(queueName, async (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Processing:', data);
try {
// Process the message
await processTask(data);
// Acknowledge successful processing
channel.ack(msg);
} catch (error) {
console.error('Error processing message:', error);
// Reject and requeue the message
channel.nack(msg, false, true);
}
}, { noAck: false }); // Manual acknowledgment
Important: Always use manual acknowledgment (
noAck: false) to ensure messages aren't lost if processing fails.
Bull Queue with Redis
Bull is a popular Node.js queue library built on Redis:
// Install dependencies
// npm install bull
const Queue = require('bull');
// Create a queue
const emailQueue = new Queue('email', {
redis: {
host: 'localhost',
port: 6379
}
});
// Add a job to the queue
await emailQueue.add('welcome-email', {
to: 'user@example.com',
name: 'John Doe',
template: 'welcome'
}, {
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 2000 // Start with 2 second delay
},
priority: 1 // Higher number = higher priority
});
// Process jobs from the queue
emailQueue.process('welcome-email', 5, async (job) => {
console.log(`Processing job ${job.id}:`, job.data);
// Simulate email sending
await sendEmail(job.data.to, job.data.template, {
name: job.data.name
});
// Update progress
job.progress(100);
return { sent: true, timestamp: new Date() };
});
// Handle job completion
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
// Handle job failure
emailQueue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
});
Event-Driven Architecture
Use queues to build an event-driven system:
const Queue = require('bull');
// Create specialized queues
const userQueue = new Queue('user-events');
const orderQueue = new Queue('order-events');
const notificationQueue = new Queue('notifications');
// User registration workflow
async function registerUser(userData) {
// Save user to database
const user = await db.users.create(userData);
// Trigger downstream tasks via queues
await userQueue.add('user-registered', {
userId: user.id,
email: user.email,
name: user.name
});
return user;
}
// Process user registration events
userQueue.process('user-registered', async (job) => {
const { userId, email, name } = job.data;
// Send welcome email (separate queue)
await notificationQueue.add('send-email', {
type: 'welcome',
to: email,
data: { name }
});
// Create default settings
await db.userSettings.create({
userId,
theme: 'light',
notifications: true
});
// Track analytics
await analytics.track('user_registered', { userId });
return { processed: true };
});
// Process notifications
notificationQueue.process('send-email', 10, async (job) => {
const { type, to, data } = job.data;
const template = await getEmailTemplate(type);
await emailService.send(to, template, data);
return { sent: true, emailType: type };
});
Best Practice: Use separate queues for different concerns (users, orders, notifications). This allows independent scaling and failure isolation.
Decoupling Services with Queues
Queues enable true service independence:
// Service 1: Order Service (orders.js)
const Queue = require('bull');
const orderQueue = new Queue('orders');
async function createOrder(orderData) {
// Save order to database
const order = await db.orders.create(orderData);
// Emit event - doesn't care who processes it
await orderQueue.add('order-created', {
orderId: order.id,
userId: order.userId,
total: order.total,
items: order.items
});
return order;
}
// Service 2: Inventory Service (inventory.js)
const orderQueue = new Queue('orders');
// Subscribe to order events
orderQueue.process('order-created', async (job) => {
const { orderId, items } = job.data;
// Reserve inventory
for (const item of items) {
await db.inventory.decrement(item.productId, item.quantity);
}
console.log(`Inventory reserved for order ${orderId}`);
});
// Service 3: Email Service (email.js)
const orderQueue = new Queue('orders');
// Also subscribes to same events
orderQueue.process('order-created', async (job) => {
const { userId } = job.data;
const user = await db.users.findById(userId);
// Send order confirmation
await sendEmail(user.email, 'order-confirmation', job.data);
console.log('Order confirmation sent');
});
Scalability: Each service can scale independently. Add more workers to any service without affecting others.
Guaranteed Message Delivery
Ensure no messages are lost:
const Queue = require('bull');
const criticalQueue = new Queue('critical-tasks', {
redis: { host: 'localhost', port: 6379 },
defaultJobOptions: {
attempts: 5, // Retry 5 times
backoff: {
type: 'exponential',
delay: 1000
},
removeOnComplete: false, // Keep completed jobs
removeOnFail: false // Keep failed jobs for inspection
}
});
// Add critical job
await criticalQueue.add('process-payment', {
orderId: 123,
amount: 99.99,
paymentMethod: 'credit_card'
}, {
jobId: `payment-${orderId}`, // Unique ID prevents duplicates
timeout: 30000 // Job must complete in 30 seconds
});
// Process with robust error handling
criticalQueue.process('process-payment', async (job) => {
const { orderId, amount, paymentMethod } = job.data;
try {
// Process payment
const result = await paymentGateway.charge({
amount,
method: paymentMethod
});
// Update order
await db.orders.update(orderId, {
status: 'paid',
transactionId: result.transactionId
});
return result;
} catch (error) {
// Log error for monitoring
console.error(`Payment failed for order ${orderId}:`, error);
// If final attempt, send alert
if (job.attemptsMade >= job.opts.attempts) {
await alertService.sendCritical({
message: `Payment failed after ${job.attemptsMade} attempts`,
orderId
});
}
throw error; // Trigger retry
}
});
// Monitor failed jobs
criticalQueue.on('failed', async (job, error) => {
console.error(`Job ${job.id} failed permanently:`, error);
// Move to dead-letter queue for manual inspection
const deadLetterQueue = new Queue('dead-letter');
await deadLetterQueue.add('failed-job', {
originalJob: job.data,
error: error.message,
attempts: job.attemptsMade
});
});
Priority Queues
Process high-priority messages first:
const taskQueue = new Queue('tasks');
// Add jobs with different priorities
await taskQueue.add('task', { type: 'normal' }, { priority: 5 });
await taskQueue.add('task', { type: 'important' }, { priority: 10 });
await taskQueue.add('task', { type: 'urgent' }, { priority: 20 });
// Jobs are processed in priority order: urgent → important → normal
taskQueue.process('task', async (job) => {
console.log(`Processing ${job.data.type} task with priority ${job.opts.priority}`);
await performTask(job.data);
});
Delayed Jobs
Schedule jobs to run in the future:
// Send reminder in 24 hours
await reminderQueue.add('send-reminder', {
userId: 123,
message: 'Don\'t forget to complete your profile!'
}, {
delay: 24 * 60 * 60 * 1000 // 24 hours in milliseconds
});
// Schedule recurring job
await reportQueue.add('daily-report', {
reportType: 'sales'
}, {
repeat: {
cron: '0 9 * * *' // Every day at 9 AM
}
});
Monitoring Queue Health
// Get queue statistics
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();
console.log('Queue Stats:', {
waiting,
active,
completed,
failed
});
// Get all failed jobs
const failedJobs = await queue.getFailed();
failedJobs.forEach(job => {
console.log(`Failed job ${job.id}:`, job.failedReason);
});
// Clean old jobs
await queue.clean(5000, 'completed'); // Remove completed jobs older than 5 seconds
await queue.clean(10000, 'failed'); // Remove failed jobs older than 10 seconds
Exercise:
- Set up Bull queue with Redis locally
- Create an order processing system with three queues:
- Order queue: Creates orders
- Payment queue: Processes payments (with retry logic)
- Fulfillment queue: Handles shipping
- Implement proper error handling and retry logic
- Add priority handling for expedited orders
- Create a monitoring endpoint that shows queue statistics
- Simulate failures and verify messages are retried
Summary
Message queues are critical for reliable real-time systems:
- Guarantee message delivery with acknowledgments
- Enable service decoupling and independence
- Provide automatic retry logic
- Support priority and delayed processing
- Allow independent scaling of producers and consumers
- Essential for event-driven architectures