Laravel Framework

Queues & Job Processing

18 min Lesson 18 of 45

Queues & Job Processing in Laravel

Laravel's queue system provides a unified API across various queue backends, allowing you to defer time-consuming tasks like sending emails, processing images, or generating reports to be processed in the background, significantly improving your application's response time.

Queue Configuration

Configure your queue driver in .env:

# Sync driver (runs immediately, for development) QUEUE_CONNECTION=sync # Database driver (stores jobs in database) QUEUE_CONNECTION=database # Redis driver (recommended for production) QUEUE_CONNECTION=redis REDIS_HOST=127.0.0.1 REDIS_PASSWORD=null REDIS_PORT=6379 # Amazon SQS QUEUE_CONNECTION=sqs AWS_ACCESS_KEY_ID=your-key-id AWS_SECRET_ACCESS_KEY=your-secret-key AWS_DEFAULT_REGION=us-east-1 SQS_QUEUE=your-queue-url # Beanstalkd QUEUE_CONNECTION=beanstalkd BEANSTALKD_HOST=127.0.0.1 BEANSTALKD_QUEUE=default

For database queue, create the jobs table:

php artisan queue:table php artisan migrate

For failed jobs tracking, create the failed jobs table:

php artisan queue:failed-table php artisan migrate
Note: Redis is recommended for production environments due to its speed and reliability. Install the predis/predis package: composer require predis/predis

Creating Jobs

Generate a job class using Artisan:

php artisan make:job ProcessPodcast php artisan make:job SendEmailNotification php artisan make:job GenerateInvoice

Example job class:

<?php namespace App\Jobs; use App\Models\Podcast; use App\Services\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public $podcast; // Number of times job may be attempted public $tries = 3; // Maximum time in seconds before timeout public $timeout = 120; // Maximum number of exceptions before failing public $maxExceptions = 3; // Delete job if models are missing public $deleteWhenMissingModels = true; public function __construct(Podcast $podcast) { $this->podcast = $podcast; } public function handle(AudioProcessor $processor) { // Process the podcast audio $processor->process($this->podcast); // Update status $this->podcast->update([ 'status' => 'processed', 'processed_at' => now(), ]); } // Handle job failure public function failed(\Throwable $exception) { $this->podcast->update(['status' => 'failed']); // Notify admin \Log::error('Podcast processing failed', [ 'podcast_id' => $this->podcast->id, 'error' => $exception->getMessage(), ]); } }

Dispatching Jobs

Dispatch jobs to the queue using multiple methods:

use App\Jobs\ProcessPodcast; // Dispatch to default queue ProcessPodcast::dispatch($podcast); // Dispatch if condition is true ProcessPodcast::dispatchIf($podcast->should_process, $podcast); // Dispatch unless condition is true ProcessPodcast::dispatchUnless($podcast->already_processed, $podcast); // Dispatch after database transaction commits ProcessPodcast::dispatch($podcast)->afterCommit(); // Delay job execution ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10)); // Specify queue name ProcessPodcast::dispatch($podcast)->onQueue('processing'); // Specify queue connection ProcessPodcast::dispatch($podcast)->onConnection('redis'); // Chain jobs (run sequentially) ProcessPodcast::withChain([ new OptimizePodcast($podcast), new ReleasePodcast($podcast), ])->dispatch($podcast); // Dispatch synchronously (not queued) ProcessPodcast::dispatchSync($podcast); // Dispatch after response sent to user ProcessPodcast::dispatchAfterResponse($podcast);

Job Middleware

Job middleware allows you to wrap custom logic around job execution:

php artisan make:middleware RateLimited

Create rate limiter middleware:

<?php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimited { public function handle($job, $next) { Redis::throttle('key') ->block(0) ->allow(10) ->every(60) ->then(function () use ($job, $next) { $next($job); }, function () use ($job) { // Release job back to queue with delay $job->release(10); }); } }

Apply middleware to job:

<?php use App\Jobs\Middleware\RateLimited; class ProcessPodcast implements ShouldQueue { public function middleware() { return [new RateLimited]; } // Or use built-in middleware public function middleware() { return [ (new WithoutOverlapping($this->podcast->id)) ->releaseAfter(60) ->expireAfter(180), ]; } }

Job Batching

Process multiple jobs as a batch and track their collective progress:

use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; $batch = Bus::batch([ new ProcessPodcast($podcast1), new ProcessPodcast($podcast2), new ProcessPodcast($podcast3), ])->then(function (Batch $batch) { // All jobs completed successfully \Log::info('All podcasts processed'); })->catch(function (Batch $batch, \Throwable $e) { // First batch job failure detected \Log::error('Batch processing failed', ['error' => $e->getMessage()]); })->finally(function (Batch $batch) { // The batch has finished executing \Log::info('Batch finished'); })->name('Process Podcasts') ->onConnection('redis') ->onQueue('processing') ->dispatch(); // Get batch ID $batchId = $batch->id; // Check batch status $batch = Bus::findBatch($batchId); $progress = $batch->progress(); // Percentage complete $pending = $batch->pendingJobs; $failed = $batch->failedJobs; $finished = $batch->finished(); $cancelled = $batch->cancelled(); // Cancel batch $batch->cancel(); // Add jobs to existing batch $batch->add([ new ProcessPodcast($podcast4), new ProcessPodcast($podcast5), ]);
Tip: Create the job_batches table before using batching: php artisan queue:batches-table && php artisan migrate

Failed Jobs

Manage failed jobs using Artisan commands:

# List all failed jobs php artisan queue:failed # Retry specific failed job php artisan queue:retry 5 # Retry all failed jobs php artisan queue:retry all # Delete failed job php artisan queue:forget 5 # Delete all failed jobs php artisan queue:flush # Prune failed jobs older than 48 hours php artisan queue:prune-failed --hours=48

Monitor failed jobs programmatically:

use Illuminate\Support\Facades\Queue; Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception // Notify admin \Log::critical('Job failed', [ 'job' => $event->job->getName(), 'connection' => $event->connectionName, 'exception' => $event->exception->getMessage(), ]); });

Queue Workers

Start queue workers to process jobs:

# Process jobs on default queue php artisan queue:work # Process specific connection php artisan queue:work redis # Process specific queue php artisan queue:work redis --queue=emails,processing,default # Process one job and stop php artisan queue:work --once # Process jobs for 60 seconds php artisan queue:work --max-time=60 # Process 100 jobs and stop php artisan queue:work --max-jobs=100 # Stop after processing current job php artisan queue:work --stop-when-empty # Set memory limit php artisan queue:work --memory=128 # Set job timeout php artisan queue:work --timeout=60 # Delay failed jobs php artisan queue:work --backoff=3 # Set sleep duration when no jobs available php artisan queue:work --sleep=3 # Number of jobs to process simultaneously php artisan queue:work --tries=3
Warning: Queue workers are long-lived processes. They won't pick up code changes without restarting. Use php artisan queue:restart to gracefully restart all workers after deploying code.

Supervisor Configuration

Use Supervisor to keep queue workers running in production:

# /etc/supervisor/conf.d/laravel-worker.conf [program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/www/html/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600 autostart=true autorestart=true stopasgroup=true killasgroup=true user=www-data numprocs=8 redirect_stderr=true stdout_logfile=/var/www/html/storage/logs/worker.log stopwaitsecs=3600

Supervisor commands:

# Reload configuration sudo supervisorctl reread sudo supervisorctl update # Start workers sudo supervisorctl start laravel-worker:* # Stop workers sudo supervisorctl stop laravel-worker:* # Restart workers sudo supervisorctl restart laravel-worker:* # Check status sudo supervisorctl status

Laravel Horizon

Horizon provides a dashboard and configuration system for Redis queues:

# Install Horizon composer require laravel/horizon # Publish assets php artisan horizon:install # Start Horizon php artisan horizon # Pause Horizon php artisan horizon:pause # Continue Horizon php artisan horizon:continue # Terminate Horizon php artisan horizon:terminate

Configure Horizon in config/horizon.php:

'environments' => [ 'production' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['default', 'emails', 'notifications'], 'balance' => 'auto', 'maxProcesses' => 10, 'maxTime' => 0, 'maxJobs' => 0, 'memory' => 128, 'tries' => 3, 'timeout' => 60, ], ], 'local' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['default'], 'balance' => 'auto', 'maxProcesses' => 3, 'tries' => 3, ], ], ],
Exercise 1: Create a job that processes uploaded images (resize, optimize, generate thumbnails). Implement proper error handling, retry logic, and progress tracking. Chain jobs to upload processed images to S3.
Exercise 2: Build a bulk email system using job batching. Create a job to send individual emails, batch them together, and provide a progress dashboard showing how many emails have been sent, failed, or are pending.
Exercise 3: Implement a rate-limited API scraper using job middleware. The job should fetch data from an external API, respect rate limits (10 requests per minute), and handle failures gracefully with exponential backoff.

Queues are essential for building scalable, performant Laravel applications. In the next lesson, we'll explore task scheduling for running automated tasks at specified intervals.