ميزات Dart المتقدمة

متحكمات التيارات والتحويلات

50 دقيقة الدرس 4 من 16

ما هو StreamController؟

في الدرس السابق، استهلكت تيارات أنشأتها منشئات المصنع مثل Stream.fromIterable و Stream.periodic. لكن ماذا عن إنشاء تيارات خاصة بك يمكنك دفع البيانات إليها عند الطلب؟ هذا بالضبط ما يفعله StreamController.

StreamController<T> يمنحك تحكمًا كاملاً في التيار. يوفر:

  • جانب المصرف (sink) -- حيث تضيف البيانات والأخطاء وتغلق التيار
  • جانب التيار (stream) -- حيث يستقبل المستمعون الأحداث

فكر فيه مثل أنبوب: تدفع البيانات من طرف (المصرف)، ويستقبلها المستمعون من الطرف الآخر (التيار).

StreamController أساسي

import 'dart:async';

void main() {
  // إنشاء متحكم لأحداث String
  final controller = StreamController<String>();

  // الاستماع لجانب التيار
  controller.stream.listen(
    (message) => print('Received: $message'),
    onError: (error) => print('Error: $error'),
    onDone: () => print('Stream closed!'),
  );

  // دفع البيانات عبر جانب المصرف
  controller.add('Hello');
  controller.add('World');
  controller.add('Dart Streams');

  // إغلاق التيار (لا يمكن إضافة المزيد من البيانات)
  controller.close();
}

// الإخراج:
// Received: Hello
// Received: World
// Received: Dart Streams
// Stream closed!
ملاحظة: StreamController الافتراضي ينشئ تيار اشتراك فردي. يمكن لمستمع واحد فقط الاشتراك في controller.stream. محاولة إضافة مستمع ثانٍ ستطرح StateError.

StreamController.broadcast

StreamController.broadcast() ينشئ متحكمًا يدعم تياره عدة مستمعين في وقت واحد. كل مستمع يستقبل الأحداث بشكل مستقل من نقطة اشتراكه.

Broadcast StreamController

import 'dart:async';

void main() {
  final controller = StreamController<String>.broadcast();

  // المستمع الأول
  controller.stream.listen(
    (msg) => print('Logger: $msg'),
  );

  // المستمع الثاني
  controller.stream.listen(
    (msg) => print('Analytics: $msg'),
  );

  controller.add('User signed in');
  controller.add('Page viewed: Home');
  controller.add('Button clicked: Buy Now');

  controller.close();
}

// الإخراج:
// Logger: User signed in
// Analytics: User signed in
// Logger: Page viewed: Home
// Analytics: Page viewed: Home
// Logger: Button clicked: Buy Now
// Analytics: Button clicked: Buy Now
تحذير: متحكمات البث لا تخزن الأحداث مؤقتًا. إذا أضفت بيانات قبل اشتراك أي مستمع، تُفقد تلك البيانات. أيضًا، إذا اشترك مستمع بعد إصدار بعض الأحداث، لن يستقبل تلك الأحداث السابقة. للسيناريوهات التي تتطلب إعادة تشغيل الأحداث، تحتاج لتنفيذ التخزين المؤقت بنفسك.

إضافة البيانات والأخطاء والإغلاق

يوفر StreamController ثلاث طرق رئيسية لدفع الأحداث:

add() و addError() و close()

import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) => print('Data: $data'),
    onError: (error) => print('Error: $error'),
    onDone: () => print('Done!'),
  );

  // إضافة أحداث بيانات
  controller.add(10);
  controller.add(20);

  // إضافة حدث خطأ (لا يغلق التيار)
  controller.addError(Exception('Something broke'));

  // الاستمرار في إضافة البيانات بعد الخطأ
  controller.add(30);

  // إغلاق التيار (يفعّل onDone)
  controller.close();

  // controller.add(40);  // خطأ! لا يمكن الإضافة لمتحكم مغلق
}

// الإخراج:
// Data: 10
// Data: 20
// Error: Exception: Something broke
// Data: 30
// Done!

استخدام خاصية Sink

import 'dart:async';

// دالة تحتاج فقط للكتابة في التيار
void populateData(StreamSink<String> sink) {
  sink.add('First item');
  sink.add('Second item');
  sink.add('Third item');
  sink.close();
}

void main() {
  final controller = StreamController<String>();

  controller.stream.listen(
    (item) => print('Got: $item'),
    onDone: () => print('All items received'),
  );

  // تمرير المصرف لدالة المنتج
  populateData(controller.sink);
}
نصيحة: استخدام StreamSink كنوع معامل بدلاً من StreamController ممارسة جيدة. يحد الوصول لجانب الكتابة فقط، مما يمنع المستهلكين من الاستماع أو إغلاق المتحكم عن طريق الخطأ. هذا يتبع مبدأ الحد الأدنى من الصلاحيات.

StreamSubscription: التحكم في المستمع

عند استدعاء listen() على تيار، يعيد كائن StreamSubscription. هذا يمنحك التحكم في دورة حياة الاستماع: يمكنك إيقاف الاشتراك مؤقتًا واستئنافه وإلغائه.

إدارة StreamSubscription

import 'dart:async';

void main() async {
  final controller = StreamController<int>();

  // listen() يعيد StreamSubscription
  final subscription = controller.stream.listen(
    (data) => print('Received: $data'),
    onDone: () => print('Done'),
  );

  controller.add(1);
  controller.add(2);

  // إيقاف الاشتراك مؤقتًا -- الأحداث تُخزّن مؤقتًا
  subscription.pause();
  print('--- Paused ---');

  controller.add(3);  // مخزّن مؤقتًا، لم يُوصّل بعد
  controller.add(4);  // مخزّن مؤقتًا

  await Future.delayed(Duration(seconds: 1));

  // الاستئناف -- الأحداث المخزّنة تُوصّل
  subscription.resume();
  print('--- Resumed ---');

  await Future.delayed(Duration(milliseconds: 100));

  controller.add(5);

  // الإلغاء -- لن يتم استقبال المزيد من الأحداث
  await subscription.cancel();
  print('--- Cancelled ---');

  controller.add(6);  // هذا لن يُستقبل

  controller.close();
}

// الإخراج:
// Received: 1
// Received: 2
// --- Paused ---
// --- Resumed ---
// Received: 3
// Received: 4
// Received: 5
// --- Cancelled ---
ملاحظة: عند إيقاف الاشتراك مؤقتًا، تُخزّن الأحداث مؤقتًا بواسطة المتحكم (لتيارات الاشتراك الفردي). عند الاستئناف، تُوصّل جميع الأحداث المخزّنة. لتيارات البث، الأحداث المصدرة أثناء الإيقاف المؤقت تُفقد لأن تيارات البث لا تخزّن مؤقتًا.

استدعاءات دورة حياة StreamController

يدعم StreamController استدعاءات دورة الحياة التي تتيح لك التفاعل عندما يشترك المستمعون أو يوقفون مؤقتًا أو يستأنفون أو يلغون. هذا ضروري لإدارة الموارد بكفاءة.

استدعاءات دورة الحياة

import 'dart:async';

void main() {
  late StreamSubscription<int> subscription;

  final controller = StreamController<int>(
    onListen: () {
      print('Someone started listening!');
    },
    onPause: () {
      print('Listener paused -- stopping data production');
    },
    onResume: () {
      print('Listener resumed -- restarting data production');
    },
    onCancel: () {
      print('Listener cancelled -- cleaning up resources');
    },
  );

  subscription = controller.stream.listen(
    (data) => print('Data: $data'),
  );

  controller.add(1);
  controller.add(2);

  subscription.pause();
  subscription.resume();

  controller.add(3);

  subscription.cancel();
}

// الإخراج:
// Someone started listening!
// Data: 1
// Data: 2
// Listener paused -- stopping data production
// Listener resumed -- restarting data production
// Data: 3
// Listener cancelled -- cleaning up resources

StreamTransformer

StreamTransformer هو كائن قابل لإعادة الاستخدام يأخذ تيارًا كمدخل وينتج تيارًا محوّلاً كمخرج. على عكس map() أو where() التي هي عمليات بسيطة، يمكن لـ StreamTransformer الحفاظ على حالة عبر الأحداث، أو دمج أحداث متعددة، أو تغيير توقيت الأحداث.

إنشاء StreamTransformer مخصص

import 'dart:async';

// محول يجمع الأحداث في دفعات
class BatchTransformer<T> extends StreamTransformerBase<T, List<T>> {
  final int batchSize;

  BatchTransformer(this.batchSize);

  @override
  Stream<List<T>> bind(Stream<T> stream) {
    final batch = <T>[];

    return stream.transform(
      StreamTransformer<T, List<T>>.fromHandlers(
        handleData: (data, sink) {
          batch.add(data);
          if (batch.length >= batchSize) {
            sink.add(List<T>.from(batch));
            batch.clear();
          }
        },
        handleDone: (sink) {
          if (batch.isNotEmpty) {
            sink.add(List<T>.from(batch));
            batch.clear();
          }
          sink.close();
        },
      ),
    );
  }
}

void main() {
  final numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

  numbers.transform(BatchTransformer(3)).listen(
    (batch) => print('Batch: $batch'),
    onDone: () => print('All batches processed'),
  );
}

// الإخراج:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]
// All batches processed

StreamTransformer.fromHandlers

import 'dart:async';

void main() {
  // محول بسيط يضاعف الأرقام الزوجية ويتخلص من الفردية
  final doubleEvens = StreamTransformer<int, int>.fromHandlers(
    handleData: (data, sink) {
      if (data.isEven) {
        sink.add(data * 2);
      }
      // الأرقام الفردية تُحذف بصمت
    },
    handleError: (error, stackTrace, sink) {
      print('Transformer caught error: $error');
      // اختياريًا التمرير: sink.addError(error, stackTrace);
    },
    handleDone: (sink) {
      print('Transformer done');
      sink.close();
    },
  );

  Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8])
      .transform(doubleEvens)
      .listen(
        (data) => print('Result: $data'),
        onDone: () => print('Stream complete'),
      );
}

// الإخراج:
// Result: 4
// Result: 8
// Result: 12
// Result: 16
// Transformer done
// Stream complete

مثال عملي: نظام المحادثة

دعنا نبني نظام محادثة بسيط باستخدام StreamControllers لتوضيح كيف يمكن لمكونات متعددة التواصل من خلال التيارات.

غرفة محادثة مع التيارات

import 'dart:async';

class ChatMessage {
  final String sender;
  final String text;
  final DateTime timestamp;

  ChatMessage(this.sender, this.text) : timestamp = DateTime.now();

  @override
  String toString() =>
      '[${timestamp.hour}:${timestamp.minute.toString().padLeft(2, "0")}] $sender: $text';
}

class ChatRoom {
  final String name;
  final _controller = StreamController<ChatMessage>.broadcast();

  ChatRoom(this.name) {
    print('Chat room "$name" created.\n');
  }

  // تيار عام للمستمعين
  Stream<ChatMessage> get messages => _controller.stream;

  // إرسال رسالة
  void send(String sender, String text) {
    final message = ChatMessage(sender, text);
    _controller.add(message);
  }

  // إغلاق غرفة المحادثة
  Future<void> close() async {
    await _controller.close();
    print('\nChat room "$name" closed.');
  }
}

class ChatUser {
  final String name;
  StreamSubscription<ChatMessage>? _subscription;

  ChatUser(this.name);

  void joinRoom(ChatRoom room) {
    _subscription = room.messages
        .where((msg) => msg.sender != name)  // لا تعرض رسائلك الخاصة
        .listen(
          (msg) => print('[$name sees] $msg'),
        );
    print('$name joined the chat room.');
  }

  void leaveRoom() {
    _subscription?.cancel();
    _subscription = null;
    print('$name left the chat room.');
  }
}

void main() async {
  final room = ChatRoom('Dart Developers');

  final ahmed = ChatUser('Ahmed');
  final sara = ChatUser('Sara');
  final omar = ChatUser('Omar');

  ahmed.joinRoom(room);
  sara.joinRoom(room);
  omar.joinRoom(room);

  print('');

  // محاكاة رسائل المحادثة
  room.send('Ahmed', 'Hello everyone!');
  await Future.delayed(Duration(milliseconds: 100));

  room.send('Sara', 'Hi Ahmed! How are you?');
  await Future.delayed(Duration(milliseconds: 100));

  room.send('Omar', 'Hey team! Working on streams today.');
  await Future.delayed(Duration(milliseconds: 100));

  // عمر يغادر
  omar.leaveRoom();

  room.send('Ahmed', 'Omar left? See you later!');
  await Future.delayed(Duration(milliseconds: 100));

  await room.close();
}

مثال عملي: تغذية بيانات في الوقت الحقيقي مع إلغاء الارتداد

إلغاء الارتداد (Debouncing) هو نمط شائع حيث تنتظر توقفًا في الأحداث قبل المعالجة. هذا ضروري لميزات البحث أثناء الكتابة لتجنب إجراء استدعاءات API عند كل ضربة مفتاح.

تنفيذ إلغاء الارتداد

import 'dart:async';

// محول إلغاء ارتداد ينتظر توقفًا في الأحداث
class DebounceTransformer<T> extends StreamTransformerBase<T, T> {
  final Duration duration;

  DebounceTransformer(this.duration);

  @override
  Stream<T> bind(Stream<T> stream) {
    Timer? timer;
    final controller = StreamController<T>();

    stream.listen(
      (data) {
        timer?.cancel();
        timer = Timer(duration, () {
          controller.add(data);
        });
      },
      onError: (error) => controller.addError(error),
      onDone: () {
        timer?.cancel();
        controller.close();
      },
    );

    return controller.stream;
  }
}

// محاكاة مربع بحث
void main() async {
  final searchInput = StreamController<String>();

  // تطبيق إلغاء الارتداد: انتظار 500ms بعد توقف المستخدم عن الكتابة
  searchInput.stream
      .transform(DebounceTransformer(Duration(milliseconds: 500)))
      .listen((query) {
        print('Searching for: "$query"');
      });

  // محاكاة كتابة "dart streams" حرفًا بحرف
  final characters = 'dart streams'.split('');
  String typed = '';

  for (final char in characters) {
    typed += char;
    searchInput.add(typed);
    await Future.delayed(Duration(milliseconds: 100));  // كتابة سريعة
  }

  // الانتظار لتفعيل إلغاء الارتداد
  await Future.delayed(Duration(seconds: 1));
  searchInput.close();
}

// الإخراج:
// Searching for: "dart streams"
// (بحث واحد فقط، وليس 12 بحثًا!)
نصيحة: إلغاء الارتداد (Debounce) وتقييد المعدل (Throttle) من أهم أنماط التيارات في تطوير واجهات المستخدم. إلغاء الارتداد ينتظر توقفًا (مثالي لإدخال البحث). تقييد المعدل يحد من معدل الأحداث (مثالي لأحداث التمرير ومعالجات تغيير الحجم أو نقرات الأزرار). في Flutter، توفر حزم مثل rxdart هذه والعديد من العمليات الأخرى.

مثال عملي: مؤشر أسعار الأسهم في الوقت الحقيقي

دعنا نجمع كل شيء في تغذية بيانات واقعية في الوقت الحقيقي توضح StreamControllers والمحولات والمستمعين المتعددين وإدارة الاشتراكات.

تغذية أسعار الأسهم

import 'dart:async';
import 'dart:math';

class StockPrice {
  final String symbol;
  final double price;
  final double change;
  final DateTime timestamp;

  StockPrice(this.symbol, this.price, this.change)
      : timestamp = DateTime.now();

  String get direction => change >= 0 ? '+' : '';

  @override
  String toString() =>
      '$symbol: \$${price.toStringAsFixed(2)} ($direction${change.toStringAsFixed(2)}%)';
}

class StockFeed {
  final _controller = StreamController<StockPrice>.broadcast();
  Timer? _timer;
  final _random = Random();
  final Map<String, double> _prices = {};

  Stream<StockPrice> get prices => _controller.stream;

  void start(List<String> symbols, {Duration interval = const Duration(seconds: 1)}) {
    // تهيئة الأسعار
    for (final symbol in symbols) {
      _prices[symbol] = 100.0 + _random.nextDouble() * 200;
    }

    _timer = Timer.periodic(interval, (_) {
      // اختيار سهم عشوائي للتحديث
      final symbol = symbols[_random.nextInt(symbols.length)];
      final currentPrice = _prices[symbol]!;

      // تغيير عشوائي في السعر بين -3% و +3%
      final changePercent = (_random.nextDouble() - 0.5) * 6;
      final newPrice = currentPrice * (1 + changePercent / 100);
      _prices[symbol] = newPrice;

      _controller.add(StockPrice(symbol, newPrice, changePercent));
    });

    print('Stock feed started for: ${symbols.join(", ")}\n');
  }

  Future<void> stop() async {
    _timer?.cancel();
    await _controller.close();
    print('\nStock feed stopped.');
  }
}

void main() async {
  final feed = StockFeed();
  feed.start(['AAPL', 'GOOGL', 'MSFT', 'AMZN'], interval: Duration(milliseconds: 300));

  // المستمع 1: جميع تحديثات الأسعار
  final allUpdates = feed.prices.take(12).listen(
    (price) => print('[TICKER] $price'),
  );

  // المستمع 2: التغييرات الكبيرة فقط (أكثر من 1%)
  final alerts = feed.prices
      .where((price) => price.change.abs() > 1.0)
      .take(3)
      .listen(
        (price) => print('[ALERT!] Big move: $price'),
      );

  // الانتظار حتى ينتهي كلا المستمعين
  await allUpdates.asFuture();

  await feed.stop();
}
تحذير: أغلق دائمًا StreamController عند الانتهاء منه. عدم إغلاق المتحكمات مصدر شائع لتسريبات الذاكرة. في Flutter، أغلق المتحكمات في طريقة dispose() لفئة State. بالمثل، ألغِ دائمًا كائنات StreamSubscription عند التخلص من الودجت.
ملاحظة: الأنماط التي تعلمتها في هذا الدرس -- StreamControllers وتيارات البث والمحولات والاشتراكات -- هي نفس الأنماط المستخدمة في حلول إدارة حالة Flutter مثل BLoC (مكون منطق الأعمال). فئة BLoC هي في الأساس فئة بها StreamControllers تستقبل الأحداث على مصرف وتخرج الحالات على تيار.

الملخص

في هذا الدرس، أتقنت مفاهيم التيارات المتقدمة في Dart:

  • StreamController -- إنشاء تيارات مخصصة مع تحكم كامل في تدفق البيانات
  • StreamController.broadcast -- دعم عدة مستمعين في وقت واحد
  • add / addError / close -- دفع الأحداث والأخطاء وإنهاء التيارات
  • StreamSink -- جانب الكتابة فقط من المتحكم
  • StreamSubscription -- إيقاف مؤقت واستئناف وإلغاء المستمعين
  • StreamTransformer -- بناء منطق معالجة تيارات قابل لإعادة الاستخدام وذو حالة
  • إلغاء الارتداد وتقييد المعدل -- أنماط واجهة مستخدم أساسية لتحديد معدل الأحداث
  • استدعاءات دورة الحياة -- التفاعل مع تغييرات المستمعين في المتحكمات

الآن لديك أساس متين في نموذج البرمجة غير المتزامنة لـ Dart. هذه المهارات ضرورية لبناء تطبيقات Flutter تفاعلية، والتعامل مع البيانات في الوقت الحقيقي، وتنفيذ أنماط مثل BLoC لإدارة الحالة.