NestJS — Node.js للمؤسسات

جدولة المهام وقوائم الانتظار (BullMQ)

16 دقيقة الدرس 41 من 48

جدولة المهام وقوائم الانتظار (BullMQ)

تحتاج تطبيقات الإنتاج بانتظام إلى تنفيذ عمل خارج دورة الطلب/الاستجابة: إرسال بريد ترحيب بعد التسجيل، أو توليد تقرير أسبوعي في منتصف الليل، أو إعادة محاولة دفعة فاشلة. يحلّ NestJS هذه المتطلبات بأداتين متكاملتين: @nestjs/schedule للمشغّلات المبنية على الوقت، و@nestjs/bullmq للمهام الخلفية الموثوقة والمستمرة المدعومة بـ Redis.

المجدوِل المدمج — @nestjs/schedule

تُغلّف حزمة schedule مكتبة node-cron وتتيح لك الإعلان عن الأعمال المتكررة مباشرةً على توابع الخدمات باستخدام المزيّنات. ثبّتها وسجّل الوحدة مرة واحدة:

npm install @nestjs/schedule
// app.module.ts import { Module } from '@nestjs/common'; import { ScheduleModule } from '@nestjs/schedule'; import { TasksService } from './tasks/tasks.service'; @Module({ imports: [ScheduleModule.forRoot()], providers: [TasksService], }) export class AppModule {}

يمكن لأي خدمة قابلة للحقن الآن استضافة توابع مجدولة:

import { Injectable, Logger } from '@nestjs/common'; import { Cron, CronExpression, Interval, Timeout } from '@nestjs/schedule'; @Injectable() export class TasksService { private readonly logger = new Logger(TasksService.name); // تعمل كل يوم عند الساعة 03:00 @Cron(CronExpression.EVERY_DAY_AT_3AM) purgeExpiredSessions(): void { this.logger.log('Purging expired sessions...'); // ... منطق تنظيف قاعدة البيانات } // تعمل كل 10 ثوانٍ @Interval(10_000) pollExternalApi(): void { this.logger.log('Polling external API'); } // تعمل مرة واحدة، بعد 5 ثوانٍ من بدء التشغيل @Timeout(5_000) warmupCache(): void { this.logger.log('Cache warmed up'); } }
  • @Cron(expression) — صياغة cron القياسية (6 حقول). استخدم ثوابت CronExpression لتحسين القراءة.
  • @Interval(ms) — يعمل بصورة متكررة كل N ميلي ثانية، يُقاس من التنفيذ السابق.
  • @Timeout(ms) — يعمل مرة واحدة بالضبط بعد N ميلي ثانية؛ مفيد للأعمال التي تُنفَّذ مرة عند البدء.
لا يصمد @nestjs/schedule أمام الأعطال أو التوسّع الأفقي. إذا مات العملية في منتصف المهمة، فُقدت المهمة. وإذا شغّلت ثلاث نسخ، تعمل الـ cron في الثلاثة في آنٍ واحد. لأي شيء يجب ألا يُفقد أو يُكرَّر، استخدم قائمة انتظار.

قوائم الانتظار الموثوقة — @nestjs/bullmq

BullMQ مكتبة قوائم انتظار مدعومة بـ Redis. تُستمَر المهام في Redis فلا تضيع عند إعادة التشغيل. يمكن إعادة محاولة كل مهمة تلقائيًا عند الفشل، وتأجيلها، وتحديد أولويتها، ومراقبتها. تدمج @nestjs/bullmq مكتبة BullMQ كمزوّدين من الدرجة الأولى في NestJS.

npm install @nestjs/bullmq bullmq

سجّل قائمة انتظار في أي وحدة ميزة. اسم قائمة الانتظار هو المفتاح المستخدم في كل مكان:

// email.module.ts import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { EmailProducer } from './email.producer'; import { EmailProcessor } from './email.processor'; @Module({ imports: [ BullModule.registerQueue({ name: 'email' }), ], providers: [EmailProducer, EmailProcessor], }) export class EmailModule {}
سجّل BullModule.forRoot() مرة واحدة في AppModule لتوفير اتصال Redis، ثم استدعِ BullModule.registerQueue() لكل ميزة. يقبل الإعداد الجذري أي خيارات اتصال ioredis: { connection: { host: 'localhost', port: 6379 } }.

المنتجون — إضافة مهام إلى قائمة الانتظار

حقن قائمة الانتظار عبر @InjectQueue('email') واستدعِ queue.add(). الوسيط الأول هو اسم المهمة (محدِّد للمعالج)، والثاني هو الحمولة، والثالث هو الخيارات:

import { Injectable } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import { Queue } from 'bullmq'; @Injectable() export class EmailProducer { constructor(@InjectQueue('email') private readonly emailQueue: Queue) {} async sendWelcomeEmail(userId: string, email: string): Promise<void> { await this.emailQueue.add( 'welcome', { userId, email }, { attempts: 3, // أعد المحاولة حتى 3 مرات عند الفشل backoff: { type: 'exponential', delay: 2_000 }, removeOnComplete: true, }, ); } async sendDailyDigest(emails: string[]): Promise<void> { // أخّر التسليم ساعة كاملة await this.emailQueue.add( 'digest', { emails }, { delay: 60 * 60 * 1_000 }, ); } }

المعالجون — استهلاك المهام

المعالج هو صنف مزيَّن بـ @Processor('email'). كل تابع @Process('jobName') يعالج نوعًا واحدًا من المهام. يستدعي BullMQ التابع بشكل متزامن حتى حدّ التزامن المُعدّ، ويعلّم المهمة مكتملةً أو فاشلةً بناءً على ما إذا كان التابع ينتهي بنجاح أو يُلقي خطأً:

import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Process } from '@nestjs/bullmq'; import { Job } from 'bullmq'; import { Logger } from '@nestjs/common'; @Processor('email') export class EmailProcessor extends WorkerHost { private readonly logger = new Logger(EmailProcessor.name); async process(job: Job): Promise<void> { switch (job.name) { case 'welcome': await this.handleWelcome(job.data); break; case 'digest': await this.handleDigest(job.data); break; default: this.logger.warn(`Unknown job: ${job.name}`); } } private async handleWelcome(data: { userId: string; email: string }) { this.logger.log(`Sending welcome email to ${data.email}`); // استدعاء خدمة البريد... } private async handleDigest(data: { emails: string[] }) { this.logger.log(`Sending digest to ${data.emails.length} subscribers`); // استدعاء خدمة البريد... } }
وسّع WorkerHost ونفّذ تابع process(job) واحد (واجهة برمجية v2). تُظهر الشروحات الأقدم @Process() على توابع فردية — ذلك لا يزال يعمل، لكن الخلط بين الأسلوبين يتسبب في ارتباك. اختر نمطًا واحدًا لكل معالج.

الخلاصة

استخدم @nestjs/schedule للمهام الدورية البسيطة المبنية على الوقت التي يمكن القبول بفقدانها (الاستطلاع، التنظيف). استخدم @nestjs/bullmq لكل ما يجب ألا يُفقد: رسائل البريد الإلكتروني التعاملية، معالجة المدفوعات، توليد التقارير. ينظّف المنتجون المهام بـ queue.add()؛ ويستهلكها المعالجون في WorkerHost.process(). تجعل مزايا BullMQ من استمرارية Redis وإعادة المحاولة/التأخير والتأجيل المهامَ الخلفية موثوقةً بصورة افتراضية.