Node.js و Express

قوائم انتظار المهام والمهام الخلفية

45 دقيقة الدرس 23 من 40

قوائم انتظار المهام والمهام الخلفية

المهام الخلفية ضرورية للتعامل مع المهام التي تستغرق وقتاً طويلاً دون حظر تطبيقك الرئيسي. بدلاً من جعل المستخدمين ينتظرون عمليات مثل إرسال رسائل البريد الإلكتروني ومعالجة الصور وإنشاء التقارير أو استدعاء واجهات API البطيئة، يمكنك وضع هذه المهام في قائمة انتظار ومعالجتها بشكل غير متزامن. هذا يحسن تجربة المستخدم واستجابة التطبيق وقابلية التوسع.

لماذا استخدام قوائم انتظار المهام؟

المشكلة: تخيل أن مستخدماً يرفع فيديو يحتاج إلى تحويل الترميز. إذا قمت بمعالجته بشكل متزامن، ينتظر المستخدم أكثر من 10 دقائق للحصول على استجابة. مع قائمة الانتظار، تعود على الفور بـ "بدأت المعالجة" وتتعامل مع تحويل الترميز في الخلفية.

حالات استخدام المهام الخلفية:

  • إرسال رسائل البريد الإلكتروني (رسائل الترحيب، النشرات الإخبارية، الإشعارات)
  • معالجة الصور/الفيديو (تغيير الحجم، الضغط، تحويل الترميز)
  • إنشاء التقارير (PDF، جداول البيانات، التحليلات)
  • مزامنة البيانات بين الأنظمة
  • تكاملات API مع خدمات الطرف الثالث
  • صيانة قاعدة البيانات (التنظيف، الأرشفة، الفهرسة)
  • المهام المجدولة (الملخصات اليومية، النسخ الاحتياطية الدورية)

مقدمة إلى Bull و BullMQ

Bull و BullMQ هما مكتبتان قويتان لقوائم انتظار المهام لـ Node.js مدعومة بـ Redis. BullMQ هو خلف Bull مع دعم أفضل لـ TypeScript وميزات أكثر وأداء محسّن.

# تثبيت Bull (لقوائم انتظار مدعومة بـ Redis) npm install bull # أو تثبيت BullMQ (موصى به للمشاريع الجديدة) npm install bullmq # تحتاج أيضاً إلى Redis npm install ioredis
Bull مقابل BullMQ: Bull ناضج ومستقر (جيد للمشاريع الحالية). BullMQ هي إعادة الكتابة الحديثة مع دعم أفضل لـ TypeScript والتدفقات والأداء. للمشاريع الجديدة، استخدم BullMQ.

إنشاء قائمة انتظارك الأولى (Bull)

لننشئ قائمة انتظار بريد إلكتروني بسيطة:

// queues/emailQueue.js const Queue = require('bull'); // إنشاء قائمة انتظار const emailQueue = new Queue('email', { redis: { host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD } }); // تعريف معالج المهمة emailQueue.process(async (job) => { const { to, subject, body } = job.data; console.log(`معالجة مهمة البريد الإلكتروني ${job.id} إلى ${to}`); // محاكاة إرسال بريد إلكتروني await sendEmail(to, subject, body); console.log(`اكتملت مهمة البريد الإلكتروني ${job.id}`); return { sent: true, to }; }); // مستمعي الأحداث emailQueue.on('completed', (job, result) => { console.log(`اكتملت المهمة ${job.id} بالنتيجة:`, result); }); emailQueue.on('failed', (job, err) => { console.error(`فشلت المهمة ${job.id}:`, err.message); }); emailQueue.on('stalled', (job) => { console.warn(`توقفت المهمة ${job.id}`); }); module.exports = emailQueue;

إضافة مهام إلى قائمة الانتظار:

// في مسار Express أو وحدة التحكم الخاصة بك const emailQueue = require('./queues/emailQueue'); app.post('/register', async (req, res) => { const { email, name } = req.body; // إنشاء مستخدم في قاعدة البيانات const user = await User.create({ email, name }); // إضافة مهمة إلى قائمة الانتظار (غير محظور) await emailQueue.add({ to: email, subject: 'مرحباً بك في تطبيقنا!', body: `مرحباً ${name}، شكراً للتسجيل!` }); // العودة فوراً res.json({ success: true, message: 'تم التسجيل بنجاح! تحقق من بريدك الإلكتروني.' }); });

خيارات المهام والأولويات

قم بتكوين سلوك المهمة بالخيارات:

// مهمة مع خيارات await emailQueue.add( { to: 'user@example.com', subject: 'اختبار', body: 'مرحباً' }, { // أولوية المهمة (رقم أقل = أولوية أعلى) priority: 1, // تأخير تنفيذ المهمة (بالميلي ثانية) delay: 5000, // ابدأ بعد 5 ثوان // محاولات المهمة (إعادة المحاولة عند الفشل) attempts: 3, // استراتيجية التراجع لإعادة المحاولات backoff: { type: 'exponential', delay: 2000 // ابدأ بـ 2 ثانية، ثم 4 ثوان، 8 ثوان، إلخ. }, // إزالة المهمة عند الانتهاء removeOnComplete: true, // إزالة المهمة عند الفشل removeOnFail: false, // مهلة المهمة timeout: 30000, // 30 ثانية // معرف المهمة (لإلغاء التكرار) jobId: `email-${userId}-welcome` } );

مثال على أولويات المهام:

// أولوية عالية - إعادة تعيين كلمة المرور (إرسال فوراً) await emailQueue.add( { to: user.email, subject: 'إعادة تعيين كلمة المرور', body: resetLink }, { priority: 1 } ); // أولوية متوسطة - تأكيد الطلب await emailQueue.add( { to: user.email, subject: 'تأكيد الطلب', body: orderDetails }, { priority: 5 } ); // أولوية منخفضة - نشرة إخبارية (يمكن أن تنتظر) await emailQueue.add( { to: user.email, subject: 'النشرة الأسبوعية', body: newsletter }, { priority: 10 } );

BullMQ - بديل حديث

يوفر BullMQ دعماً أفضل لـ TypeScript وميزات أكثر:

// queues/emailQueue.js (إصدار BullMQ) const { Queue, Worker } = require('bullmq'); const connection = { host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD }; // إنشاء قائمة انتظار const emailQueue = new Queue('email', { connection }); // إنشاء عامل (يعالج المهام) const emailWorker = new Worker( 'email', async (job) => { const { to, subject, body } = job.data; console.log(`معالجة مهمة البريد الإلكتروني ${job.id} إلى ${to}`); await sendEmail(to, subject, body); return { sent: true, to, sentAt: Date.now() }; }, { connection } ); // مستمعي الأحداث emailWorker.on('completed', (job) => { console.log(`اكتملت المهمة ${job.id}`); }); emailWorker.on('failed', (job, err) => { console.error(`فشلت المهمة ${job.id}:`, err.message); }); module.exports = { emailQueue, emailWorker };

إضافة مهام مع BullMQ:

const { emailQueue } = require('./queues/emailQueue'); // إضافة مهمة await emailQueue.add('welcome-email', { to: 'user@example.com', subject: 'مرحباً!', body: 'شكراً للانضمام' }); // إضافة مع خيارات await emailQueue.add( 'password-reset', { to: 'user@example.com', token: resetToken }, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: 100, // احتفظ بآخر 100 مهمة مكتملة removeOnFail: 1000 // احتفظ بآخر 1000 مهمة فاشلة } );

المهام المجدولة والمتكررة

جدول المهام للتشغيل في أوقات أو فترات محددة:

// تشغيل مهمة مرة واحدة في وقت محدد const tomorrow = new Date(); tomorrow.setDate(tomorrow.getDate() + 1); tomorrow.setHours(9, 0, 0, 0); // 9 صباحاً غداً await emailQueue.add( { to: 'user@example.com', subject: 'تذكير', body: 'موعدك غداً' }, { delay: tomorrow.getTime() - Date.now() } ); // مهام قابلة للتكرار (نمط cron) await emailQueue.add( { subject: 'تقرير يومي', body: 'إليك تقريرك اليومي' }, { repeat: { cron: '0 9 * * *', // كل يوم في الساعة 9 صباحاً tz: 'Asia/Riyadh' } } ); // تقرير أسبوعي await emailQueue.add( { subject: 'ملخص أسبوعي' }, { repeat: { cron: '0 9 * * 1', // كل يوم اثنين في الساعة 9 صباحاً limit: 52 // توقف بعد سنة واحدة } } ); // كل 5 دقائق await emailQueue.add( { task: 'health-check' }, { repeat: { every: 5 * 60 * 1000 // 5 دقائق بالميلي ثانية } } );
أنماط Cron:
  • 0 9 * * * - كل يوم في الساعة 9 صباحاً
  • */5 * * * * - كل 5 دقائق
  • 0 0 * * 0 - كل يوم أحد في منتصف الليل
  • 0 0 1 * * - اليوم الأول من كل شهر
  • 0 12 * * 1-5 - أيام الأسبوع في الظهيرة

إعادة محاولات المهام ومعالجة الأخطاء

تعامل مع الفشل بأمان مع استراتيجيات إعادة المحاولة:

// تعريف معالج مع معالجة الأخطاء emailQueue.process(async (job) => { const { to, subject, body } = job.data; try { // محاولة إرسال بريد إلكتروني await sendEmail(to, subject, body); return { success: true }; } catch (error) { // تسجيل تفاصيل الخطأ console.error(`فشل إرسال البريد الإلكتروني إلى ${to}:`, error.message); // التحقق مما إذا كان يجب إعادة المحاولة if (job.attemptsMade < job.opts.attempts) { throw error; // سيعيد Bull المحاولة تلقائياً } // تم الوصول إلى الحد الأقصى للمحاولات - معالجة المهمة الفاشلة نهائياً await notifyAdmin({ subject: 'فشل تسليم البريد الإلكتروني', message: `فشل إرسال البريد الإلكتروني إلى ${to} بعد ${job.attemptsMade} محاولات`, error: error.message, jobData: job.data }); throw error; // لا تزال تطرح لوضع علامة على المهمة كفاشلة } }); // تكوين استراتيجية إعادة المحاولة await emailQueue.add( emailData, { attempts: 5, backoff: { type: 'exponential', delay: 2000 } } ); // دالة تراجع مخصصة await emailQueue.add( emailData, { attempts: 3, backoff: (attemptsMade) => { // منطق مخصص: 1 ثانية، 5 ثوان، 30 ثانية return [1000, 5000, 30000][attemptsMade - 1] || 60000; } } );

عمال متعددون للتزامن

عالج المهام بشكل أسرع عن طريق تشغيل عمال متعددين:

// التزامن أحادي الخيط emailQueue.process(5, async (job) => { // معالجة ما يصل إلى 5 مهام في وقت واحد return await sendEmail(job.data); }); // عمليات عامل متعددة (worker.js) const { Worker } = require('bullmq'); const worker = new Worker( 'email', async (job) => { return await sendEmail(job.data); }, { connection, concurrency: 10 // معالجة 10 مهام في وقت واحد لكل عامل } ); // تشغيل عمليات عامل متعددة // الطرفية 1: node worker.js // الطرفية 2: node worker.js // الطرفية 3: node worker.js // الآن لديك 3 عمال يعالجون 10 مهام لكل منهم = 30 مهمة متزامنة

تتبع تقدم المهمة

الإبلاغ عن تقدم المهمة للمهام طويلة المدى:

// معالج مع تحديثات التقدم videoQueue.process(async (job) => { const { videoPath } = job.data; // بدء المعالجة await job.progress(0); // تنزيل الفيديو await downloadVideo(videoPath); await job.progress(25); // تحويل الترميز إلى 720p await transcodeVideo(videoPath, '720p'); await job.progress(50); // تحويل الترميز إلى 1080p await transcodeVideo(videoPath, '1080p'); await job.progress(75); // تحميل النتائج await uploadResults(); await job.progress(100); return { success: true, formats: ['720p', '1080p'] }; }); // مراقبة التقدم من العميل const job = await videoQueue.add({ videoPath: '/uploads/video.mp4' }); // استطلاع التقدم const interval = setInterval(async () => { const jobState = await job.getState(); const progress = await job.progress(); console.log(`المهمة ${job.id}: ${jobState} - ${progress}%`); if (jobState === 'completed' || jobState === 'failed') { clearInterval(interval); } }, 1000);

Bull Board - لوحة معلومات المهام

تصور وإدارة قوائم الانتظار الخاصة بك باستخدام Bull Board:

// تثبيت لوحة المعلومات npm install @bull-board/express @bull-board/api @bull-board/ui // إعداد لوحة المعلومات (app.js) const { createBullBoard } = require('@bull-board/api'); const { BullAdapter } = require('@bull-board/api/bullAdapter'); const { ExpressAdapter } = require('@bull-board/express'); const emailQueue = require('./queues/emailQueue'); const videoQueue = require('./queues/videoQueue'); const reportQueue = require('./queues/reportQueue'); const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullAdapter(emailQueue), new BullAdapter(videoQueue), new BullAdapter(reportQueue) ], serverAdapter }); // تركيب لوحة المعلومات app.use('/admin/queues', serverAdapter.getRouter()); // الآن قم بزيارة: http://localhost:3000/admin/queues
ميزات Bull Board:
  • عرض جميع قوائم الانتظار وأعداد المهام الخاصة بها
  • مراقبة حالة المهمة (في الانتظار، نشطة، مكتملة، فاشلة)
  • فحص بيانات المهمة والسجلات
  • إعادة محاولة المهام الفاشلة يدوياً
  • تنظيف المهام المكتملة/الفاشلة
  • إيقاف/استئناف قوائم الانتظار

أفضل ممارسات الإنتاج

// 1. إيقاف التشغيل الرشيق const gracefulShutdown = async () => { console.log('إيقاف العمال بشكل رشيق...'); await emailWorker.close(); await videoWorker.close(); process.exit(0); }; process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown); // 2. تحديد معدل المهمة await emailQueue.add( emailData, { limiter: { max: 100, // بحد أقصى 100 مهمة duration: 60000 // في الدقيقة } } ); // 3. إلغاء تكرار المهمة await emailQueue.add( emailData, { jobId: `welcome-email-${userId}`, // المعرف الفريد يمنع التكرار removeOnComplete: true } ); // 4. قائمة انتظار الرسائل الميتة للمهام الفاشلة emailQueue.on('failed', async (job, err) => { if (job.attemptsMade >= job.opts.attempts) { // الانتقال إلى قائمة انتظار الرسائل الميتة await deadLetterQueue.add({ originalQueue: 'email', jobId: job.id, data: job.data, error: err.message, failedAt: Date.now() }); } }); // 5. مراقبة صحة قائمة الانتظار setInterval(async () => { const waiting = await emailQueue.getWaitingCount(); const active = await emailQueue.getActiveCount(); const failed = await emailQueue.getFailedCount(); if (waiting > 1000) { console.warn(`تراكم عالي في قائمة الانتظار: ${waiting} مهمة في الانتظار`); // تنبيه فريق العمليات } if (failed > 100) { console.error(`معدل فشل مرتفع: ${failed} مهمة فاشلة`); // تنبيه فريق التطوير } }, 60000); // التحقق كل دقيقة

تمرين: بناء خط أنابيب متعدد المراحل لمعالجة الصور

  1. أنشئ نقطة نهاية تحميل تقبل الصور
  2. ضع في قائمة انتظار مهمة معالجة الصور مع هذه المراحل:
    • التحقق من صحة تنسيق الصورة والحجم
    • إنشاء صور مصغرة (صغيرة، متوسطة، كبيرة)
    • تحسين الصور (ضغط)
    • التحميل إلى التخزين السحابي (محاكاة)
    • تحديث قاعدة البيانات بعناوين URL
  3. الإبلاغ عن التقدم في كل مرحلة
  4. تنفيذ منطق إعادة المحاولة لفشل الشبكة
  5. إضافة لوحة معلومات Bull Board لمراقبة المهام
  6. إنشاء نقطة نهاية webhook لإخطار العملاء عند اكتمال المعالجة
المزالق الشائعة:
  • لا تعالج المهام في عملية خادم الويب الخاص بك - استخدم عمال مخصصين
  • قم بتعيين مهلات مهمة مناسبة لمنع المهام المعلقة
  • قم بتنظيف المهام المكتملة بانتظام لمنع مشاكل الذاكرة
  • راقب استخدام ذاكرة Redis - يمكن أن تتراكم المهام الفاشلة
  • تعامل دائماً مع الأخطاء - يمكن أن تتسبب الأخطاء غير المعترضة في تعطل العمال

قوائم انتظار المهام ضرورية لبناء تطبيقات قابلة للتوسع ومستجيبة. أتقن Bull أو BullMQ وستكون قادراً على التعامل مع أي متطلبات معالجة خلفية بكفاءة.