Advanced Laravel

Advanced Queue System

20 min Lesson 9 of 40

Advanced Queue System

Master Laravel's advanced queue features including job middleware, rate limiting, unique jobs, job batching, chaining, failure handling, and monitoring with Laravel Horizon.

Job Middleware

Apply middleware to jobs for cross-cutting concerns like rate limiting and authentication:

// app/Jobs/ProcessPodcast.php namespace App\Jobs; use App\Jobs\Middleware\RateLimited; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct( public Podcast $podcast ) {} // Define job middleware public function middleware(): array { return [ new RateLimited('process-podcast'), new WithoutOverlapping($this->podcast->id), ]; } public function handle(): void { // Process podcast } } // app/Jobs/Middleware/RateLimited.php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimited { public function __construct( protected string $key, protected int $maxAttempts = 60, protected int $decayMinutes = 1 ) {} public function handle($job, $next) { $key = "rate-limit:{$this->key}:{$job->podcast->id}"; if (Redis::throttle($key)->allow($this->maxAttempts)->every($this->decayMinutes * 60)->then( function () use ($job, $next) { $next($job); }, function () use ($job) { $job->release(60); // Release for 60 seconds } )) { return; } $job->delete(); } }
Pro Tip: Job middleware runs before the job is processed, allowing you to control execution flow, implement rate limiting, or skip execution based on conditions.

Rate Limiting Jobs

Control job execution rate using built-in rate limiters:

use Illuminate\Queue\Middleware\RateLimited as RateLimitMiddleware; use Illuminate\Queue\Middleware\ThrottlesExceptions; class ProcessApiCall implements ShouldQueue { public function middleware(): array { return [ // Limit to 10 jobs per minute (new RateLimitMiddleware('api-calls')) ->allow(10) ->everyMinute(), // Throttle exceptions - release job on exception (new ThrottlesExceptions(3, 5)) // 3 attempts over 5 minutes ->backoff(5), // Wait 5 minutes before retry ]; } public function handle(): void { // Make API call $response = Http::get('https://api.example.com/data'); if ($response->failed()) { throw new ApiCallException('API call failed'); } } } // Dynamic rate limiting class SendEmail implements ShouldQueue { public function middleware(): array { return [ (new RateLimitMiddleware('emails')) ->dontRelease() // Don't release back to queue ->allow($this->getAllowedRate()) ->everyMinute(), ]; } protected function getAllowedRate(): int { return $this->user->isPremium() ? 100 : 10; } }

Unique Jobs

Prevent duplicate jobs from being queued:

use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; // Unique for the job lifetime class ProcessReport implements ShouldQueue, ShouldBeUnique { public function __construct( public Report $report ) {} // Define uniqueness constraint public function uniqueId(): string { return $this->report->id; } // How long to maintain uniqueness (seconds) public $uniqueFor = 3600; // 1 hour public function handle(): void { // Process report } } // Unique only until processing starts class SendNotification implements ShouldQueue, ShouldBeUniqueUntilProcessing { public function __construct( public User $user, public string $message ) {} public function uniqueId(): string { return "notification:{$this->user->id}"; } public function handle(): void { $this->user->notify(new GenericNotification($this->message)); } } // Unique via attribute class ExportData implements ShouldQueue, ShouldBeUnique { public function __construct( public int $userId ) {} // Use via attribute instead of uniqueId() public function uniqueVia(): string { return Cache::driver('redis'); } }
Note: ShouldBeUnique prevents duplicate jobs during the entire uniqueFor period, while ShouldBeUniqueUntilProcessing allows duplicate jobs once processing starts.

Job Batching

Execute multiple jobs as a coordinated batch:

use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; // Dispatch a batch of jobs $batch = Bus::batch([ new ProcessCsvRow($row1), new ProcessCsvRow($row2), new ProcessCsvRow($row3), ])->then(function (Batch $batch) { // All jobs completed successfully Log::info('Batch completed', ['batch_id' => $batch->id]); })->catch(function (Batch $batch, Throwable $e) { // First batch job failure detected Log::error('Batch failed', ['batch_id' => $batch->id, 'error' => $e->getMessage()]); })->finally(function (Batch $batch) { // Batch has finished executing Notification::send($batch->user, new BatchCompleted($batch)); })->name('CSV Import') ->onQueue('imports') ->dispatch(); // Add jobs to existing batch $batch->add([ new ProcessCsvRow($row4), new ProcessCsvRow($row5), ]); // Check batch status if ($batch->finished()) { // All jobs completed } if ($batch->hasFailures()) { // Some jobs failed foreach ($batch->failures() as $failure) { Log::error($failure->exception); } } // Cancel batch $batch->cancel(); // Accessing batch from within job class ProcessCsvRow implements ShouldQueue { use Batchable; public function handle(): void { if ($this->batch()->cancelled()) { return; } // Process row } }

Job Chaining

Execute jobs sequentially with dependencies:

use Illuminate\Support\Facades\Bus; // Simple chain Bus::chain([ new DownloadVideo($video), new ProcessVideo($video), new UploadToS3($video), new SendNotification($user, 'Video processed'), ])->dispatch(); // Chain with error handling Bus::chain([ new ImportUsers($file), new SendWelcomeEmails(), new GenerateReport(), ])->catch(function (Throwable $e) { Log::error('Chain failed: ' . $e->getMessage()); })->dispatch(); // Conditional chaining within job class ProcessOrder implements ShouldQueue { public function handle(): void { // Process order $order->process(); // Chain next job if ($order->requiresShipping()) { $this->chain([ new AllocateInventory($order), new GenerateShippingLabel($order), new NotifyWarehouse($order), ]); } else { $this->chain([ new SendDigitalProduct($order), ]); } } } // Prepend jobs to chain $chain = [new Job2(), new Job3()]; array_unshift($chain, new Job1()); Bus::chain($chain)->dispatch();
Warning: If any job in a chain fails, subsequent jobs won't execute unless you use catch() to handle failures gracefully.

Handling Failed Jobs

Implement comprehensive failure handling:

class ProcessPayment implements ShouldQueue { // Number of times the job may be attempted public $tries = 5; // Maximum seconds the job can run public $timeout = 120; // Seconds to wait before retrying after failure public $backoff = [10, 30, 60, 120, 300]; // Maximum exceptions allowed before failing public $maxExceptions = 3; // Delete job if models don't exist public $deleteWhenMissingModels = true; public function __construct( public Payment $payment ) {} public function handle(): void { // Attempt payment processing $result = PaymentGateway::charge($this->payment); if (!$result->successful()) { // Manually fail with custom message $this->fail(new PaymentException('Payment failed: ' . $result->error)); } } // Called when job fails after all retries public function failed(Throwable $exception): void { Log::error('Payment job failed', [ 'payment_id' => $this->payment->id, 'exception' => $exception->getMessage(), ]); // Update payment status $this->payment->update([ 'status' => 'failed', 'error' => $exception->getMessage(), ]); // Notify user $this->payment->user->notify( new PaymentFailedNotification($this->payment) ); } // Determine if job should be retried public function shouldRetry(Throwable $exception): bool { // Don't retry on validation errors if ($exception instanceof ValidationException) { return false; } // Retry on network errors return $exception instanceof NetworkException; } }

Monitoring with Laravel Horizon

Monitor and manage queues with Horizon dashboard:

// composer require laravel/horizon // config/horizon.php return [ 'environments' => [ 'production' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['default', 'emails', 'reports'], 'balance' => 'auto', 'processes' => 10, 'tries' => 3, 'timeout' => 60, ], 'supervisor-2' => [ 'connection' => 'redis', 'queue' => ['imports', 'exports'], 'balance' => 'simple', 'processes' => 5, 'tries' => 1, 'timeout' => 300, ], ], ], // Job trimming 'trim' => [ 'recent' => 60, // minutes 'pending' => 60, 'completed' => 60, 'recent_failed' => 10080, // 1 week 'failed' => 10080, 'monitored' => 10080, ], // Job metrics 'metrics' => [ 'default' => 60, // minutes ], ]; // Start Horizon php artisan horizon // Publish Horizon assets php artisan horizon:install // Authorization // app/Providers/HorizonServiceProvider.php protected function gate() { Gate::define('viewHorizon', function ($user) { return in_array($user->email, [ 'admin@example.com', ]); }); }

Job Events and Listeners

Listen to queue events for monitoring and debugging:

use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; // app/Providers/EventServiceProvider.php protected $listen = [ JobProcessing::class => [ LogJobProcessing::class, ], JobProcessed::class => [ LogJobProcessed::class, UpdateJobMetrics::class, ], JobFailed::class => [ LogJobFailure::class, NotifyAdminOfFailure::class, ], ]; // app/Listeners/LogJobProcessing.php class LogJobProcessing { public function handle(JobProcessing $event): void { Log::info('Job processing started', [ 'job' => $event->job->resolveName(), 'connection' => $event->connectionName, 'queue' => $event->job->getQueue(), ]); } } // app/Listeners/NotifyAdminOfFailure.php class NotifyAdminOfFailure { public function handle(JobFailed $event): void { if ($this->isCriticalJob($event->job)) { Notification::send( User::admins(), new CriticalJobFailed($event) ); } } protected function isCriticalJob($job): bool { return in_array($job->resolveName(), [ 'App\\Jobs\\ProcessPayment', 'App\\Jobs\\SendInvoice', ]); } }

Custom Queue Connectors

Create custom queue drivers for specialized needs:

// app/Queue/MongoConnector.php namespace App\Queue; use Illuminate\Queue\Connectors\ConnectorInterface; class MongoConnector implements ConnectorInterface { public function connect(array $config) { return new MongoQueue( $config['connection'], $config['collection'], $config['queue'] ?? 'default', $config['retry_after'] ?? 60 ); } } // Register in service provider public function boot() { Queue::extend('mongo', function () { return new MongoConnector; }); } // config/queue.php 'connections' => [ 'mongo' => [ 'driver' => 'mongo', 'connection' => 'mongodb', 'collection' => 'jobs', 'queue' => 'default', 'retry_after' => 90, ], ],
Exercise 1: Build a batch CSV import system that:
1. Accepts uploads up to 10,000 rows
2. Processes rows in batches of 100
3. Validates each row and tracks failures
4. Generates a report when batch completes
5. Sends email notification with import statistics
Use job batching with progress tracking and comprehensive error handling.
Exercise 2: Implement a video processing pipeline using job chaining:
1. Download video from URL
2. Extract audio track
3. Generate 3 thumbnail images
4. Transcode to multiple resolutions (480p, 720p, 1080p)
5. Upload all assets to S3
6. Update database and notify user
Handle failures at each stage with appropriate retry logic and notifications.
Exercise 3: Create a rate-limited API synchronization job that:
1. Fetches data from external API (rate limit: 100 requests/minute)
2. Processes and stores data in database
3. Ensures only one sync runs at a time per resource
4. Retries failed requests with exponential backoff
5. Logs all sync activities to Horizon
Implement using unique jobs, rate limiting middleware, and custom failure handling.