Dart Advanced Features

Streams Fundamentals

55 min Lesson 3 of 16

What Are Streams?

In previous lessons, you learned about Future -- a class that represents a single asynchronous value. But what about situations where you need to handle a sequence of asynchronous values over time? That is where Streams come in.

A Stream<T> delivers multiple values of type T asynchronously, one at a time. Think of it like a conveyor belt: items arrive one by one, and you process each item as it comes. Streams are fundamental to many real-world scenarios:

  • User input events (taps, keystrokes, gestures)
  • Data from WebSocket connections
  • Reading a file line by line
  • GPS location updates
  • Notifications from a database
  • Timer ticks
Note: Future is like ordering a package online -- you get one delivery. Stream is like a newsletter subscription -- you receive multiple deliveries over time until you unsubscribe.

Single-Subscription vs Broadcast Streams

Dart has two types of streams, and understanding the difference is critical:

Single-Subscription Streams

A single-subscription stream allows only one listener at a time. If you try to listen a second time, Dart throws a StateError. Most streams in Dart are single-subscription by default. They are ideal for ordered data sequences like file I/O or HTTP responses.

Broadcast Streams

A broadcast stream allows multiple listeners simultaneously. Each listener receives events independently. They are ideal for events that multiple parts of your app need to react to, like user input or state changes.

Single-Subscription vs Broadcast

import 'dart:async';

void main() {
  // Single-subscription stream
  final singleStream = Stream.fromIterable([1, 2, 3]);

  singleStream.listen((data) => print('Listener A: $data'));
  // singleStream.listen((data) => print('Listener B: $data'));
  // ^ ERROR! Bad state: Stream has already been listened to.

  // Broadcast stream
  final controller = StreamController<int>.broadcast();

  controller.stream.listen((data) => print('Listener 1: $data'));
  controller.stream.listen((data) => print('Listener 2: $data'));

  controller.add(10);
  controller.add(20);
  controller.close();
}

// Output:
// Listener A: 1
// Listener A: 2
// Listener A: 3
// Listener 1: 10
// Listener 2: 10
// Listener 1: 20
// Listener 2: 20
Warning: You can convert a single-subscription stream to broadcast using .asBroadcastStream(), but be careful: broadcast streams do not buffer events. If no one is listening when an event is emitted, that event is lost. Single-subscription streams, on the other hand, buffer events until a listener subscribes.

Creating Streams: Stream.fromIterable

Stream.fromIterable creates a stream from an existing list or iterable. Each element is delivered asynchronously as a separate event.

Stream.fromIterable

void main() {
  final cities = ['Dubai', 'Riyadh', 'Cairo', 'Istanbul', 'Kuala Lumpur'];
  final cityStream = Stream.fromIterable(cities);

  cityStream.listen(
    (city) => print('Visiting: $city'),
    onDone: () => print('Tour complete!'),
  );
}

// Output:
// Visiting: Dubai
// Visiting: Riyadh
// Visiting: Cairo
// Visiting: Istanbul
// Visiting: Kuala Lumpur
// Tour complete!

Creating Streams: Stream.periodic

Stream.periodic creates a stream that emits values at regular intervals, like a clock tick. This is extremely useful for polling, animations, and countdown timers.

Stream.periodic

void main() {
  // Emit values every second, transforming the count
  final timerStream = Stream.periodic(
    Duration(seconds: 1),
    (count) => count + 1,  // Transform: 0,1,2,3... becomes 1,2,3,4...
  ).take(5);  // Only take first 5 values

  timerStream.listen(
    (tick) => print('Tick $tick at ${DateTime.now().second}s'),
    onDone: () => print('Timer finished!'),
  );
}

// Output (one per second):
// Tick 1 at 10s
// Tick 2 at 11s
// Tick 3 at 12s
// Tick 4 at 13s
// Tick 5 at 14s
// Timer finished!

Countdown Timer with Stream.periodic

Stream<int> countdown(int seconds) {
  return Stream.periodic(
    Duration(seconds: 1),
    (tick) => seconds - tick - 1,
  ).take(seconds);
}

void main() {
  print('Starting countdown...');

  countdown(5).listen(
    (remaining) {
      if (remaining > 0) {
        print('$remaining...');
      } else {
        print('Go!');
      }
    },
    onDone: () => print('Countdown complete!'),
  );
}

Creating Streams: Stream.fromFuture and Stream.fromFutures

Stream.fromFuture wraps a single Future as a Stream that emits one value (or error) and then closes. Stream.fromFutures takes a list of Futures and emits each result as it completes.

Stream.fromFuture and Stream.fromFutures

Future<String> fetchUser() =>
    Future.delayed(Duration(seconds: 2), () => 'Ahmed');

Future<String> fetchCity() =>
    Future.delayed(Duration(seconds: 1), () => 'Dubai');

Future<String> fetchRole() =>
    Future.delayed(Duration(seconds: 3), () => 'Developer');

void main() {
  // Single Future as Stream
  Stream.fromFuture(fetchUser()).listen(
    (name) => print('User: $name'),
    onDone: () => print('Stream from single future done\n'),
  );

  // Multiple Futures as Stream (emits in completion order)
  Stream.fromFutures([
    fetchUser(),   // completes at 2s
    fetchCity(),   // completes at 1s
    fetchRole(),   // completes at 3s
  ]).listen(
    (data) => print('Got: $data'),
    onDone: () => print('All futures completed!'),
  );
}

// Output:
// Got: Dubai (1s -- fastest first)
// User: Ahmed
// Got: Ahmed (2s)
// Stream from single future done
// Got: Developer (3s)
// All futures completed!
Tip: Stream.fromFutures emits results as each Future completes, not in the order they were provided. This is useful when you want to display data progressively as it becomes available, rather than waiting for all requests to finish.

Listening to Streams

The primary way to consume a stream is with the listen() method. It accepts callbacks for data events, errors, and stream completion.

Full listen() API

import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) {
      print('Data: $data');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream closed');
    },
    cancelOnError: false,  // Continue listening after errors
  );

  controller.add(1);
  controller.add(2);
  controller.addError(Exception('Something went wrong'));
  controller.add(3);
  controller.close();
}

// Output:
// Data: 1
// Data: 2
// Error: Exception: Something went wrong
// Data: 3
// Stream closed
Note: Setting cancelOnError: true (which is the default for some stream operations) will cancel the subscription when the first error occurs. Set it to false if you want to continue receiving events after an error.

Stream Transformation Methods

Streams provide a rich set of transformation methods that let you build powerful data processing pipelines. These methods return new streams, so they can be chained together.

map() -- Transform Each Element

Stream.map()

void main() {
  Stream.fromIterable([1, 2, 3, 4, 5])
      .map((number) => number * number)
      .listen((squared) => print(squared));
}

// Output: 1, 4, 9, 16, 25

where() -- Filter Elements

Stream.where()

void main() {
  Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      .where((number) => number.isEven)
      .listen((even) => print(even));
}

// Output: 2, 4, 6, 8, 10

expand() -- One-to-Many Transformation

Stream.expand()

void main() {
  Stream.fromIterable(['hello world', 'dart streams'])
      .expand((sentence) => sentence.split(' '))
      .listen((word) => print(word));
}

// Output: hello, world, dart, streams

take() and skip() -- Limit and Offset

Stream.take() and Stream.skip()

void main() {
  final numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

  // Take first 3 elements
  Stream.fromIterable([1, 2, 3, 4, 5])
      .take(3)
      .listen((n) => print('Take: $n'));
  // Take: 1, Take: 2, Take: 3

  // Skip first 3 elements
  Stream.fromIterable([1, 2, 3, 4, 5])
      .skip(3)
      .listen((n) => print('Skip: $n'));
  // Skip: 4, Skip: 5
}

Chaining Transformations

Building a Data Pipeline

void main() {
  final scores = Stream.fromIterable([85, 92, 45, 78, 95, 60, 88, 30, 72, 98]);

  scores
      .where((score) => score >= 70)           // Filter: passing scores only
      .map((score) => score / 100)               // Transform: to percentage
      .map((pct) => '${(pct * 100).toStringAsFixed(0)}%') // Format
      .listen(
        (formatted) => print('Passing: $formatted'),
        onDone: () => print('Processing complete'),
      );
}

// Output:
// Passing: 85%
// Passing: 92%
// Passing: 78%
// Passing: 95%
// Passing: 88%
// Passing: 72%
// Passing: 98%
// Processing complete

Consuming Streams with await for

Just as await works with Futures, await for works with Streams inside an async function. It processes each element as it arrives and automatically completes when the stream closes.

Using await for

Stream<int> generateNumbers() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(milliseconds: 500));
    yield i;
  }
}

Future<void> main() async {
  int sum = 0;

  await for (final number in generateNumbers()) {
    sum += number;
    print('Received $number, running sum: $sum');
  }

  print('Final sum: $sum');
}

// Output (every 500ms):
// Received 1, running sum: 1
// Received 2, running sum: 3
// Received 3, running sum: 6
// Received 4, running sum: 10
// Received 5, running sum: 15
// Final sum: 15
Warning: await for blocks the async function until the stream closes. If the stream never closes (like a broadcast stream from user input), the code after await for will never execute. Use listen() with a subscription for infinite streams, and use await for for finite streams that you know will close.

Useful Stream Properties and Methods

Streams provide several convenience methods that return a single Future with an aggregated result:

Stream Aggregate Methods

Future<void> main() async {
  final numbers = Stream.fromIterable([3, 1, 4, 1, 5, 9, 2, 6]);

  // Get first element
  final first = await Stream.fromIterable([10, 20, 30]).first;
  print('First: $first');  // 10

  // Get last element
  final last = await Stream.fromIterable([10, 20, 30]).last;
  print('Last: $last');  // 30

  // Get length (number of elements)
  final length = await Stream.fromIterable([10, 20, 30]).length;
  print('Length: $length');  // 3

  // Check if empty
  final isEmpty = await Stream.fromIterable([]).isEmpty;
  print('Empty: $isEmpty');  // true

  // Check if any element matches
  final hasLarge = await Stream.fromIterable([3, 1, 4, 15, 9])
      .any((n) => n > 10);
  print('Has large: $hasLarge');  // true

  // Check if all elements match
  final allPositive = await Stream.fromIterable([3, 1, 4, 1, 5])
      .every((n) => n > 0);
  print('All positive: $allPositive');  // true

  // Collect all elements into a list
  final list = await Stream.fromIterable([3, 1, 4]).toList();
  print('List: $list');  // [3, 1, 4]

  // Reduce to single value
  final sum = await Stream.fromIterable([1, 2, 3, 4, 5])
      .reduce((previous, element) => previous + element);
  print('Sum: $sum');  // 15

  // Fold with initial value
  final product = await Stream.fromIterable([1, 2, 3, 4])
      .fold<int>(1, (prev, element) => prev * element);
  print('Product: $product');  // 24
}

Practical Example: Event System

Let’s build an event processing system that simulates a real-time application receiving and processing events through streams.

Event Processing Pipeline

import 'dart:async';

// Event types
class AppEvent {
  final String type;
  final String data;
  final DateTime timestamp;

  AppEvent(this.type, this.data) : timestamp = DateTime.now();

  @override
  String toString() => '[$type] $data';
}

// Simulate events arriving over time
Stream<AppEvent> generateEvents() async* {
  final events = [
    AppEvent('user', 'User logged in'),
    AppEvent('navigation', 'Opened dashboard'),
    AppEvent('api', 'Fetched user profile'),
    AppEvent('error', 'Failed to load notifications'),
    AppEvent('user', 'Changed settings'),
    AppEvent('navigation', 'Opened profile page'),
    AppEvent('api', 'Updated profile photo'),
    AppEvent('user', 'User logged out'),
  ];

  for (final event in events) {
    await Future.delayed(Duration(milliseconds: 300));
    yield event;
  }
}

Future<void> main() async {
  print('=== Event Processing System ===\n');

  // Process all events
  final allEvents = <AppEvent>[];

  await for (final event in generateEvents()) {
    allEvents.add(event);
    print('Received: $event');
  }

  print('\n=== Event Summary ===');
  print('Total events: ${allEvents.length}');

  // Count by type using stream operations
  final userEvents = allEvents.where((e) => e.type == 'user').length;
  final apiEvents = allEvents.where((e) => e.type == 'api').length;
  final errorEvents = allEvents.where((e) => e.type == 'error').length;
  final navEvents = allEvents.where((e) => e.type == 'navigation').length;

  print('User events: $userEvents');
  print('API events: $apiEvents');
  print('Navigation events: $navEvents');
  print('Error events: $errorEvents');
}

Real-Time Data Pipeline

import 'dart:async';
import 'dart:math';

// Simulate sensor readings
Stream<double> temperatureSensor() async* {
  final random = Random();
  double temp = 22.0;

  while (true) {
    await Future.delayed(Duration(seconds: 1));
    // Random fluctuation between -0.5 and +0.5
    temp += (random.nextDouble() - 0.5);
    yield double.parse(temp.toStringAsFixed(1));
  }
}

Future<void> main() async {
  print('=== Temperature Monitor ===\n');

  // Build a processing pipeline
  final subscription = temperatureSensor()
      .take(10)  // Only read 10 values
      .where((temp) => temp > 22.0)  // Alert when above 22
      .map((temp) => 'ALERT: Temperature is ${temp}C (above threshold)')
      .listen(
        (alert) => print(alert),
        onDone: () => print('\nMonitoring session ended.'),
      );

  // The listen returns a StreamSubscription -- we will explore this more
  // in the next lesson on StreamControllers
}
Tip: Stream transformation methods like map(), where(), and take() are lazy -- they do not process data until someone listens to the resulting stream. This makes building complex pipelines efficient because no work is done until it is needed.
Note: Streams are the foundation of reactive programming in Flutter. The StreamBuilder widget, BLoC pattern, and many state management solutions are built on top of Dart streams. Mastering streams here will pay dividends when you build Flutter apps.

Summary

In this lesson, you learned the fundamentals of Streams in Dart:

  • Streams -- deliver multiple asynchronous values over time
  • Single-subscription vs Broadcast -- one listener vs many listeners
  • Stream.fromIterable -- create a stream from a collection
  • Stream.periodic -- emit values at regular intervals
  • Stream.fromFuture / fromFutures -- convert Futures to Streams
  • listen() -- subscribe to stream events with data, error, and done callbacks
  • Transformation methods -- map, where, expand, take, skip for building pipelines
  • await for -- consume streams in async functions
  • Aggregate methods -- first, last, length, reduce, fold, toList

In the next lesson, you’ll learn about StreamControllers -- how to create your own custom streams, manage subscriptions, and build advanced stream transformations.