WebSockets والتطبيقات الفورية

قوائم انتظار الرسائل لأنظمة الوقت الفعلي

20 دقيقة الدرس 23 من 35

مقدمة في قوائم انتظار الرسائل

قوائم انتظار الرسائل ضرورية لبناء أنظمة وقت فعلي قوية وقابلة للتوسع. على عكس Redis Pub/Sub وهو "أطلق وانس"، تضمن قوائم انتظار الرسائل تسليم الرسائل وتوفر ميزات متقدمة مثل منطق إعادة المحاولة، وضع الأولويات في قائمة الانتظار، وقوائم الرسائل الميتة.

ما هي قائمة انتظار الرسائل؟

قائمة انتظار الرسائل هي شكل من أشكال الاتصال غير المتزامن حيث:

  • المنتجون: ينشئون ويرسلون الرسائل إلى قائمة الانتظار
  • قائمة الانتظار: تخزن الرسائل حتى يتم استهلاكها
  • المستهلكون: يعالجون الرسائل من قائمة الانتظار
  • الوسيط: يدير البنية التحتية لقائمة الانتظار
الفرق الأساسي: Pub/Sub قائم على الدفع (يتم دفع الرسائل إلى جميع المشتركين). قوائم الانتظار قائمة على السحب (يسحب المستهلكون الرسائل عندما يكونون جاهزين).

لماذا نستخدم قوائم انتظار الرسائل؟

توفر قوائم انتظار الرسائل فوائد حاسمة:

  • التسليم المضمون: تستمر الرسائل حتى يتم الإقرار بها
  • موازنة الحمل: يتشارك المستهلكون المتعددون عبء العمل
  • الفصل: يعمل المنتجون والمستهلكون بشكل مستقل
  • منطق إعادة المحاولة: تتم إعادة محاولة الرسائل الفاشلة تلقائياً
  • تحديد المعدل: التحكم في سرعة معالجة الرسائل
  • معالجة الأولويات: معالجة الرسائل المهمة أولاً

أساسيات RabbitMQ

RabbitMQ هو وسيط رسائل شائع ينفذ بروتوكول AMQP:

// تثبيت التبعيات // npm install amqplib const amqp = require('amqplib'); // الاتصال بـ RabbitMQ const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // الإعلان عن قائمة انتظار const queueName = 'tasks'; await channel.assertQueue(queueName, { durable: true // تبقى قائمة الانتظار بعد إعادة تشغيل الوسيط }); // إرسال رسالة إلى قائمة الانتظار const message = { task: 'send_email', userId: 123 }; channel.sendToQueue( queueName, Buffer.from(JSON.stringify(message)), { persistent: true } // تبقى الرسالة بعد إعادة تشغيل الوسيط ); console.log('تم إرسال الرسالة إلى قائمة الانتظار'); // استهلاك الرسائل من قائمة الانتظار channel.consume(queueName, async (msg) => { const data = JSON.parse(msg.content.toString()); console.log('المعالجة:', data); try { // معالجة الرسالة await processTask(data); // الإقرار بالمعالجة الناجحة channel.ack(msg); } catch (error) { console.error('خطأ في معالجة الرسالة:', error); // رفض وإعادة الرسالة إلى قائمة الانتظار channel.nack(msg, false, true); } }, { noAck: false }); // الإقرار اليدوي
مهم: استخدم دائماً الإقرار اليدوي (noAck: false) لضمان عدم فقدان الرسائل إذا فشلت المعالجة.

قائمة انتظار Bull مع Redis

Bull هي مكتبة قائمة انتظار Node.js شائعة مبنية على Redis:

// تثبيت التبعيات // npm install bull const Queue = require('bull'); // إنشاء قائمة انتظار const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } }); // إضافة مهمة إلى قائمة الانتظار await emailQueue.add('welcome-email', { to: 'user@example.com', name: 'جون دو', template: 'welcome' }, { attempts: 3, // إعادة المحاولة حتى 3 مرات backoff: { type: 'exponential', delay: 2000 // البدء بتأخير 2 ثانية }, priority: 1 // رقم أعلى = أولوية أعلى }); // معالجة المهام من قائمة الانتظار emailQueue.process('welcome-email', 5, async (job) => { console.log(`معالجة المهمة ${job.id}:`, job.data); // محاكاة إرسال البريد الإلكتروني await sendEmail(job.data.to, job.data.template, { name: job.data.name }); // تحديث التقدم job.progress(100); return { sent: true, timestamp: new Date() }; }); // معالجة إكمال المهمة emailQueue.on('completed', (job, result) => { console.log(`المهمة ${job.id} مكتملة:`, result); }); // معالجة فشل المهمة emailQueue.on('failed', (job, error) => { console.error(`المهمة ${job.id} فشلت:`, error.message); });

البنية الموجهة للأحداث

استخدم قوائم الانتظار لبناء نظام موجه للأحداث:

const Queue = require('bull'); // إنشاء قوائم انتظار متخصصة const userQueue = new Queue('user-events'); const orderQueue = new Queue('order-events'); const notificationQueue = new Queue('notifications'); // سير عمل تسجيل المستخدم async function registerUser(userData) { // حفظ المستخدم في قاعدة البيانات const user = await db.users.create(userData); // تشغيل المهام اللاحقة عبر قوائم الانتظار await userQueue.add('user-registered', { userId: user.id, email: user.email, name: user.name }); return user; } // معالجة أحداث تسجيل المستخدم userQueue.process('user-registered', async (job) => { const { userId, email, name } = job.data; // إرسال بريد إلكتروني ترحيبي (قائمة انتظار منفصلة) await notificationQueue.add('send-email', { type: 'welcome', to: email, data: { name } }); // إنشاء إعدادات افتراضية await db.userSettings.create({ userId, theme: 'light', notifications: true }); // تتبع التحليلات await analytics.track('user_registered', { userId }); return { processed: true }; }); // معالجة الإشعارات notificationQueue.process('send-email', 10, async (job) => { const { type, to, data } = job.data; const template = await getEmailTemplate(type); await emailService.send(to, template, data); return { sent: true, emailType: type }; });
أفضل ممارسة: استخدم قوائم انتظار منفصلة لمخاوف مختلفة (المستخدمون، الطلبات، الإشعارات). هذا يسمح بتوسيع مستقل وعزل الفشل.

فصل الخدمات بقوائم الانتظار

تمكن قوائم الانتظار من استقلال الخدمة الحقيقي:

// الخدمة 1: خدمة الطلب (orders.js) const Queue = require('bull'); const orderQueue = new Queue('orders'); async function createOrder(orderData) { // حفظ الطلب في قاعدة البيانات const order = await db.orders.create(orderData); // إصدار حدث - لا يهتم من يعالجه await orderQueue.add('order-created', { orderId: order.id, userId: order.userId, total: order.total, items: order.items }); return order; } // الخدمة 2: خدمة المخزون (inventory.js) const orderQueue = new Queue('orders'); // الاشتراك في أحداث الطلب orderQueue.process('order-created', async (job) => { const { orderId, items } = job.data; // حجز المخزون for (const item of items) { await db.inventory.decrement(item.productId, item.quantity); } console.log(`تم حجز المخزون للطلب ${orderId}`); }); // الخدمة 3: خدمة البريد الإلكتروني (email.js) const orderQueue = new Queue('orders'); // أيضاً الاشتراك في نفس الأحداث orderQueue.process('order-created', async (job) => { const { userId } = job.data; const user = await db.users.findById(userId); // إرسال تأكيد الطلب await sendEmail(user.email, 'order-confirmation', job.data); console.log('تم إرسال تأكيد الطلب'); });
قابلية التوسع: يمكن لكل خدمة التوسع بشكل مستقل. إضافة المزيد من العمال إلى أي خدمة دون التأثير على الآخرين.

تسليم الرسائل المضمون

تأكد من عدم فقدان أي رسائل:

const Queue = require('bull'); const criticalQueue = new Queue('critical-tasks', { redis: { host: 'localhost', port: 6379 }, defaultJobOptions: { attempts: 5, // إعادة المحاولة 5 مرات backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: false, // الاحتفاظ بالمهام المكتملة removeOnFail: false // الاحتفاظ بالمهام الفاشلة للتفتيش } }); // إضافة مهمة حرجة await criticalQueue.add('process-payment', { orderId: 123, amount: 99.99, paymentMethod: 'credit_card' }, { jobId: `payment-${orderId}`, // معرف فريد يمنع التكرار timeout: 30000 // يجب إكمال المهمة في 30 ثانية }); // المعالجة مع معالجة أخطاء قوية criticalQueue.process('process-payment', async (job) => { const { orderId, amount, paymentMethod } = job.data; try { // معالجة الدفع const result = await paymentGateway.charge({ amount, method: paymentMethod }); // تحديث الطلب await db.orders.update(orderId, { status: 'paid', transactionId: result.transactionId }); return result; } catch (error) { // تسجيل الخطأ للمراقبة console.error(`فشل الدفع للطلب ${orderId}:`, error); // إذا كانت المحاولة الأخيرة، إرسال تنبيه if (job.attemptsMade >= job.opts.attempts) { await alertService.sendCritical({ message: `فشل الدفع بعد ${job.attemptsMade} محاولات`, orderId }); } throw error; // تشغيل إعادة المحاولة } }); // مراقبة المهام الفاشلة criticalQueue.on('failed', async (job, error) => { console.error(`المهمة ${job.id} فشلت بشكل دائم:`, error); // النقل إلى قائمة انتظار الرسائل الميتة للفحص اليدوي const deadLetterQueue = new Queue('dead-letter'); await deadLetterQueue.add('failed-job', { originalJob: job.data, error: error.message, attempts: job.attemptsMade }); });

قوائم انتظار الأولويات

معالجة الرسائل ذات الأولوية العالية أولاً:

const taskQueue = new Queue('tasks'); // إضافة مهام بأولويات مختلفة await taskQueue.add('task', { type: 'normal' }, { priority: 5 }); await taskQueue.add('task', { type: 'important' }, { priority: 10 }); await taskQueue.add('task', { type: 'urgent' }, { priority: 20 }); // تتم معالجة المهام حسب الأولوية: عاجل → مهم → عادي taskQueue.process('task', async (job) => { console.log(`معالجة ${job.data.type} مهمة بأولوية ${job.opts.priority}`); await performTask(job.data); });

المهام المؤجلة

جدولة المهام للتشغيل في المستقبل:

// إرسال تذكير في 24 ساعة await reminderQueue.add('send-reminder', { userId: 123, message: 'لا تنس إكمال ملفك الشخصي!' }, { delay: 24 * 60 * 60 * 1000 // 24 ساعة بالميلي ثانية }); // جدولة مهمة متكررة await reportQueue.add('daily-report', { reportType: 'sales' }, { repeat: { cron: '0 9 * * *' // كل يوم في الساعة 9 صباحاً } });

مراقبة صحة قائمة الانتظار

// الحصول على إحصائيات قائمة الانتظار const waiting = await queue.getWaitingCount(); const active = await queue.getActiveCount(); const completed = await queue.getCompletedCount(); const failed = await queue.getFailedCount(); console.log('إحصائيات قائمة الانتظار:', { waiting, active, completed, failed }); // الحصول على جميع المهام الفاشلة const failedJobs = await queue.getFailed(); failedJobs.forEach(job => { console.log(`المهمة الفاشلة ${job.id}:`, job.failedReason); }); // تنظيف المهام القديمة await queue.clean(5000, 'completed'); // إزالة المهام المكتملة الأقدم من 5 ثوان await queue.clean(10000, 'failed'); // إزالة المهام الفاشلة الأقدم من 10 ثوان
تمرين:
  1. قم بإعداد قائمة انتظار Bull مع Redis محلياً
  2. أنشئ نظام معالجة طلبات بثلاث قوائم انتظار:
    • قائمة انتظار الطلبات: تنشئ الطلبات
    • قائمة انتظار الدفع: تعالج المدفوعات (مع منطق إعادة المحاولة)
    • قائمة انتظار الإيفاء: تتعامل مع الشحن
  3. نفذ معالجة الأخطاء الصحيحة ومنطق إعادة المحاولة
  4. أضف معالجة الأولويات للطلبات السريعة
  5. أنشئ نقطة نهاية مراقبة تعرض إحصائيات قائمة الانتظار
  6. قم بمحاكاة الفشل والتحقق من إعادة محاولة الرسائل

ملخص

قوائم انتظار الرسائل حاسمة لأنظمة الوقت الفعلي الموثوقة:

  • ضمان تسليم الرسائل بالإقرارات
  • تمكين فصل الخدمة والاستقلال
  • توفير منطق إعادة المحاولة التلقائي
  • دعم المعالجة ذات الأولوية والمؤجلة
  • السماح بتوسيع مستقل للمنتجين والمستهلكين
  • ضروري للبنى الموجهة للأحداث