Advanced Laravel
Advanced Queue System
Advanced Queue System
Master Laravel's advanced queue features including job middleware, rate limiting, unique jobs, job batching, chaining, failure handling, and monitoring with Laravel Horizon.
Job Middleware
Apply middleware to jobs for cross-cutting concerns like rate limiting and authentication:
// app/Jobs/ProcessPodcast.php
namespace App\Jobs;
use App\Jobs\Middleware\RateLimited;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public Podcast $podcast
) {}
// Define job middleware
public function middleware(): array
{
return [
new RateLimited('process-podcast'),
new WithoutOverlapping($this->podcast->id),
];
}
public function handle(): void
{
// Process podcast
}
}
// app/Jobs/Middleware/RateLimited.php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
public function __construct(
protected string $key,
protected int $maxAttempts = 60,
protected int $decayMinutes = 1
) {}
public function handle($job, $next)
{
$key = "rate-limit:{$this->key}:{$job->podcast->id}";
if (Redis::throttle($key)->allow($this->maxAttempts)->every($this->decayMinutes * 60)->then(
function () use ($job, $next) {
$next($job);
},
function () use ($job) {
$job->release(60); // Release for 60 seconds
}
)) {
return;
}
$job->delete();
}
}
Pro Tip: Job middleware runs before the job is processed, allowing you to control execution flow, implement rate limiting, or skip execution based on conditions.
Rate Limiting Jobs
Control job execution rate using built-in rate limiters:
use Illuminate\Queue\Middleware\RateLimited as RateLimitMiddleware;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
class ProcessApiCall implements ShouldQueue
{
public function middleware(): array
{
return [
// Limit to 10 jobs per minute
(new RateLimitMiddleware('api-calls'))
->allow(10)
->everyMinute(),
// Throttle exceptions - release job on exception
(new ThrottlesExceptions(3, 5)) // 3 attempts over 5 minutes
->backoff(5), // Wait 5 minutes before retry
];
}
public function handle(): void
{
// Make API call
$response = Http::get('https://api.example.com/data');
if ($response->failed()) {
throw new ApiCallException('API call failed');
}
}
}
// Dynamic rate limiting
class SendEmail implements ShouldQueue
{
public function middleware(): array
{
return [
(new RateLimitMiddleware('emails'))
->dontRelease() // Don't release back to queue
->allow($this->getAllowedRate())
->everyMinute(),
];
}
protected function getAllowedRate(): int
{
return $this->user->isPremium() ? 100 : 10;
}
}
Unique Jobs
Prevent duplicate jobs from being queued:
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
// Unique for the job lifetime
class ProcessReport implements ShouldQueue, ShouldBeUnique
{
public function __construct(
public Report $report
) {}
// Define uniqueness constraint
public function uniqueId(): string
{
return $this->report->id;
}
// How long to maintain uniqueness (seconds)
public $uniqueFor = 3600; // 1 hour
public function handle(): void
{
// Process report
}
}
// Unique only until processing starts
class SendNotification implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
public function __construct(
public User $user,
public string $message
) {}
public function uniqueId(): string
{
return "notification:{$this->user->id}";
}
public function handle(): void
{
$this->user->notify(new GenericNotification($this->message));
}
}
// Unique via attribute
class ExportData implements ShouldQueue, ShouldBeUnique
{
public function __construct(
public int $userId
) {}
// Use via attribute instead of uniqueId()
public function uniqueVia(): string
{
return Cache::driver('redis');
}
}
Note:
ShouldBeUnique prevents duplicate jobs during the entire uniqueFor period, while ShouldBeUniqueUntilProcessing allows duplicate jobs once processing starts.
Job Batching
Execute multiple jobs as a coordinated batch:
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
// Dispatch a batch of jobs
$batch = Bus::batch([
new ProcessCsvRow($row1),
new ProcessCsvRow($row2),
new ProcessCsvRow($row3),
])->then(function (Batch $batch) {
// All jobs completed successfully
Log::info('Batch completed', ['batch_id' => $batch->id]);
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected
Log::error('Batch failed', ['batch_id' => $batch->id, 'error' => $e->getMessage()]);
})->finally(function (Batch $batch) {
// Batch has finished executing
Notification::send($batch->user, new BatchCompleted($batch));
})->name('CSV Import')
->onQueue('imports')
->dispatch();
// Add jobs to existing batch
$batch->add([
new ProcessCsvRow($row4),
new ProcessCsvRow($row5),
]);
// Check batch status
if ($batch->finished()) {
// All jobs completed
}
if ($batch->hasFailures()) {
// Some jobs failed
foreach ($batch->failures() as $failure) {
Log::error($failure->exception);
}
}
// Cancel batch
$batch->cancel();
// Accessing batch from within job
class ProcessCsvRow implements ShouldQueue
{
use Batchable;
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
// Process row
}
}
Job Chaining
Execute jobs sequentially with dependencies:
use Illuminate\Support\Facades\Bus;
// Simple chain
Bus::chain([
new DownloadVideo($video),
new ProcessVideo($video),
new UploadToS3($video),
new SendNotification($user, 'Video processed'),
])->dispatch();
// Chain with error handling
Bus::chain([
new ImportUsers($file),
new SendWelcomeEmails(),
new GenerateReport(),
])->catch(function (Throwable $e) {
Log::error('Chain failed: ' . $e->getMessage());
})->dispatch();
// Conditional chaining within job
class ProcessOrder implements ShouldQueue
{
public function handle(): void
{
// Process order
$order->process();
// Chain next job
if ($order->requiresShipping()) {
$this->chain([
new AllocateInventory($order),
new GenerateShippingLabel($order),
new NotifyWarehouse($order),
]);
} else {
$this->chain([
new SendDigitalProduct($order),
]);
}
}
}
// Prepend jobs to chain
$chain = [new Job2(), new Job3()];
array_unshift($chain, new Job1());
Bus::chain($chain)->dispatch();
Warning: If any job in a chain fails, subsequent jobs won't execute unless you use catch() to handle failures gracefully.
Handling Failed Jobs
Implement comprehensive failure handling:
class ProcessPayment implements ShouldQueue
{
// Number of times the job may be attempted
public $tries = 5;
// Maximum seconds the job can run
public $timeout = 120;
// Seconds to wait before retrying after failure
public $backoff = [10, 30, 60, 120, 300];
// Maximum exceptions allowed before failing
public $maxExceptions = 3;
// Delete job if models don't exist
public $deleteWhenMissingModels = true;
public function __construct(
public Payment $payment
) {}
public function handle(): void
{
// Attempt payment processing
$result = PaymentGateway::charge($this->payment);
if (!$result->successful()) {
// Manually fail with custom message
$this->fail(new PaymentException('Payment failed: ' . $result->error));
}
}
// Called when job fails after all retries
public function failed(Throwable $exception): void
{
Log::error('Payment job failed', [
'payment_id' => $this->payment->id,
'exception' => $exception->getMessage(),
]);
// Update payment status
$this->payment->update([
'status' => 'failed',
'error' => $exception->getMessage(),
]);
// Notify user
$this->payment->user->notify(
new PaymentFailedNotification($this->payment)
);
}
// Determine if job should be retried
public function shouldRetry(Throwable $exception): bool
{
// Don't retry on validation errors
if ($exception instanceof ValidationException) {
return false;
}
// Retry on network errors
return $exception instanceof NetworkException;
}
}
Monitoring with Laravel Horizon
Monitor and manage queues with Horizon dashboard:
// composer require laravel/horizon
// config/horizon.php
return [
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default', 'emails', 'reports'],
'balance' => 'auto',
'processes' => 10,
'tries' => 3,
'timeout' => 60,
],
'supervisor-2' => [
'connection' => 'redis',
'queue' => ['imports', 'exports'],
'balance' => 'simple',
'processes' => 5,
'tries' => 1,
'timeout' => 300,
],
],
],
// Job trimming
'trim' => [
'recent' => 60, // minutes
'pending' => 60,
'completed' => 60,
'recent_failed' => 10080, // 1 week
'failed' => 10080,
'monitored' => 10080,
],
// Job metrics
'metrics' => [
'default' => 60, // minutes
],
];
// Start Horizon
php artisan horizon
// Publish Horizon assets
php artisan horizon:install
// Authorization
// app/Providers/HorizonServiceProvider.php
protected function gate()
{
Gate::define('viewHorizon', function ($user) {
return in_array($user->email, [
'admin@example.com',
]);
});
}
Job Events and Listeners
Listen to queue events for monitoring and debugging:
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
// app/Providers/EventServiceProvider.php
protected $listen = [
JobProcessing::class => [
LogJobProcessing::class,
],
JobProcessed::class => [
LogJobProcessed::class,
UpdateJobMetrics::class,
],
JobFailed::class => [
LogJobFailure::class,
NotifyAdminOfFailure::class,
],
];
// app/Listeners/LogJobProcessing.php
class LogJobProcessing
{
public function handle(JobProcessing $event): void
{
Log::info('Job processing started', [
'job' => $event->job->resolveName(),
'connection' => $event->connectionName,
'queue' => $event->job->getQueue(),
]);
}
}
// app/Listeners/NotifyAdminOfFailure.php
class NotifyAdminOfFailure
{
public function handle(JobFailed $event): void
{
if ($this->isCriticalJob($event->job)) {
Notification::send(
User::admins(),
new CriticalJobFailed($event)
);
}
}
protected function isCriticalJob($job): bool
{
return in_array($job->resolveName(), [
'App\\Jobs\\ProcessPayment',
'App\\Jobs\\SendInvoice',
]);
}
}
Custom Queue Connectors
Create custom queue drivers for specialized needs:
// app/Queue/MongoConnector.php
namespace App\Queue;
use Illuminate\Queue\Connectors\ConnectorInterface;
class MongoConnector implements ConnectorInterface
{
public function connect(array $config)
{
return new MongoQueue(
$config['connection'],
$config['collection'],
$config['queue'] ?? 'default',
$config['retry_after'] ?? 60
);
}
}
// Register in service provider
public function boot()
{
Queue::extend('mongo', function () {
return new MongoConnector;
});
}
// config/queue.php
'connections' => [
'mongo' => [
'driver' => 'mongo',
'connection' => 'mongodb',
'collection' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],
],
Exercise 1: Build a batch CSV import system that:
1. Accepts uploads up to 10,000 rows
2. Processes rows in batches of 100
3. Validates each row and tracks failures
4. Generates a report when batch completes
5. Sends email notification with import statistics
Use job batching with progress tracking and comprehensive error handling.
1. Accepts uploads up to 10,000 rows
2. Processes rows in batches of 100
3. Validates each row and tracks failures
4. Generates a report when batch completes
5. Sends email notification with import statistics
Use job batching with progress tracking and comprehensive error handling.
Exercise 2: Implement a video processing pipeline using job chaining:
1. Download video from URL
2. Extract audio track
3. Generate 3 thumbnail images
4. Transcode to multiple resolutions (480p, 720p, 1080p)
5. Upload all assets to S3
6. Update database and notify user
Handle failures at each stage with appropriate retry logic and notifications.
1. Download video from URL
2. Extract audio track
3. Generate 3 thumbnail images
4. Transcode to multiple resolutions (480p, 720p, 1080p)
5. Upload all assets to S3
6. Update database and notify user
Handle failures at each stage with appropriate retry logic and notifications.
Exercise 3: Create a rate-limited API synchronization job that:
1. Fetches data from external API (rate limit: 100 requests/minute)
2. Processes and stores data in database
3. Ensures only one sync runs at a time per resource
4. Retries failed requests with exponential backoff
5. Logs all sync activities to Horizon
Implement using unique jobs, rate limiting middleware, and custom failure handling.
1. Fetches data from external API (rate limit: 100 requests/minute)
2. Processes and stores data in database
3. Ensures only one sync runs at a time per resource
4. Retries failed requests with exponential backoff
5. Logs all sync activities to Horizon
Implement using unique jobs, rate limiting middleware, and custom failure handling.