Task Queues and Background Jobs
Background jobs are essential for handling time-consuming tasks without blocking your main application. Instead of making users wait for operations like sending emails, processing images, generating reports, or calling slow APIs, you queue these tasks and process them asynchronously. This improves user experience, application responsiveness, and scalability.
Why Use Task Queues?
The Problem: Imagine a user uploads a video that needs to be transcoded. If you process it synchronously, the user waits 10+ minutes for a response. With a queue, you return immediately with "Processing started" and handle transcoding in the background.
Use Cases for Background Jobs:
- Sending emails (welcome emails, newsletters, notifications)
- Image/video processing (resizing, compression, transcoding)
- Report generation (PDFs, spreadsheets, analytics)
- Data synchronization between systems
- API integrations with third-party services
- Database maintenance (cleanup, archiving, indexing)
- Scheduled tasks (daily summaries, periodic backups)
Introducing Bull and BullMQ
Bull and BullMQ are powerful job queue libraries for Node.js backed by Redis. BullMQ is the successor to Bull with better TypeScript support, more features, and improved performance.
# Install Bull (for Redis-backed queues)
npm install bull
# Or install BullMQ (recommended for new projects)
npm install bullmq
# Also need Redis
npm install ioredis
Bull vs BullMQ: Bull is mature and stable (good for existing projects). BullMQ is the modern rewrite with better TypeScript support, flows, and performance. For new projects, use BullMQ.
Creating Your First Queue (Bull)
Let's create a simple email queue:
// queues/emailQueue.js
const Queue = require('bull');
// Create a queue
const emailQueue = new Queue('email', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
}
});
// Define job processor
emailQueue.process(async (job) => {
const { to, subject, body } = job.data;
console.log(`Processing email job ${job.id} to ${to}`);
// Simulate sending email
await sendEmail(to, subject, body);
console.log(`Email job ${job.id} completed`);
return { sent: true, to };
});
// Event listeners
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
emailQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});
module.exports = emailQueue;
Adding jobs to the queue:
// In your Express route or controller
const emailQueue = require('./queues/emailQueue');
app.post('/register', async (req, res) => {
const { email, name } = req.body;
// Create user in database
const user = await User.create({ email, name });
// Add job to queue (non-blocking)
await emailQueue.add({
to: email,
subject: 'Welcome to Our App!',
body: `Hi ${name}, thanks for signing up!`
});
// Return immediately
res.json({
success: true,
message: 'Registration successful! Check your email.'
});
});
Job Options and Priorities
Configure job behavior with options:
// Job with options
await emailQueue.add(
{ to: 'user@example.com', subject: 'Test', body: 'Hello' },
{
// Job priority (lower number = higher priority)
priority: 1,
// Delay job execution (in milliseconds)
delay: 5000, // Start after 5 seconds
// Job attempts (retries on failure)
attempts: 3,
// Backoff strategy for retries
backoff: {
type: 'exponential',
delay: 2000 // Start with 2s, then 4s, 8s, etc.
},
// Remove job when completed
removeOnComplete: true,
// Remove job when failed
removeOnFail: false,
// Job timeout
timeout: 30000, // 30 seconds
// Job ID (for deduplication)
jobId: `email-${userId}-welcome`
}
);
Job priorities example:
// High priority - password reset (send immediately)
await emailQueue.add(
{ to: user.email, subject: 'Password Reset', body: resetLink },
{ priority: 1 }
);
// Medium priority - order confirmation
await emailQueue.add(
{ to: user.email, subject: 'Order Confirmation', body: orderDetails },
{ priority: 5 }
);
// Low priority - newsletter (can wait)
await emailQueue.add(
{ to: user.email, subject: 'Weekly Newsletter', body: newsletter },
{ priority: 10 }
);
BullMQ - Modern Alternative
BullMQ offers better TypeScript support and more features:
// queues/emailQueue.js (BullMQ version)
const { Queue, Worker } = require('bullmq');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
};
// Create queue
const emailQueue = new Queue('email', { connection });
// Create worker (processes jobs)
const emailWorker = new Worker(
'email',
async (job) => {
const { to, subject, body } = job.data;
console.log(`Processing email job ${job.id} to ${to}`);
await sendEmail(to, subject, body);
return { sent: true, to, sentAt: Date.now() };
},
{ connection }
);
// Event listeners
emailWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
emailWorker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
module.exports = { emailQueue, emailWorker };
Adding jobs with BullMQ:
const { emailQueue } = require('./queues/emailQueue');
// Add job
await emailQueue.add('welcome-email', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for joining'
});
// Add with options
await emailQueue.add(
'password-reset',
{ to: 'user@example.com', token: resetToken },
{
priority: 1,
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 1000 // Keep last 1000 failed jobs
}
);
Scheduled and Recurring Jobs
Schedule jobs to run at specific times or intervals:
// Run job once at specific time
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(9, 0, 0, 0); // 9 AM tomorrow
await emailQueue.add(
{ to: 'user@example.com', subject: 'Reminder', body: 'Your appointment is tomorrow' },
{ delay: tomorrow.getTime() - Date.now() }
);
// Repeatable jobs (cron-style)
await emailQueue.add(
{ subject: 'Daily Report', body: 'Here is your daily report' },
{
repeat: {
cron: '0 9 * * *', // Every day at 9 AM
tz: 'America/New_York'
}
}
);
// Weekly report
await emailQueue.add(
{ subject: 'Weekly Summary' },
{
repeat: {
cron: '0 9 * * 1', // Every Monday at 9 AM
limit: 52 // Stop after 1 year
}
}
);
// Every 5 minutes
await emailQueue.add(
{ task: 'health-check' },
{
repeat: {
every: 5 * 60 * 1000 // 5 minutes in milliseconds
}
}
);
Cron Patterns:
0 9 * * * - Every day at 9 AM
*/5 * * * * - Every 5 minutes
0 0 * * 0 - Every Sunday at midnight
0 0 1 * * - First day of every month
0 12 * * 1-5 - Weekdays at noon
Job Retries and Error Handling
Handle failures gracefully with retry strategies:
// Define processor with error handling
emailQueue.process(async (job) => {
const { to, subject, body } = job.data;
try {
// Attempt to send email
await sendEmail(to, subject, body);
return { success: true };
} catch (error) {
// Log error details
console.error(`Failed to send email to ${to}:`, error.message);
// Check if we should retry
if (job.attemptsMade < job.opts.attempts) {
throw error; // Bull will retry automatically
}
// Max attempts reached - handle permanently failed job
await notifyAdmin({
subject: 'Email Delivery Failed',
message: `Failed to send email to ${to} after ${job.attemptsMade} attempts`,
error: error.message,
jobData: job.data
});
throw error; // Still throw to mark job as failed
}
});
// Configure retry strategy
await emailQueue.add(
emailData,
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
}
}
);
// Custom backoff function
await emailQueue.add(
emailData,
{
attempts: 3,
backoff: (attemptsMade) => {
// Custom logic: 1s, 5s, 30s
return [1000, 5000, 30000][attemptsMade - 1] || 60000;
}
}
);
Multiple Workers for Concurrency
Process jobs faster by running multiple workers:
// Single-threaded concurrency
emailQueue.process(5, async (job) => {
// Process up to 5 jobs concurrently
return await sendEmail(job.data);
});
// Multiple worker processes (worker.js)
const { Worker } = require('bullmq');
const worker = new Worker(
'email',
async (job) => {
return await sendEmail(job.data);
},
{
connection,
concurrency: 10 // Process 10 jobs at once per worker
}
);
// Run multiple worker processes
// Terminal 1: node worker.js
// Terminal 2: node worker.js
// Terminal 3: node worker.js
// Now you have 3 workers processing 10 jobs each = 30 concurrent jobs
Job Progress Tracking
Report job progress for long-running tasks:
// Processor with progress updates
videoQueue.process(async (job) => {
const { videoPath } = job.data;
// Start processing
await job.progress(0);
// Download video
await downloadVideo(videoPath);
await job.progress(25);
// Transcode to 720p
await transcodeVideo(videoPath, '720p');
await job.progress(50);
// Transcode to 1080p
await transcodeVideo(videoPath, '1080p');
await job.progress(75);
// Upload results
await uploadResults();
await job.progress(100);
return { success: true, formats: ['720p', '1080p'] };
});
// Monitor progress from client
const job = await videoQueue.add({ videoPath: '/uploads/video.mp4' });
// Poll for progress
const interval = setInterval(async () => {
const jobState = await job.getState();
const progress = await job.progress();
console.log(`Job ${job.id}: ${jobState} - ${progress}%`);
if (jobState === 'completed' || jobState === 'failed') {
clearInterval(interval);
}
}, 1000);
Bull Board - Job Dashboard
Visualize and manage your queues with Bull Board:
// Install dashboard
npm install @bull-board/express @bull-board/api @bull-board/ui
// Setup dashboard (app.js)
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const emailQueue = require('./queues/emailQueue');
const videoQueue = require('./queues/videoQueue');
const reportQueue = require('./queues/reportQueue');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullAdapter(emailQueue),
new BullAdapter(videoQueue),
new BullAdapter(reportQueue)
],
serverAdapter
});
// Mount dashboard
app.use('/admin/queues', serverAdapter.getRouter());
// Now visit: http://localhost:3000/admin/queues
Bull Board Features:
- View all queues and their job counts
- Monitor job status (waiting, active, completed, failed)
- Inspect job data and logs
- Retry failed jobs manually
- Clean completed/failed jobs
- Pause/resume queues
Production Best Practices
// 1. Graceful shutdown
const gracefulShutdown = async () => {
console.log('Shutting down workers gracefully...');
await emailWorker.close();
await videoWorker.close();
process.exit(0);
};
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
// 2. Job rate limiting
await emailQueue.add(
emailData,
{
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per minute
}
}
);
// 3. Job deduplication
await emailQueue.add(
emailData,
{
jobId: `welcome-email-${userId}`, // Unique ID prevents duplicates
removeOnComplete: true
}
);
// 4. Dead letter queue for failed jobs
emailQueue.on('failed', async (job, err) => {
if (job.attemptsMade >= job.opts.attempts) {
// Move to dead letter queue
await deadLetterQueue.add({
originalQueue: 'email',
jobId: job.id,
data: job.data,
error: err.message,
failedAt: Date.now()
});
}
});
// 5. Monitor queue health
setInterval(async () => {
const waiting = await emailQueue.getWaitingCount();
const active = await emailQueue.getActiveCount();
const failed = await emailQueue.getFailedCount();
if (waiting > 1000) {
console.warn(`High queue backlog: ${waiting} jobs waiting`);
// Alert operations team
}
if (failed > 100) {
console.error(`High failure rate: ${failed} jobs failed`);
// Alert development team
}
}, 60000); // Check every minute
Exercise: Build a Multi-Stage Image Processing Pipeline
- Create an upload endpoint that accepts images
- Queue image processing job with these stages:
- Validate image format and size
- Generate thumbnails (small, medium, large)
- Optimize images (compress)
- Upload to cloud storage (simulate)
- Update database with URLs
- Report progress at each stage
- Implement retry logic for network failures
- Add Bull Board dashboard to monitor jobs
- Create a webhook endpoint to notify clients when processing completes
Common Pitfalls:
- Don't process jobs in your web server process - use dedicated workers
- Set appropriate job timeouts to prevent hanging jobs
- Clean up completed jobs regularly to prevent memory issues
- Monitor Redis memory usage - failed jobs can accumulate
- Always handle errors - uncaught errors can crash workers
Task queues are essential for building scalable, responsive applications. Master Bull or BullMQ and you'll be able to handle any background processing requirement efficiently.