إطار Laravel

الطوابير المتقدمة وتجميع الوظائف

18 دقيقة الدرس 38 من 45

الطوابير المتقدمة وتجميع الوظائف

يوفر نظام الطوابير في Laravel ميزات قوية لإدارة المهام المعقدة والطويلة. في هذا الدرس، سنستكشف مفاهيم الطوابير المتقدمة بما في ذلك تجميع الوظائف، والتسلسل، وتحديد المعدل، والوظائف الفريدة، والبرامج الوسيطة، ومعالجة الإخفاقات، واستراتيجيات إعادة المحاولة.

تجميع الوظائف

يسمح لك تجميع الوظائف بإرسال مجموعة من الوظائف واتخاذ إجراء عند اكتمال الدفعة بأكملها، بما في ذلك تتبع التقدم ومعالجة الإخفاقات:

// إنشاء جدول الدفعات php artisan queue:batches-table php artisan migrate // إرسال دفعة من الوظائف use App\Jobs\ProcessVideo; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; $videos = Video::where('status', 'pending')->get(); $batch = Bus::batch([ new ProcessVideo($videos[0]), new ProcessVideo($videos[1]), new ProcessVideo($videos[2]), ])->then(function (Batch $batch) { // اكتملت جميع الوظائف بنجاح logger()->info("اكتملت الدفعة {$batch->id} بنجاح"); // إرسال إشعار الاكتمال Notification::send( User::admins(), new BatchCompleted($batch) ); })->catch(function (Batch $batch, Throwable $e) { // تم اكتشاف فشل أول وظيفة في الدفعة logger()->error("فشلت الدفعة {$batch->id}", [ 'exception' => $e->getMessage() ]); })->finally(function (Batch $batch) { // انتهى تنفيذ الدفعة logger()->info("انتهت الدفعة {$batch->id}", [ 'total_jobs' => $batch->totalJobs, 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, ]); })->allowFailures()->dispatch(); // الحصول على معرف الدفعة $batchId = $batch->id; // إضافة وظائف ديناميكياً إلى الدفعة $batch->add([ new ProcessVideo($videos[3]), new ProcessVideo($videos[4]), ]);
طرق الدفعة:
  • allowFailures() - متابعة المعالجة حتى لو فشلت الوظائف
  • onConnection() - تحديد اتصال الطابور
  • onQueue() - تحديد اسم الطابور
  • name() - إعطاء الدفعة اسماً
  • withOption() - إضافة خيارات مخصصة

جعل الوظائف قابلة للتجميع

يجب أن تستخدم الوظائف Batchable trait للمشاركة في الدفعات:

// app/Jobs/ProcessVideo.php namespace App\Jobs; use App\Models\Video; use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessVideo implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct( public Video $video ) {} public function handle(): void { // تحقق مما إذا تم إلغاء الدفعة if ($this->batch()?->cancelled()) { return; } // معالجة الفيديو $this->video->process(); // تحديث تقدم الدفعة (سمة مخصصة) $this->batch()?->increment('videos_processed'); // إضافة المزيد من الوظائف ديناميكياً إذا لزم الأمر if ($this->video->needsThumbnails()) { $this->batch()?->add([ new GenerateThumbnails($this->video) ]); } } }

فحص الدفعات

استرجع وافحص حالة الدفعة من أي مكان في تطبيقك:

use Illuminate\Support\Facades\Bus; // العثور على دفعة بالمعرف $batch = Bus::findBatch($batchId); if ($batch) { // خصائص الدفعة $batch->id; // معرف الدفعة $batch->name; // اسم الدفعة $batch->totalJobs; // إجمالي الوظائف $batch->pendingJobs; // الوظائف المعلقة $batch->processedJobs(); // الوظائف المعالجة $batch->failedJobs; // عدد الوظائف الفاشلة $batch->progress(); // نسبة التقدم (0-100) $batch->finished(); // انتهت جميع الوظائف؟ $batch->cancelled(); // تم إلغاء الدفعة؟ $batch->createdAt; // طابع زمني للإنشاء $batch->finishedAt; // طابع زمني للاكتمال // إلغاء دفعة $batch->cancel(); // حذف دفعة $batch->delete(); // التحقق من حالات محددة if ($batch->finished() && !$batch->cancelled()) { // اكتملت الدفعة بنجاح } } // في المتحكمات - عرض تقدم الدفعة public function show(string $batchId) { $batch = Bus::findBatch($batchId); return response()->json([ 'id' => $batch->id, 'name' => $batch->name, 'progress' => $batch->progress(), 'total' => $batch->totalJobs, 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, 'finished' => $batch->finished(), ]); }

تسلسل الوظائف

سلسل الوظائف لتشغيلها بالتسلسل، حيث تعمل كل وظيفة فقط إذا نجحت السابقة:

use App\Jobs\ProcessVideo; use App\Jobs\GenerateThumbnails; use App\Jobs\OptimizeVideo; use App\Jobs\PublishVideo; use Illuminate\Support\Facades\Bus; // سلسلة بسيطة Bus::chain([ new ProcessVideo($video), new GenerateThumbnails($video), new OptimizeVideo($video), new PublishVideo($video), ])->dispatch(); // سلسلة مع callbacks Bus::chain([ new ProcessVideo($video), new GenerateThumbnails($video), new OptimizeVideo($video), function () use ($video) { // يعمل هذا الإغلاق بعد نجاح جميع الوظائف $video->update(['status' => 'published']); // إخطار المستخدم $video->user->notify(new VideoPublished($video)); }, ])->onConnection('redis') ->onQueue('video-processing') ->catch(function (Throwable $e) use ($video) { // معالجة فشل السلسلة $video->update(['status' => 'failed']); logger()->error("فشلت سلسلة معالجة الفيديو", [ 'video_id' => $video->id, 'error' => $e->getMessage() ]); })->dispatch(); // إضافة إلى سلسلة موجودة (في الوظيفة) class ProcessVideo implements ShouldQueue { public function handle(): void { // معالجة الفيديو... // إضافة المزيد من الوظائف إلى السلسلة $this->chain([ new GenerateThumbnails($this->video), new OptimizeVideo($this->video), ]); } }
فشل السلسلة: إذا فشلت أي وظيفة في السلسلة، لن يتم تنفيذ الوظائف اللاحقة. استخدم try-catch داخل الوظائف للتعامل مع الأخطاء بسلاسة، أو استخدم callback catch() للتعامل مع إخفاقات مستوى السلسلة.

تحديد معدل الوظائف

التحكم في عدد الوظائف من نوع معين التي يمكن تشغيلها بشكل متزامن:

use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\Middleware\RateLimited; use Illuminate\Queue\Middleware\WithoutOverlapping; class ProcessApiRequest implements ShouldQueue { /** * الحصول على البرامج الوسيطة التي يجب أن تمر بها الوظيفة. */ public function middleware(): array { return [ // السماح بـ 10 وظائف في الدقيقة new RateLimited('api-requests'), // منع تداخل الوظائف بالمفتاح (new WithoutOverlapping($this->userId)) ->releaseAfter(60) // إطلاق القفل بعد 60 ثانية ->expireAfter(180) // انتهاء القفل بعد 3 دقائق ->dontRelease(), // لا تطلق الوظيفة مرة أخرى إلى الطابور ]; } } // تعريف محدد المعدل في RouteServiceProvider use Illuminate\Cache\RateLimiting\Limit; use Illuminate\Support\Facades\RateLimiter; RateLimiter::for('api-requests', function ($job) { return Limit::perMinute(10)->by($job->userId); }); // حدود مختلفة لمستخدمين مختلفين RateLimiter::for('api-requests', function ($job) { return $job->user->isPremium() ? Limit::perMinute(100) : Limit::perMinute(10); }); // حدود معدل متعددة RateLimiter::for('api-requests', function ($job) { return [ Limit::perMinute(10), Limit::perHour(100), ]; });

الوظائف الفريدة

تأكد من وجود مثيل واحد فقط من الوظيفة في الطابور في وقت واحد:

use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPayment implements ShouldQueue, ShouldBeUnique { public $orderId; /** * المعرف الفريد للوظيفة. */ public function uniqueId(): string { return $this->orderId; } /** * عدد الثواني التي ينتهي بعدها قفل الفريد. */ public $uniqueFor = 3600; // ساعة واحدة public function handle(): void { // معالجة الدفع // ستعمل وظيفة معالجة دفع واحدة فقط لكل طلب } } // بديل: فريد حتى يبدأ المعالجة class ProcessPayment implements ShouldQueue, ShouldBeUniqueUntilProcessing { public function uniqueId(): string { return $this->orderId; } } // فريد عبر جميع الخصائص class SendEmail implements ShouldQueue, ShouldBeUnique { public function __construct( public string $email, public string $subject ) {} public function uniqueId(): string { return $this->email . $this->subject; } }
ShouldBeUnique مقابل ShouldBeUniqueUntilProcessing:
  • ShouldBeUnique - تبقى الوظيفة فريدة حتى أثناء المعالجة
  • ShouldBeUniqueUntilProcessing - يتم إطلاق القفل عند بدء الوظيفة، مما يسمح بوضع مثيلات جديدة في الطابور

البرامج الوسيطة للوظائف

أنشئ برامج وسيطة قابلة لإعادة الاستخدام للوظائف:

// app/Jobs/Middleware/RateLimitApi.php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimitApi { public function handle($job, $next): void { // التحقق من حد المعدل $key = "api:rate_limit:{$job->apiKey}"; $requests = Redis::incr($key); if ($requests === 1) { Redis::expire($key, 60); // نافذة دقيقة واحدة } if ($requests > 100) { // تم تجاوز حد المعدل - إعادة الوظيفة إلى الطابور $job->release(60); // حاول مرة أخرى في 60 ثانية return; } // متابعة البرنامج الوسيط التالي أو الوظيفة $next($job); } } // app/Jobs/Middleware/LogJobExecution.php namespace App\Jobs\Middleware; class LogJobExecution { public function handle($job, $next): void { $startTime = microtime(true); logger()->info("بدأت الوظيفة", [ 'job' => get_class($job), 'queue' => $job->queue, ]); $next($job); $duration = microtime(true) - $startTime; logger()->info("اكتملت الوظيفة", [ 'job' => get_class($job), 'duration' => round($duration, 2) . 's', ]); } } // استخدام البرامج الوسيطة في الوظيفة class ProcessApiRequest implements ShouldQueue { public function middleware(): array { return [ new RateLimitApi, new LogJobExecution, ]; } }

معالجة إخفاقات الوظائف

نفذ معالجة إخفاقات قوية واستراتيجيات إعادة المحاولة:

use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Facades\Log; class ProcessVideo implements ShouldQueue { use InteractsWithQueue; /** * عدد المرات التي يمكن محاولة الوظيفة فيها. */ public $tries = 5; /** * الحد الأقصى لعدد الثواني للانتظار قبل إعادة المحاولة. */ public $backoff = [10, 30, 60, 120, 300]; // التراجع الأسي /** * بديل: حساب التراجع ديناميكياً */ public function backoff(): array { return [ 10 * $this->attempts(), // 10s, 20s, 30s, 40s, 50s ]; } /** * عدد الثواني التي يمكن للوظيفة تشغيلها قبل انتهاء المهلة. */ public $timeout = 120; /** * الإشارة إلى ما إذا كان يجب وضع علامة على الوظيفة كفاشلة عند انتهاء المهلة. */ public $failOnTimeout = true; /** * تحديد الوقت الذي يجب أن تنتهي فيه مهلة الوظيفة. */ public function retryUntil(): DateTime { return now()->addHours(2); } public function handle(): void { try { // معالجة الفيديو $this->video->process(); } catch (\Exception $e) { // تسجيل الخطأ Log::error("فشلت معالجة الفيديو", [ 'video_id' => $this->video->id, 'attempt' => $this->attempts(), 'error' => $e->getMessage(), ]); // إعادة الوظيفة إلى الطابور لإعادة المحاولة $this->release(30); // إعادة المحاولة في 30 ثانية // أو فشل الوظيفة فوراً // $this->fail($e); } } /** * معالجة فشل الوظيفة. */ public function failed(Throwable $exception): void { // تحديث حالة الفيديو $this->video->update(['status' => 'failed']); // إرسال إشعار إلى المسؤول $admin = User::admin()->first(); $admin->notify(new VideoProcessingFailed($this->video, $exception)); // التسجيل في خدمة خارجية report($exception); } } // المعالجة العالمية للوظائف الفاشلة // إنشاء جدول failed_jobs php artisan queue:failed-table php artisan migrate // سرد الوظائف الفاشلة php artisan queue:failed // إعادة محاولة وظيفة فاشلة محددة php artisan queue:retry {job-id} // إعادة محاولة جميع الوظائف الفاشلة php artisan queue:retry all // حذف وظيفة فاشلة php artisan queue:forget {job-id} // مسح جميع الوظائف الفاشلة php artisan queue:flush
استراتيجيات إعادة المحاولة:
  • tries - الحد الأقصى للمحاولات قبل الاستسلام
  • backoff - التأخير بين المحاولات (ثواني أو مصفوفة)
  • retryUntil - الموعد النهائي المطلق لإعادة المحاولة
  • timeout - الحد الأقصى لوقت التنفيذ لكل محاولة
  • release() - إعادة المحاولة يدوياً مع تأخير مخصص

أحداث الوظائف والمراقبة

// الاستماع إلى أحداث الطابور في AppServiceProvider use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Support\Facades\Queue; public function boot(): void { Queue::before(function (JobProcessing $event) { // الوظيفة على وشك المعالجة logger()->debug("معالجة الوظيفة", [ 'connection' => $event->connectionName, 'queue' => $event->job->getQueue(), ]); }); Queue::after(function (JobProcessed $event) { // تمت معالجة الوظيفة Cache::increment('jobs_processed_today'); }); Queue::failing(function (JobFailed $event) { // فشلت الوظيفة logger()->error("فشلت الوظيفة", [ 'connection' => $event->connectionName, 'job' => $event->job->resolveName(), 'exception' => $event->exception->getMessage(), ]); // إرسال تنبيه إلى Slack Notification::route('slack', config('services.slack.webhook')) ->notify(new JobFailedAlert($event)); }); Queue::looping(function () { // يتم الاستدعاء في كل تكرار لحلقة عامل الطابور // مفيد لمسح الذاكرة المؤقتة، والتحقق من تغييرات التكوين، إلخ. if (Cache::has('maintenance_mode')) { // إيقاف معالجة الوظائف exit(0); } }); }

التمرين 1: تنفيذ خط معالجة الفيديو

أنشئ نظام معالجة فيديو مع التجميع:

  1. أنشئ الوظائف: UploadVideo, ProcessVideo, GenerateThumbnails, OptimizeVideo
  2. نفذ تسلسل الوظائف لمعالجة فيديو واحد
  3. نفذ التجميع لمعالجة فيديو مجمعة (10+ فيديوهات)
  4. تتبع تقدم الدفعة في قاعدة البيانات
  5. أرسل إشعار بريد إلكتروني عند اكتمال الدفعة
  6. أضف تحديد المعدل (5 فيديوهات في الدقيقة)
  7. نفذ منطق إعادة المحاولة مع التراجع الأسي

التمرين 2: بناء نظام إنشاء التقارير

أنشئ نظام إنشاء تقارير مع وظائف فريدة:

  1. إنشاء التقارير: UserReport, SalesReport, InventoryReport
  2. اجعل التقارير فريدة حسب معرف المستخدم + التاريخ
  3. ينتهي الطابور بعد ساعة واحدة
  4. السلسلة: FetchData -> ProcessData -> GeneratePDF -> SendEmail
  5. دفعة طلبات تقارير متعددة من المسؤول
  6. معالجة الإخفاقات بلطف (إخطار المستخدم، تسجيل الخطأ)

التمرين 3: البرامج الوسيطة المتقدمة للوظائف

أنشئ برامج وسيطة مخصصة للوظائف:

  1. ApiThrottle - تحديد معدل مكالمات API (100/دقيقة لكل مستخدم)
  2. RequireMembership - معالجة فقط للمستخدمين المميزين
  3. LogPerformance - تتبع وقت التنفيذ واستخدام الذاكرة
  4. CheckMaintenanceMode - تخطي الوظائف أثناء الصيانة
  5. تطبيق البرامج الوسيطة على ImportDataJob
  6. اختبار سلوك البرامج الوسيطة مع queue:work

الخلاصة

في هذا الدرس، استكشفت ميزات الطوابير المتقدمة في Laravel. تعلمت كيفية تجميع الوظائف معاً، وتسلسل الوظائف بشكل متسلسل، وتحديد معدل تنفيذ الوظائف، وضمان تفرد الوظائف، وإنشاء برامج وسيطة مخصصة للوظائف، ومعالجة الإخفاقات بلطف، وتنفيذ استراتيجيات إعادة محاولة متطورة. تمكنك هذه الأدوات القوية من بناء أنظمة معالجة في الخلفية قوية وقابلة للتطوير يمكنها التعامل مع سير العمل المعقد بثقة.

في الدرس التالي، سنستكشف Laravel Pennant للأعلام المميزة واختبار A/B.