WebSockets والتطبيقات الفورية
Redis Pub/Sub للاتصال في الوقت الفعلي
مقدمة في Redis Pub/Sub
Redis Pub/Sub (النشر/الاشتراك) هو نمط مراسلة يتيح الاتصال بين أجزاء مختلفة من تطبيقك أو بين خدمات متعددة. إنه ضروري لبناء أنظمة في الوقت الفعلي قابلة للتوسع.
ما هو Pub/Sub؟
يحتوي نمط Pub/Sub على ثلاثة مكونات رئيسية:
- الناشرون: يرسلون الرسائل إلى القنوات
- المشتركون: يستمعون إلى القنوات ويتلقون الرسائل
- القنوات: مسارات اتصال مسماة
مفهوم أساسي: الناشرون والمشتركون منفصلون. لا يعرف الناشرون من يشترك، ولا يعرف المشتركون من ينشر.
لماذا Redis Pub/Sub؟
Redis Pub/Sub مثالي لتطبيقات الوقت الفعلي لأنه:
- سريع للغاية (عمليات في الذاكرة)
- واجهة برمجية بسيطة وسهلة التنفيذ
- يدعم الاشتراكات القائمة على الأنماط
- مثالي للاتصال عبر الخدمات
- يعمل بسلاسة مع Socket.io ومكتبات WebSocket الأخرى
مثال أساسي على Redis Pub/Sub
const redis = require('redis');
// إنشاء عميل الناشر
const publisher = redis.createClient({
url: 'redis://localhost:6379'
});
// إنشاء عميل المشترك
const subscriber = redis.createClient({
url: 'redis://localhost:6379'
});
// الاتصال بكلا العميلين
await publisher.connect();
await subscriber.connect();
// الاشتراك في قناة
await subscriber.subscribe('news', (message) => {
console.log('تم استلام الرسالة:', message);
});
// نشر رسالة
await publisher.publish('news', 'أخبار عاجلة: Redis رائع!');
// الإخراج: تم استلام الرسالة: أخبار عاجلة: Redis رائع!
مهم: تحتاج إلى مثيلات عميل Redis منفصلة للنشر والاشتراك. لا يمكن لعميل في وضع الاشتراك تنفيذ أوامر Redis أخرى.
نشر الرسائل
النشر واضح ومباشر مع Redis:
// رسالة نصية بسيطة
await publisher.publish('channel-name', 'مرحبا بالعالم');
// بيانات JSON
const data = { userId: 123, action: 'login', timestamp: Date.now() };
await publisher.publish('user-events', JSON.stringify(data));
// يُرجع النشر عدد المشتركين الذين تلقوا الرسالة
const subscriberCount = await publisher.publish('notifications', 'رسالة جديدة');
console.log(`تم توصيل الرسالة إلى ${subscriberCount} مشترك`);
الاشتراك في القنوات
يستمع المشتركون إلى قناة واحدة أو أكثر:
// الاشتراك في قناة واحدة
await subscriber.subscribe('chat', (message) => {
console.log('رسالة الدردشة:', message);
});
// الاشتراك في قنوات متعددة
await subscriber.subscribe('chat', (message, channel) => {
console.log(`رسالة من ${channel}:`, message);
});
await subscriber.subscribe('notifications', (message, channel) => {
console.log(`إشعار من ${channel}:`, message);
});
// إلغاء الاشتراك من قناة
await subscriber.unsubscribe('chat');
// إلغاء الاشتراك من جميع القنوات
await subscriber.unsubscribe();
الاشتراكات القائمة على الأنماط
الاشتراك في قنوات متعددة تطابق نمطاً:
// الاشتراك في جميع القنوات التي تبدأ بـ "user:"
await subscriber.pSubscribe('user:*', (message, channel) => {
console.log(`رسالة من ${channel}:`, message);
});
// يطابق: user:login, user:logout, user:register, إلخ.
// الاشتراك في جميع قنوات غرف الدردشة
await subscriber.pSubscribe('chat:room:*', (message, channel) => {
const roomId = channel.split(':')[2];
console.log(`رسالة في الغرفة ${roomId}:`, message);
});
// يطابق: chat:room:1, chat:room:2, chat:room:lobby, إلخ.
// إلغاء الاشتراك من الأنماط
await subscriber.pUnsubscribe('user:*');
أفضل ممارسة: استخدم اصطلاحات تسمية قنوات وصفية مثل "resource:action" أو "service:event:id" لجعل اشتراكات الأنماط قوية.
دمج Redis Pub/Sub مع Socket.io
دمج Redis Pub/Sub مع Socket.io لتطبيقات وقت فعلي قابلة للتوسع:
const express = require('express');
const { Server } = require('socket.io');
const redis = require('redis');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// عملاء Redis
const publisher = redis.createClient({ url: 'redis://localhost:6379' });
const subscriber = redis.createClient({ url: 'redis://localhost:6379' });
await publisher.connect();
await subscriber.connect();
// الاشتراك في قنوات Redis
await subscriber.subscribe('broadcast', (message) => {
const data = JSON.parse(message);
// البث إلى جميع عملاء Socket.io
io.emit('message', data);
});
await subscriber.subscribe('notifications', (message) => {
const notification = JSON.parse(message);
// إرسال إلى مستخدم محدد
io.to(`user-${notification.userId}`).emit('notification', notification);
});
// اتصال Socket.io
io.on('connection', (socket) => {
socket.on('send_message', (data) => {
// النشر إلى Redis بدلاً من الإصدار مباشرة
publisher.publish('broadcast', JSON.stringify({
userId: socket.userId,
message: data.message,
timestamp: new Date()
}));
});
socket.on('join_room', (roomId) => {
socket.join(`room-${roomId}`);
// الاشتراك في قناة Redis خاصة بالغرفة
subscriber.subscribe(`room:${roomId}`, (message) => {
io.to(`room-${roomId}`).emit('room_message', JSON.parse(message));
});
});
});
الاتصال عبر الخدمات
استخدم Redis Pub/Sub للاتصال بين خدمات مختلفة:
// الخدمة 1: خدمة المستخدم (users.js)
const redis = require('redis');
const publisher = redis.createClient({ url: 'redis://localhost:6379' });
await publisher.connect();
// عند تسجيل المستخدم
async function registerUser(userData) {
// الحفظ في قاعدة البيانات
const user = await db.users.create(userData);
// نشر الحدث
await publisher.publish('user:registered', JSON.stringify({
userId: user.id,
email: user.email,
name: user.name,
timestamp: new Date()
}));
return user;
}
// الخدمة 2: خدمة البريد الإلكتروني (email.js)
const subscriber = redis.createClient({ url: 'redis://localhost:6379' });
await subscriber.connect();
// الاستماع لأحداث تسجيل المستخدم
await subscriber.subscribe('user:registered', async (message) => {
const user = JSON.parse(message);
// إرسال بريد إلكتروني ترحيبي
await sendWelcomeEmail(user.email, user.name);
console.log(`تم إرسال بريد إلكتروني ترحيبي إلى ${user.email}`);
});
// الخدمة 3: خدمة التحليلات (analytics.js)
const analyticsSubscriber = redis.createClient({ url: 'redis://localhost:6379' });
await analyticsSubscriber.connect();
// الاستماع لجميع أحداث المستخدم
await analyticsSubscriber.pSubscribe('user:*', async (message, channel) => {
const event = channel.split(':')[1]; // registered, login, logout, إلخ.
const data = JSON.parse(message);
// تتبع الحدث في التحليلات
await trackEvent(event, data);
});
البنية الموجهة للأحداث مع Pub/Sub
// أداة ناشر الأحداث
class EventBus {
constructor(redisUrl) {
this.publisher = redis.createClient({ url: redisUrl });
}
async connect() {
await this.publisher.connect();
}
async emit(event, data) {
const message = JSON.stringify({
event,
data,
timestamp: new Date(),
id: generateUniqueId()
});
await this.publisher.publish(event, message);
}
}
// أداة مستمع الأحداث
class EventListener {
constructor(redisUrl) {
this.subscriber = redis.createClient({ url: redisUrl });
this.handlers = new Map();
}
async connect() {
await this.subscriber.connect();
}
async on(event, handler) {
this.handlers.set(event, handler);
await this.subscriber.subscribe(event, async (message) => {
const { data } = JSON.parse(message);
await handler(data);
});
}
async onPattern(pattern, handler) {
await this.subscriber.pSubscribe(pattern, async (message, channel) => {
const { data } = JSON.parse(message);
await handler(data, channel);
});
}
}
// الاستخدام
const eventBus = new EventBus('redis://localhost:6379');
await eventBus.connect();
const listener = new EventListener('redis://localhost:6379');
await listener.connect();
// إصدار الأحداث
await eventBus.emit('order:created', { orderId: 123, userId: 456 });
await eventBus.emit('order:shipped', { orderId: 123, trackingNumber: 'ABC123' });
// الاستماع للأحداث
await listener.on('order:created', async (data) => {
console.log('طلب جديد:', data);
await processOrder(data.orderId);
});
await listener.onPattern('order:*', async (data, channel) => {
const action = channel.split(':')[1];
console.log(`الطلب ${action}:`, data);
});
قابلية التوسع: يتيح لك هذا النمط إضافة خدمات جديدة تتفاعل مع الأحداث دون تعديل الخدمات الحالية. فقط اشترك في الأحداث التي تحتاجها.
معالجة تسليم الرسائل
Redis Pub/Sub هو "أطلق وانس" - لا يتم الاحتفاظ بالرسائل:
// تُفقد الرسائل إذا:
// 1. لا يوجد مشتركون يستمعون عند النشر
// 2. ينفصل مشترك مؤقتاً
// 3. يعيد خادم Redis التشغيل
// التحقق مما إذا تم تسليم الرسالة
const count = await publisher.publish('channel', 'message');
if (count === 0) {
console.log('لا يوجد مشتركون - لم يتم تسليم الرسالة');
// فكر في استخدام Redis Streams أو قائمة انتظار الرسائل بدلاً من ذلك
}
قيد: لا يضمن Redis Pub/Sub تسليم الرسائل. للرسائل الحرجة التي تتطلب تسليماً مضموناً، استخدم Redis Streams أو قوائم انتظار الرسائل مثل RabbitMQ.
مراقبة Pub/Sub
// الحصول على القنوات النشطة
const channels = await publisher.pubSubChannels();
console.log('القنوات النشطة:', channels);
// الحصول على عدد المشتركين لقناة
const numSubs = await publisher.pubSubNumSub('chat');
console.log('المشتركون في الدردشة:', numSubs);
// الحصول على عدد اشتراكات الأنماط
const numPat = await publisher.pubSubNumPat();
console.log('اشتراكات الأنماط النشطة:', numPat);
تمرين:
- أنشئ تطبيق متعدد الخدمات بثلاث خدمات:
- خدمة API: تتعامل مع طلبات HTTP وتنشر الأحداث
- خدمة الإشعارات: تشترك في الأحداث وترسل الإشعارات
- خدمة السجل: تشترك في جميع الأحداث وتسجلها
- نفذ ما لا يقل عن 3 أنواع مختلفة من الأحداث (user.created، order.placed، payment.completed)
- استخدم اشتراكات الأنماط في خدمة السجل لالتقاط جميع الأحداث
- اختبر أن الأحداث يتم استلامها من قبل عدة مشتركين في وقت واحد
- راقب القنوات النشطة وعدد المشتركين
ملخص
Redis Pub/Sub هو نمط قوي للاتصال في الوقت الفعلي:
- يفصل الناشرين والمشتركين
- يدعم الاشتراكات القائمة على الأنماط
- مثالي للاتصال عبر الخدمات
- يتكامل بسلاسة مع Socket.io
- يمكّن البنية الموجهة للأحداث
- سريع وبسيط، لكن لا يضمن التسليم