Laravel المتقدم

نظام الطوابير المتقدم

20 دقيقة الدرس 9 من 40

نظام الطوابير المتقدم

أتقن ميزات الطوابير المتقدمة في Laravel بما في ذلك وسيطة المهام وتحديد المعدل والمهام الفريدة ودفعات المهام والتسلسل ومعالجة الفشل والمراقبة باستخدام Laravel Horizon.

وسيطة المهام

تطبيق الوسيطة على المهام للمخاوف المتقاطعة مثل تحديد المعدل والمصادقة:

// app/Jobs/ProcessPodcast.php namespace App\Jobs; use App\Jobs\Middleware\RateLimited; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct( public Podcast $podcast ) {} // تعريف وسيطة المهمة public function middleware(): array { return [ new RateLimited('process-podcast'), new WithoutOverlapping($this->podcast->id), ]; } public function handle(): void { // معالجة البودكاست } } // app/Jobs/Middleware/RateLimited.php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimited { public function __construct( protected string $key, protected int $maxAttempts = 60, protected int $decayMinutes = 1 ) {} public function handle($job, $next) { $key = "rate-limit:{$this->key}:{$job->podcast->id}"; if (Redis::throttle($key)->allow($this->maxAttempts)->every($this->decayMinutes * 60)->then( function () use ($job, $next) { $next($job); }, function () use ($job) { $job->release(60); // الإفراج لمدة 60 ثانية } )) { return; } $job->delete(); } }
نصيحة احترافية: تعمل وسيطة المهمة قبل معالجة المهمة، مما يسمح لك بالتحكم في تدفق التنفيذ أو تنفيذ تحديد المعدل أو تخطي التنفيذ بناءً على الشروط.

تحديد معدل المهام

التحكم في معدل تنفيذ المهام باستخدام محددات المعدل المضمنة:

use Illuminate\Queue\Middleware\RateLimited as RateLimitMiddleware; use Illuminate\Queue\Middleware\ThrottlesExceptions; class ProcessApiCall implements ShouldQueue { public function middleware(): array { return [ // الحد من 10 مهام في الدقيقة (new RateLimitMiddleware('api-calls')) ->allow(10) ->everyMinute(), // خنق الاستثناءات - إطلاق المهمة عند الاستثناء (new ThrottlesExceptions(3, 5)) // 3 محاولات على مدى 5 دقائق ->backoff(5), // انتظر 5 دقائق قبل إعادة المحاولة ]; } public function handle(): void { // إجراء استدعاء API $response = Http::get('https://api.example.com/data'); if ($response->failed()) { throw new ApiCallException('فشل استدعاء API'); } } } // تحديد المعدل الديناميكي class SendEmail implements ShouldQueue { public function middleware(): array { return [ (new RateLimitMiddleware('emails')) ->dontRelease() // لا تطلق إلى الطابور ->allow($this->getAllowedRate()) ->everyMinute(), ]; } protected function getAllowedRate(): int { return $this->user->isPremium() ? 100 : 10; } }

المهام الفريدة

منع المهام المكررة من الانضمام إلى الطابور:

use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; // فريدة لعمر المهمة class ProcessReport implements ShouldQueue, ShouldBeUnique { public function __construct( public Report $report ) {} // تعريف قيد التفرد public function uniqueId(): string { return $this->report->id; } // كم من الوقت للحفاظ على التفرد (بالثواني) public $uniqueFor = 3600; // ساعة واحدة public function handle(): void { // معالجة التقرير } } // فريدة فقط حتى بدء المعالجة class SendNotification implements ShouldQueue, ShouldBeUniqueUntilProcessing { public function __construct( public User $user, public string $message ) {} public function uniqueId(): string { return "notification:{$this->user->id}"; } public function handle(): void { $this->user->notify(new GenericNotification($this->message)); } } // فريدة عبر السمة class ExportData implements ShouldQueue, ShouldBeUnique { public function __construct( public int $userId ) {} // استخدام عبر السمة بدلاً من uniqueId() public function uniqueVia(): string { return Cache::driver('redis'); } }
ملاحظة: ShouldBeUnique يمنع المهام المكررة خلال فترة uniqueFor بأكملها، بينما ShouldBeUniqueUntilProcessing يسمح بالمهام المكررة بمجرد بدء المعالجة.

دفعات المهام

تنفيذ مهام متعددة كدفعة منسقة:

use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; // إرسال دفعة من المهام $batch = Bus::batch([ new ProcessCsvRow($row1), new ProcessCsvRow($row2), new ProcessCsvRow($row3), ])->then(function (Batch $batch) { // اكتملت جميع المهام بنجاح Log::info('اكتملت الدفعة', ['batch_id' => $batch->id]); })->catch(function (Batch $batch, Throwable $e) { // تم اكتشاف فشل أول مهمة دفعة Log::error('فشلت الدفعة', ['batch_id' => $batch->id, 'error' => $e->getMessage()]); })->finally(function (Batch $batch) { // انتهت الدفعة من التنفيذ Notification::send($batch->user, new BatchCompleted($batch)); })->name('استيراد CSV') ->onQueue('imports') ->dispatch(); // إضافة مهام إلى دفعة موجودة $batch->add([ new ProcessCsvRow($row4), new ProcessCsvRow($row5), ]); // التحقق من حالة الدفعة if ($batch->finished()) { // اكتملت جميع المهام } if ($batch->hasFailures()) { // فشلت بعض المهام foreach ($batch->failures() as $failure) { Log::error($failure->exception); } } // إلغاء الدفعة $batch->cancel(); // الوصول إلى الدفعة من داخل المهمة class ProcessCsvRow implements ShouldQueue { use Batchable; public function handle(): void { if ($this->batch()->cancelled()) { return; } // معالجة الصف } }

تسلسل المهام

تنفيذ المهام بشكل تسلسلي مع التبعيات:

use Illuminate\Support\Facades\Bus; // سلسلة بسيطة Bus::chain([ new DownloadVideo($video), new ProcessVideo($video), new UploadToS3($video), new SendNotification($user, 'تمت معالجة الفيديو'), ])->dispatch(); // سلسلة مع معالجة الأخطاء Bus::chain([ new ImportUsers($file), new SendWelcomeEmails(), new GenerateReport(), ])->catch(function (Throwable $e) { Log::error('فشلت السلسلة: ' . $e->getMessage()); })->dispatch(); // التسلسل الشرطي داخل المهمة class ProcessOrder implements ShouldQueue { public function handle(): void { // معالجة الطلب $order->process(); // ربط المهمة التالية if ($order->requiresShipping()) { $this->chain([ new AllocateInventory($order), new GenerateShippingLabel($order), new NotifyWarehouse($order), ]); } else { $this->chain([ new SendDigitalProduct($order), ]); } } } // إضافة مهام إلى بداية السلسلة $chain = [new Job2(), new Job3()]; array_unshift($chain, new Job1()); Bus::chain($chain)->dispatch();
تحذير: إذا فشلت أي مهمة في السلسلة، فلن يتم تنفيذ المهام اللاحقة إلا إذا استخدمت catch() للتعامل مع الإخفاقات بشكل رشيق.

معالجة المهام الفاشلة

تنفيذ معالجة شاملة للفشل:

class ProcessPayment implements ShouldQueue { // عدد المرات التي يمكن محاولة المهمة فيها public $tries = 5; // الحد الأقصى للثواني التي يمكن أن تعمل فيها المهمة public $timeout = 120; // ثوانٍ للانتظار قبل إعادة المحاولة بعد الفشل public $backoff = [10, 30, 60, 120, 300]; // الحد الأقصى للاستثناءات المسموح بها قبل الفشل public $maxExceptions = 3; // حذف المهمة إذا لم تكن النماذج موجودة public $deleteWhenMissingModels = true; public function __construct( public Payment $payment ) {} public function handle(): void { // محاولة معالجة الدفع $result = PaymentGateway::charge($this->payment); if (!$result->successful()) { // الفشل يدوياً برسالة مخصصة $this->fail(new PaymentException('فشل الدفع: ' . $result->error)); } } // يتم استدعاؤه عندما تفشل المهمة بعد كل إعادات المحاولة public function failed(Throwable $exception): void { Log::error('فشلت مهمة الدفع', [ 'payment_id' => $this->payment->id, 'exception' => $exception->getMessage(), ]); // تحديث حالة الدفع $this->payment->update([ 'status' => 'فشل', 'error' => $exception->getMessage(), ]); // إخطار المستخدم $this->payment->user->notify( new PaymentFailedNotification($this->payment) ); } // تحديد ما إذا كان يجب إعادة محاولة المهمة public function shouldRetry(Throwable $exception): bool { // لا تعيد المحاولة على أخطاء التحقق if ($exception instanceof ValidationException) { return false; } // أعد المحاولة على أخطاء الشبكة return $exception instanceof NetworkException; } }

المراقبة مع Laravel Horizon

راقب وأدر الطوابير بلوحة معلومات Horizon:

// composer require laravel/horizon // config/horizon.php return [ 'environments' => [ 'production' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['default', 'emails', 'reports'], 'balance' => 'auto', 'processes' => 10, 'tries' => 3, 'timeout' => 60, ], 'supervisor-2' => [ 'connection' => 'redis', 'queue' => ['imports', 'exports'], 'balance' => 'simple', 'processes' => 5, 'tries' => 1, 'timeout' => 300, ], ], ], // تقليم المهام 'trim' => [ 'recent' => 60, // دقائق 'pending' => 60, 'completed' => 60, 'recent_failed' => 10080, // أسبوع واحد 'failed' => 10080, 'monitored' => 10080, ], // مقاييس المهام 'metrics' => [ 'default' => 60, // دقائق ], ]; // بدء Horizon php artisan horizon // نشر أصول Horizon php artisan horizon:install // التفويض // app/Providers/HorizonServiceProvider.php protected function gate() { Gate::define('viewHorizon', function ($user) { return in_array($user->email, [ 'admin@example.com', ]); }); }

أحداث المهام والمستمعون

الاستماع إلى أحداث الطابور للمراقبة وتصحيح الأخطاء:

use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; // app/Providers/EventServiceProvider.php protected $listen = [ JobProcessing::class => [ LogJobProcessing::class, ], JobProcessed::class => [ LogJobProcessed::class, UpdateJobMetrics::class, ], JobFailed::class => [ LogJobFailure::class, NotifyAdminOfFailure::class, ], ]; // app/Listeners/LogJobProcessing.php class LogJobProcessing { public function handle(JobProcessing $event): void { Log::info('بدأت معالجة المهمة', [ 'job' => $event->job->resolveName(), 'connection' => $event->connectionName, 'queue' => $event->job->getQueue(), ]); } } // app/Listeners/NotifyAdminOfFailure.php class NotifyAdminOfFailure { public function handle(JobFailed $event): void { if ($this->isCriticalJob($event->job)) { Notification::send( User::admins(), new CriticalJobFailed($event) ); } } protected function isCriticalJob($job): bool { return in_array($job->resolveName(), [ 'App\\Jobs\\ProcessPayment', 'App\\Jobs\\SendInvoice', ]); } }

موصلات الطوابير المخصصة

إنشاء برامج تشغيل طوابير مخصصة لاحتياجات متخصصة:

// app/Queue/MongoConnector.php namespace App\Queue; use Illuminate\Queue\Connectors\ConnectorInterface; class MongoConnector implements ConnectorInterface { public function connect(array $config) { return new MongoQueue( $config['connection'], $config['collection'], $config['queue'] ?? 'default', $config['retry_after'] ?? 60 ); } } // التسجيل في مزود الخدمة public function boot() { Queue::extend('mongo', function () { return new MongoConnector; }); } // config/queue.php 'connections' => [ 'mongo' => [ 'driver' => 'mongo', 'connection' => 'mongodb', 'collection' => 'jobs', 'queue' => 'default', 'retry_after' => 90, ], ],
تمرين 1: ابن نظام استيراد CSV دفعي يقوم بـ:
1. قبول التحميلات حتى 10,000 صف
2. معالجة الصفوف في دفعات من 100
3. التحقق من كل صف وتتبع الإخفاقات
4. إنشاء تقرير عند اكتمال الدفعة
5. إرسال إشعار بالبريد الإلكتروني بإحصائيات الاستيراد
استخدم دفعات المهام مع تتبع التقدم ومعالجة شاملة للأخطاء.
تمرين 2: نفذ خط معالجة فيديو باستخدام تسلسل المهام:
1. تنزيل الفيديو من عنوان URL
2. استخراج مسار الصوت
3. إنشاء 3 صور مصغرة
4. التحويل إلى دقات متعددة (480p، 720p، 1080p)
5. تحميل جميع الأصول إلى S3
6. تحديث قاعدة البيانات وإخطار المستخدم
تعامل مع الإخفاقات في كل مرحلة بمنطق إعادة محاولة مناسب وإشعارات.
تمرين 3: أنشئ مهمة مزامنة API محددة المعدل تقوم بـ:
1. جلب البيانات من API خارجي (حد المعدل: 100 طلب/دقيقة)
2. معالجة وتخزين البيانات في قاعدة البيانات
3. التأكد من تشغيل مزامنة واحدة فقط في كل مرة لكل مورد
4. إعادة محاولة الطلبات الفاشلة بانسحاب أسي
5. تسجيل جميع أنشطة المزامنة في Horizon
نفذ باستخدام المهام الفريدة ووسيطة تحديد المعدل ومعالجة الفشل المخصصة.