Node.js & Express

Task Queues and Background Jobs

45 min Lesson 23 of 40

Task Queues and Background Jobs

Background jobs are essential for handling time-consuming tasks without blocking your main application. Instead of making users wait for operations like sending emails, processing images, generating reports, or calling slow APIs, you queue these tasks and process them asynchronously. This improves user experience, application responsiveness, and scalability.

Why Use Task Queues?

The Problem: Imagine a user uploads a video that needs to be transcoded. If you process it synchronously, the user waits 10+ minutes for a response. With a queue, you return immediately with "Processing started" and handle transcoding in the background.

Use Cases for Background Jobs:

  • Sending emails (welcome emails, newsletters, notifications)
  • Image/video processing (resizing, compression, transcoding)
  • Report generation (PDFs, spreadsheets, analytics)
  • Data synchronization between systems
  • API integrations with third-party services
  • Database maintenance (cleanup, archiving, indexing)
  • Scheduled tasks (daily summaries, periodic backups)

Introducing Bull and BullMQ

Bull and BullMQ are powerful job queue libraries for Node.js backed by Redis. BullMQ is the successor to Bull with better TypeScript support, more features, and improved performance.

# Install Bull (for Redis-backed queues) npm install bull # Or install BullMQ (recommended for new projects) npm install bullmq # Also need Redis npm install ioredis
Bull vs BullMQ: Bull is mature and stable (good for existing projects). BullMQ is the modern rewrite with better TypeScript support, flows, and performance. For new projects, use BullMQ.

Creating Your First Queue (Bull)

Let's create a simple email queue:

// queues/emailQueue.js const Queue = require('bull'); // Create a queue const emailQueue = new Queue('email', { redis: { host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD } }); // Define job processor emailQueue.process(async (job) => { const { to, subject, body } = job.data; console.log(`Processing email job ${job.id} to ${to}`); // Simulate sending email await sendEmail(to, subject, body); console.log(`Email job ${job.id} completed`); return { sent: true, to }; }); // Event listeners emailQueue.on('completed', (job, result) => { console.log(`Job ${job.id} completed with result:`, result); }); emailQueue.on('failed', (job, err) => { console.error(`Job ${job.id} failed:`, err.message); }); emailQueue.on('stalled', (job) => { console.warn(`Job ${job.id} stalled`); }); module.exports = emailQueue;

Adding jobs to the queue:

// In your Express route or controller const emailQueue = require('./queues/emailQueue'); app.post('/register', async (req, res) => { const { email, name } = req.body; // Create user in database const user = await User.create({ email, name }); // Add job to queue (non-blocking) await emailQueue.add({ to: email, subject: 'Welcome to Our App!', body: `Hi ${name}, thanks for signing up!` }); // Return immediately res.json({ success: true, message: 'Registration successful! Check your email.' }); });

Job Options and Priorities

Configure job behavior with options:

// Job with options await emailQueue.add( { to: 'user@example.com', subject: 'Test', body: 'Hello' }, { // Job priority (lower number = higher priority) priority: 1, // Delay job execution (in milliseconds) delay: 5000, // Start after 5 seconds // Job attempts (retries on failure) attempts: 3, // Backoff strategy for retries backoff: { type: 'exponential', delay: 2000 // Start with 2s, then 4s, 8s, etc. }, // Remove job when completed removeOnComplete: true, // Remove job when failed removeOnFail: false, // Job timeout timeout: 30000, // 30 seconds // Job ID (for deduplication) jobId: `email-${userId}-welcome` } );

Job priorities example:

// High priority - password reset (send immediately) await emailQueue.add( { to: user.email, subject: 'Password Reset', body: resetLink }, { priority: 1 } ); // Medium priority - order confirmation await emailQueue.add( { to: user.email, subject: 'Order Confirmation', body: orderDetails }, { priority: 5 } ); // Low priority - newsletter (can wait) await emailQueue.add( { to: user.email, subject: 'Weekly Newsletter', body: newsletter }, { priority: 10 } );

BullMQ - Modern Alternative

BullMQ offers better TypeScript support and more features:

// queues/emailQueue.js (BullMQ version) const { Queue, Worker } = require('bullmq'); const connection = { host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD }; // Create queue const emailQueue = new Queue('email', { connection }); // Create worker (processes jobs) const emailWorker = new Worker( 'email', async (job) => { const { to, subject, body } = job.data; console.log(`Processing email job ${job.id} to ${to}`); await sendEmail(to, subject, body); return { sent: true, to, sentAt: Date.now() }; }, { connection } ); // Event listeners emailWorker.on('completed', (job) => { console.log(`Job ${job.id} completed`); }); emailWorker.on('failed', (job, err) => { console.error(`Job ${job.id} failed:`, err.message); }); module.exports = { emailQueue, emailWorker };

Adding jobs with BullMQ:

const { emailQueue } = require('./queues/emailQueue'); // Add job await emailQueue.add('welcome-email', { to: 'user@example.com', subject: 'Welcome!', body: 'Thanks for joining' }); // Add with options await emailQueue.add( 'password-reset', { to: 'user@example.com', token: resetToken }, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: 100, // Keep last 100 completed jobs removeOnFail: 1000 // Keep last 1000 failed jobs } );

Scheduled and Recurring Jobs

Schedule jobs to run at specific times or intervals:

// Run job once at specific time const tomorrow = new Date(); tomorrow.setDate(tomorrow.getDate() + 1); tomorrow.setHours(9, 0, 0, 0); // 9 AM tomorrow await emailQueue.add( { to: 'user@example.com', subject: 'Reminder', body: 'Your appointment is tomorrow' }, { delay: tomorrow.getTime() - Date.now() } ); // Repeatable jobs (cron-style) await emailQueue.add( { subject: 'Daily Report', body: 'Here is your daily report' }, { repeat: { cron: '0 9 * * *', // Every day at 9 AM tz: 'America/New_York' } } ); // Weekly report await emailQueue.add( { subject: 'Weekly Summary' }, { repeat: { cron: '0 9 * * 1', // Every Monday at 9 AM limit: 52 // Stop after 1 year } } ); // Every 5 minutes await emailQueue.add( { task: 'health-check' }, { repeat: { every: 5 * 60 * 1000 // 5 minutes in milliseconds } } );
Cron Patterns:
  • 0 9 * * * - Every day at 9 AM
  • */5 * * * * - Every 5 minutes
  • 0 0 * * 0 - Every Sunday at midnight
  • 0 0 1 * * - First day of every month
  • 0 12 * * 1-5 - Weekdays at noon

Job Retries and Error Handling

Handle failures gracefully with retry strategies:

// Define processor with error handling emailQueue.process(async (job) => { const { to, subject, body } = job.data; try { // Attempt to send email await sendEmail(to, subject, body); return { success: true }; } catch (error) { // Log error details console.error(`Failed to send email to ${to}:`, error.message); // Check if we should retry if (job.attemptsMade < job.opts.attempts) { throw error; // Bull will retry automatically } // Max attempts reached - handle permanently failed job await notifyAdmin({ subject: 'Email Delivery Failed', message: `Failed to send email to ${to} after ${job.attemptsMade} attempts`, error: error.message, jobData: job.data }); throw error; // Still throw to mark job as failed } }); // Configure retry strategy await emailQueue.add( emailData, { attempts: 5, backoff: { type: 'exponential', delay: 2000 } } ); // Custom backoff function await emailQueue.add( emailData, { attempts: 3, backoff: (attemptsMade) => { // Custom logic: 1s, 5s, 30s return [1000, 5000, 30000][attemptsMade - 1] || 60000; } } );

Multiple Workers for Concurrency

Process jobs faster by running multiple workers:

// Single-threaded concurrency emailQueue.process(5, async (job) => { // Process up to 5 jobs concurrently return await sendEmail(job.data); }); // Multiple worker processes (worker.js) const { Worker } = require('bullmq'); const worker = new Worker( 'email', async (job) => { return await sendEmail(job.data); }, { connection, concurrency: 10 // Process 10 jobs at once per worker } ); // Run multiple worker processes // Terminal 1: node worker.js // Terminal 2: node worker.js // Terminal 3: node worker.js // Now you have 3 workers processing 10 jobs each = 30 concurrent jobs

Job Progress Tracking

Report job progress for long-running tasks:

// Processor with progress updates videoQueue.process(async (job) => { const { videoPath } = job.data; // Start processing await job.progress(0); // Download video await downloadVideo(videoPath); await job.progress(25); // Transcode to 720p await transcodeVideo(videoPath, '720p'); await job.progress(50); // Transcode to 1080p await transcodeVideo(videoPath, '1080p'); await job.progress(75); // Upload results await uploadResults(); await job.progress(100); return { success: true, formats: ['720p', '1080p'] }; }); // Monitor progress from client const job = await videoQueue.add({ videoPath: '/uploads/video.mp4' }); // Poll for progress const interval = setInterval(async () => { const jobState = await job.getState(); const progress = await job.progress(); console.log(`Job ${job.id}: ${jobState} - ${progress}%`); if (jobState === 'completed' || jobState === 'failed') { clearInterval(interval); } }, 1000);

Bull Board - Job Dashboard

Visualize and manage your queues with Bull Board:

// Install dashboard npm install @bull-board/express @bull-board/api @bull-board/ui // Setup dashboard (app.js) const { createBullBoard } = require('@bull-board/api'); const { BullAdapter } = require('@bull-board/api/bullAdapter'); const { ExpressAdapter } = require('@bull-board/express'); const emailQueue = require('./queues/emailQueue'); const videoQueue = require('./queues/videoQueue'); const reportQueue = require('./queues/reportQueue'); const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullAdapter(emailQueue), new BullAdapter(videoQueue), new BullAdapter(reportQueue) ], serverAdapter }); // Mount dashboard app.use('/admin/queues', serverAdapter.getRouter()); // Now visit: http://localhost:3000/admin/queues
Bull Board Features:
  • View all queues and their job counts
  • Monitor job status (waiting, active, completed, failed)
  • Inspect job data and logs
  • Retry failed jobs manually
  • Clean completed/failed jobs
  • Pause/resume queues

Production Best Practices

// 1. Graceful shutdown const gracefulShutdown = async () => { console.log('Shutting down workers gracefully...'); await emailWorker.close(); await videoWorker.close(); process.exit(0); }; process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown); // 2. Job rate limiting await emailQueue.add( emailData, { limiter: { max: 100, // Max 100 jobs duration: 60000 // Per minute } } ); // 3. Job deduplication await emailQueue.add( emailData, { jobId: `welcome-email-${userId}`, // Unique ID prevents duplicates removeOnComplete: true } ); // 4. Dead letter queue for failed jobs emailQueue.on('failed', async (job, err) => { if (job.attemptsMade >= job.opts.attempts) { // Move to dead letter queue await deadLetterQueue.add({ originalQueue: 'email', jobId: job.id, data: job.data, error: err.message, failedAt: Date.now() }); } }); // 5. Monitor queue health setInterval(async () => { const waiting = await emailQueue.getWaitingCount(); const active = await emailQueue.getActiveCount(); const failed = await emailQueue.getFailedCount(); if (waiting > 1000) { console.warn(`High queue backlog: ${waiting} jobs waiting`); // Alert operations team } if (failed > 100) { console.error(`High failure rate: ${failed} jobs failed`); // Alert development team } }, 60000); // Check every minute

Exercise: Build a Multi-Stage Image Processing Pipeline

  1. Create an upload endpoint that accepts images
  2. Queue image processing job with these stages:
    • Validate image format and size
    • Generate thumbnails (small, medium, large)
    • Optimize images (compress)
    • Upload to cloud storage (simulate)
    • Update database with URLs
  3. Report progress at each stage
  4. Implement retry logic for network failures
  5. Add Bull Board dashboard to monitor jobs
  6. Create a webhook endpoint to notify clients when processing completes
Common Pitfalls:
  • Don't process jobs in your web server process - use dedicated workers
  • Set appropriate job timeouts to prevent hanging jobs
  • Clean up completed jobs regularly to prevent memory issues
  • Monitor Redis memory usage - failed jobs can accumulate
  • Always handle errors - uncaught errors can crash workers

Task queues are essential for building scalable, responsive applications. Master Bull or BullMQ and you'll be able to handle any background processing requirement efficiently.