Dart Advanced Features

Stream Controllers & Transformations

50 min Lesson 4 of 16

What Is a StreamController?

In the previous lesson, you consumed streams created by factory constructors like Stream.fromIterable and Stream.periodic. But what about creating your own streams that you can push data into on demand? That is exactly what StreamController does.

A StreamController<T> gives you full control over a stream. It provides:

  • A sink side -- where you add data, errors, and close the stream
  • A stream side -- where listeners receive events

Think of it like a pipe: you push data in one end (the sink), and listeners receive it from the other end (the stream).

Basic StreamController

import 'dart:async';

void main() {
  // Create a controller for String events
  final controller = StreamController<String>();

  // Listen to the stream side
  controller.stream.listen(
    (message) => print('Received: $message'),
    onError: (error) => print('Error: $error'),
    onDone: () => print('Stream closed!'),
  );

  // Push data through the sink side
  controller.add('Hello');
  controller.add('World');
  controller.add('Dart Streams');

  // Close the stream (no more data can be added)
  controller.close();
}

// Output:
// Received: Hello
// Received: World
// Received: Dart Streams
// Stream closed!
Note: A default StreamController creates a single-subscription stream. Only one listener can subscribe to controller.stream. Attempting to add a second listener will throw a StateError.

StreamController.broadcast

StreamController.broadcast() creates a controller whose stream supports multiple simultaneous listeners. Each listener receives events independently from the point they subscribe.

Broadcast StreamController

import 'dart:async';

void main() {
  final controller = StreamController<String>.broadcast();

  // First listener
  controller.stream.listen(
    (msg) => print('Logger: $msg'),
  );

  // Second listener
  controller.stream.listen(
    (msg) => print('Analytics: $msg'),
  );

  controller.add('User signed in');
  controller.add('Page viewed: Home');
  controller.add('Button clicked: Buy Now');

  controller.close();
}

// Output:
// Logger: User signed in
// Analytics: User signed in
// Logger: Page viewed: Home
// Analytics: Page viewed: Home
// Logger: Button clicked: Buy Now
// Analytics: Button clicked: Buy Now
Warning: Broadcast controllers do not buffer events. If you add data before any listener subscribes, that data is lost. Also, if a listener subscribes after some events have been emitted, it will not receive those past events. For scenarios requiring event replay, you need to implement buffering yourself.

Adding Data, Errors, and Closing

A StreamController provides three key methods for pushing events:

add(), addError(), and close()

import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) => print('Data: $data'),
    onError: (error) => print('Error: $error'),
    onDone: () => print('Done!'),
  );

  // Add data events
  controller.add(10);
  controller.add(20);

  // Add an error event (does not close the stream)
  controller.addError(Exception('Something broke'));

  // Continue adding data after an error
  controller.add(30);

  // Close the stream (triggers onDone)
  controller.close();

  // controller.add(40);  // ERROR! Cannot add to closed controller
}

// Output:
// Data: 10
// Data: 20
// Error: Exception: Something broke
// Data: 30
// Done!

Using the Sink Property

import 'dart:async';

// A function that only needs to write to the stream
void populateData(StreamSink<String> sink) {
  sink.add('First item');
  sink.add('Second item');
  sink.add('Third item');
  sink.close();
}

void main() {
  final controller = StreamController<String>();

  controller.stream.listen(
    (item) => print('Got: $item'),
    onDone: () => print('All items received'),
  );

  // Pass the sink to a producer function
  populateData(controller.sink);
}
Tip: Using StreamSink as a parameter type instead of StreamController is a good practice. It limits access to only the writing side, preventing consumers from accidentally listening or closing the controller. This follows the principle of least privilege.

StreamSubscription: Controlling the Listener

When you call listen() on a stream, it returns a StreamSubscription object. This gives you control over the listening lifecycle: you can pause, resume, and cancel the subscription.

Managing StreamSubscription

import 'dart:async';

void main() async {
  final controller = StreamController<int>();

  // listen() returns a StreamSubscription
  final subscription = controller.stream.listen(
    (data) => print('Received: $data'),
    onDone: () => print('Done'),
  );

  controller.add(1);
  controller.add(2);

  // Pause the subscription -- events are buffered
  subscription.pause();
  print('--- Paused ---');

  controller.add(3);  // Buffered, not delivered yet
  controller.add(4);  // Buffered

  await Future.delayed(Duration(seconds: 1));

  // Resume -- buffered events are delivered
  subscription.resume();
  print('--- Resumed ---');

  await Future.delayed(Duration(milliseconds: 100));

  controller.add(5);

  // Cancel -- no more events will be received
  await subscription.cancel();
  print('--- Cancelled ---');

  controller.add(6);  // This will not be received

  controller.close();
}

// Output:
// Received: 1
// Received: 2
// --- Paused ---
// --- Resumed ---
// Received: 3
// Received: 4
// Received: 5
// --- Cancelled ---
Note: When a subscription is paused, events are buffered by the controller (for single-subscription streams). When resumed, all buffered events are delivered. For broadcast streams, events emitted during a pause are lost because broadcast streams do not buffer.

StreamController Lifecycle Callbacks

StreamController supports lifecycle callbacks that let you react when listeners subscribe, pause, resume, or cancel. This is essential for managing resources efficiently.

Lifecycle Callbacks

import 'dart:async';

void main() {
  late StreamSubscription<int> subscription;

  final controller = StreamController<int>(
    onListen: () {
      print('Someone started listening!');
    },
    onPause: () {
      print('Listener paused -- stopping data production');
    },
    onResume: () {
      print('Listener resumed -- restarting data production');
    },
    onCancel: () {
      print('Listener cancelled -- cleaning up resources');
    },
  );

  subscription = controller.stream.listen(
    (data) => print('Data: $data'),
  );

  controller.add(1);
  controller.add(2);

  subscription.pause();
  subscription.resume();

  controller.add(3);

  subscription.cancel();
}

// Output:
// Someone started listening!
// Data: 1
// Data: 2
// Listener paused -- stopping data production
// Listener resumed -- restarting data production
// Data: 3
// Listener cancelled -- cleaning up resources

StreamTransformer

A StreamTransformer is a reusable object that takes a stream as input and produces a transformed stream as output. Unlike map() or where() which are simple operations, a StreamTransformer can maintain state across events, combine multiple events, or change the timing of events.

Creating a Custom StreamTransformer

import 'dart:async';

// A transformer that batches events into groups
StreamTransformer<T, List<T>> batchTransformer<T>(int batchSize) {
  return StreamTransformer<T, List<T>>.fromHandlers(
    handleData: (data, sink) {
      // We need static storage -- use a closure variable
    },
  );
}

// Better approach: full StreamTransformer with state
class BatchTransformer<T> extends StreamTransformerBase<T, List<T>> {
  final int batchSize;

  BatchTransformer(this.batchSize);

  @override
  Stream<List<T>> bind(Stream<T> stream) {
    final batch = <T>[];

    return stream.transform(
      StreamTransformer<T, List<T>>.fromHandlers(
        handleData: (data, sink) {
          batch.add(data);
          if (batch.length >= batchSize) {
            sink.add(List<T>.from(batch));
            batch.clear();
          }
        },
        handleDone: (sink) {
          if (batch.isNotEmpty) {
            sink.add(List<T>.from(batch));
            batch.clear();
          }
          sink.close();
        },
      ),
    );
  }
}

void main() {
  final numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

  numbers.transform(BatchTransformer(3)).listen(
    (batch) => print('Batch: $batch'),
    onDone: () => print('All batches processed'),
  );
}

// Output:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]
// All batches processed

StreamTransformer.fromHandlers

import 'dart:async';

void main() {
  // A simple transformer that doubles numbers and filters out odds
  final doubleEvens = StreamTransformer<int, int>.fromHandlers(
    handleData: (data, sink) {
      if (data.isEven) {
        sink.add(data * 2);
      }
      // Odd numbers are silently dropped
    },
    handleError: (error, stackTrace, sink) {
      print('Transformer caught error: $error');
      // Optionally forward: sink.addError(error, stackTrace);
    },
    handleDone: (sink) {
      print('Transformer done');
      sink.close();
    },
  );

  Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8])
      .transform(doubleEvens)
      .listen(
        (data) => print('Result: $data'),
        onDone: () => print('Stream complete'),
      );
}

// Output:
// Result: 4
// Result: 8
// Result: 12
// Result: 16
// Transformer done
// Stream complete

Practical Example: Chat System

Let’s build a simple chat system using StreamControllers to demonstrate how multiple components can communicate through streams.

Chat Room with Streams

import 'dart:async';

class ChatMessage {
  final String sender;
  final String text;
  final DateTime timestamp;

  ChatMessage(this.sender, this.text) : timestamp = DateTime.now();

  @override
  String toString() =>
      '[${timestamp.hour}:${timestamp.minute.toString().padLeft(2, "0")}] $sender: $text';
}

class ChatRoom {
  final String name;
  final _controller = StreamController<ChatMessage>.broadcast();

  ChatRoom(this.name) {
    print('Chat room "$name" created.\n');
  }

  // Public stream for listeners
  Stream<ChatMessage> get messages => _controller.stream;

  // Send a message
  void send(String sender, String text) {
    final message = ChatMessage(sender, text);
    _controller.add(message);
  }

  // Close the chat room
  Future<void> close() async {
    await _controller.close();
    print('\nChat room "$name" closed.');
  }
}

class ChatUser {
  final String name;
  StreamSubscription<ChatMessage>? _subscription;

  ChatUser(this.name);

  void joinRoom(ChatRoom room) {
    _subscription = room.messages
        .where((msg) => msg.sender != name)  // Do not echo own messages
        .listen(
          (msg) => print('[$name sees] $msg'),
        );
    print('$name joined the chat room.');
  }

  void leaveRoom() {
    _subscription?.cancel();
    _subscription = null;
    print('$name left the chat room.');
  }
}

void main() async {
  final room = ChatRoom('Dart Developers');

  final ahmed = ChatUser('Ahmed');
  final sara = ChatUser('Sara');
  final omar = ChatUser('Omar');

  ahmed.joinRoom(room);
  sara.joinRoom(room);
  omar.joinRoom(room);

  print('');

  // Simulate chat messages
  room.send('Ahmed', 'Hello everyone!');
  await Future.delayed(Duration(milliseconds: 100));

  room.send('Sara', 'Hi Ahmed! How are you?');
  await Future.delayed(Duration(milliseconds: 100));

  room.send('Omar', 'Hey team! Working on streams today.');
  await Future.delayed(Duration(milliseconds: 100));

  // Omar leaves
  omar.leaveRoom();

  room.send('Ahmed', 'Omar left? See you later!');
  await Future.delayed(Duration(milliseconds: 100));

  await room.close();
}

Practical Example: Real-Time Data Feed with Debounce

Debouncing is a common pattern where you wait for a pause in events before processing. This is essential for search-as-you-type features to avoid making API calls on every keystroke.

Debounce Implementation

import 'dart:async';

// A debounce transformer that waits for a pause in events
class DebounceTransformer<T> extends StreamTransformerBase<T, T> {
  final Duration duration;

  DebounceTransformer(this.duration);

  @override
  Stream<T> bind(Stream<T> stream) {
    Timer? timer;
    final controller = StreamController<T>();

    stream.listen(
      (data) {
        timer?.cancel();
        timer = Timer(duration, () {
          controller.add(data);
        });
      },
      onError: (error) => controller.addError(error),
      onDone: () {
        timer?.cancel();
        controller.close();
      },
    );

    return controller.stream;
  }
}

// A throttle transformer that only allows one event per duration
class ThrottleTransformer<T> extends StreamTransformerBase<T, T> {
  final Duration duration;

  ThrottleTransformer(this.duration);

  @override
  Stream<T> bind(Stream<T> stream) {
    Timer? cooldown;
    final controller = StreamController<T>();

    stream.listen(
      (data) {
        if (cooldown == null || !cooldown!.isActive) {
          controller.add(data);
          cooldown = Timer(duration, () {});
        }
      },
      onError: (error) => controller.addError(error),
      onDone: () => controller.close(),
    );

    return controller.stream;
  }
}

// Simulate a search box
void main() async {
  final searchInput = StreamController<String>();

  // Apply debounce: wait 500ms after user stops typing
  searchInput.stream
      .transform(DebounceTransformer(Duration(milliseconds: 500)))
      .listen((query) {
        print('Searching for: "$query"');
      });

  // Simulate typing "dart streams" character by character
  final characters = 'dart streams'.split('');
  String typed = '';

  for (final char in characters) {
    typed += char;
    searchInput.add(typed);
    await Future.delayed(Duration(milliseconds: 100));  // Fast typing
  }

  // Wait for debounce to fire
  await Future.delayed(Duration(seconds: 1));
  searchInput.close();
}

// Output:
// Searching for: "dart streams"
// (Only one search, not 12 searches!)
Tip: Debounce and throttle are two of the most important stream patterns in UI development. Debounce waits for a pause (ideal for search input). Throttle limits the rate of events (ideal for scroll events, resize handlers, or button clicks). In Flutter, packages like rxdart provide these and many more operators.

Practical Example: Real-Time Stock Ticker

Let’s combine everything into a realistic real-time data feed that demonstrates StreamControllers, transformers, multiple listeners, and subscription management.

Stock Price Feed

import 'dart:async';
import 'dart:math';

class StockPrice {
  final String symbol;
  final double price;
  final double change;
  final DateTime timestamp;

  StockPrice(this.symbol, this.price, this.change)
      : timestamp = DateTime.now();

  String get direction => change >= 0 ? '+' : '';

  @override
  String toString() =>
      '$symbol: \$${price.toStringAsFixed(2)} ($direction${change.toStringAsFixed(2)}%)';
}

class StockFeed {
  final _controller = StreamController<StockPrice>.broadcast();
  Timer? _timer;
  final _random = Random();
  final Map<String, double> _prices = {};

  Stream<StockPrice> get prices => _controller.stream;

  void start(List<String> symbols, {Duration interval = const Duration(seconds: 1)}) {
    // Initialize prices
    for (final symbol in symbols) {
      _prices[symbol] = 100.0 + _random.nextDouble() * 200;
    }

    _timer = Timer.periodic(interval, (_) {
      // Pick a random stock to update
      final symbol = symbols[_random.nextInt(symbols.length)];
      final currentPrice = _prices[symbol]!;

      // Random price change between -3% and +3%
      final changePercent = (_random.nextDouble() - 0.5) * 6;
      final newPrice = currentPrice * (1 + changePercent / 100);
      _prices[symbol] = newPrice;

      _controller.add(StockPrice(symbol, newPrice, changePercent));
    });

    print('Stock feed started for: ${symbols.join(", ")}\n');
  }

  Future<void> stop() async {
    _timer?.cancel();
    await _controller.close();
    print('\nStock feed stopped.');
  }
}

void main() async {
  final feed = StockFeed();
  feed.start(['AAPL', 'GOOGL', 'MSFT', 'AMZN'], interval: Duration(milliseconds: 300));

  // Listener 1: All price updates
  final allUpdates = feed.prices.take(12).listen(
    (price) => print('[TICKER] $price'),
  );

  // Listener 2: Only significant changes (more than 1%)
  final alerts = feed.prices
      .where((price) => price.change.abs() > 1.0)
      .take(3)
      .listen(
        (price) => print('[ALERT!] Big move: $price'),
      );

  // Wait for both listeners to finish
  await allUpdates.asFuture();

  await feed.stop();
}
Warning: Always close your StreamController when you are done with it. Failing to close controllers is a common source of memory leaks. In Flutter, close controllers in the dispose() method of your State class. Similarly, always cancel StreamSubscription objects when the widget is disposed.
Note: The patterns you learned in this lesson -- StreamControllers, broadcast streams, transformers, subscriptions -- are the exact same patterns used in Flutter state management solutions like BLoC (Business Logic Component). A BLoC class is essentially a class with StreamControllers that receives events on a sink and outputs states on a stream.

Summary

In this lesson, you mastered advanced stream concepts in Dart:

  • StreamController -- create custom streams with full control over the data flow
  • StreamController.broadcast -- support multiple simultaneous listeners
  • add / addError / close -- push events, errors, and terminate streams
  • StreamSink -- the write-only side of a controller
  • StreamSubscription -- pause, resume, and cancel listeners
  • StreamTransformer -- build reusable, stateful stream processing logic
  • Debounce and Throttle -- essential UI patterns for rate-limiting events
  • Lifecycle callbacks -- react to listener changes in controllers

You now have a solid foundation in Dart’s asynchronous programming model. These skills are essential for building reactive Flutter applications, handling real-time data, and implementing patterns like BLoC for state management.