التدفقات والمخازن المؤقتة في Node.js
التدفقات والمخازن المؤقتة هي مفاهيم أساسية في Node.js للتعامل مع البيانات بكفاءة، خاصة عند العمل مع ملفات كبيرة أو طلبات الشبكة أو البيانات في الوقت الفعلي. فهمها أمر حاسم لبناء تطبيقات عالية الأداء.
ما هو المخزن المؤقت (Buffer)؟
المخزن المؤقت هو منطقة تخزين مؤقتة للبيانات الثنائية. فكر فيه كجزء ثابت الحجم من الذاكرة المخصصة خارج محرك JavaScript V8. تُستخدم المخازن المؤقتة للتعامل مع البيانات الثنائية الأولية مباشرة.
// إنشاء مخازن مؤقتة
const buf1 = Buffer.from('Hello World');
const buf2 = Buffer.alloc(10); // 10 بايتات مهيأة إلى 0
const buf3 = Buffer.allocUnsafe(10); // 10 بايتات، أسرع ولكن قد تحتوي على بيانات قديمة
console.log(buf1); // <Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64>
console.log(buf1.toString()); // 'Hello World'
console.log(buf1.length); // 11 بايت
ملاحظة: Buffer.allocUnsafe() أسرع لكنه قد يحتوي على بيانات قديمة. قم دائمًا بتهيئته قبل الاستخدام إذا كان الأمان مصدر قلق.
العمل مع المخازن المؤقتة
const buf = Buffer.from('Hello');
// القراءة من المخزن المؤقت
console.log(buf[0]); // 72 (كود ASCII لـ 'H')
console.log(buf.toString()); // 'Hello'
console.log(buf.toString('hex')); // '48656c6c6f'
console.log(buf.toString('base64')); // 'SGVsbG8='
// الكتابة إلى المخزن المؤقت
buf[0] = 74; // تغيير 'H' إلى 'J' (ASCII 74)
console.log(buf.toString()); // 'Jello'
// عمليات المخزن المؤقت
const buf1 = Buffer.from('Hello ');
const buf2 = Buffer.from('World');
const buf3 = Buffer.concat([buf1, buf2]);
console.log(buf3.toString()); // 'Hello World'
// مقارنة المخازن المؤقتة
const bufA = Buffer.from('ABC');
const bufB = Buffer.from('ABC');
const bufC = Buffer.from('DEF');
console.log(bufA.equals(bufB)); // true
console.log(bufA.equals(bufC)); // false
console.log(Buffer.compare(bufA, bufC)); // -1 (bufA يأتي قبل bufC)
ما هي التدفقات (Streams)؟
التدفقات هي مجموعات من البيانات قد لا تكون متاحة دفعة واحدة. تسمح لك بمعالجة البيانات قطعة تلو الأخرى (أجزاء) دون تحميل كل شيء في الذاكرة. هذا مفيد بشكل خاص للملفات الكبيرة أو البيانات في الوقت الفعلي.
أنواع التدفقات:
- قابلة للقراءة (Readable): تدفقات يمكنك القراءة منها (مثل fs.createReadStream)
- قابلة للكتابة (Writable): تدفقات يمكنك الكتابة إليها (مثل fs.createWriteStream)
- مزدوجة (Duplex): تدفقات قابلة للقراءة والكتابة معًا (مثل TCP socket)
- تحويلية (Transform): تدفقات مزدوجة تعدل البيانات أثناء قراءتها أو كتابتها (مثل الضغط)
التدفقات القابلة للقراءة
const fs = require('fs');
// إنشاء تدفق قابل للقراءة
const readStream = fs.createReadStream('./large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // أجزاء 64KB (الافتراضي 64KB)
});
// حدث: 'data' - يتم تشغيله عند توفر جزء
readStream.on('data', (chunk) => {
console.log(`تم استقبال ${chunk.length} بايت`);
console.log(chunk);
});
// حدث: 'end' - يتم تشغيله عند عدم وجود المزيد من البيانات
readStream.on('end', () => {
console.log('انتهت القراءة');
});
// حدث: 'error' - يتم تشغيله عند حدوث خطأ
readStream.on('error', (error) => {
console.error('خطأ:', error.message);
});
التدفقات القابلة للكتابة
const fs = require('fs');
// إنشاء تدفق قابل للكتابة
const writeStream = fs.createWriteStream('./output.txt');
// كتابة البيانات
writeStream.write('السطر الأول\n');
writeStream.write('السطر الثاني\n');
writeStream.write('السطر الثالث\n');
// إغلاق التدفق
writeStream.end('السطر النهائي\n');
writeStream.on('finish', () => {
console.log('تمت كتابة جميع البيانات');
});
writeStream.on('error', (error) => {
console.error('خطأ:', error.message);
});
توصيل التدفقات (Piping)
التوصيل هو آلية لربط تدفق قابل للقراءة بتدفق قابل للكتابة تلقائيًا. يتعامل مع الضغط العكسي والأخطاء نيابة عنك.
const fs = require('fs');
const readStream = fs.createReadStream('./input.txt');
const writeStream = fs.createWriteStream('./output.txt');
// توصيل التدفق القابل للقراءة بالتدفق القابل للكتابة
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('تم نسخ الملف بنجاح');
});
نصيحة: التوصيل أكثر كفاءة بكثير من قراءة الملف بأكمله في الذاكرة ثم كتابته. إنه يعالج البيانات في أجزاء.
تسلسل التوصيلات
const fs = require('fs');
const zlib = require('zlib');
// قراءة الملف -> ضغط -> كتابة الملف المضغوط
fs.createReadStream('./input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('./input.txt.gz'))
.on('finish', () => {
console.log('تم ضغط الملف بنجاح');
});
// فك الضغط -> قراءة -> كتابة
fs.createReadStream('./input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('./decompressed.txt'))
.on('finish', () => {
console.log('تم فك ضغط الملف بنجاح');
});
التدفقات المزدوجة
التدفقات المزدوجة تنفذ واجهات القراءة والكتابة معًا. مقابس الشبكة مثال شائع:
const { Duplex } = require('stream');
const duplexStream = new Duplex({
read(size) {
this.push('بيانات من الجانب القابل للقراءة\n');
this.push(null); // لا مزيد من البيانات
},
write(chunk, encoding, callback) {
console.log(`كتابة: ${chunk.toString()}`);
callback();
}
});
// الكتابة إلى التدفق
duplexStream.write('مرحبا');
duplexStream.end();
// القراءة من التدفق
duplexStream.on('data', (chunk) => {
console.log(`قراءة: ${chunk.toString()}`);
});
التدفقات التحويلية
التدفقات التحويلية هي تدفقات مزدوجة تعدل البيانات أثناء مرورها:
const { Transform } = require('stream');
// التحويل إلى أحرف كبيرة
const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// استخدام تدفق التحويل
process.stdin
.pipe(uppercaseTransform)
.pipe(process.stdout);
// اكتب شيئًا وسيتم طباعته بأحرف كبيرة
// مثال أكثر عملية: محلل CSV
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line, index) => {
if (!line.trim()) return;
if (!this.headers) {
this.headers = line.split(',');
} else {
const values = line.split(',');
const row = {};
this.headers.forEach((header, i) => {
row[header.trim()] = values[i]?.trim();
});
this.push(row);
}
});
callback();
}
}
// الاستخدام
const fs = require('fs');
const parser = new CSVParser();
fs.createReadStream('./data.csv')
.pipe(parser)
.on('data', (row) => {
console.log('صف محلل:', row);
});
الضغط العكسي للتدفق
يحدث الضغط العكسي عندما تُكتب البيانات إلى تدفق أسرع مما يمكن استهلاكه. يتعامل Node.js مع هذا تلقائيًا، ولكن يجب أن تكون على دراية به:
const fs = require('fs');
const writeStream = fs.createWriteStream('./output.txt');
function writeMillionLines() {
let i = 0;
function write() {
let canContinue = true;
while (i < 1000000 && canContinue) {
canContinue = writeStream.write(`Line ${i}\n`);
i++;
}
if (i < 1000000) {
// انتظر حدث 'drain' إذا كان المخزن المؤقت ممتلئًا
writeStream.once('drain', write);
} else {
writeStream.end();
}
}
write();
}
writeMillionLines();
writeStream.on('finish', () => {
console.log('انتهت كتابة مليون سطر');
});
تحذير: تجاهل الضغط العكسي يمكن أن يؤدي إلى استخدام عالٍ للذاكرة. تحقق دائمًا من قيمة إرجاع write() وانتظر حدث drain إذا أعاد false.
أمثلة عملية للتدفقات
1. معالجة ملفات السجل الكبيرة
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('./access.log'),
crlfDelay: Infinity
});
let errorCount = 0;
rl.on('line', (line) => {
if (line.includes('ERROR')) {
errorCount++;
}
});
rl.on('close', () => {
console.log(`تم العثور على ${errorCount} أخطاء`);
});
2. بث استجابة HTTP
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
if (req.url === '/video') {
res.writeHead(200, { 'Content-Type': 'video/mp4' });
// بث ملف الفيديو بدلاً من تحميله في الذاكرة
const videoStream = fs.createReadStream('./movie.mp4');
videoStream.pipe(res);
videoStream.on('error', (error) => {
res.statusCode = 500;
res.end('خطأ في بث الفيديو');
});
} else {
res.statusCode = 404;
res.end('غير موجود');
}
});
server.listen(3000);
3. تتبع تقدم التحميل
const express = require('express');
const fs = require('fs');
const app = express();
app.post('/upload', (req, res) => {
const writeStream = fs.createWriteStream('./uploaded-file.dat');
let totalBytes = 0;
req.on('data', (chunk) => {
totalBytes += chunk.length;
console.log(`تم الاستقبال: ${totalBytes} بايت`);
});
req.pipe(writeStream);
writeStream.on('finish', () => {
res.json({
message: 'اكتمل التحميل',
size: totalBytes
});
});
writeStream.on('error', (error) => {
res.status(500).json({ error: error.message });
});
});
app.listen(3000);
فوائد أداء التدفقات
// سيئ: تحميل الملف بأكمله في الذاكرة
const fs = require('fs');
fs.readFile('./huge-file.txt', (err, data) => {
if (err) throw err;
// إذا كان الملف 1GB، فهذا يستخدم 1GB من RAM
console.log(data.toString());
});
// جيد: بث الملف
const readStream = fs.createReadStream('./huge-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // معالجة 64KB في المرة الواحدة
});
readStream.on('data', (chunk) => {
// فقط 64KB في الذاكرة في المرة الواحدة
console.log(chunk);
});
تمرين تطبيقي:
أنشئ تطبيق Node.js مع الميزات التالية:
- قراءة ملف CSV كبير باستخدام التدفقات (لا تحمله في الذاكرة)
- تحويل كل صف: تحويل الأسماء إلى أحرف كبيرة، حساب العمر من تاريخ الميلاد
- تصفية الصفوف: تضمين الأشخاص الذين تزيد أعمارهم عن 18 عامًا فقط
- كتابة النتائج إلى ملف CSV جديد باستخدام التدفقات
- تتبع وعرض التقدم (الأسطر المعالجة، استخدام الذاكرة)
- التعامل مع الأخطاء بلطف
الخلاصة
في هذا الدرس، تعلمت:
- ما هي المخازن المؤقتة وكيفية العمل مع البيانات الثنائية
- الأنواع الأربعة للتدفقات: قابلة للقراءة، قابلة للكتابة، مزدوجة، تحويلية
- قراءة وكتابة التدفقات مع الأحداث
- توصيل التدفقات لمعالجة البيانات بكفاءة
- فهم ومعالجة الضغط العكسي
- إنشاء تدفقات تحويل مخصصة
- التطبيقات العملية للتدفقات للملفات الكبيرة والبيانات في الوقت الفعلي
- فوائد الأداء لاستخدام التدفقات على تحميل الملفات بأكملها