Job Queues with Redis
Job Queues with Redis
Job queues allow you to process time-consuming tasks asynchronously in the background. Redis provides fast, reliable queue functionality through libraries like Bull.
What are Job Queues?
Job queues decouple task execution from request handling. Instead of making users wait for slow operations (emails, image processing, reports), you add jobs to a queue and process them in the background.
Bull Queue Library
Bull is the most popular Node.js job queue library built on Redis. It provides robust features for managing background jobs.
const Queue = require('bull');\n\n// Create a queue\nconst emailQueue = new Queue('email-queue', {\n redis: {\n host: '127.0.0.1',\n port: 6379\n }\n});\n\n// Add jobs to queue\nawait emailQueue.add({\n to: 'user@example.com',\n subject: 'Welcome!',\n body: 'Thanks for signing up'\n});\n\nconsole.log('Job added to queue');Processing Jobs
Define processors to handle jobs from the queue. Processors run in separate processes and can be scaled independently.
// Processor file: emailProcessor.js\nconst emailQueue = require('./emailQueue');\nconst nodemailer = require('nodemailer');\n\nemailQueue.process(async (job) => {\n const { to, subject, body } = job.data;\n \n // Send email\n await transporter.sendMail({\n from: 'noreply@app.com',\n to,\n subject,\n text: body\n });\n \n return { sent: true, to };\n});Job Priorities
Assign priorities to jobs to control processing order. Higher priority jobs are processed first.
// High priority job (lower number = higher priority)\nawait emailQueue.add(\n { to: 'admin@app.com', subject: 'Critical Alert' },\n { priority: 1 }\n);\n\n// Normal priority\nawait emailQueue.add(\n { to: 'user@app.com', subject: 'Newsletter' },\n { priority: 5 }\n);\n\n// Low priority\nawait emailQueue.add(\n { to: 'user@app.com', subject: 'Marketing' },\n { priority: 10 }\n);Delayed Jobs
Schedule jobs to run after a specific delay or at a specific time.
// Delay job by 1 hour (milliseconds)\nawait emailQueue.add(\n { to: 'user@app.com', subject: 'Reminder' },\n { delay: 60 * 60 * 1000 }\n);\n\n// Schedule for specific time\nconst scheduledTime = new Date('2026-02-17T10:00:00Z');\nawait emailQueue.add(\n { to: 'user@app.com', subject: 'Scheduled Email' },\n { delay: scheduledTime.getTime() - Date.now() }\n);Repeatable Jobs
Create recurring jobs that run on a schedule using cron patterns.
// Run every day at midnight\nawait emailQueue.add(\n { type: 'daily-report' },\n {\n repeat: {\n cron: '0 0 * * *'\n }\n }\n);\n\n// Run every 15 minutes\nawait emailQueue.add(\n { type: 'health-check' },\n {\n repeat: {\n every: 15 * 60 * 1000\n }\n }\n);Job Events and Monitoring
Listen to job events for monitoring, logging, and error handling.
emailQueue.on('completed', (job, result) => {\n console.log(`Job ${job.id} completed:`, result);\n});\n\nemailQueue.on('failed', (job, err) => {\n console.error(`Job ${job.id} failed:`, err);\n // Send alert or retry logic\n});\n\nemailQueue.on('progress', (job, progress) => {\n console.log(`Job ${job.id} progress: ${progress}%`);\n});Bull Board Dashboard
Bull Board provides a web UI to monitor and manage queues, jobs, and workers.
const { createBullBoard } = require('@bull-board/api');\nconst { BullAdapter } = require('@bull-board/api/bullAdapter');\nconst { ExpressAdapter } = require('@bull-board/express');\n\nconst serverAdapter = new ExpressAdapter();\nserverAdapter.setBasePath('/admin/queues');\n\ncreateBullBoard({\n queues: [new BullAdapter(emailQueue)],\n serverAdapter\n});\n\napp.use('/admin/queues', serverAdapter.getRouter());Retry Logic
Configure automatic retries for failed jobs with exponential backoff.
await emailQueue.add(\n { to: 'user@app.com' },\n {\n attempts: 3,\n backoff: {\n type: 'exponential',\n delay: 2000 // Start with 2 seconds\n }\n }\n);\n\n// Custom backoff function\nawait emailQueue.add(\n { to: 'user@app.com' },\n {\n attempts: 5,\n backoff: (attemptsMade) => {\n return Math.pow(2, attemptsMade) * 1000;\n }\n }\n);Job Lifecycle
Understanding the complete lifecycle of a job helps with debugging and optimization.
// Job states:\n// waiting - In queue, waiting to be processed\n// active - Currently being processed\n// completed - Successfully finished\n// failed - Processing failed\n// delayed - Waiting for delay to expire\n// paused - Queue is paused\n\n// Get jobs by state\nconst waiting = await emailQueue.getWaiting();\nconst active = await emailQueue.getActive();\nconst failed = await emailQueue.getFailed();\n\n// Clean old jobs\nawait emailQueue.clean(24 * 3600 * 1000, 'completed');