ميزات Dart المتقدمة

أساسيات التيارات (Streams)

55 دقيقة الدرس 3 من 16

ما هي التيارات (Streams)؟

في الدروس السابقة، تعلمت عن Future -- فئة تمثل قيمة غير متزامنة واحدة. لكن ماذا عن الحالات التي تحتاج فيها للتعامل مع تسلسل من القيم غير المتزامنة عبر الزمن؟ هنا تأتي التيارات (Streams).

Stream<T> يوصل قيمًا متعددة من نوع T بشكل غير متزامن، واحدة تلو الأخرى. فكر فيه مثل حزام ناقل: تصل العناصر واحدة تلو الأخرى، وتعالج كل عنصر عند وصوله. التيارات أساسية للعديد من السيناريوهات الواقعية:

  • أحداث إدخال المستخدم (النقرات، ضربات المفاتيح، الإيماءات)
  • البيانات من اتصالات WebSocket
  • قراءة ملف سطرًا بسطر
  • تحديثات موقع GPS
  • الإشعارات من قاعدة بيانات
  • نبضات المؤقت
ملاحظة: Future مثل طلب شراء عبر الإنترنت -- تحصل على توصيل واحد. Stream مثل اشتراك في نشرة إخبارية -- تتلقى توصيلات متعددة عبر الزمن حتى تلغي اشتراكك.

تيارات الاشتراك الفردي مقابل تيارات البث

لدى Dart نوعان من التيارات، وفهم الفرق بينهما أمر حاسم:

تيارات الاشتراك الفردي

تيار الاشتراك الفردي يسمح بمستمع واحد فقط في كل مرة. إذا حاولت الاستماع مرة ثانية، يطرح Dart خطأ StateError. معظم التيارات في Dart هي اشتراك فردي بشكل افتراضي. إنها مثالية لتسلسلات البيانات المرتبة مثل إدخال/إخراج الملفات أو استجابات HTTP.

تيارات البث

تيار البث يسمح بعدة مستمعين في وقت واحد. كل مستمع يتلقى الأحداث بشكل مستقل. إنها مثالية للأحداث التي تحتاج أجزاء متعددة من تطبيقك للتفاعل معها، مثل إدخال المستخدم أو تغييرات الحالة.

الاشتراك الفردي مقابل البث

import 'dart:async';

void main() {
  // تيار اشتراك فردي
  final singleStream = Stream.fromIterable([1, 2, 3]);

  singleStream.listen((data) => print('Listener A: $data'));
  // singleStream.listen((data) => print('Listener B: $data'));
  // ^ خطأ! Bad state: Stream has already been listened to.

  // تيار بث
  final controller = StreamController<int>.broadcast();

  controller.stream.listen((data) => print('Listener 1: $data'));
  controller.stream.listen((data) => print('Listener 2: $data'));

  controller.add(10);
  controller.add(20);
  controller.close();
}

// الإخراج:
// Listener A: 1
// Listener A: 2
// Listener A: 3
// Listener 1: 10
// Listener 2: 10
// Listener 1: 20
// Listener 2: 20
تحذير: يمكنك تحويل تيار اشتراك فردي إلى بث باستخدام .asBroadcastStream()، لكن كن حذرًا: تيارات البث لا تخزن الأحداث مؤقتًا. إذا لم يكن أحد يستمع عند إصدار حدث، يُفقد هذا الحدث. تيارات الاشتراك الفردي، من ناحية أخرى، تخزن الأحداث مؤقتًا حتى يشترك مستمع.

إنشاء التيارات: Stream.fromIterable

Stream.fromIterable ينشئ تيارًا من قائمة أو iterable موجود. كل عنصر يُوصَل بشكل غير متزامن كحدث منفصل.

Stream.fromIterable

void main() {
  final cities = ['Dubai', 'Riyadh', 'Cairo', 'Istanbul', 'Kuala Lumpur'];
  final cityStream = Stream.fromIterable(cities);

  cityStream.listen(
    (city) => print('Visiting: $city'),
    onDone: () => print('Tour complete!'),
  );
}

// الإخراج:
// Visiting: Dubai
// Visiting: Riyadh
// Visiting: Cairo
// Visiting: Istanbul
// Visiting: Kuala Lumpur
// Tour complete!

إنشاء التيارات: Stream.periodic

Stream.periodic ينشئ تيارًا يصدر قيمًا على فترات منتظمة، مثل نبضة الساعة. هذا مفيد للغاية للاستطلاع والرسوم المتحركة ومؤقتات العد التنازلي.

Stream.periodic

void main() {
  // إصدار قيم كل ثانية، مع تحويل العداد
  final timerStream = Stream.periodic(
    Duration(seconds: 1),
    (count) => count + 1,  // تحويل: 0,1,2,3... يصبح 1,2,3,4...
  ).take(5);  // أخذ أول 5 قيم فقط

  timerStream.listen(
    (tick) => print('Tick $tick at ${DateTime.now().second}s'),
    onDone: () => print('Timer finished!'),
  );
}

// الإخراج (واحد في الثانية):
// Tick 1 at 10s
// Tick 2 at 11s
// Tick 3 at 12s
// Tick 4 at 13s
// Tick 5 at 14s
// Timer finished!

مؤقت عد تنازلي مع Stream.periodic

Stream<int> countdown(int seconds) {
  return Stream.periodic(
    Duration(seconds: 1),
    (tick) => seconds - tick - 1,
  ).take(seconds);
}

void main() {
  print('Starting countdown...');

  countdown(5).listen(
    (remaining) {
      if (remaining > 0) {
        print('$remaining...');
      } else {
        print('Go!');
      }
    },
    onDone: () => print('Countdown complete!'),
  );
}

إنشاء التيارات: Stream.fromFuture و Stream.fromFutures

Stream.fromFuture يغلف Future واحد كتيار يصدر قيمة واحدة (أو خطأ) ثم يُغلق. Stream.fromFutures يأخذ قائمة من Futures ويصدر كل نتيجة عند اكتمالها.

Stream.fromFuture و Stream.fromFutures

Future<String> fetchUser() =>
    Future.delayed(Duration(seconds: 2), () => 'Ahmed');

Future<String> fetchCity() =>
    Future.delayed(Duration(seconds: 1), () => 'Dubai');

Future<String> fetchRole() =>
    Future.delayed(Duration(seconds: 3), () => 'Developer');

void main() {
  // Future واحد كتيار
  Stream.fromFuture(fetchUser()).listen(
    (name) => print('User: $name'),
    onDone: () => print('Stream from single future done\n'),
  );

  // عدة Futures كتيار (يصدر بترتيب الاكتمال)
  Stream.fromFutures([
    fetchUser(),   // يكتمل في 2 ثانية
    fetchCity(),   // يكتمل في 1 ثانية
    fetchRole(),   // يكتمل في 3 ثوانٍ
  ]).listen(
    (data) => print('Got: $data'),
    onDone: () => print('All futures completed!'),
  );
}

// الإخراج:
// Got: Dubai (1 ثانية -- الأسرع أولاً)
// User: Ahmed
// Got: Ahmed (2 ثانية)
// Stream from single future done
// Got: Developer (3 ثوانٍ)
// All futures completed!
نصيحة: Stream.fromFutures يصدر النتائج عند اكتمال كل Future، وليس بالترتيب الذي تم تقديمها به. هذا مفيد عندما تريد عرض البيانات تدريجيًا عند توفرها، بدلاً من الانتظار حتى تنتهي جميع الطلبات.

الاستماع للتيارات

الطريقة الأساسية لاستهلاك تيار هي باستخدام طريقة listen(). تقبل استدعاءات راجعة لأحداث البيانات والأخطاء واكتمال التيار.

واجهة listen() الكاملة

import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) {
      print('Data: $data');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream closed');
    },
    cancelOnError: false,  // الاستمرار في الاستماع بعد الأخطاء
  );

  controller.add(1);
  controller.add(2);
  controller.addError(Exception('Something went wrong'));
  controller.add(3);
  controller.close();
}

// الإخراج:
// Data: 1
// Data: 2
// Error: Exception: Something went wrong
// Data: 3
// Stream closed
ملاحظة: تعيين cancelOnError: true (وهو الافتراضي لبعض عمليات التيار) سيلغي الاشتراك عند حدوث أول خطأ. عيّنه إلى false إذا أردت الاستمرار في تلقي الأحداث بعد الخطأ.

طرق تحويل التيارات

توفر التيارات مجموعة غنية من طرق التحويل التي تتيح لك بناء خطوط أنابيب معالجة بيانات قوية. هذه الطرق تعيد تيارات جديدة، لذا يمكن تسلسلها معًا.

map() -- تحويل كل عنصر

Stream.map()

void main() {
  Stream.fromIterable([1, 2, 3, 4, 5])
      .map((number) => number * number)
      .listen((squared) => print(squared));
}

// الإخراج: 1, 4, 9, 16, 25

where() -- تصفية العناصر

Stream.where()

void main() {
  Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      .where((number) => number.isEven)
      .listen((even) => print(even));
}

// الإخراج: 2, 4, 6, 8, 10

expand() -- تحويل واحد إلى كثير

Stream.expand()

void main() {
  Stream.fromIterable(['hello world', 'dart streams'])
      .expand((sentence) => sentence.split(' '))
      .listen((word) => print(word));
}

// الإخراج: hello, world, dart, streams

take() و skip() -- التحديد والتخطي

Stream.take() و Stream.skip()

void main() {
  final numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

  // أخذ أول 3 عناصر
  Stream.fromIterable([1, 2, 3, 4, 5])
      .take(3)
      .listen((n) => print('Take: $n'));
  // Take: 1, Take: 2, Take: 3

  // تخطي أول 3 عناصر
  Stream.fromIterable([1, 2, 3, 4, 5])
      .skip(3)
      .listen((n) => print('Skip: $n'));
  // Skip: 4, Skip: 5
}

تسلسل التحويلات

بناء خط أنابيب بيانات

void main() {
  final scores = Stream.fromIterable([85, 92, 45, 78, 95, 60, 88, 30, 72, 98]);

  scores
      .where((score) => score >= 70)           // تصفية: الدرجات الناجحة فقط
      .map((score) => score / 100)               // تحويل: إلى نسبة مئوية
      .map((pct) => '${(pct * 100).toStringAsFixed(0)}%') // تنسيق
      .listen(
        (formatted) => print('Passing: $formatted'),
        onDone: () => print('Processing complete'),
      );
}

// الإخراج:
// Passing: 85%
// Passing: 92%
// Passing: 78%
// Passing: 95%
// Passing: 88%
// Passing: 72%
// Passing: 98%
// Processing complete

استهلاك التيارات مع await for

كما يعمل await مع Futures، يعمل await for مع التيارات داخل دالة async. يعالج كل عنصر عند وصوله ويكتمل تلقائيًا عند إغلاق التيار.

استخدام await for

Stream<int> generateNumbers() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(milliseconds: 500));
    yield i;
  }
}

Future<void> main() async {
  int sum = 0;

  await for (final number in generateNumbers()) {
    sum += number;
    print('Received $number, running sum: $sum');
  }

  print('Final sum: $sum');
}

// الإخراج (كل 500 ملي ثانية):
// Received 1, running sum: 1
// Received 2, running sum: 3
// Received 3, running sum: 6
// Received 4, running sum: 10
// Received 5, running sum: 15
// Final sum: 15
تحذير: await for يحجب الدالة غير المتزامنة حتى يُغلق التيار. إذا لم يُغلق التيار أبدًا (مثل تيار بث من إدخال المستخدم)، لن يتم تنفيذ الكود بعد await for أبدًا. استخدم listen() مع اشتراك للتيارات اللانهائية، واستخدم await for للتيارات المحدودة التي تعرف أنها ستُغلق.

خصائص وطرق التيار المفيدة

توفر التيارات عدة طرق مساعدة تعيد Future واحد بنتيجة مجمّعة:

طرق تجميع التيارات

Future<void> main() async {
  final numbers = Stream.fromIterable([3, 1, 4, 1, 5, 9, 2, 6]);

  // الحصول على العنصر الأول
  final first = await Stream.fromIterable([10, 20, 30]).first;
  print('First: $first');  // 10

  // الحصول على العنصر الأخير
  final last = await Stream.fromIterable([10, 20, 30]).last;
  print('Last: $last');  // 30

  // الحصول على الطول (عدد العناصر)
  final length = await Stream.fromIterable([10, 20, 30]).length;
  print('Length: $length');  // 3

  // التحقق من الفراغ
  final isEmpty = await Stream.fromIterable([]).isEmpty;
  print('Empty: $isEmpty');  // true

  // التحقق مما إذا كان أي عنصر يطابق
  final hasLarge = await Stream.fromIterable([3, 1, 4, 15, 9])
      .any((n) => n > 10);
  print('Has large: $hasLarge');  // true

  // التحقق مما إذا كانت جميع العناصر تطابق
  final allPositive = await Stream.fromIterable([3, 1, 4, 1, 5])
      .every((n) => n > 0);
  print('All positive: $allPositive');  // true

  // جمع جميع العناصر في قائمة
  final list = await Stream.fromIterable([3, 1, 4]).toList();
  print('List: $list');  // [3, 1, 4]

  // التخفيض إلى قيمة واحدة
  final sum = await Stream.fromIterable([1, 2, 3, 4, 5])
      .reduce((previous, element) => previous + element);
  print('Sum: $sum');  // 15

  // الطي مع قيمة ابتدائية
  final product = await Stream.fromIterable([1, 2, 3, 4])
      .fold<int>(1, (prev, element) => prev * element);
  print('Product: $product');  // 24
}

مثال عملي: نظام الأحداث

دعنا نبني نظام معالجة أحداث يحاكي تطبيقًا في الوقت الحقيقي يستقبل ويعالج الأحداث من خلال التيارات.

خط أنابيب معالجة الأحداث

import 'dart:async';

// أنواع الأحداث
class AppEvent {
  final String type;
  final String data;
  final DateTime timestamp;

  AppEvent(this.type, this.data) : timestamp = DateTime.now();

  @override
  String toString() => '[$type] $data';
}

// محاكاة وصول الأحداث عبر الزمن
Stream<AppEvent> generateEvents() async* {
  final events = [
    AppEvent('user', 'User logged in'),
    AppEvent('navigation', 'Opened dashboard'),
    AppEvent('api', 'Fetched user profile'),
    AppEvent('error', 'Failed to load notifications'),
    AppEvent('user', 'Changed settings'),
    AppEvent('navigation', 'Opened profile page'),
    AppEvent('api', 'Updated profile photo'),
    AppEvent('user', 'User logged out'),
  ];

  for (final event in events) {
    await Future.delayed(Duration(milliseconds: 300));
    yield event;
  }
}

Future<void> main() async {
  print('=== Event Processing System ===\n');

  // معالجة جميع الأحداث
  final allEvents = <AppEvent>[];

  await for (final event in generateEvents()) {
    allEvents.add(event);
    print('Received: $event');
  }

  print('\n=== Event Summary ===');
  print('Total events: ${allEvents.length}');

  // العد حسب النوع باستخدام عمليات التيار
  final userEvents = allEvents.where((e) => e.type == 'user').length;
  final apiEvents = allEvents.where((e) => e.type == 'api').length;
  final errorEvents = allEvents.where((e) => e.type == 'error').length;
  final navEvents = allEvents.where((e) => e.type == 'navigation').length;

  print('User events: $userEvents');
  print('API events: $apiEvents');
  print('Navigation events: $navEvents');
  print('Error events: $errorEvents');
}

خط أنابيب بيانات في الوقت الحقيقي

import 'dart:async';
import 'dart:math';

// محاكاة قراءات المستشعر
Stream<double> temperatureSensor() async* {
  final random = Random();
  double temp = 22.0;

  while (true) {
    await Future.delayed(Duration(seconds: 1));
    // تقلب عشوائي بين -0.5 و +0.5
    temp += (random.nextDouble() - 0.5);
    yield double.parse(temp.toStringAsFixed(1));
  }
}

Future<void> main() async {
  print('=== Temperature Monitor ===\n');

  // بناء خط أنابيب المعالجة
  final subscription = temperatureSensor()
      .take(10)  // قراءة 10 قيم فقط
      .where((temp) => temp > 22.0)  // تنبيه عند تجاوز 22
      .map((temp) => 'ALERT: Temperature is ${temp}C (above threshold)')
      .listen(
        (alert) => print(alert),
        onDone: () => print('\nMonitoring session ended.'),
      );

  // listen يعيد StreamSubscription -- سنستكشف هذا أكثر
  // في الدرس التالي عن StreamControllers
}
نصيحة: طرق تحويل التيارات مثل map() و where() و take() هي كسولة -- لا تعالج البيانات حتى يستمع أحد للتيار الناتج. هذا يجعل بناء خطوط الأنابيب المعقدة فعالاً لأنه لا يُنجز أي عمل حتى يُطلب.
ملاحظة: التيارات هي أساس البرمجة التفاعلية في Flutter. أداة StreamBuilder ونمط BLoC والعديد من حلول إدارة الحالة مبنية على تيارات Dart. إتقان التيارات هنا سيؤتي ثماره عند بناء تطبيقات Flutter.

الملخص

في هذا الدرس، تعلمت أساسيات التيارات في Dart:

  • التيارات -- توصل قيمًا غير متزامنة متعددة عبر الزمن
  • الاشتراك الفردي مقابل البث -- مستمع واحد مقابل عدة مستمعين
  • Stream.fromIterable -- إنشاء تيار من مجموعة
  • Stream.periodic -- إصدار قيم على فترات منتظمة
  • Stream.fromFuture / fromFutures -- تحويل Futures إلى تيارات
  • listen() -- الاشتراك في أحداث التيار مع استدعاءات راجعة للبيانات والأخطاء والاكتمال
  • طرق التحويل -- map, where, expand, take, skip لبناء خطوط الأنابيب
  • await for -- استهلاك التيارات في الدوال غير المتزامنة
  • طرق التجميع -- first, last, length, reduce, fold, toList

في الدرس التالي، ستتعلم عن StreamControllers -- كيفية إنشاء تيارات مخصصة خاصة بك، وإدارة الاشتراكات، وبناء تحويلات تيار متقدمة.