Laravel Framework

Queues Advanced & Job Batching

18 min Lesson 38 of 45

Queues Advanced & Job Batching

Laravel's queue system provides powerful features for managing complex, long-running tasks. In this lesson, we'll explore advanced queue concepts including job batching, chaining, rate limiting, unique jobs, middleware, failure handling, and retry strategies.

Job Batching

Job batching allows you to dispatch a group of jobs and take action when the entire batch completes, including tracking progress and handling failures:

// Create the batches table php artisan queue:batches-table php artisan migrate // Dispatching a batch of jobs use App\Jobs\ProcessVideo; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; $videos = Video::where('status', 'pending')->get(); $batch = Bus::batch([ new ProcessVideo($videos[0]), new ProcessVideo($videos[1]), new ProcessVideo($videos[2]), ])->then(function (Batch $batch) { // All jobs completed successfully logger()->info("Batch {$batch->id} completed successfully"); // Send completion notification Notification::send( User::admins(), new BatchCompleted($batch) ); })->catch(function (Batch $batch, Throwable $e) { // First batch job failure detected logger()->error("Batch {$batch->id} failed", [ 'exception' => $e->getMessage() ]); })->finally(function (Batch $batch) { // The batch has finished executing logger()->info("Batch {$batch->id} finished", [ 'total_jobs' => $batch->totalJobs, 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, ]); })->allowFailures()->dispatch(); // Get batch ID $batchId = $batch->id; // Dynamically adding jobs to a batch $batch->add([ new ProcessVideo($videos[3]), new ProcessVideo($videos[4]), ]);
Batch Methods:
  • allowFailures() - Continue processing even if jobs fail
  • onConnection() - Specify queue connection
  • onQueue() - Specify queue name
  • name() - Give the batch a name
  • withOption() - Add custom options

Making Jobs Batchable

Jobs must use the Batchable trait to participate in batches:

// app/Jobs/ProcessVideo.php namespace App\Jobs; use App\Models\Video; use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessVideo implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct( public Video $video ) {} public function handle(): void { // Check if batch has been cancelled if ($this->batch()?->cancelled()) { return; } // Process the video $this->video->process(); // Update batch progress (custom attribute) $this->batch()?->increment('videos_processed'); // Add more jobs dynamically if needed if ($this->video->needsThumbnails()) { $this->batch()?->add([ new GenerateThumbnails($this->video) ]); } } }

Inspecting Batches

Retrieve and inspect batch status from anywhere in your application:

use Illuminate\Support\Facades\Bus; // Find a batch by ID $batch = Bus::findBatch($batchId); if ($batch) { // Batch properties $batch->id; // Batch ID $batch->name; // Batch name $batch->totalJobs; // Total jobs $batch->pendingJobs; // Pending jobs $batch->processedJobs(); // Processed jobs $batch->failedJobs; // Failed jobs count $batch->progress(); // Progress percentage (0-100) $batch->finished(); // All jobs finished? $batch->cancelled(); // Batch cancelled? $batch->createdAt; // Creation timestamp $batch->finishedAt; // Completion timestamp // Cancel a batch $batch->cancel(); // Delete a batch $batch->delete(); // Check specific states if ($batch->finished() && !$batch->cancelled()) { // Batch completed successfully } } // In controllers - show batch progress public function show(string $batchId) { $batch = Bus::findBatch($batchId); return response()->json([ 'id' => $batch->id, 'name' => $batch->name, 'progress' => $batch->progress(), 'total' => $batch->totalJobs, 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, 'finished' => $batch->finished(), ]); }

Job Chaining

Chain jobs to run sequentially, where each job runs only if the previous one succeeds:

use App\Jobs\ProcessVideo; use App\Jobs\GenerateThumbnails; use App\Jobs\OptimizeVideo; use App\Jobs\PublishVideo; use Illuminate\Support\Facades\Bus; // Simple chain Bus::chain([ new ProcessVideo($video), new GenerateThumbnails($video), new OptimizeVideo($video), new PublishVideo($video), ])->dispatch(); // Chain with callbacks Bus::chain([ new ProcessVideo($video), new GenerateThumbnails($video), new OptimizeVideo($video), function () use ($video) { // This closure runs after all jobs succeed $video->update(['status' => 'published']); // Notify user $video->user->notify(new VideoPublished($video)); }, ])->onConnection('redis') ->onQueue('video-processing') ->catch(function (Throwable $e) use ($video) { // Handle chain failure $video->update(['status' => 'failed']); logger()->error("Video processing chain failed", [ 'video_id' => $video->id, 'error' => $e->getMessage() ]); })->dispatch(); // Prepend to existing chain (in job) class ProcessVideo implements ShouldQueue { public function handle(): void { // Process video... // Add more jobs to the chain $this->chain([ new GenerateThumbnails($this->video), new OptimizeVideo($this->video), ]); } }
Chain Failure: If any job in a chain fails, subsequent jobs will not execute. Use try-catch within jobs to handle errors gracefully, or use the catch() callback to handle chain-level failures.

Rate Limiting Jobs

Control how many jobs of a certain type can run concurrently:

use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\Middleware\RateLimited; use Illuminate\Queue\Middleware\WithoutOverlapping; class ProcessApiRequest implements ShouldQueue { /** * Get the middleware the job should pass through. */ public function middleware(): array { return [ // Allow 10 jobs per minute new RateLimited('api-requests'), // Prevent job overlap by key (new WithoutOverlapping($this->userId)) ->releaseAfter(60) // Release lock after 60 seconds ->expireAfter(180) // Expire lock after 3 minutes ->dontRelease(), // Don't release job back to queue ]; } } // Define rate limiter in RouteServiceProvider use Illuminate\Cache\RateLimiting\Limit; use Illuminate\Support\Facades\RateLimiter; RateLimiter::for('api-requests', function ($job) { return Limit::perMinute(10)->by($job->userId); }); // Different limits for different users RateLimiter::for('api-requests', function ($job) { return $job->user->isPremium() ? Limit::perMinute(100) : Limit::perMinute(10); }); // Multiple rate limits RateLimiter::for('api-requests', function ($job) { return [ Limit::perMinute(10), Limit::perHour(100), ]; });

Unique Jobs

Ensure only one instance of a job is on the queue at a time:

use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPayment implements ShouldQueue, ShouldBeUnique { public $orderId; /** * The unique ID of the job. */ public function uniqueId(): string { return $this->orderId; } /** * The number of seconds after which unique lock expires. */ public $uniqueFor = 3600; // 1 hour public function handle(): void { // Process payment // Only one payment processing job per order will run } } // Alternative: Unique until processing starts class ProcessPayment implements ShouldQueue, ShouldBeUniqueUntilProcessing { public function uniqueId(): string { return $this->orderId; } } // Unique across all properties class SendEmail implements ShouldQueue, ShouldBeUnique { public function __construct( public string $email, public string $subject ) {} public function uniqueId(): string { return $this->email . $this->subject; } }
ShouldBeUnique vs ShouldBeUniqueUntilProcessing:
  • ShouldBeUnique - Job remains unique even while processing
  • ShouldBeUniqueUntilProcessing - Lock released when job starts, allowing new instances to queue

Job Middleware

Create reusable middleware for jobs:

// app/Jobs/Middleware/RateLimitApi.php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimitApi { public function handle($job, $next): void { // Check rate limit $key = "api:rate_limit:{$job->apiKey}"; $requests = Redis::incr($key); if ($requests === 1) { Redis::expire($key, 60); // 1 minute window } if ($requests > 100) { // Rate limit exceeded - release job back to queue $job->release(60); // Try again in 60 seconds return; } // Continue to next middleware or job $next($job); } } // app/Jobs/Middleware/LogJobExecution.php namespace App\Jobs\Middleware; class LogJobExecution { public function handle($job, $next): void { $startTime = microtime(true); logger()->info("Job started", [ 'job' => get_class($job), 'queue' => $job->queue, ]); $next($job); $duration = microtime(true) - $startTime; logger()->info("Job completed", [ 'job' => get_class($job), 'duration' => round($duration, 2) . 's', ]); } } // Use middleware in job class ProcessApiRequest implements ShouldQueue { public function middleware(): array { return [ new RateLimitApi, new LogJobExecution, ]; } }

Handling Job Failures

Implement robust failure handling and retry strategies:

use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Facades\Log; class ProcessVideo implements ShouldQueue { use InteractsWithQueue; /** * The number of times the job may be attempted. */ public $tries = 5; /** * The maximum number of seconds to wait before retrying. */ public $backoff = [10, 30, 60, 120, 300]; // Exponential backoff /** * Alternative: Calculate backoff dynamically */ public function backoff(): array { return [ 10 * $this->attempts(), // 10s, 20s, 30s, 40s, 50s ]; } /** * The number of seconds the job can run before timing out. */ public $timeout = 120; /** * Indicate if the job should be marked as failed on timeout. */ public $failOnTimeout = true; /** * Determine the time at which the job should timeout. */ public function retryUntil(): DateTime { return now()->addHours(2); } public function handle(): void { try { // Process video $this->video->process(); } catch (\Exception $e) { // Log the error Log::error("Video processing failed", [ 'video_id' => $this->video->id, 'attempt' => $this->attempts(), 'error' => $e->getMessage(), ]); // Release job back to queue for retry $this->release(30); // Retry in 30 seconds // Or fail the job immediately // $this->fail($e); } } /** * Handle a job failure. */ public function failed(Throwable $exception): void { // Update video status $this->video->update(['status' => 'failed']); // Send notification to admin $admin = User::admin()->first(); $admin->notify(new VideoProcessingFailed($this->video, $exception)); // Log to external service report($exception); } } // Global failed job handling // Create failed_jobs table php artisan queue:failed-table php artisan migrate // List failed jobs php artisan queue:failed // Retry specific failed job php artisan queue:retry {job-id} // Retry all failed jobs php artisan queue:retry all // Delete failed job php artisan queue:forget {job-id} // Flush all failed jobs php artisan queue:flush
Retry Strategies:
  • tries - Maximum attempts before giving up
  • backoff - Delay between retries (seconds or array)
  • retryUntil - Absolute deadline for retries
  • timeout - Maximum execution time per attempt
  • release() - Manually retry with custom delay

Job Events and Monitoring

// Listen to queue events in AppServiceProvider use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Support\Facades\Queue; public function boot(): void { Queue::before(function (JobProcessing $event) { // Job is about to be processed logger()->debug("Processing job", [ 'connection' => $event->connectionName, 'queue' => $event->job->getQueue(), ]); }); Queue::after(function (JobProcessed $event) { // Job has been processed Cache::increment('jobs_processed_today'); }); Queue::failing(function (JobFailed $event) { // Job has failed logger()->error("Job failed", [ 'connection' => $event->connectionName, 'job' => $event->job->resolveName(), 'exception' => $event->exception->getMessage(), ]); // Send alert to Slack Notification::route('slack', config('services.slack.webhook')) ->notify(new JobFailedAlert($event)); }); Queue::looping(function () { // Called on each iteration of the queue worker loop // Useful for clearing cache, checking config changes, etc. if (Cache::has('maintenance_mode')) { // Stop processing jobs exit(0); } }); }

Exercise 1: Implement Video Processing Pipeline

Create a video processing system with batching:

  1. Create jobs: UploadVideo, ProcessVideo, GenerateThumbnails, OptimizeVideo
  2. Implement job chaining for single video processing
  3. Implement batching for bulk video processing (10+ videos)
  4. Track batch progress in the database
  5. Send email notification when batch completes
  6. Add rate limiting (5 videos per minute)
  7. Implement retry logic with exponential backoff

Exercise 2: Build Report Generation System

Create a report generation system with unique jobs:

  1. Generate reports: UserReport, SalesReport, InventoryReport
  2. Make reports unique by user ID + date
  3. Queue expires after 1 hour
  4. Chain: FetchData -> ProcessData -> GeneratePDF -> SendEmail
  5. Batch multiple report requests from admin
  6. Handle failures gracefully (notify user, log error)

Exercise 3: Advanced Job Middleware

Create custom job middleware:

  1. ApiThrottle - Rate limit API calls (100/minute per user)
  2. RequireMembership - Only process for premium users
  3. LogPerformance - Track execution time and memory usage
  4. CheckMaintenanceMode - Skip jobs during maintenance
  5. Apply middleware to ImportDataJob
  6. Test middleware behavior with queue:work

Summary

In this lesson, you explored advanced queue features in Laravel. You learned how to batch jobs together, chain jobs sequentially, rate limit job execution, ensure job uniqueness, create custom job middleware, handle failures gracefully, and implement sophisticated retry strategies. These powerful tools enable you to build robust, scalable background processing systems that can handle complex workflows with confidence.

In the next lesson, we'll explore Laravel Pennant for feature flags and A/B testing.