Redis & Advanced Caching
Building a Caching Layer Project (Part 2)
Building a Caching Layer Project (Part 2)
In Part 2, we'll extend our caching layer with session management, rate limiting, real-time features using Pub/Sub, job queue integration, and cache invalidation events.
Session Management with Redis
Implement distributed session storage using Redis:
// Install session packages
// npm install express-session connect-redis
// src/config/session.js
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const redisClient = require('./redis');
const sessionConfig = {
store: new RedisStore({
client: redisClient,
prefix: 'sess:' // Prefix for session keys
}),
secret: process.env.SESSION_SECRET || 'your-secret-key',
resave: false,
saveUninitialized: false,
cookie: {
secure: process.env.NODE_ENV === 'production', // HTTPS only in production
httpOnly: true, // Prevent XSS attacks
maxAge: 1000 * 60 * 60 * 24 // 24 hours
},
rolling: true // Reset cookie maxAge on every response
};
module.exports = session(sessionConfig);
// npm install express-session connect-redis
// src/config/session.js
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const redisClient = require('./redis');
const sessionConfig = {
store: new RedisStore({
client: redisClient,
prefix: 'sess:' // Prefix for session keys
}),
secret: process.env.SESSION_SECRET || 'your-secret-key',
resave: false,
saveUninitialized: false,
cookie: {
secure: process.env.NODE_ENV === 'production', // HTTPS only in production
httpOnly: true, // Prevent XSS attacks
maxAge: 1000 * 60 * 60 * 24 // 24 hours
},
rolling: true // Reset cookie maxAge on every response
};
module.exports = session(sessionConfig);
// Add to src/app.js
const sessionMiddleware = require('./config/session');
// Use session middleware
app.use(sessionMiddleware);
// Session-based cart example
app.post('/api/cart/add', (req, res) => {
const { productId, quantity } = req.body;
// Initialize cart if it doesn't exist
if (!req.session.cart) {
req.session.cart = [];
}
// Add item to cart
const existingItem = req.session.cart.find(item => item.productId === productId);
if (existingItem) {
existingItem.quantity += quantity;
} else {
req.session.cart.push({ productId, quantity });
}
res.json({ cart: req.session.cart });
});
app.get('/api/cart', (req, res) => {
res.json({ cart: req.session.cart || [] });
});
const sessionMiddleware = require('./config/session');
// Use session middleware
app.use(sessionMiddleware);
// Session-based cart example
app.post('/api/cart/add', (req, res) => {
const { productId, quantity } = req.body;
// Initialize cart if it doesn't exist
if (!req.session.cart) {
req.session.cart = [];
}
// Add item to cart
const existingItem = req.session.cart.find(item => item.productId === productId);
if (existingItem) {
existingItem.quantity += quantity;
} else {
req.session.cart.push({ productId, quantity });
}
res.json({ cart: req.session.cart });
});
app.get('/api/cart', (req, res) => {
res.json({ cart: req.session.cart || [] });
});
Session Benefits: Using Redis for sessions provides distributed session storage, automatic expiration, persistence across server restarts, and scalability for multiple application instances behind a load balancer.
Rate Limiting with Redis
Implement sliding window rate limiting to protect your API:
// src/middleware/rateLimiter.js
const redisClient = require('../config/redis');
/**
* Sliding window rate limiter
* @param {number} maxRequests - Maximum requests allowed
* @param {number} windowSeconds - Time window in seconds
*/
const rateLimiter = (maxRequests = 100, windowSeconds = 60) => {
return async (req, res, next) => {
// Use IP address or user ID as identifier
const identifier = req.session?.userId || req.ip;
const key = `rate:${identifier}`;
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);
try {
// Use Redis sorted set for sliding window
const multi = redisClient.multi();
// Remove old entries outside the window
multi.zRemRangeByScore(key, 0, windowStart);
// Count current requests in window
multi.zCard(key);
// Add current request
multi.zAdd(key, { score: now, value: `${now}` });
// Set expiration
multi.expire(key, windowSeconds);
const results = await multi.exec();
const currentRequests = results[1] as number;
// Set rate limit headers
res.set({
'X-RateLimit-Limit': maxRequests,
'X-RateLimit-Remaining': Math.max(0, maxRequests - currentRequests - 1),
'X-RateLimit-Reset': new Date(now + windowSeconds * 1000).toISOString()
});
if (currentRequests >= maxRequests) {
return res.status(429).json({
error: 'Too many requests',
retryAfter: windowSeconds
});
}
next();
} catch (err) {
console.error('Rate limiter error:', err);
// Fail open - allow request if Redis is down
next();
}
};
};
module.exports = rateLimiter;
const redisClient = require('../config/redis');
/**
* Sliding window rate limiter
* @param {number} maxRequests - Maximum requests allowed
* @param {number} windowSeconds - Time window in seconds
*/
const rateLimiter = (maxRequests = 100, windowSeconds = 60) => {
return async (req, res, next) => {
// Use IP address or user ID as identifier
const identifier = req.session?.userId || req.ip;
const key = `rate:${identifier}`;
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);
try {
// Use Redis sorted set for sliding window
const multi = redisClient.multi();
// Remove old entries outside the window
multi.zRemRangeByScore(key, 0, windowStart);
// Count current requests in window
multi.zCard(key);
// Add current request
multi.zAdd(key, { score: now, value: `${now}` });
// Set expiration
multi.expire(key, windowSeconds);
const results = await multi.exec();
const currentRequests = results[1] as number;
// Set rate limit headers
res.set({
'X-RateLimit-Limit': maxRequests,
'X-RateLimit-Remaining': Math.max(0, maxRequests - currentRequests - 1),
'X-RateLimit-Reset': new Date(now + windowSeconds * 1000).toISOString()
});
if (currentRequests >= maxRequests) {
return res.status(429).json({
error: 'Too many requests',
retryAfter: windowSeconds
});
}
next();
} catch (err) {
console.error('Rate limiter error:', err);
// Fail open - allow request if Redis is down
next();
}
};
};
module.exports = rateLimiter;
// Apply rate limiting to routes
const rateLimiter = require('./middleware/rateLimiter');
// Global rate limit: 1000 requests per 15 minutes
app.use('/api/', rateLimiter(1000, 900));
// Stricter limit for expensive operations
app.post('/api/products', rateLimiter(10, 60), productController.create);
// Different limits for authenticated users
app.get('/api/products', (req, res, next) => {
const limit = req.session.userId ? 1000 : 100;
return rateLimiter(limit, 3600)(req, res, next);
}, productController.list);
const rateLimiter = require('./middleware/rateLimiter');
// Global rate limit: 1000 requests per 15 minutes
app.use('/api/', rateLimiter(1000, 900));
// Stricter limit for expensive operations
app.post('/api/products', rateLimiter(10, 60), productController.create);
// Different limits for authenticated users
app.get('/api/products', (req, res, next) => {
const limit = req.session.userId ? 1000 : 100;
return rateLimiter(limit, 3600)(req, res, next);
}, productController.list);
Real-Time Features with Pub/Sub
Use Redis Pub/Sub for real-time notifications and cache synchronization:
// src/services/notificationService.js
const redisClient = require('../config/redis');
class NotificationService {
constructor() {
// Create separate clients for pub and sub
this.publisher = redisClient.duplicate();
this.subscriber = redisClient.duplicate();
this.listeners = new Map();
this.setupSubscriber();
}
async setupSubscriber() {
await this.subscriber.connect();
this.subscriber.on('message', (channel, message) => {
const listeners = this.listeners.get(channel) || [];
const data = JSON.parse(message);
listeners.forEach(listener => {
try {
listener(data);
} catch (err) {
console.error('Listener error:', err);
}
});
});
}
/**
* Subscribe to a channel
*/
async subscribe(channel, listener) {
if (!this.listeners.has(channel)) {
this.listeners.set(channel, []);
await this.subscriber.subscribe(channel);
}
this.listeners.get(channel).push(listener);
}
/**
* Publish message to channel
*/
async publish(channel, data) {
if (!this.publisher.isReady) {
await this.publisher.connect();
}
await this.publisher.publish(channel, JSON.stringify(data));
}
/**
* Unsubscribe from channel
*/
async unsubscribe(channel) {
await this.subscriber.unsubscribe(channel);
this.listeners.delete(channel);
}
}
module.exports = new NotificationService();
const redisClient = require('../config/redis');
class NotificationService {
constructor() {
// Create separate clients for pub and sub
this.publisher = redisClient.duplicate();
this.subscriber = redisClient.duplicate();
this.listeners = new Map();
this.setupSubscriber();
}
async setupSubscriber() {
await this.subscriber.connect();
this.subscriber.on('message', (channel, message) => {
const listeners = this.listeners.get(channel) || [];
const data = JSON.parse(message);
listeners.forEach(listener => {
try {
listener(data);
} catch (err) {
console.error('Listener error:', err);
}
});
});
}
/**
* Subscribe to a channel
*/
async subscribe(channel, listener) {
if (!this.listeners.has(channel)) {
this.listeners.set(channel, []);
await this.subscriber.subscribe(channel);
}
this.listeners.get(channel).push(listener);
}
/**
* Publish message to channel
*/
async publish(channel, data) {
if (!this.publisher.isReady) {
await this.publisher.connect();
}
await this.publisher.publish(channel, JSON.stringify(data));
}
/**
* Unsubscribe from channel
*/
async unsubscribe(channel) {
await this.subscriber.unsubscribe(channel);
this.listeners.delete(channel);
}
}
module.exports = new NotificationService();
// Real-time product updates example
const notificationService = require('./services/notificationService');
// Subscribe to product updates
notificationService.subscribe('product:updates', (data) => {
console.log('Product updated:', data);
// Send WebSocket notification to connected clients
// Invalidate local cache
// Update search index
});
// Publish when product is updated
class ProductService {
async update(id, updates) {
const product = await Product.findByIdAndUpdate(id, updates, { new: true });
// Invalidate cache
await cacheService.del(`products:${id}`);
await cacheService.delPattern('products:list:*');
// Publish update event
await notificationService.publish('product:updates', {
type: 'update',
productId: id,
product,
timestamp: Date.now()
});
return product;
}
}
const notificationService = require('./services/notificationService');
// Subscribe to product updates
notificationService.subscribe('product:updates', (data) => {
console.log('Product updated:', data);
// Send WebSocket notification to connected clients
// Invalidate local cache
// Update search index
});
// Publish when product is updated
class ProductService {
async update(id, updates) {
const product = await Product.findByIdAndUpdate(id, updates, { new: true });
// Invalidate cache
await cacheService.del(`products:${id}`);
await cacheService.delPattern('products:list:*');
// Publish update event
await notificationService.publish('product:updates', {
type: 'update',
productId: id,
product,
timestamp: Date.now()
});
return product;
}
}
Job Queue Integration
Use Redis as a message queue for background jobs:
// Install Bull queue
// npm install bull
// src/queues/cacheWarmingQueue.js
const Queue = require('bull');
const redisConfig = {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD
};
const cacheWarmingQueue = new Queue('cache-warming', {
redis: redisConfig
});
// Process cache warming jobs
cacheWarmingQueue.process(async (job) => {
const { type, data } = job.data;
console.log(`Processing cache warming job: ${type}`);
switch (type) {
case 'products':
await warmProductCache();
break;
case 'categories':
await warmCategoryCache();
break;
default:
console.warn(`Unknown job type: ${type}`);
}
return { success: true };
});
// Warm product cache
async function warmProductCache() {
const products = await Product.find({ isActive: true }).limit(100).lean();
for (const product of products) {
await cacheService.set(
`products:${product._id}`,
product,
3600
);
}
console.log(`Warmed cache for ${products.length} products`);
}
// Schedule recurring cache warming
cacheWarmingQueue.add(
{ type: 'products' },
{
repeat: {
cron: '0 */6 * * *' // Every 6 hours
}
}
);
module.exports = cacheWarmingQueue;
// npm install bull
// src/queues/cacheWarmingQueue.js
const Queue = require('bull');
const redisConfig = {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD
};
const cacheWarmingQueue = new Queue('cache-warming', {
redis: redisConfig
});
// Process cache warming jobs
cacheWarmingQueue.process(async (job) => {
const { type, data } = job.data;
console.log(`Processing cache warming job: ${type}`);
switch (type) {
case 'products':
await warmProductCache();
break;
case 'categories':
await warmCategoryCache();
break;
default:
console.warn(`Unknown job type: ${type}`);
}
return { success: true };
});
// Warm product cache
async function warmProductCache() {
const products = await Product.find({ isActive: true }).limit(100).lean();
for (const product of products) {
await cacheService.set(
`products:${product._id}`,
product,
3600
);
}
console.log(`Warmed cache for ${products.length} products`);
}
// Schedule recurring cache warming
cacheWarmingQueue.add(
{ type: 'products' },
{
repeat: {
cron: '0 */6 * * *' // Every 6 hours
}
}
);
module.exports = cacheWarmingQueue;
// src/queues/emailQueue.js
const Queue = require('bull');
const emailQueue = new Queue('email', {
redis: redisConfig,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true
}
});
emailQueue.process(async (job) => {
const { to, subject, body } = job.data;
// Send email using your email service
console.log(`Sending email to ${to}`);
// await emailService.send({ to, subject, body });
return { sent: true };
});
// Monitor queue events
emailQueue.on('completed', (job) => {
console.log(`Email job ${job.id} completed`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Email job ${job.id} failed:`, err);
});
module.exports = emailQueue;
const Queue = require('bull');
const emailQueue = new Queue('email', {
redis: redisConfig,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true
}
});
emailQueue.process(async (job) => {
const { to, subject, body } = job.data;
// Send email using your email service
console.log(`Sending email to ${to}`);
// await emailService.send({ to, subject, body });
return { sent: true };
});
// Monitor queue events
emailQueue.on('completed', (job) => {
console.log(`Email job ${job.id} completed`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Email job ${job.id} failed:`, err);
});
module.exports = emailQueue;
Queue Best Practices: Use separate Redis instances for cache and queues in production, implement job retry logic with exponential backoff, monitor queue health and job failures, and clean up completed jobs regularly.
Cache Invalidation Events
Implement an event-driven cache invalidation system:
// src/services/cacheInvalidationService.js
const notificationService = require('./notificationService');
const cacheService = require('./cacheService');
class CacheInvalidationService {
constructor() {
this.setupListeners();
}
setupListeners() {
// Listen for cache invalidation events
notificationService.subscribe('cache:invalidate', async (event) => {
console.log('Cache invalidation event:', event);
switch (event.type) {
case 'key':
await cacheService.del(event.key);
break;
case 'pattern':
await cacheService.delPattern(event.pattern);
break;
case 'all':
await this.invalidateAll();
break;
}
});
}
/**
* Invalidate specific cache key across all instances
*/
async invalidateKey(key) {
await cacheService.del(key);
await notificationService.publish('cache:invalidate', {
type: 'key',
key,
timestamp: Date.now()
});
}
/**
* Invalidate cache keys matching pattern
*/
async invalidatePattern(pattern) {
await cacheService.delPattern(pattern);
await notificationService.publish('cache:invalidate', {
type: 'pattern',
pattern,
timestamp: Date.now()
});
}
/**
* Invalidate all cache
*/
async invalidateAll() {
const keys = await redisClient.keys('cache:*');
if (keys.length > 0) {
await redisClient.del(keys);
}
await notificationService.publish('cache:invalidate', {
type: 'all',
timestamp: Date.now()
});
}
}
module.exports = new CacheInvalidationService();
const notificationService = require('./notificationService');
const cacheService = require('./cacheService');
class CacheInvalidationService {
constructor() {
this.setupListeners();
}
setupListeners() {
// Listen for cache invalidation events
notificationService.subscribe('cache:invalidate', async (event) => {
console.log('Cache invalidation event:', event);
switch (event.type) {
case 'key':
await cacheService.del(event.key);
break;
case 'pattern':
await cacheService.delPattern(event.pattern);
break;
case 'all':
await this.invalidateAll();
break;
}
});
}
/**
* Invalidate specific cache key across all instances
*/
async invalidateKey(key) {
await cacheService.del(key);
await notificationService.publish('cache:invalidate', {
type: 'key',
key,
timestamp: Date.now()
});
}
/**
* Invalidate cache keys matching pattern
*/
async invalidatePattern(pattern) {
await cacheService.delPattern(pattern);
await notificationService.publish('cache:invalidate', {
type: 'pattern',
pattern,
timestamp: Date.now()
});
}
/**
* Invalidate all cache
*/
async invalidateAll() {
const keys = await redisClient.keys('cache:*');
if (keys.length > 0) {
await redisClient.del(keys);
}
await notificationService.publish('cache:invalidate', {
type: 'all',
timestamp: Date.now()
});
}
}
module.exports = new CacheInvalidationService();
Complete Product Controller
Putting it all together with a complete controller:
// src/controllers/productController.js
const productService = require('../services/productService');
const cacheInvalidationService = require('../services/cacheInvalidationService');
const emailQueue = require('../queues/emailQueue');
class ProductController {
/**
* List all products
*/
async list(req, res, next) {
try {
const filters = {
category: req.query.category,
minPrice: parseFloat(req.query.minPrice),
maxPrice: parseFloat(req.query.maxPrice)
};
const products = await productService.getAll(filters);
res.json({
success: true,
count: products.length,
data: products
});
} catch (err) {
next(err);
}
}
/**
* Get single product
*/
async get(req, res, next) {
try {
const product = await productService.getById(req.params.id);
res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
/**
* Create new product
*/
async create(req, res, next) {
try {
const product = await productService.create(req.body);
// Send notification email (async)
await emailQueue.add({
to: 'admin@example.com',
subject: 'New Product Created',
body: `Product "${product.name}" has been created.`
});
res.status(201).json({
success: true,
data: product
});
} catch (err) {
next(err);
}
}
/**
* Update product
*/
async update(req, res, next) {
try {
const product = await productService.update(req.params.id, req.body);
res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
/**
* Delete product
*/
async delete(req, res, next) {
try {
await productService.delete(req.params.id);
res.json({
success: true,
message: 'Product deleted successfully'
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
}
module.exports = new ProductController();
const productService = require('../services/productService');
const cacheInvalidationService = require('../services/cacheInvalidationService');
const emailQueue = require('../queues/emailQueue');
class ProductController {
/**
* List all products
*/
async list(req, res, next) {
try {
const filters = {
category: req.query.category,
minPrice: parseFloat(req.query.minPrice),
maxPrice: parseFloat(req.query.maxPrice)
};
const products = await productService.getAll(filters);
res.json({
success: true,
count: products.length,
data: products
});
} catch (err) {
next(err);
}
}
/**
* Get single product
*/
async get(req, res, next) {
try {
const product = await productService.getById(req.params.id);
res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
/**
* Create new product
*/
async create(req, res, next) {
try {
const product = await productService.create(req.body);
// Send notification email (async)
await emailQueue.add({
to: 'admin@example.com',
subject: 'New Product Created',
body: `Product "${product.name}" has been created.`
});
res.status(201).json({
success: true,
data: product
});
} catch (err) {
next(err);
}
}
/**
* Update product
*/
async update(req, res, next) {
try {
const product = await productService.update(req.params.id, req.body);
res.json({
success: true,
data: product
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
/**
* Delete product
*/
async delete(req, res, next) {
try {
await productService.delete(req.params.id);
res.json({
success: true,
message: 'Product deleted successfully'
});
} catch (err) {
if (err.message === 'Product not found') {
return res.status(404).json({ error: err.message });
}
next(err);
}
}
}
module.exports = new ProductController();
// src/routes/products.js
const express = require('express');
const router = express.Router();
const productController = require('../controllers/productController');
const { cacheMiddleware } = require('../middleware/cache');
const rateLimiter = require('../middleware/rateLimiter');
// List products - cached for 5 minutes
router.get('/', cacheMiddleware(300), productController.list);
// Get single product - cached for 1 hour
router.get('/:id', cacheMiddleware(3600), productController.get);
// Create product - rate limited to 10 per hour
router.post('/', rateLimiter(10, 3600), productController.create);
// Update product - rate limited to 20 per hour
router.put('/:id', rateLimiter(20, 3600), productController.update);
// Delete product - rate limited to 5 per hour
router.delete('/:id', rateLimiter(5, 3600), productController.delete);
module.exports = router;
const express = require('express');
const router = express.Router();
const productController = require('../controllers/productController');
const { cacheMiddleware } = require('../middleware/cache');
const rateLimiter = require('../middleware/rateLimiter');
// List products - cached for 5 minutes
router.get('/', cacheMiddleware(300), productController.list);
// Get single product - cached for 1 hour
router.get('/:id', cacheMiddleware(3600), productController.get);
// Create product - rate limited to 10 per hour
router.post('/', rateLimiter(10, 3600), productController.create);
// Update product - rate limited to 20 per hour
router.put('/:id', rateLimiter(20, 3600), productController.update);
// Delete product - rate limited to 5 per hour
router.delete('/:id', rateLimiter(5, 3600), productController.delete);
module.exports = router;
Production Considerations: Use environment variables for all configuration, implement proper error logging, add authentication/authorization middleware, use HTTPS in production, monitor Redis memory usage, and implement circuit breakers for Redis failures.
Monitoring Endpoint
Add an admin endpoint to monitor cache performance:
// src/routes/admin.js
const express = require('express');
const router = express.Router();
const cacheService = require('../services/cacheService');
const redisClient = require('../config/redis');
// Cache metrics
router.get('/metrics/cache', async (req, res) => {
const metrics = cacheService.getMetrics();
const info = await redisClient.info('memory');
res.json({
cache: metrics,
redis: {
connected: redisClient.isReady,
memory: info
}
});
});
// Invalidate cache (admin only)
router.post('/cache/invalidate', async (req, res) => {
const { pattern } = req.body;
if (!pattern) {
return res.status(400).json({ error: 'Pattern required' });
}
const count = await cacheService.delPattern(pattern);
res.json({
success: true,
message: `Invalidated ${count} keys`
});
});
module.exports = router;
const express = require('express');
const router = express.Router();
const cacheService = require('../services/cacheService');
const redisClient = require('../config/redis');
// Cache metrics
router.get('/metrics/cache', async (req, res) => {
const metrics = cacheService.getMetrics();
const info = await redisClient.info('memory');
res.json({
cache: metrics,
redis: {
connected: redisClient.isReady,
memory: info
}
});
});
// Invalidate cache (admin only)
router.post('/cache/invalidate', async (req, res) => {
const { pattern } = req.body;
if (!pattern) {
return res.status(400).json({ error: 'Pattern required' });
}
const count = await cacheService.delPattern(pattern);
res.json({
success: true,
message: `Invalidated ${count} keys`
});
});
module.exports = router;
Exercise: Extend the project with these features:
- Add WebSocket support to push real-time product updates to clients
- Implement a cache warming strategy that preloads popular products on startup
- Create a dashboard that displays cache hit rates, queue statistics, and Redis memory usage
- Add support for cache tags to invalidate related cache entries together
- Implement a circuit breaker that falls back to database when Redis is unavailable
Project Complete: You've built a production-ready caching layer with Redis session management, sliding window rate limiting, real-time Pub/Sub notifications, background job processing with Bull queues, distributed cache invalidation, and comprehensive monitoring. This architecture can handle thousands of requests per second with sub-millisecond response times.