WebSockets والتطبيقات الفورية

Redis Pub/Sub للاتصال في الوقت الفعلي

16 دقيقة الدرس 22 من 35

مقدمة في Redis Pub/Sub

Redis Pub/Sub (النشر/الاشتراك) هو نمط مراسلة يتيح الاتصال بين أجزاء مختلفة من تطبيقك أو بين خدمات متعددة. إنه ضروري لبناء أنظمة في الوقت الفعلي قابلة للتوسع.

ما هو Pub/Sub؟

يحتوي نمط Pub/Sub على ثلاثة مكونات رئيسية:

  • الناشرون: يرسلون الرسائل إلى القنوات
  • المشتركون: يستمعون إلى القنوات ويتلقون الرسائل
  • القنوات: مسارات اتصال مسماة
مفهوم أساسي: الناشرون والمشتركون منفصلون. لا يعرف الناشرون من يشترك، ولا يعرف المشتركون من ينشر.

لماذا Redis Pub/Sub؟

Redis Pub/Sub مثالي لتطبيقات الوقت الفعلي لأنه:

  • سريع للغاية (عمليات في الذاكرة)
  • واجهة برمجية بسيطة وسهلة التنفيذ
  • يدعم الاشتراكات القائمة على الأنماط
  • مثالي للاتصال عبر الخدمات
  • يعمل بسلاسة مع Socket.io ومكتبات WebSocket الأخرى

مثال أساسي على Redis Pub/Sub

const redis = require('redis'); // إنشاء عميل الناشر const publisher = redis.createClient({ url: 'redis://localhost:6379' }); // إنشاء عميل المشترك const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); // الاتصال بكلا العميلين await publisher.connect(); await subscriber.connect(); // الاشتراك في قناة await subscriber.subscribe('news', (message) => { console.log('تم استلام الرسالة:', message); }); // نشر رسالة await publisher.publish('news', 'أخبار عاجلة: Redis رائع!'); // الإخراج: تم استلام الرسالة: أخبار عاجلة: Redis رائع!
مهم: تحتاج إلى مثيلات عميل Redis منفصلة للنشر والاشتراك. لا يمكن لعميل في وضع الاشتراك تنفيذ أوامر Redis أخرى.

نشر الرسائل

النشر واضح ومباشر مع Redis:

// رسالة نصية بسيطة await publisher.publish('channel-name', 'مرحبا بالعالم'); // بيانات JSON const data = { userId: 123, action: 'login', timestamp: Date.now() }; await publisher.publish('user-events', JSON.stringify(data)); // يُرجع النشر عدد المشتركين الذين تلقوا الرسالة const subscriberCount = await publisher.publish('notifications', 'رسالة جديدة'); console.log(`تم توصيل الرسالة إلى ${subscriberCount} مشترك`);

الاشتراك في القنوات

يستمع المشتركون إلى قناة واحدة أو أكثر:

// الاشتراك في قناة واحدة await subscriber.subscribe('chat', (message) => { console.log('رسالة الدردشة:', message); }); // الاشتراك في قنوات متعددة await subscriber.subscribe('chat', (message, channel) => { console.log(`رسالة من ${channel}:`, message); }); await subscriber.subscribe('notifications', (message, channel) => { console.log(`إشعار من ${channel}:`, message); }); // إلغاء الاشتراك من قناة await subscriber.unsubscribe('chat'); // إلغاء الاشتراك من جميع القنوات await subscriber.unsubscribe();

الاشتراكات القائمة على الأنماط

الاشتراك في قنوات متعددة تطابق نمطاً:

// الاشتراك في جميع القنوات التي تبدأ بـ "user:" await subscriber.pSubscribe('user:*', (message, channel) => { console.log(`رسالة من ${channel}:`, message); }); // يطابق: user:login, user:logout, user:register, إلخ. // الاشتراك في جميع قنوات غرف الدردشة await subscriber.pSubscribe('chat:room:*', (message, channel) => { const roomId = channel.split(':')[2]; console.log(`رسالة في الغرفة ${roomId}:`, message); }); // يطابق: chat:room:1, chat:room:2, chat:room:lobby, إلخ. // إلغاء الاشتراك من الأنماط await subscriber.pUnsubscribe('user:*');
أفضل ممارسة: استخدم اصطلاحات تسمية قنوات وصفية مثل "resource:action" أو "service:event:id" لجعل اشتراكات الأنماط قوية.

دمج Redis Pub/Sub مع Socket.io

دمج Redis Pub/Sub مع Socket.io لتطبيقات وقت فعلي قابلة للتوسع:

const express = require('express'); const { Server } = require('socket.io'); const redis = require('redis'); const app = express(); const server = app.listen(3000); const io = new Server(server); // عملاء Redis const publisher = redis.createClient({ url: 'redis://localhost:6379' }); const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); await publisher.connect(); await subscriber.connect(); // الاشتراك في قنوات Redis await subscriber.subscribe('broadcast', (message) => { const data = JSON.parse(message); // البث إلى جميع عملاء Socket.io io.emit('message', data); }); await subscriber.subscribe('notifications', (message) => { const notification = JSON.parse(message); // إرسال إلى مستخدم محدد io.to(`user-${notification.userId}`).emit('notification', notification); }); // اتصال Socket.io io.on('connection', (socket) => { socket.on('send_message', (data) => { // النشر إلى Redis بدلاً من الإصدار مباشرة publisher.publish('broadcast', JSON.stringify({ userId: socket.userId, message: data.message, timestamp: new Date() })); }); socket.on('join_room', (roomId) => { socket.join(`room-${roomId}`); // الاشتراك في قناة Redis خاصة بالغرفة subscriber.subscribe(`room:${roomId}`, (message) => { io.to(`room-${roomId}`).emit('room_message', JSON.parse(message)); }); }); });

الاتصال عبر الخدمات

استخدم Redis Pub/Sub للاتصال بين خدمات مختلفة:

// الخدمة 1: خدمة المستخدم (users.js) const redis = require('redis'); const publisher = redis.createClient({ url: 'redis://localhost:6379' }); await publisher.connect(); // عند تسجيل المستخدم async function registerUser(userData) { // الحفظ في قاعدة البيانات const user = await db.users.create(userData); // نشر الحدث await publisher.publish('user:registered', JSON.stringify({ userId: user.id, email: user.email, name: user.name, timestamp: new Date() })); return user; } // الخدمة 2: خدمة البريد الإلكتروني (email.js) const subscriber = redis.createClient({ url: 'redis://localhost:6379' }); await subscriber.connect(); // الاستماع لأحداث تسجيل المستخدم await subscriber.subscribe('user:registered', async (message) => { const user = JSON.parse(message); // إرسال بريد إلكتروني ترحيبي await sendWelcomeEmail(user.email, user.name); console.log(`تم إرسال بريد إلكتروني ترحيبي إلى ${user.email}`); }); // الخدمة 3: خدمة التحليلات (analytics.js) const analyticsSubscriber = redis.createClient({ url: 'redis://localhost:6379' }); await analyticsSubscriber.connect(); // الاستماع لجميع أحداث المستخدم await analyticsSubscriber.pSubscribe('user:*', async (message, channel) => { const event = channel.split(':')[1]; // registered, login, logout, إلخ. const data = JSON.parse(message); // تتبع الحدث في التحليلات await trackEvent(event, data); });

البنية الموجهة للأحداث مع Pub/Sub

// أداة ناشر الأحداث class EventBus { constructor(redisUrl) { this.publisher = redis.createClient({ url: redisUrl }); } async connect() { await this.publisher.connect(); } async emit(event, data) { const message = JSON.stringify({ event, data, timestamp: new Date(), id: generateUniqueId() }); await this.publisher.publish(event, message); } } // أداة مستمع الأحداث class EventListener { constructor(redisUrl) { this.subscriber = redis.createClient({ url: redisUrl }); this.handlers = new Map(); } async connect() { await this.subscriber.connect(); } async on(event, handler) { this.handlers.set(event, handler); await this.subscriber.subscribe(event, async (message) => { const { data } = JSON.parse(message); await handler(data); }); } async onPattern(pattern, handler) { await this.subscriber.pSubscribe(pattern, async (message, channel) => { const { data } = JSON.parse(message); await handler(data, channel); }); } } // الاستخدام const eventBus = new EventBus('redis://localhost:6379'); await eventBus.connect(); const listener = new EventListener('redis://localhost:6379'); await listener.connect(); // إصدار الأحداث await eventBus.emit('order:created', { orderId: 123, userId: 456 }); await eventBus.emit('order:shipped', { orderId: 123, trackingNumber: 'ABC123' }); // الاستماع للأحداث await listener.on('order:created', async (data) => { console.log('طلب جديد:', data); await processOrder(data.orderId); }); await listener.onPattern('order:*', async (data, channel) => { const action = channel.split(':')[1]; console.log(`الطلب ${action}:`, data); });
قابلية التوسع: يتيح لك هذا النمط إضافة خدمات جديدة تتفاعل مع الأحداث دون تعديل الخدمات الحالية. فقط اشترك في الأحداث التي تحتاجها.

معالجة تسليم الرسائل

Redis Pub/Sub هو "أطلق وانس" - لا يتم الاحتفاظ بالرسائل:

// تُفقد الرسائل إذا: // 1. لا يوجد مشتركون يستمعون عند النشر // 2. ينفصل مشترك مؤقتاً // 3. يعيد خادم Redis التشغيل // التحقق مما إذا تم تسليم الرسالة const count = await publisher.publish('channel', 'message'); if (count === 0) { console.log('لا يوجد مشتركون - لم يتم تسليم الرسالة'); // فكر في استخدام Redis Streams أو قائمة انتظار الرسائل بدلاً من ذلك }
قيد: لا يضمن Redis Pub/Sub تسليم الرسائل. للرسائل الحرجة التي تتطلب تسليماً مضموناً، استخدم Redis Streams أو قوائم انتظار الرسائل مثل RabbitMQ.

مراقبة Pub/Sub

// الحصول على القنوات النشطة const channels = await publisher.pubSubChannels(); console.log('القنوات النشطة:', channels); // الحصول على عدد المشتركين لقناة const numSubs = await publisher.pubSubNumSub('chat'); console.log('المشتركون في الدردشة:', numSubs); // الحصول على عدد اشتراكات الأنماط const numPat = await publisher.pubSubNumPat(); console.log('اشتراكات الأنماط النشطة:', numPat);
تمرين:
  1. أنشئ تطبيق متعدد الخدمات بثلاث خدمات:
    • خدمة API: تتعامل مع طلبات HTTP وتنشر الأحداث
    • خدمة الإشعارات: تشترك في الأحداث وترسل الإشعارات
    • خدمة السجل: تشترك في جميع الأحداث وتسجلها
  2. نفذ ما لا يقل عن 3 أنواع مختلفة من الأحداث (user.created، order.placed، payment.completed)
  3. استخدم اشتراكات الأنماط في خدمة السجل لالتقاط جميع الأحداث
  4. اختبر أن الأحداث يتم استلامها من قبل عدة مشتركين في وقت واحد
  5. راقب القنوات النشطة وعدد المشتركين

ملخص

Redis Pub/Sub هو نمط قوي للاتصال في الوقت الفعلي:

  • يفصل الناشرين والمشتركين
  • يدعم الاشتراكات القائمة على الأنماط
  • مثالي للاتصال عبر الخدمات
  • يتكامل بسلاسة مع Socket.io
  • يمكّن البنية الموجهة للأحداث
  • سريع وبسيط، لكن لا يضمن التسليم