Queues Advanced & Job Batching
Laravel's queue system provides powerful features for managing complex, long-running tasks. In this lesson, we'll explore advanced queue concepts including job batching, chaining, rate limiting, unique jobs, middleware, failure handling, and retry strategies.
Job Batching
Job batching allows you to dispatch a group of jobs and take action when the entire batch completes, including tracking progress and handling failures:
// Create the batches table
php artisan queue:batches-table
php artisan migrate
// Dispatching a batch of jobs
use App\Jobs\ProcessVideo;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$videos = Video::where('status', 'pending')->get();
$batch = Bus::batch([
new ProcessVideo($videos[0]),
new ProcessVideo($videos[1]),
new ProcessVideo($videos[2]),
])->then(function (Batch $batch) {
// All jobs completed successfully
logger()->info("Batch {$batch->id} completed successfully");
// Send completion notification
Notification::send(
User::admins(),
new BatchCompleted($batch)
);
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected
logger()->error("Batch {$batch->id} failed", [
'exception' => $e->getMessage()
]);
})->finally(function (Batch $batch) {
// The batch has finished executing
logger()->info("Batch {$batch->id} finished", [
'total_jobs' => $batch->totalJobs,
'processed' => $batch->processedJobs(),
'failed' => $batch->failedJobs,
]);
})->allowFailures()->dispatch();
// Get batch ID
$batchId = $batch->id;
// Dynamically adding jobs to a batch
$batch->add([
new ProcessVideo($videos[3]),
new ProcessVideo($videos[4]),
]);
Batch Methods:
allowFailures() - Continue processing even if jobs fail
onConnection() - Specify queue connection
onQueue() - Specify queue name
name() - Give the batch a name
withOption() - Add custom options
Making Jobs Batchable
Jobs must use the Batchable trait to participate in batches:
// app/Jobs/ProcessVideo.php
namespace App\Jobs;
use App\Models\Video;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessVideo implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public Video $video
) {}
public function handle(): void
{
// Check if batch has been cancelled
if ($this->batch()?->cancelled()) {
return;
}
// Process the video
$this->video->process();
// Update batch progress (custom attribute)
$this->batch()?->increment('videos_processed');
// Add more jobs dynamically if needed
if ($this->video->needsThumbnails()) {
$this->batch()?->add([
new GenerateThumbnails($this->video)
]);
}
}
}
Inspecting Batches
Retrieve and inspect batch status from anywhere in your application:
use Illuminate\Support\Facades\Bus;
// Find a batch by ID
$batch = Bus::findBatch($batchId);
if ($batch) {
// Batch properties
$batch->id; // Batch ID
$batch->name; // Batch name
$batch->totalJobs; // Total jobs
$batch->pendingJobs; // Pending jobs
$batch->processedJobs(); // Processed jobs
$batch->failedJobs; // Failed jobs count
$batch->progress(); // Progress percentage (0-100)
$batch->finished(); // All jobs finished?
$batch->cancelled(); // Batch cancelled?
$batch->createdAt; // Creation timestamp
$batch->finishedAt; // Completion timestamp
// Cancel a batch
$batch->cancel();
// Delete a batch
$batch->delete();
// Check specific states
if ($batch->finished() && !$batch->cancelled()) {
// Batch completed successfully
}
}
// In controllers - show batch progress
public function show(string $batchId)
{
$batch = Bus::findBatch($batchId);
return response()->json([
'id' => $batch->id,
'name' => $batch->name,
'progress' => $batch->progress(),
'total' => $batch->totalJobs,
'processed' => $batch->processedJobs(),
'failed' => $batch->failedJobs,
'finished' => $batch->finished(),
]);
}
Job Chaining
Chain jobs to run sequentially, where each job runs only if the previous one succeeds:
use App\Jobs\ProcessVideo;
use App\Jobs\GenerateThumbnails;
use App\Jobs\OptimizeVideo;
use App\Jobs\PublishVideo;
use Illuminate\Support\Facades\Bus;
// Simple chain
Bus::chain([
new ProcessVideo($video),
new GenerateThumbnails($video),
new OptimizeVideo($video),
new PublishVideo($video),
])->dispatch();
// Chain with callbacks
Bus::chain([
new ProcessVideo($video),
new GenerateThumbnails($video),
new OptimizeVideo($video),
function () use ($video) {
// This closure runs after all jobs succeed
$video->update(['status' => 'published']);
// Notify user
$video->user->notify(new VideoPublished($video));
},
])->onConnection('redis')
->onQueue('video-processing')
->catch(function (Throwable $e) use ($video) {
// Handle chain failure
$video->update(['status' => 'failed']);
logger()->error("Video processing chain failed", [
'video_id' => $video->id,
'error' => $e->getMessage()
]);
})->dispatch();
// Prepend to existing chain (in job)
class ProcessVideo implements ShouldQueue
{
public function handle(): void
{
// Process video...
// Add more jobs to the chain
$this->chain([
new GenerateThumbnails($this->video),
new OptimizeVideo($this->video),
]);
}
}
Chain Failure: If any job in a chain fails, subsequent jobs will not execute. Use try-catch within jobs to handle errors gracefully, or use the catch() callback to handle chain-level failures.
Rate Limiting Jobs
Control how many jobs of a certain type can run concurrently:
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProcessApiRequest implements ShouldQueue
{
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
// Allow 10 jobs per minute
new RateLimited('api-requests'),
// Prevent job overlap by key
(new WithoutOverlapping($this->userId))
->releaseAfter(60) // Release lock after 60 seconds
->expireAfter(180) // Expire lock after 3 minutes
->dontRelease(), // Don't release job back to queue
];
}
}
// Define rate limiter in RouteServiceProvider
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
RateLimiter::for('api-requests', function ($job) {
return Limit::perMinute(10)->by($job->userId);
});
// Different limits for different users
RateLimiter::for('api-requests', function ($job) {
return $job->user->isPremium()
? Limit::perMinute(100)
: Limit::perMinute(10);
});
// Multiple rate limits
RateLimiter::for('api-requests', function ($job) {
return [
Limit::perMinute(10),
Limit::perHour(100),
];
});
Unique Jobs
Ensure only one instance of a job is on the queue at a time:
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPayment implements ShouldQueue, ShouldBeUnique
{
public $orderId;
/**
* The unique ID of the job.
*/
public function uniqueId(): string
{
return $this->orderId;
}
/**
* The number of seconds after which unique lock expires.
*/
public $uniqueFor = 3600; // 1 hour
public function handle(): void
{
// Process payment
// Only one payment processing job per order will run
}
}
// Alternative: Unique until processing starts
class ProcessPayment implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
public function uniqueId(): string
{
return $this->orderId;
}
}
// Unique across all properties
class SendEmail implements ShouldQueue, ShouldBeUnique
{
public function __construct(
public string $email,
public string $subject
) {}
public function uniqueId(): string
{
return $this->email . $this->subject;
}
}
ShouldBeUnique vs ShouldBeUniqueUntilProcessing:
ShouldBeUnique - Job remains unique even while processing
ShouldBeUniqueUntilProcessing - Lock released when job starts, allowing new instances to queue
Job Middleware
Create reusable middleware for jobs:
// app/Jobs/Middleware/RateLimitApi.php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimitApi
{
public function handle($job, $next): void
{
// Check rate limit
$key = "api:rate_limit:{$job->apiKey}";
$requests = Redis::incr($key);
if ($requests === 1) {
Redis::expire($key, 60); // 1 minute window
}
if ($requests > 100) {
// Rate limit exceeded - release job back to queue
$job->release(60); // Try again in 60 seconds
return;
}
// Continue to next middleware or job
$next($job);
}
}
// app/Jobs/Middleware/LogJobExecution.php
namespace App\Jobs\Middleware;
class LogJobExecution
{
public function handle($job, $next): void
{
$startTime = microtime(true);
logger()->info("Job started", [
'job' => get_class($job),
'queue' => $job->queue,
]);
$next($job);
$duration = microtime(true) - $startTime;
logger()->info("Job completed", [
'job' => get_class($job),
'duration' => round($duration, 2) . 's',
]);
}
}
// Use middleware in job
class ProcessApiRequest implements ShouldQueue
{
public function middleware(): array
{
return [
new RateLimitApi,
new LogJobExecution,
];
}
}
Handling Job Failures
Implement robust failure handling and retry strategies:
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Log;
class ProcessVideo implements ShouldQueue
{
use InteractsWithQueue;
/**
* The number of times the job may be attempted.
*/
public $tries = 5;
/**
* The maximum number of seconds to wait before retrying.
*/
public $backoff = [10, 30, 60, 120, 300]; // Exponential backoff
/**
* Alternative: Calculate backoff dynamically
*/
public function backoff(): array
{
return [
10 * $this->attempts(), // 10s, 20s, 30s, 40s, 50s
];
}
/**
* The number of seconds the job can run before timing out.
*/
public $timeout = 120;
/**
* Indicate if the job should be marked as failed on timeout.
*/
public $failOnTimeout = true;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addHours(2);
}
public function handle(): void
{
try {
// Process video
$this->video->process();
} catch (\Exception $e) {
// Log the error
Log::error("Video processing failed", [
'video_id' => $this->video->id,
'attempt' => $this->attempts(),
'error' => $e->getMessage(),
]);
// Release job back to queue for retry
$this->release(30); // Retry in 30 seconds
// Or fail the job immediately
// $this->fail($e);
}
}
/**
* Handle a job failure.
*/
public function failed(Throwable $exception): void
{
// Update video status
$this->video->update(['status' => 'failed']);
// Send notification to admin
$admin = User::admin()->first();
$admin->notify(new VideoProcessingFailed($this->video, $exception));
// Log to external service
report($exception);
}
}
// Global failed job handling
// Create failed_jobs table
php artisan queue:failed-table
php artisan migrate
// List failed jobs
php artisan queue:failed
// Retry specific failed job
php artisan queue:retry {job-id}
// Retry all failed jobs
php artisan queue:retry all
// Delete failed job
php artisan queue:forget {job-id}
// Flush all failed jobs
php artisan queue:flush
Retry Strategies:
- tries - Maximum attempts before giving up
- backoff - Delay between retries (seconds or array)
- retryUntil - Absolute deadline for retries
- timeout - Maximum execution time per attempt
- release() - Manually retry with custom delay
Job Events and Monitoring
// Listen to queue events in AppServiceProvider
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\Facades\Queue;
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// Job is about to be processed
logger()->debug("Processing job", [
'connection' => $event->connectionName,
'queue' => $event->job->getQueue(),
]);
});
Queue::after(function (JobProcessed $event) {
// Job has been processed
Cache::increment('jobs_processed_today');
});
Queue::failing(function (JobFailed $event) {
// Job has failed
logger()->error("Job failed", [
'connection' => $event->connectionName,
'job' => $event->job->resolveName(),
'exception' => $event->exception->getMessage(),
]);
// Send alert to Slack
Notification::route('slack', config('services.slack.webhook'))
->notify(new JobFailedAlert($event));
});
Queue::looping(function () {
// Called on each iteration of the queue worker loop
// Useful for clearing cache, checking config changes, etc.
if (Cache::has('maintenance_mode')) {
// Stop processing jobs
exit(0);
}
});
}
Exercise 1: Implement Video Processing Pipeline
Create a video processing system with batching:
- Create jobs: UploadVideo, ProcessVideo, GenerateThumbnails, OptimizeVideo
- Implement job chaining for single video processing
- Implement batching for bulk video processing (10+ videos)
- Track batch progress in the database
- Send email notification when batch completes
- Add rate limiting (5 videos per minute)
- Implement retry logic with exponential backoff
Exercise 2: Build Report Generation System
Create a report generation system with unique jobs:
- Generate reports: UserReport, SalesReport, InventoryReport
- Make reports unique by user ID + date
- Queue expires after 1 hour
- Chain: FetchData -> ProcessData -> GeneratePDF -> SendEmail
- Batch multiple report requests from admin
- Handle failures gracefully (notify user, log error)
Exercise 3: Advanced Job Middleware
Create custom job middleware:
- ApiThrottle - Rate limit API calls (100/minute per user)
- RequireMembership - Only process for premium users
- LogPerformance - Track execution time and memory usage
- CheckMaintenanceMode - Skip jobs during maintenance
- Apply middleware to ImportDataJob
- Test middleware behavior with queue:work
Summary
In this lesson, you explored advanced queue features in Laravel. You learned how to batch jobs together, chain jobs sequentially, rate limit job execution, ensure job uniqueness, create custom job middleware, handle failures gracefully, and implement sophisticated retry strategies. These powerful tools enable you to build robust, scalable background processing systems that can handle complex workflows with confidence.
In the next lesson, we'll explore Laravel Pennant for feature flags and A/B testing.