فهم الاتصال في الوقت الفعلي
تتبع واجهات برمجة التطبيقات REST التقليدية نمط طلب-استجابة حيث يجب على العملاء استطلاع الخادم للحصول على التحديثات. تتيح واجهات برمجة التطبيقات في الوقت الفعلي الاتصال ثنائي الاتجاه، مما يسمح للخوادم بدفع التحديثات إلى العملاء على الفور. هذا ضروري للتطبيقات مثل لوحات المعلومات المباشرة وتطبيقات الدردشة والأدوات التعاونية والإشعارات والتحليلات في الوقت الفعلي.
تقنيات الاتصال في الوقت الفعلي
ثلاثة نهج رئيسية:
- الاستطلاع: يطلب العميل البيانات بشكل متكرر على فترات (بسيط لكن غير فعال)
- الأحداث المرسلة من الخادم (SSE): يدفع الخادم البيانات إلى العميل عبر HTTP (أحادي الاتجاه)
- WebSockets: اتصال ثنائي الاتجاه كامل الازدواج (الأكثر مرونة لكن معقد)
الأحداث المرسلة من الخادم (SSE)
توفر SSE طريقة بسيطة وموحدة للخوادم لدفع البيانات إلى العملاء عبر HTTP. على عكس WebSockets، SSE أحادية الاتجاه (من الخادم إلى العميل فقط) وتستخدم اتصالات HTTP العادية.
مزايا SSE
- مبنية على HTTP/HTTPS القياسي (تعمل عبر جدران الحماية والوكلاء)
- إعادة الاتصال التلقائية مع فترات إعادة محاولة قابلة للتكوين
- معرفات الأحداث لتتبع واستئناف التدفقات
- تنفيذ بسيط مقارنة بـ WebSockets
- دعم المتصفح الأصلي عبر EventSource API
تنفيذ SSE - جانب الخادم (Node.js)
// نقطة نهاية SSE في Express.js
const express = require('express');
const app = express();
// نقطة نهاية SSE للإشعارات المباشرة
app.get('/api/notifications/stream', (req, res) => {
// تعيين رؤوس SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// إرسال تعليق أولي لإنشاء الاتصال
res.write(': تم إنشاء اتصال SSE\n\n');
// الحصول على معرف المستخدم من معلمات الاستعلام
const userId = req.query.userId;
// إرسال رسالة ترحيب
const welcomeData = {
id: Date.now(),
type: 'connected',
message: 'متصل بتدفق الإشعارات',
timestamp: new Date().toISOString()
};
res.write(`id: ${welcomeData.id}\n`);
res.write(`event: connected\n`);
res.write(`data: ${JSON.stringify(welcomeData)}\n\n`);
// إرسال نبض كل 30 ثانية للحفاظ على الاتصال نشطًا
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// محاكاة إرسال الإشعارات
const notificationInterval = setInterval(() => {
const notification = {
id: Date.now(),
userId: userId,
type: 'info',
title: 'تحديث جديد',
message: `تم استلام التحديث في ${new Date().toLocaleTimeString()}`,
timestamp: new Date().toISOString()
};
res.write(`id: ${notification.id}\n`);
res.write(`event: notification\n`);
res.write(`data: ${JSON.stringify(notification)}\n\n`);
}, 10000); // إرسال إشعار كل 10 ثوانٍ
// التنظيف عند قطع اتصال العميل
req.on('close', () => {
console.log(`العميل ${userId} قطع الاتصال من SSE`);
clearInterval(heartbeatInterval);
clearInterval(notificationInterval);
res.end();
});
});
تنسيق رسالة SSE: تتكون كل رسالة SSE من حقول مفصولة بأسطر جديدة. تشمل الحقول الرئيسية id: (معرف الرسالة الفريد)، event: (نوع الحدث)، data: (حمولة الرسالة)، و retry: (وقت إعادة الاتصال بالميلي ثانية).
تنفيذ SSE - جانب العميل (JavaScript)
// SSE من جانب العميل مع EventSource API
class NotificationStream {
constructor(userId) {
this.userId = userId;
this.eventSource = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
const url = `/api/notifications/stream?userId=${this.userId}`;
this.eventSource = new EventSource(url);
// الاستماع لفتح الاتصال
this.eventSource.onopen = (event) => {
console.log('تم فتح اتصال SSE');
this.reconnectAttempts = 0; // إعادة تعيين عند نجاح الاتصال
};
// الاستماع لأحداث 'notification' المخصصة
this.eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data);
console.log('تم استلام الإشعار:', notification);
this.handleNotification(notification);
});
// الاستماع لأحداث 'connected'
this.eventSource.addEventListener('connected', (event) => {
const data = JSON.parse(event.data);
console.log('تم إنشاء الاتصال:', data.message);
});
// معالجة الأخطاء وإعادة الاتصال
this.eventSource.onerror = (error) => {
console.error('خطأ SSE:', error);
if (this.eventSource.readyState === EventSource.CLOSED) {
console.log('اتصال SSE مغلق');
this.handleReconnect();
}
};
// الاستماع للرسائل العامة (الرسائل بدون نوع حدث)
this.eventSource.onmessage = (event) => {
console.log('رسالة عامة:', event.data);
};
}
handleNotification(notification) {
// عرض الإشعار للمستخدم
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.message,
icon: '/icons/notification.png',
timestamp: new Date(notification.timestamp).getTime()
});
}
// تحديث واجهة المستخدم بالإشعار
this.updateNotificationUI(notification);
}
updateNotificationUI(notification) {
const notificationList = document.getElementById('notification-list');
const notificationElement = document.createElement('div');
notificationElement.className = 'notification-item';
notificationElement.innerHTML = `
<div class="notification-header">
<strong>${notification.title}</strong>
<span class="notification-time">${new Date(notification.timestamp).toLocaleTimeString()}</span>
</div>
<div class="notification-body">${notification.message}</div>
`;
notificationList.prepend(notificationElement);
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`إعادة الاتصال في ${delay}ms (المحاولة ${this.reconnectAttempts})`);
setTimeout(() => {
console.log('محاولة إعادة الاتصال...');
this.connect();
}, delay);
} else {
console.error('تم الوصول إلى الحد الأقصى لمحاولات إعادة الاتصال');
this.showReconnectButton();
}
}
showReconnectButton() {
const button = document.getElementById('reconnect-button');
button.style.display = 'block';
button.onclick = () => {
this.reconnectAttempts = 0;
this.connect();
button.style.display = 'none';
};
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
console.log('تم إغلاق اتصال SSE بواسطة العميل');
}
}
}
// الاستخدام
const notificationStream = new NotificationStream(123);
notificationStream.connect();
// قطع الاتصال عندما يغادر المستخدم الصفحة
window.addEventListener('beforeunload', () => {
notificationStream.disconnect();
});
SSE المتقدم: البث إلى عملاء متعددين
// SSE مع Redis pub/sub للبث متعدد الخوادم
const express = require('express');
const Redis = require('ioredis');
const app = express();
const redis = new Redis();
const subscriber = new Redis();
// تخزين اتصالات SSE النشطة
const connections = new Map();
// الاشتراك في قناة Redis
subscriber.subscribe('notifications', (err, count) => {
if (err) {
console.error('فشل الاشتراك:', err);
} else {
console.log(`تم الاشتراك في ${count} قناة`);
}
});
// معالجة الرسائل الواردة من Redis
subscriber.on('message', (channel, message) => {
if (channel === 'notifications') {
const notification = JSON.parse(message);
broadcastToUser(notification.userId, notification);
}
});
// نقطة نهاية SSE
app.get('/api/notifications/stream', (req, res) => {
const userId = req.query.userId;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// تخزين الاتصال
if (!connections.has(userId)) {
connections.set(userId, []);
}
connections.get(userId).push(res);
console.log(`المستخدم ${userId} متصل. إجمالي الاتصالات: ${connections.get(userId).length}`);
// التنظيف عند قطع الاتصال
req.on('close', () => {
const userConnections = connections.get(userId);
const index = userConnections.indexOf(res);
if (index !== -1) {
userConnections.splice(index, 1);
}
if (userConnections.length === 0) {
connections.delete(userId);
}
console.log(`المستخدم ${userId} قطع الاتصال`);
});
});
// دالة للبث إلى مستخدم محدد
function broadcastToUser(userId, notification) {
const userConnections = connections.get(userId.toString());
if (!userConnections) return;
const message = `id: ${notification.id}\nevent: notification\ndata: ${JSON.stringify(notification)}\n\n`;
userConnections.forEach((res) => {
try {
res.write(message);
} catch (error) {
console.error('خطأ في الكتابة إلى الاتصال:', error);
}
});
}
// نقطة نهاية API لنشر الإشعار
app.post('/api/notifications', express.json(), async (req, res) => {
const notification = {
id: Date.now(),
userId: req.body.userId,
title: req.body.title,
message: req.body.message,
timestamp: new Date().toISOString()
};
// النشر إلى Redis (سيتم استلامه من قبل جميع نسخ الخادم)
await redis.publish('notifications', JSON.stringify(notification));
res.json({ success: true, notification });
});
قيود SSE: لدى SSE حد أقصى للاتصال (عادةً 6 لكل متصفح لـ HTTP/1.1). استخدم HTTP/2 لحدود أعلى. SSE أحادية الاتجاه؛ إذا كنت بحاجة إلى اتصال من العميل إلى الخادم، فكر في WebSockets أو ادمج SSE مع طلبات HTTP العادية.
WebSockets
توفر WebSockets قنوات اتصال كاملة الازدواج عبر اتصال TCP واحد. على عكس SSE، تدعم WebSockets تدفق البيانات ثنائي الاتجاه ويمكنها إرسال بيانات نصية وثنائية.
متى تستخدم WebSockets
- تطبيقات الدردشة التي تتطلب رسائل ثنائية الاتجاه في الوقت الفعلي
- التحرير التعاوني (تطبيقات على نمط Google Docs)
- الألعاب متعددة اللاعبين
- منصات التداول في الوقت الفعلي
- عناصر تحكم بث الفيديو/الصوت المباشر
تنفيذ WebSocket - جانب الخادم (Node.js مع ws)
// خادم WebSocket مع المصادقة
const WebSocket = require('ws');
const http = require('http');
const express = require('express');
const jwt = require('jsonwebtoken');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ noServer: true });
// تخزين الاتصالات النشطة
const clients = new Map();
// معالجة ترقية WebSocket
server.on('upgrade', (request, socket, head) => {
// استخراج الرمز من سلسلة الاستعلام
const url = new URL(request.url, `http://${request.headers.host}`);
const token = url.searchParams.get('token');
// التحقق من المصادقة
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
request.userId = decoded.userId;
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
} catch (error) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
}
});
// معالجة اتصالات WebSocket الجديدة
wss.on('connection', (ws, request) => {
const userId = request.userId;
console.log(`المستخدم ${userId} متصل عبر WebSocket`);
// تخزين الاتصال
clients.set(userId, ws);
// إرسال رسالة ترحيب
ws.send(JSON.stringify({
type: 'connected',
message: 'تم إنشاء اتصال WebSocket',
userId: userId,
timestamp: new Date().toISOString()
}));
// معالجة الرسائل الواردة
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(userId, message, ws);
} catch (error) {
console.error('خطأ في تحليل الرسالة:', error);
ws.send(JSON.stringify({
type: 'error',
message: 'تنسيق رسالة غير صالح'
}));
}
});
// معالجة ping/pong لصحة الاتصال
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
// معالجة أخطاء الاتصال
ws.on('error', (error) => {
console.error(`خطأ WebSocket للمستخدم ${userId}:`, error);
});
// معالجة قطع الاتصال
ws.on('close', () => {
console.log(`المستخدم ${userId} قطع الاتصال`);
clients.delete(userId);
});
});
// نبض لاكتشاف الاتصالات المكسورة
const heartbeatInterval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// التنظيف عند إيقاف الخادم
wss.on('close', () => {
clearInterval(heartbeatInterval);
});
// معالج الرسائل
function handleMessage(userId, message, ws) {
console.log(`رسالة من المستخدم ${userId}:`, message);
switch (message.type) {
case 'chat':
handleChatMessage(userId, message);
break;
case 'typing':
handleTypingIndicator(userId, message);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: `نوع رسالة غير معروف: ${message.type}`
}));
}
}
// بث رسالة الدردشة إلى الغرفة
function handleChatMessage(senderId, message) {
const chatMessage = {
type: 'chat',
senderId: senderId,
roomId: message.roomId,
content: message.content,
timestamp: new Date().toISOString()
};
// الحصول على جميع المستخدمين في الغرفة
const roomUsers = getRoomUsers(message.roomId);
roomUsers.forEach(userId => {
const client = clients.get(userId);
if (client && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(chatMessage));
}
});
}
// بث مؤشر الكتابة
function handleTypingIndicator(userId, message) {
const typingMessage = {
type: 'typing',
userId: userId,
roomId: message.roomId,
isTyping: message.isTyping
};
const roomUsers = getRoomUsers(message.roomId);
roomUsers.forEach(recipientId => {
if (recipientId !== userId) {
const client = clients.get(recipientId);
if (client && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(typingMessage));
}
}
});
}
// مساعد للحصول على مستخدمي الغرفة (مبسط)
function getRoomUsers(roomId) {
// في الإنتاج، الجلب من قاعدة البيانات
return Array.from(clients.keys());
}
server.listen(3000, () => {
console.log('خادم WebSocket يعمل على المنفذ 3000');
});
تنفيذ WebSocket - جانب العميل
// عميل WebSocket من جانب العميل مع منطق إعادة الاتصال
class WebSocketClient {
constructor(url, token) {
this.url = `${url}?token=${token}`;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectInterval = 1000;
this.messageQueue = [];
this.handlers = new Map();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket متصل');
this.reconnectAttempts = 0;
this.reconnectInterval = 1000;
// إرسال الرسائل في قائمة الانتظار
this.flushMessageQueue();
// إخطار المستمعين
this.trigger('connected');
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleMessage(message);
} catch (error) {
console.error('خطأ في تحليل الرسالة:', error);
}
};
this.ws.onerror = (error) => {
console.error('خطأ WebSocket:', error);
this.trigger('error', error);
};
this.ws.onclose = () => {
console.log('WebSocket قطع الاتصال');
this.trigger('disconnected');
this.handleReconnect();
};
}
send(type, data) {
const message = {
type,
...data,
timestamp: new Date().toISOString()
};
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// إضافة الرسالة إلى قائمة الانتظار إذا لم يكن متصلاً
this.messageQueue.push(message);
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.ws.send(JSON.stringify(message));
}
}
handleMessage(message) {
console.log('تم استلام الرسالة:', message);
// تشغيل معالجات خاصة بالنوع
this.trigger(message.type, message);
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
}
trigger(event, data) {
const handlers = this.handlers.get(event);
if (handlers) {
handlers.forEach(handler => handler(data));
}
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`إعادة الاتصال في ${this.reconnectInterval}ms (المحاولة ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
// تراجع أسي
this.reconnectInterval = Math.min(this.reconnectInterval * 2, 30000);
} else {
console.error('تم الوصول إلى الحد الأقصى لمحاولات إعادة الاتصال');
this.trigger('reconnect_failed');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// مثال على الاستخدام
const token = localStorage.getItem('authToken');
const wsClient = new WebSocketClient('ws://localhost:3000', token);
// تسجيل معالجات الأحداث
wsClient.on('connected', () => {
console.log('تم الاتصال بنجاح بالدردشة');
document.getElementById('connection-status').textContent = 'متصل';
});
wsClient.on('chat', (message) => {
displayChatMessage(message);
});
wsClient.on('typing', (data) => {
showTypingIndicator(data.userId, data.isTyping);
});
wsClient.on('disconnected', () => {
document.getElementById('connection-status').textContent = 'غير متصل';
});
// الاتصال
wsClient.connect();
// إرسال رسالة دردشة
function sendMessage(roomId, content) {
wsClient.send('chat', { roomId, content });
}
// إرسال مؤشر الكتابة
let typingTimeout;
function handleTyping(roomId) {
wsClient.send('typing', { roomId, isTyping: true });
clearTimeout(typingTimeout);
typingTimeout = setTimeout(() => {
wsClient.send('typing', { roomId, isTyping: false });
}, 2000);
}
أفضل ممارسات WebSocket: نفذ دائمًا آليات نبض/ping-pong للكشف عن الاتصالات الميتة. استخدم قائمة انتظار الرسائل للتعامل مع فترات عدم الاتصال. نفذ التراجع الأسي لمحاولات إعادة الاتصال. قم دائمًا بمصادقة اتصالات WebSocket.
SSE مقابل WebSockets: اختيار التكنولوجيا المناسبة
| الميزة |
الأحداث المرسلة من الخادم (SSE) |
WebSockets |
| الاتصال |
أحادي الاتجاه (من الخادم إلى العميل) |
ثنائي الاتجاه (كامل الازدواج) |
| البروتوكول |
HTTP/HTTPS |
WS/WSS (بروتوكول WebSocket) |
| تنسيق البيانات |
نصي فقط |
نصي وثنائي |
| إعادة الاتصال |
تلقائي مع EventSource |
يتطلب تنفيذ يدوي |
| دعم المتصفح |
جميع المتصفحات الحديثة (باستثناء IE) |
جميع المتصفحات الحديثة |
| جدار الحماية/الوكيل |
يعمل عبر معظم جدران الحماية |
قد يتم حظره من قبل بعض جدران الحماية |
| معرفات الرسالة |
دعم مدمج |
تنفيذ يدوي |
| التعقيد |
بسيط للتنفيذ |
أكثر تعقيدًا |
| الأفضل لـ |
الإشعارات، التدفقات المباشرة، لوحات المعلومات |
الدردشة، الألعاب، التعاون |
النهج الهجين: الجمع بين SSE و HTTP
للعديد من حالات الاستخدام، فإن الجمع بين SSE لتحديثات الخادم إلى العميل مع طلبات HTTP POST/PUT العادية للاتصال من العميل إلى الخادم أبسط وأكثر موثوقية من WebSockets.
// نظام الإشعارات مع SSE + HTTP
class NotificationSystem {
constructor(userId, token) {
this.userId = userId;
this.token = token;
this.eventSource = null;
}
// بدء استقبال الإشعارات عبر SSE
startListening() {
this.eventSource = new EventSource(
`/api/notifications/stream?userId=${this.userId}&token=${this.token}`
);
this.eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data);
this.displayNotification(notification);
});
}
// وضع علامة على الإشعار كمقروء عبر HTTP
async markAsRead(notificationId) {
try {
const response = await fetch(`/api/notifications/${notificationId}/read`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json'
}
});
if (response.ok) {
console.log('تم وضع علامة على الإشعار كمقروء');
}
} catch (error) {
console.error('خطأ في وضع علامة على الإشعار كمقروء:', error);
}
}
// حذف الإشعار عبر HTTP
async deleteNotification(notificationId) {
try {
const response = await fetch(`/api/notifications/${notificationId}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.token}`
}
});
if (response.ok) {
console.log('تم حذف الإشعار');
}
} catch (error) {
console.error('خطأ في حذف الإشعار:', error);
}
}
displayNotification(notification) {
// تحديث واجهة المستخدم
console.log('إشعار جديد:', notification);
}
stopListening() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
تمرين: قم ببناء نظام لوحة معلومات في الوقت الفعلي مع المتطلبات التالية:
- جانب الخادم (Node.js):
- أنشئ نقطة نهاية SSE
/api/dashboard/stream تدفع التحديثات كل 5 ثوانٍ
- قم بتضمين المقاييس: المستخدمون النشطون، الطلبات في الدقيقة، الإيرادات، وحالة النظام
- نفذ المصادقة باستخدام رموز JWT
- دعم اتصالات متزامنة متعددة
- إرسال نبض كل 30 ثانية
- جانب العميل (JavaScript):
- الاتصال بتدفق SSE وعرض المقاييس في الوقت الفعلي
- تنفيذ إعادة الاتصال التلقائية مع التراجع الأسي (حد أقصى 5 محاولات)
- إظهار مؤشر حالة الاتصال (متصل/غير متصل/إعادة الاتصال)
- عرض الطابع الزمني لآخر تحديث
- إضافة معالجة الأخطاء ورسائل خطأ سهلة الاستخدام
- إضافي:
- أضف زرًا لتحديث البيانات يدويًا عبر HTTP POST
- نفذ قائمة انتظار الرسائل للتعامل مع فترات عدم الاتصال
- أضف مخططات مرئية تتحدث في الوقت الفعلي
اختبر تنفيذك من خلال محاكاة انقطاع الشبكة والتحقق من سلوك إعادة الاتصال.