Redis والتخزين المؤقت المتقدم

بناء مشروع طبقة تخزين مؤقت (الجزء الثاني)

20 دقيقة الدرس 28 من 30

بناء مشروع طبقة تخزين مؤقت (الجزء الثاني)

في الجزء الثاني، سنوسع طبقة التخزين المؤقت لدينا بإدارة الجلسات وتحديد المعدل والميزات في الوقت الفعلي باستخدام Pub/Sub وتكامل قائمة الانتظار وأحداث إبطال الذاكرة المؤقتة.

إدارة الجلسات باستخدام Redis

تنفيذ تخزين جلسات موزع باستخدام Redis:

// تثبيت حزم الجلسة
// npm install express-session connect-redis

// src/config/session.js
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const redisClient = require('./redis');

const sessionConfig = {
store: new RedisStore({
client: redisClient,
prefix: 'sess:' // بادئة لمفاتيح الجلسة
}),
secret: process.env.SESSION_SECRET || 'your-secret-key',
resave: false,
saveUninitialized: false,
cookie: {
secure: process.env.NODE_ENV === 'production', // HTTPS فقط في الإنتاج
httpOnly: true, // منع هجمات XSS
maxAge: 1000 * 60 * 60 * 24 // 24 ساعة
},
rolling: true // إعادة تعيين maxAge للكوكي عند كل استجابة
};

module.exports = session(sessionConfig);
// الإضافة إلى src/app.js
const sessionMiddleware = require('./config/session');

// استخدام وسيط الجلسة
app.use(sessionMiddleware);

// مثال سلة تسوق قائمة على الجلسة
app.post('/api/cart/add', (req, res) => {
const { productId, quantity } = req.body;

// تهيئة السلة إذا لم تكن موجودة
if (!req.session.cart) {
req.session.cart = [];
}

// إضافة عنصر إلى السلة
const existingItem = req.session.cart.find(item => item.productId === productId);

if (existingItem) {
existingItem.quantity += quantity;
} else {
req.session.cart.push({ productId, quantity });
}

res.json({ cart: req.session.cart });
});

app.get('/api/cart', (req, res) => {
res.json({ cart: req.session.cart || [] });
});
فوائد الجلسة: استخدام Redis للجلسات يوفر تخزين جلسات موزع، انتهاء تلقائي للصلاحية، استمرارية عبر إعادة تشغيل الخادم، وقابلية التوسع لنسخ تطبيق متعددة خلف موازن الحمل.

تحديد المعدل باستخدام Redis

تنفيذ تحديد معدل النافذة المنزلقة لحماية واجهة برمجة التطبيقات الخاصة بك:

// src/middleware/rateLimiter.js
const redisClient = require('../config/redis');

/**
* محدد معدل النافذة المنزلقة
* @param {number} maxRequests - الحد الأقصى للطلبات المسموح بها
* @param {number} windowSeconds - نافذة الوقت بالثواني
*/
const rateLimiter = (maxRequests = 100, windowSeconds = 60) => {
return async (req, res, next) => {
// استخدام عنوان IP أو معرف المستخدم كمعرف
const identifier = req.session?.userId || req.ip;
const key = `rate:${identifier}`;
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);

try {
// استخدام مجموعة مرتبة في Redis للنافذة المنزلقة
const multi = redisClient.multi();

// إزالة الإدخالات القديمة خارج النافذة
multi.zRemRangeByScore(key, 0, windowStart);

// عد الطلبات الحالية في النافذة
multi.zCard(key);

// إضافة الطلب الحالي
multi.zAdd(key, { score: now, value: `${now}` });

// تعيين انتهاء الصلاحية
multi.expire(key, windowSeconds);

const results = await multi.exec();
const currentRequests = results[1] as number;

// تعيين رؤوس تحديد المعدل
res.set({
'X-RateLimit-Limit': maxRequests,
'X-RateLimit-Remaining': Math.max(0, maxRequests - currentRequests - 1),
'X-RateLimit-Reset': new Date(now + windowSeconds * 1000).toISOString()
});

if (currentRequests >= maxRequests) {
return res.status(429).json({
error: 'طلبات كثيرة جداً',
retryAfter: windowSeconds
});
}

next();

} catch (err) {
console.error('خطأ في محدد المعدل:', err);
// الفشل المفتوح - السماح بالطلب إذا كان Redis معطلاً
next();
}
};
};

module.exports = rateLimiter;
// تطبيق تحديد المعدل على المسارات
const rateLimiter = require('./middleware/rateLimiter');

// حد معدل عام: 1000 طلب لكل 15 دقيقة
app.use('/api/', rateLimiter(1000, 900));

// حد أكثر صرامة للعمليات المكلفة
app.post('/api/products', rateLimiter(10, 60), productController.create);

// حدود مختلفة للمستخدمين المصادق عليهم
app.get('/api/products', (req, res, next) => {
const limit = req.session.userId ? 1000 : 100;
return rateLimiter(limit, 3600)(req, res, next);
}, productController.list);

ميزات الوقت الفعلي مع Pub/Sub

استخدام Redis Pub/Sub للإشعارات في الوقت الفعلي ومزامنة الذاكرة المؤقتة:

// src/services/notificationService.js
const redisClient = require('../config/redis');

class NotificationService {
constructor() {
// إنشاء عملاء منفصلين للنشر والاشتراك
this.publisher = redisClient.duplicate();
this.subscriber = redisClient.duplicate();
this.listeners = new Map();

this.setupSubscriber();
}

async setupSubscriber() {
await this.subscriber.connect();

this.subscriber.on('message', (channel, message) => {
const listeners = this.listeners.get(channel) || [];
const data = JSON.parse(message);

listeners.forEach(listener => {
try {
listener(data);
} catch (err) {
console.error('خطأ في المستمع:', err);
}
});
});
}

/**
* الاشتراك في قناة
*/
async subscribe(channel, listener) {
if (!this.listeners.has(channel)) {
this.listeners.set(channel, []);
await this.subscriber.subscribe(channel);
}

this.listeners.get(channel).push(listener);
}

/**
* نشر رسالة إلى قناة
*/
async publish(channel, data) {
if (!this.publisher.isReady) {
await this.publisher.connect();
}

await this.publisher.publish(channel, JSON.stringify(data));
}

/**
* إلغاء الاشتراك من قناة
*/
async unsubscribe(channel) {
await this.subscriber.unsubscribe(channel);
this.listeners.delete(channel);
}
}

module.exports = new NotificationService();
// مثال تحديثات المنتج في الوقت الفعلي
const notificationService = require('./services/notificationService');

// الاشتراك في تحديثات المنتج
notificationService.subscribe('product:updates', (data) => {
console.log('تم تحديث المنتج:', data);
// إرسال إشعار WebSocket للعملاء المتصلين
// إبطال الذاكرة المؤقتة المحلية
// تحديث فهرس البحث
});

// النشر عند تحديث المنتج
class ProductService {
async update(id, updates) {
const product = await Product.findByIdAndUpdate(id, updates, { new: true });

// إبطال الذاكرة المؤقتة
await cacheService.del(`products:${id}`);
await cacheService.delPattern('products:list:*');

// نشر حدث التحديث
await notificationService.publish('product:updates', {
type: 'update',
productId: id,
product,
timestamp: Date.now()
});

return product;
}
}

تكامل قائمة انتظار الوظائف

استخدام Redis كقائمة انتظار رسائل للوظائف في الخلفية:

// تثبيت قائمة انتظار Bull
// npm install bull

// src/queues/cacheWarmingQueue.js
const Queue = require('bull');
const redisConfig = {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD
};

const cacheWarmingQueue = new Queue('cache-warming', {
redis: redisConfig
});

// معالجة وظائف تسخين الذاكرة المؤقتة
cacheWarmingQueue.process(async (job) => {
const { type, data } = job.data;

console.log(`معالجة وظيفة تسخين الذاكرة المؤقتة: ${type}`);

switch (type) {
case 'products':
await warmProductCache();
break;
case 'categories':
await warmCategoryCache();
break;
default:
console.warn(`نوع وظيفة غير معروف: ${type}`);
}

return { success: true };
});

// تسخين ذاكرة المنتج المؤقتة
async function warmProductCache() {
const products = await Product.find({ isActive: true }).limit(100).lean();

for (const product of products) {
await cacheService.set(
`products:${product._id}`,
product,
3600
);
}

console.log(`تم تسخين الذاكرة المؤقتة لـ ${products.length} منتج`);
}

// جدولة تسخين الذاكرة المؤقتة المتكرر
cacheWarmingQueue.add(
{ type: 'products' },
{
repeat: {
cron: '0 */6 * * *' // كل 6 ساعات
}
}
);

module.exports = cacheWarmingQueue;
// src/queues/emailQueue.js
const Queue = require('bull');

const emailQueue = new Queue('email', {
redis: redisConfig,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true
}
});

emailQueue.process(async (job) => {
const { to, subject, body } = job.data;

// إرسال البريد الإلكتروني باستخدام خدمة البريد الإلكتروني
console.log(`إرسال بريد إلكتروني إلى ${to}`);
// await emailService.send({ to, subject, body });

return { sent: true };
});

// مراقبة أحداث قائمة الانتظار
emailQueue.on('completed', (job) => {
console.log(`تمت وظيفة البريد الإلكتروني ${job.id}`);
});

emailQueue.on('failed', (job, err) => {
console.error(`فشلت وظيفة البريد الإلكتروني ${job.id}:`, err);
});

module.exports = emailQueue;
أفضل ممارسات قائمة الانتظار: استخدم نسخ Redis منفصلة للذاكرة المؤقتة وقوائم الانتظار في الإنتاج، نفذ منطق إعادة محاولة الوظيفة مع التراجع الأسي، راقب صحة قائمة الانتظار وفشل الوظائف، ونظف الوظائف المكتملة بانتظام.

أحداث إبطال الذاكرة المؤقتة

تنفيذ نظام إبطال ذاكرة مؤقتة مدفوع بالأحداث:

// src/services/cacheInvalidationService.js
const notificationService = require('./notificationService');
const cacheService = require('./cacheService');

class CacheInvalidationService {
constructor() {
this.setupListeners();
}

setupListeners() {
// الاستماع لأحداث إبطال الذاكرة المؤقتة
notificationService.subscribe('cache:invalidate', async (event) => {
console.log('حدث إبطال الذاكرة المؤقتة:', event);

switch (event.type) {
case 'key':
await cacheService.del(event.key);
break;
case 'pattern':
await cacheService.delPattern(event.pattern);
break;
case 'all':
await this.invalidateAll();
break;
}
});
}

/**
* إبطال مفتاح ذاكرة مؤقتة محدد عبر جميع النسخ
*/
async invalidateKey(key) {
await cacheService.del(key);
await notificationService.publish('cache:invalidate', {
type: 'key',
key,
timestamp: Date.now()
});
}

/**
* إبطال مفاتيح الذاكرة المؤقتة المطابقة للنمط
*/
async invalidatePattern(pattern) {
await cacheService.delPattern(pattern);
await notificationService.publish('cache:invalidate', {
type: 'pattern',
pattern,
timestamp: Date.now()
});
}

/**
* إبطال جميع الذاكرة المؤقتة
*/
async invalidateAll() {
const keys = await redisClient.keys('cache:*');
if (keys.length > 0) {
await redisClient.del(keys);
}
await notificationService.publish('cache:invalidate', {
type: 'all',
timestamp: Date.now()
});
}
}

module.exports = new CacheInvalidationService();

متحكم المنتج الكامل

دمج كل شيء معاً بمتحكم كامل:

// src/controllers/productController.js
const productService = require('../services/productService');
const cacheInvalidationService = require('../services/cacheInvalidationService');
const emailQueue = require('../queues/emailQueue');

class ProductController {
/**
* قائمة جميع المنتجات
*/
async list(req, res, next) {
try {
const filters = {
category: req.query.category,
minPrice: parseFloat(req.query.minPrice),
maxPrice: parseFloat(req.query.maxPrice)
};

const products = await productService.getAll(filters);

res.json({
success: true,
count: products.length,
data: products
});
} catch (err) {
next(err);
}
}

/**
* الحصول على منتج واحد
*/
async get(req, res, next) {
try {
const product = await productService.getById(req.params.id);

res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}

/**
* إنشاء منتج جديد
*/
async create(req, res, next) {
try {
const product = await productService.create(req.body);

// إرسال بريد إلكتروني إشعاري (غير متزامن)
await emailQueue.add({
to: 'admin@example.com',
subject: 'تم إنشاء منتج جديد',
body: `تم إنشاء المنتج "${product.name}".`
});

res.status(201).json({
success: true,
data: product
});
} catch (err) {
next(err);
}
}

/**
* تحديث المنتج
*/
async update(req, res, next) {
try {
const product = await productService.update(req.params.id, req.body);

res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}

/**
* حذف المنتج
*/
async delete(req, res, next) {
try {
await productService.delete(req.params.id);

res.json({
success: true,
message: 'تم حذف المنتج بنجاح'
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
}

module.exports = new ProductController();
// src/routes/products.js
const express = require('express');
const router = express.Router();
const productController = require('../controllers/productController');
const { cacheMiddleware } = require('../middleware/cache');
const rateLimiter = require('../middleware/rateLimiter');

// قائمة المنتجات - مخزنة مؤقتاً لمدة 5 دقائق
router.get('/', cacheMiddleware(300), productController.list);

// الحصول على منتج واحد - مخزن مؤقتاً لمدة ساعة واحدة
router.get('/:id', cacheMiddleware(3600), productController.get);

// إنشاء منتج - محدد المعدل إلى 10 لكل ساعة
router.post('/', rateLimiter(10, 3600), productController.create);

// تحديث المنتج - محدد المعدل إلى 20 لكل ساعة
router.put('/:id', rateLimiter(20, 3600), productController.update);

// حذف المنتج - محدد المعدل إلى 5 لكل ساعة
router.delete('/:id', rateLimiter(5, 3600), productController.delete);

module.exports = router;
اعتبارات الإنتاج: استخدم متغيرات البيئة لجميع الإعدادات، نفذ تسجيل أخطاء مناسب، أضف وسيط مصادقة/تفويض، استخدم HTTPS في الإنتاج، راقب استخدام ذاكرة Redis، ونفذ قواطع الدوائر لفشل Redis.

نقطة نهاية المراقبة

إضافة نقطة نهاية إدارية لمراقبة أداء الذاكرة المؤقتة:

// src/routes/admin.js
const express = require('express');
const router = express.Router();
const cacheService = require('../services/cacheService');
const redisClient = require('../config/redis');

// مقاييس الذاكرة المؤقتة
router.get('/metrics/cache', async (req, res) => {
const metrics = cacheService.getMetrics();
const info = await redisClient.info('memory');

res.json({
cache: metrics,
redis: {
connected: redisClient.isReady,
memory: info
}
});
});

// إبطال الذاكرة المؤقتة (للمسؤول فقط)
router.post('/cache/invalidate', async (req, res) => {
const { pattern } = req.body;

if (!pattern) {
return res.status(400).json({ error: 'النمط مطلوب' });
}

const count = await cacheService.delPattern(pattern);

res.json({
success: true,
message: `تم إبطال ${count} مفاتيح`
});
});

module.exports = router;
تمرين: وسع المشروع بهذه الميزات:
  • أضف دعم WebSocket لدفع تحديثات المنتج في الوقت الفعلي للعملاء
  • نفذ استراتيجية تسخين ذاكرة مؤقتة تحمل مسبقاً المنتجات الشائعة عند بدء التشغيل
  • أنشئ لوحة تحكم تعرض معدلات نجاح الذاكرة المؤقتة، إحصائيات قائمة الانتظار، واستخدام ذاكرة Redis
  • أضف دعماً لعلامات الذاكرة المؤقتة لإبطال إدخالات الذاكرة المؤقتة ذات الصلة معاً
  • نفذ قاطع دائرة يتراجع إلى قاعدة البيانات عندما يكون Redis غير متاح
المشروع مكتمل: لقد بنيت طبقة تخزين مؤقت جاهزة للإنتاج مع إدارة جلسات Redis، تحديد معدل النافذة المنزلقة، إشعارات Pub/Sub في الوقت الفعلي، معالجة وظائف الخلفية مع قوائم انتظار Bull، إبطال ذاكرة مؤقتة موزع، ومراقبة شاملة. يمكن لهذه البنية التعامل مع آلاف الطلبات في الثانية بأوقات استجابة دون ميلي ثانية.