Queues & Job Processing in Laravel
Laravel's queue system provides a unified API across various queue backends, allowing you to defer time-consuming tasks like sending emails, processing images, or generating reports to be processed in the background, significantly improving your application's response time.
Queue Configuration
Configure your queue driver in .env:
# Sync driver (runs immediately, for development)
QUEUE_CONNECTION=sync
# Database driver (stores jobs in database)
QUEUE_CONNECTION=database
# Redis driver (recommended for production)
QUEUE_CONNECTION=redis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
# Amazon SQS
QUEUE_CONNECTION=sqs
AWS_ACCESS_KEY_ID=your-key-id
AWS_SECRET_ACCESS_KEY=your-secret-key
AWS_DEFAULT_REGION=us-east-1
SQS_QUEUE=your-queue-url
# Beanstalkd
QUEUE_CONNECTION=beanstalkd
BEANSTALKD_HOST=127.0.0.1
BEANSTALKD_QUEUE=default
For database queue, create the jobs table:
php artisan queue:table
php artisan migrate
For failed jobs tracking, create the failed jobs table:
php artisan queue:failed-table
php artisan migrate
Note: Redis is recommended for production environments due to its speed and reliability. Install the predis/predis package: composer require predis/predis
Creating Jobs
Generate a job class using Artisan:
php artisan make:job ProcessPodcast
php artisan make:job SendEmailNotification
php artisan make:job GenerateInvoice
Example job class:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $podcast;
// Number of times job may be attempted
public $tries = 3;
// Maximum time in seconds before timeout
public $timeout = 120;
// Maximum number of exceptions before failing
public $maxExceptions = 3;
// Delete job if models are missing
public $deleteWhenMissingModels = true;
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
public function handle(AudioProcessor $processor)
{
// Process the podcast audio
$processor->process($this->podcast);
// Update status
$this->podcast->update([
'status' => 'processed',
'processed_at' => now(),
]);
}
// Handle job failure
public function failed(\Throwable $exception)
{
$this->podcast->update(['status' => 'failed']);
// Notify admin
\Log::error('Podcast processing failed', [
'podcast_id' => $this->podcast->id,
'error' => $exception->getMessage(),
]);
}
}
Dispatching Jobs
Dispatch jobs to the queue using multiple methods:
use App\Jobs\ProcessPodcast;
// Dispatch to default queue
ProcessPodcast::dispatch($podcast);
// Dispatch if condition is true
ProcessPodcast::dispatchIf($podcast->should_process, $podcast);
// Dispatch unless condition is true
ProcessPodcast::dispatchUnless($podcast->already_processed, $podcast);
// Dispatch after database transaction commits
ProcessPodcast::dispatch($podcast)->afterCommit();
// Delay job execution
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));
// Specify queue name
ProcessPodcast::dispatch($podcast)->onQueue('processing');
// Specify queue connection
ProcessPodcast::dispatch($podcast)->onConnection('redis');
// Chain jobs (run sequentially)
ProcessPodcast::withChain([
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
])->dispatch($podcast);
// Dispatch synchronously (not queued)
ProcessPodcast::dispatchSync($podcast);
// Dispatch after response sent to user
ProcessPodcast::dispatchAfterResponse($podcast);
Job Middleware
Job middleware allows you to wrap custom logic around job execution:
php artisan make:middleware RateLimited
Create rate limiter middleware:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)
->allow(10)
->every(60)
->then(function () use ($job, $next) {
$next($job);
}, function () use ($job) {
// Release job back to queue with delay
$job->release(10);
});
}
}
Apply middleware to job:
<?php
use App\Jobs\Middleware\RateLimited;
class ProcessPodcast implements ShouldQueue
{
public function middleware()
{
return [new RateLimited];
}
// Or use built-in middleware
public function middleware()
{
return [
(new WithoutOverlapping($this->podcast->id))
->releaseAfter(60)
->expireAfter(180),
];
}
}
Job Batching
Process multiple jobs as a batch and track their collective progress:
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([
new ProcessPodcast($podcast1),
new ProcessPodcast($podcast2),
new ProcessPodcast($podcast3),
])->then(function (Batch $batch) {
// All jobs completed successfully
\Log::info('All podcasts processed');
})->catch(function (Batch $batch, \Throwable $e) {
// First batch job failure detected
\Log::error('Batch processing failed', ['error' => $e->getMessage()]);
})->finally(function (Batch $batch) {
// The batch has finished executing
\Log::info('Batch finished');
})->name('Process Podcasts')
->onConnection('redis')
->onQueue('processing')
->dispatch();
// Get batch ID
$batchId = $batch->id;
// Check batch status
$batch = Bus::findBatch($batchId);
$progress = $batch->progress(); // Percentage complete
$pending = $batch->pendingJobs;
$failed = $batch->failedJobs;
$finished = $batch->finished();
$cancelled = $batch->cancelled();
// Cancel batch
$batch->cancel();
// Add jobs to existing batch
$batch->add([
new ProcessPodcast($podcast4),
new ProcessPodcast($podcast5),
]);
Tip: Create the job_batches table before using batching: php artisan queue:batches-table && php artisan migrate
Failed Jobs
Manage failed jobs using Artisan commands:
# List all failed jobs
php artisan queue:failed
# Retry specific failed job
php artisan queue:retry 5
# Retry all failed jobs
php artisan queue:retry all
# Delete failed job
php artisan queue:forget 5
# Delete all failed jobs
php artisan queue:flush
# Prune failed jobs older than 48 hours
php artisan queue:prune-failed --hours=48
Monitor failed jobs programmatically:
use Illuminate\Support\Facades\Queue;
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
// Notify admin
\Log::critical('Job failed', [
'job' => $event->job->getName(),
'connection' => $event->connectionName,
'exception' => $event->exception->getMessage(),
]);
});
Queue Workers
Start queue workers to process jobs:
# Process jobs on default queue
php artisan queue:work
# Process specific connection
php artisan queue:work redis
# Process specific queue
php artisan queue:work redis --queue=emails,processing,default
# Process one job and stop
php artisan queue:work --once
# Process jobs for 60 seconds
php artisan queue:work --max-time=60
# Process 100 jobs and stop
php artisan queue:work --max-jobs=100
# Stop after processing current job
php artisan queue:work --stop-when-empty
# Set memory limit
php artisan queue:work --memory=128
# Set job timeout
php artisan queue:work --timeout=60
# Delay failed jobs
php artisan queue:work --backoff=3
# Set sleep duration when no jobs available
php artisan queue:work --sleep=3
# Number of jobs to process simultaneously
php artisan queue:work --tries=3
Warning: Queue workers are long-lived processes. They won't pick up code changes without restarting. Use php artisan queue:restart to gracefully restart all workers after deploying code.
Supervisor Configuration
Use Supervisor to keep queue workers running in production:
# /etc/supervisor/conf.d/laravel-worker.conf
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/var/www/html/storage/logs/worker.log
stopwaitsecs=3600
Supervisor commands:
# Reload configuration
sudo supervisorctl reread
sudo supervisorctl update
# Start workers
sudo supervisorctl start laravel-worker:*
# Stop workers
sudo supervisorctl stop laravel-worker:*
# Restart workers
sudo supervisorctl restart laravel-worker:*
# Check status
sudo supervisorctl status
Laravel Horizon
Horizon provides a dashboard and configuration system for Redis queues:
# Install Horizon
composer require laravel/horizon
# Publish assets
php artisan horizon:install
# Start Horizon
php artisan horizon
# Pause Horizon
php artisan horizon:pause
# Continue Horizon
php artisan horizon:continue
# Terminate Horizon
php artisan horizon:terminate
Configure Horizon in config/horizon.php:
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default', 'emails', 'notifications'],
'balance' => 'auto',
'maxProcesses' => 10,
'maxTime' => 0,
'maxJobs' => 0,
'memory' => 128,
'tries' => 3,
'timeout' => 60,
],
],
'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'auto',
'maxProcesses' => 3,
'tries' => 3,
],
],
],
Exercise 1: Create a job that processes uploaded images (resize, optimize, generate thumbnails). Implement proper error handling, retry logic, and progress tracking. Chain jobs to upload processed images to S3.
Exercise 2: Build a bulk email system using job batching. Create a job to send individual emails, batch them together, and provide a progress dashboard showing how many emails have been sent, failed, or are pending.
Exercise 3: Implement a rate-limited API scraper using job middleware. The job should fetch data from an external API, respect rate limits (10 requests per minute), and handle failures gracefully with exponential backoff.
Queues are essential for building scalable, performant Laravel applications. In the next lesson, we'll explore task scheduling for running automated tasks at specified intervals.