WebSockets والتطبيقات الفورية
تكامل قاعدة البيانات في الوقت الفعلي
مقدمة في تكامل قاعدة البيانات في الوقت الفعلي
توفر قواعد البيانات الحديثة قدرات وقت فعلي مدمجة تمكن التطبيقات من الاستجابة فوراً لتغييرات البيانات. يستكشف هذا الدرس ميزات قاعدة البيانات المختلفة لبناء أنظمة الوقت الفعلي بدون بنية تحتية مخصصة معقدة.
نظرة عامة على ميزات قاعدة البيانات في الوقت الفعلي
توفر قواعد البيانات المختلفة قدرات وقت فعلي مختلفة:
- MongoDB Change Streams: مراقبة المجموعات للتغييرات
- PostgreSQL LISTEN/NOTIFY: Pub/Sub مدمج في قاعدة البيانات
- Firebase Realtime Database: المزامنة في الوقت الفعلي كميزة أساسية
- محفزات قاعدة البيانات: تنفيذ التعليمات البرمجية عند تغيير البيانات
- Event Sourcing: تخزين جميع التغييرات كأحداث
الفائدة الرئيسية: تقلل ميزات الوقت الفعلي الأصلية لقاعدة البيانات من التعقيد عن طريق إزالة الحاجة إلى وسطاء رسائل منفصلة أو أنظمة أحداث.
MongoDB Change Streams
تسمح تدفقات التغيير للتطبيقات بمراقبة التغييرات في مجموعات MongoDB:
// تثبيت برنامج تشغيل MongoDB
// npm install mongodb
const { MongoClient } = require('mongodb');
// الاتصال بـ MongoDB
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
const collection = db.collection('messages');
// إنشاء تدفق تغيير
const changeStream = collection.watch();
// الاستماع للتغييرات
changeStream.on('change', (change) => {
console.log('تم الكشف عن تغيير:', change);
// أنواع التغيير: insert، update، replace، delete
switch (change.operationType) {
case 'insert':
console.log('وثيقة جديدة:', change.fullDocument);
// البث إلى عملاء WebSocket
io.emit('message_added', change.fullDocument);
break;
case 'update':
console.log('وثيقة محدثة:', change.documentKey);
io.emit('message_updated', {
id: change.documentKey._id,
changes: change.updateDescription.updatedFields
});
break;
case 'delete':
console.log('وثيقة محذوفة:', change.documentKey);
io.emit('message_deleted', { id: change.documentKey._id });
break;
}
});
// إدراج وثيقة (يشغل تدفق التغيير)
await collection.insertOne({
text: 'مرحبا بالعالم',
author: 'جون',
timestamp: new Date()
});
تصفية تدفقات التغيير
تصفية تدفقات التغيير لمراقبة تغييرات محددة:
// مراقبة عمليات الإدراج فقط
const insertStream = collection.watch([
{ $match: { operationType: 'insert' } }
]);
// مراقبة حقول محددة
const fieldStream = collection.watch([
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
]);
// مراقبة الوثائق المطابقة للمعايير
const userStream = collection.watch([
{
$match: {
$or: [
{ 'fullDocument.userId': 'user123' },
{ 'fullDocument.recipientId': 'user123' }
]
}
}
]);
userStream.on('change', (change) => {
// فقط التغييرات لـ user123
console.log('تغيير خاص بالمستخدم:', change);
});
أفضل ممارسة: استخدم مرشحات خط أنابيب التجميع في تدفقات التغيير لتقليل حركة مرور الشبكة وحمل المعالجة.
دردشة في الوقت الفعلي مع MongoDB Change Streams
const express = require('express');
const { Server } = require('socket.io');
const { MongoClient } = require('mongodb');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// اتصال MongoDB
const mongoClient = new MongoClient('mongodb://localhost:27017');
await mongoClient.connect();
const db = mongoClient.db('chat');
const messages = db.collection('messages');
// مراقبة الرسائل الجديدة
const changeStream = messages.watch([
{ $match: { operationType: 'insert' } }
]);
changeStream.on('change', (change) => {
const message = change.fullDocument;
// البث إلى جميع العملاء المتصلين
io.to(`room-${message.roomId}`).emit('new_message', {
id: message._id,
text: message.text,
author: message.author,
timestamp: message.timestamp
});
});
// اتصال Socket.io
io.on('connection', (socket) => {
socket.on('join_room', (roomId) => {
socket.join(`room-${roomId}`);
});
socket.on('send_message', async (data) => {
// الإدراج في MongoDB (سيشغل تدفق التغيير)
await messages.insertOne({
roomId: data.roomId,
text: data.text,
author: socket.userId,
timestamp: new Date()
});
});
});
PostgreSQL LISTEN/NOTIFY
يحتوي PostgreSQL على pub/sub مدمج مع LISTEN و NOTIFY:
// تثبيت برنامج تشغيل pg
// npm install pg
const { Client } = require('pg');
// اتصال الناشر
const publisher = new Client({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
await publisher.connect();
// اتصال المشترك
const subscriber = new Client({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
await subscriber.connect();
// الاستماع إلى قناة
await subscriber.query('LISTEN new_orders');
// معالجة الإشعارات
subscriber.on('notification', (msg) => {
console.log('إشعار على القناة:', msg.channel);
console.log('الحمولة:', msg.payload);
const data = JSON.parse(msg.payload);
io.emit('order_created', data);
});
// نشر إشعار
await publisher.query(
"NOTIFY new_orders, '{\"orderId\": 123, \"total\": 99.99}'"
);
محفزات قاعدة البيانات مع NOTIFY
إشعار تلقائي عند تغيير البيانات باستخدام المحفزات:
-- إنشاء دالة ترسل الإشعارات
CREATE OR REPLACE FUNCTION notify_order_change()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify(
'order_changes',
json_build_object(
'operation', 'insert',
'order_id', NEW.id,
'user_id', NEW.user_id,
'total', NEW.total
)::text
);
ELSIF TG_OP = 'UPDATE' THEN
PERFORM pg_notify(
'order_changes',
json_build_object(
'operation', 'update',
'order_id', NEW.id,
'status', NEW.status
)::text
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- إنشاء محفز على جدول الطلبات
CREATE TRIGGER order_change_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION notify_order_change();
Node.js مع محفزات PostgreSQL
const { Client } = require('pg');
const express = require('express');
const { Server } = require('socket.io');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// مستمع PostgreSQL
const listener = new Client({
host: 'localhost',
database: 'myapp',
user: 'postgres',
password: 'password'
});
await listener.connect();
await listener.query('LISTEN order_changes');
listener.on('notification', (msg) => {
const data = JSON.parse(msg.payload);
// الإصدار إلى مستخدم محدد
if (data.user_id) {
io.to(`user-${data.user_id}`).emit('order_update', data);
}
// الإصدار إلى لوحة المسؤول
io.to('admin').emit('order_change', data);
});
// نقطة نهاية API لإنشاء طلب
app.post('/orders', async (req, res) => {
const client = new Client({ /* config */ });
await client.connect();
// إدراج طلب (سيتم تشغيل المحفز تلقائياً)
const result = await client.query(
'INSERT INTO orders (user_id, total, status) VALUES ($1, $2, $3) RETURNING *',
[req.body.userId, req.body.total, 'pending']
);
await client.end();
res.json(result.rows[0]);
});
مهم: PostgreSQL LISTEN/NOTIFY لا يضمن التسليم إذا فقد الاتصال المستمع. للرسائل الحرجة، قم بالدمج مع الاستطلاع أو قوائم انتظار الرسائل.
مفهوم Firebase Realtime Database
يوفر Firebase المزامنة في الوقت الفعلي كميزة أساسية:
// تثبيت Firebase
// npm install firebase
import { initializeApp } from 'firebase/app';
import { getDatabase, ref, onValue, push, set } from 'firebase/database';
// تهيئة Firebase
const firebaseConfig = {
apiKey: 'your-api-key',
databaseURL: 'https://your-project.firebaseio.com'
};
const app = initializeApp(firebaseConfig);
const db = getDatabase(app);
// الاستماع للتحديثات في الوقت الفعلي
const messagesRef = ref(db, 'messages');
onValue(messagesRef, (snapshot) => {
const data = snapshot.val();
console.log('تم تحديث الرسائل:', data);
// تحديث واجهة المستخدم بالبيانات الجديدة
updateMessageList(data);
});
// إضافة رسالة جديدة (تتم المزامنة تلقائياً لجميع المستمعين)
const newMessageRef = push(messagesRef);
await set(newMessageRef, {
text: 'مرحبا بالعالم',
author: 'جون',
timestamp: Date.now()
});
أساسيات Event Sourcing
يخزن Event Sourcing جميع التغييرات كتسلسل من الأحداث:
// مخزن الأحداث (يمكن أن يكون MongoDB أو PostgreSQL، إلخ.)
class EventStore {
constructor(db) {
this.events = db.collection('events');
}
// إلحاق حدث
async append(event) {
const eventData = {
aggregateId: event.aggregateId,
eventType: event.type,
data: event.data,
timestamp: new Date(),
version: await this.getNextVersion(event.aggregateId)
};
const result = await this.events.insertOne(eventData);
// نشر الحدث للمستمعين في الوقت الفعلي
this.emit('event_appended', eventData);
return result;
}
// الحصول على جميع الأحداث لتجميع
async getEvents(aggregateId) {
return await this.events
.find({ aggregateId })
.sort({ version: 1 })
.toArray();
}
// إعادة بناء الحالة من الأحداث
async replayEvents(aggregateId) {
const events = await this.getEvents(aggregateId);
let state = {};
for (const event of events) {
state = this.applyEvent(state, event);
}
return state;
}
applyEvent(state, event) {
switch (event.eventType) {
case 'ORDER_CREATED':
return { ...state, ...event.data, status: 'pending' };
case 'ORDER_PAID':
return { ...state, status: 'paid', paidAt: event.timestamp };
case 'ORDER_SHIPPED':
return { ...state, status: 'shipped', shippedAt: event.timestamp };
default:
return state;
}
}
async getNextVersion(aggregateId) {
const result = await this.events
.find({ aggregateId })
.sort({ version: -1 })
.limit(1)
.toArray();
return result.length > 0 ? result[0].version + 1 : 1;
}
}
// الاستخدام
const eventStore = new EventStore(db);
// إنشاء طلب
await eventStore.append({
aggregateId: 'order-123',
type: 'ORDER_CREATED',
data: { userId: 'user-456', total: 99.99, items: [...] }
});
// تم استلام الدفع
await eventStore.append({
aggregateId: 'order-123',
type: 'ORDER_PAID',
data: { transactionId: 'txn-789' }
});
// الحصول على حالة الطلب الحالية
const orderState = await eventStore.replayEvents('order-123');
console.log('الطلب الحالي:', orderState);
فوائد Event Sourcing:
- مسار تدقيق كامل لجميع التغييرات
- القدرة على إعادة تشغيل الأحداث وإعادة بناء الحالة
- ملاءمة طبيعية للتحديثات في الوقت الفعلي (أحداث جديدة = إشعارات جديدة)
- استعلامات زمنية (ما كانت الحالة في الوقت X؟)
إسقاط الأحداث في الوقت الفعلي
// الاستماع إلى مخزن الأحداث والإسقاط إلى نماذج القراءة
eventStore.on('event_appended', async (event) => {
// تحديث نموذج القراءة
switch (event.eventType) {
case 'ORDER_CREATED':
await db.orders.insertOne({
_id: event.aggregateId,
...event.data,
status: 'pending',
createdAt: event.timestamp
});
// إشعار عملاء WebSocket
io.emit('order_created', {
orderId: event.aggregateId,
...event.data
});
break;
case 'ORDER_PAID':
await db.orders.updateOne(
{ _id: event.aggregateId },
{ $set: { status: 'paid', paidAt: event.timestamp } }
);
io.emit('order_paid', {
orderId: event.aggregateId
});
break;
}
});
دمج ميزات قاعدة البيانات مع WebSockets
const express = require('express');
const { Server } = require('socket.io');
const { MongoClient } = require('mongodb');
const { Client: PgClient } = require('pg');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// تدفقات تغيير MongoDB للرسائل
const mongoClient = new MongoClient('mongodb://localhost:27017');
await mongoClient.connect();
const messages = mongoClient.db('app').collection('messages');
messages.watch().on('change', (change) => {
if (change.operationType === 'insert') {
io.emit('new_message', change.fullDocument);
}
});
// PostgreSQL NOTIFY للطلبات
const pgClient = new PgClient({
host: 'localhost',
database: 'app',
user: 'postgres',
password: 'password'
});
await pgClient.connect();
await pgClient.query('LISTEN order_updates');
pgClient.on('notification', (msg) => {
const order = JSON.parse(msg.payload);
io.to(`user-${order.userId}`).emit('order_update', order);
});
console.log('تكامل قاعدة البيانات في الوقت الفعلي قيد التشغيل...');
تمرين:
- قم بإعداد MongoDB وإنشاء مراقب تدفق التغيير
- أنشئ نظام إشعارات في الوقت الفعلي:
- إدراج الإشعارات في MongoDB
- مراقبة التغييرات بتدفقات التغيير
- البث إلى عملاء Socket.io
- أضف تصفية لإرسال الإشعارات فقط للمستخدمين ذوي الصلة
- (اختياري) قم بإعداد PostgreSQL وتنفيذ LISTEN/NOTIFY
- أنشئ نظام Event Sourcing بسيط مع إعادة تشغيل الأحداث
ملخص
تبسط ميزات الوقت الفعلي الأصلية لقاعدة البيانات أنظمة الوقت الفعلي:
- MongoDB Change Streams تراقب المجموعات للتغييرات
- PostgreSQL LISTEN/NOTIFY يوفر pub/sub في قاعدة البيانات
- يمكن لمحفزات قاعدة البيانات إشعار التطبيقات تلقائياً
- يوفر Firebase المزامنة في الوقت الفعلي كميزة أساسية
- ينشئ Event Sourcing تدفقات أحداث طبيعية في الوقت الفعلي
- دمج ميزات قاعدة البيانات مع WebSockets ينشئ تطبيقات وقت فعلي قوية