Stream Controllers & Transformations
What Is a StreamController?
In the previous lesson, you consumed streams created by factory constructors like Stream.fromIterable and Stream.periodic. But what about creating your own streams that you can push data into on demand? That is exactly what StreamController does.
A StreamController<T> gives you full control over a stream. It provides:
- A sink side -- where you add data, errors, and close the stream
- A stream side -- where listeners receive events
Think of it like a pipe: you push data in one end (the sink), and listeners receive it from the other end (the stream).
Basic StreamController
import 'dart:async';
void main() {
// Create a controller for String events
final controller = StreamController<String>();
// Listen to the stream side
controller.stream.listen(
(message) => print('Received: $message'),
onError: (error) => print('Error: $error'),
onDone: () => print('Stream closed!'),
);
// Push data through the sink side
controller.add('Hello');
controller.add('World');
controller.add('Dart Streams');
// Close the stream (no more data can be added)
controller.close();
}
// Output:
// Received: Hello
// Received: World
// Received: Dart Streams
// Stream closed!
StreamController creates a single-subscription stream. Only one listener can subscribe to controller.stream. Attempting to add a second listener will throw a StateError.StreamController.broadcast
StreamController.broadcast() creates a controller whose stream supports multiple simultaneous listeners. Each listener receives events independently from the point they subscribe.
Broadcast StreamController
import 'dart:async';
void main() {
final controller = StreamController<String>.broadcast();
// First listener
controller.stream.listen(
(msg) => print('Logger: $msg'),
);
// Second listener
controller.stream.listen(
(msg) => print('Analytics: $msg'),
);
controller.add('User signed in');
controller.add('Page viewed: Home');
controller.add('Button clicked: Buy Now');
controller.close();
}
// Output:
// Logger: User signed in
// Analytics: User signed in
// Logger: Page viewed: Home
// Analytics: Page viewed: Home
// Logger: Button clicked: Buy Now
// Analytics: Button clicked: Buy Now
Adding Data, Errors, and Closing
A StreamController provides three key methods for pushing events:
add(), addError(), and close()
import 'dart:async';
void main() {
final controller = StreamController<int>();
controller.stream.listen(
(data) => print('Data: $data'),
onError: (error) => print('Error: $error'),
onDone: () => print('Done!'),
);
// Add data events
controller.add(10);
controller.add(20);
// Add an error event (does not close the stream)
controller.addError(Exception('Something broke'));
// Continue adding data after an error
controller.add(30);
// Close the stream (triggers onDone)
controller.close();
// controller.add(40); // ERROR! Cannot add to closed controller
}
// Output:
// Data: 10
// Data: 20
// Error: Exception: Something broke
// Data: 30
// Done!
Using the Sink Property
import 'dart:async';
// A function that only needs to write to the stream
void populateData(StreamSink<String> sink) {
sink.add('First item');
sink.add('Second item');
sink.add('Third item');
sink.close();
}
void main() {
final controller = StreamController<String>();
controller.stream.listen(
(item) => print('Got: $item'),
onDone: () => print('All items received'),
);
// Pass the sink to a producer function
populateData(controller.sink);
}
StreamSink as a parameter type instead of StreamController is a good practice. It limits access to only the writing side, preventing consumers from accidentally listening or closing the controller. This follows the principle of least privilege.StreamSubscription: Controlling the Listener
When you call listen() on a stream, it returns a StreamSubscription object. This gives you control over the listening lifecycle: you can pause, resume, and cancel the subscription.
Managing StreamSubscription
import 'dart:async';
void main() async {
final controller = StreamController<int>();
// listen() returns a StreamSubscription
final subscription = controller.stream.listen(
(data) => print('Received: $data'),
onDone: () => print('Done'),
);
controller.add(1);
controller.add(2);
// Pause the subscription -- events are buffered
subscription.pause();
print('--- Paused ---');
controller.add(3); // Buffered, not delivered yet
controller.add(4); // Buffered
await Future.delayed(Duration(seconds: 1));
// Resume -- buffered events are delivered
subscription.resume();
print('--- Resumed ---');
await Future.delayed(Duration(milliseconds: 100));
controller.add(5);
// Cancel -- no more events will be received
await subscription.cancel();
print('--- Cancelled ---');
controller.add(6); // This will not be received
controller.close();
}
// Output:
// Received: 1
// Received: 2
// --- Paused ---
// --- Resumed ---
// Received: 3
// Received: 4
// Received: 5
// --- Cancelled ---
StreamController Lifecycle Callbacks
StreamController supports lifecycle callbacks that let you react when listeners subscribe, pause, resume, or cancel. This is essential for managing resources efficiently.
Lifecycle Callbacks
import 'dart:async';
void main() {
late StreamSubscription<int> subscription;
final controller = StreamController<int>(
onListen: () {
print('Someone started listening!');
},
onPause: () {
print('Listener paused -- stopping data production');
},
onResume: () {
print('Listener resumed -- restarting data production');
},
onCancel: () {
print('Listener cancelled -- cleaning up resources');
},
);
subscription = controller.stream.listen(
(data) => print('Data: $data'),
);
controller.add(1);
controller.add(2);
subscription.pause();
subscription.resume();
controller.add(3);
subscription.cancel();
}
// Output:
// Someone started listening!
// Data: 1
// Data: 2
// Listener paused -- stopping data production
// Listener resumed -- restarting data production
// Data: 3
// Listener cancelled -- cleaning up resources
StreamTransformer
A StreamTransformer is a reusable object that takes a stream as input and produces a transformed stream as output. Unlike map() or where() which are simple operations, a StreamTransformer can maintain state across events, combine multiple events, or change the timing of events.
Creating a Custom StreamTransformer
import 'dart:async';
// A transformer that batches events into groups
StreamTransformer<T, List<T>> batchTransformer<T>(int batchSize) {
return StreamTransformer<T, List<T>>.fromHandlers(
handleData: (data, sink) {
// We need static storage -- use a closure variable
},
);
}
// Better approach: full StreamTransformer with state
class BatchTransformer<T> extends StreamTransformerBase<T, List<T>> {
final int batchSize;
BatchTransformer(this.batchSize);
@override
Stream<List<T>> bind(Stream<T> stream) {
final batch = <T>[];
return stream.transform(
StreamTransformer<T, List<T>>.fromHandlers(
handleData: (data, sink) {
batch.add(data);
if (batch.length >= batchSize) {
sink.add(List<T>.from(batch));
batch.clear();
}
},
handleDone: (sink) {
if (batch.isNotEmpty) {
sink.add(List<T>.from(batch));
batch.clear();
}
sink.close();
},
),
);
}
}
void main() {
final numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
numbers.transform(BatchTransformer(3)).listen(
(batch) => print('Batch: $batch'),
onDone: () => print('All batches processed'),
);
}
// Output:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]
// All batches processed
StreamTransformer.fromHandlers
import 'dart:async';
void main() {
// A simple transformer that doubles numbers and filters out odds
final doubleEvens = StreamTransformer<int, int>.fromHandlers(
handleData: (data, sink) {
if (data.isEven) {
sink.add(data * 2);
}
// Odd numbers are silently dropped
},
handleError: (error, stackTrace, sink) {
print('Transformer caught error: $error');
// Optionally forward: sink.addError(error, stackTrace);
},
handleDone: (sink) {
print('Transformer done');
sink.close();
},
);
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8])
.transform(doubleEvens)
.listen(
(data) => print('Result: $data'),
onDone: () => print('Stream complete'),
);
}
// Output:
// Result: 4
// Result: 8
// Result: 12
// Result: 16
// Transformer done
// Stream complete
Practical Example: Chat System
Let’s build a simple chat system using StreamControllers to demonstrate how multiple components can communicate through streams.
Chat Room with Streams
import 'dart:async';
class ChatMessage {
final String sender;
final String text;
final DateTime timestamp;
ChatMessage(this.sender, this.text) : timestamp = DateTime.now();
@override
String toString() =>
'[${timestamp.hour}:${timestamp.minute.toString().padLeft(2, "0")}] $sender: $text';
}
class ChatRoom {
final String name;
final _controller = StreamController<ChatMessage>.broadcast();
ChatRoom(this.name) {
print('Chat room "$name" created.\n');
}
// Public stream for listeners
Stream<ChatMessage> get messages => _controller.stream;
// Send a message
void send(String sender, String text) {
final message = ChatMessage(sender, text);
_controller.add(message);
}
// Close the chat room
Future<void> close() async {
await _controller.close();
print('\nChat room "$name" closed.');
}
}
class ChatUser {
final String name;
StreamSubscription<ChatMessage>? _subscription;
ChatUser(this.name);
void joinRoom(ChatRoom room) {
_subscription = room.messages
.where((msg) => msg.sender != name) // Do not echo own messages
.listen(
(msg) => print('[$name sees] $msg'),
);
print('$name joined the chat room.');
}
void leaveRoom() {
_subscription?.cancel();
_subscription = null;
print('$name left the chat room.');
}
}
void main() async {
final room = ChatRoom('Dart Developers');
final ahmed = ChatUser('Ahmed');
final sara = ChatUser('Sara');
final omar = ChatUser('Omar');
ahmed.joinRoom(room);
sara.joinRoom(room);
omar.joinRoom(room);
print('');
// Simulate chat messages
room.send('Ahmed', 'Hello everyone!');
await Future.delayed(Duration(milliseconds: 100));
room.send('Sara', 'Hi Ahmed! How are you?');
await Future.delayed(Duration(milliseconds: 100));
room.send('Omar', 'Hey team! Working on streams today.');
await Future.delayed(Duration(milliseconds: 100));
// Omar leaves
omar.leaveRoom();
room.send('Ahmed', 'Omar left? See you later!');
await Future.delayed(Duration(milliseconds: 100));
await room.close();
}
Practical Example: Real-Time Data Feed with Debounce
Debouncing is a common pattern where you wait for a pause in events before processing. This is essential for search-as-you-type features to avoid making API calls on every keystroke.
Debounce Implementation
import 'dart:async';
// A debounce transformer that waits for a pause in events
class DebounceTransformer<T> extends StreamTransformerBase<T, T> {
final Duration duration;
DebounceTransformer(this.duration);
@override
Stream<T> bind(Stream<T> stream) {
Timer? timer;
final controller = StreamController<T>();
stream.listen(
(data) {
timer?.cancel();
timer = Timer(duration, () {
controller.add(data);
});
},
onError: (error) => controller.addError(error),
onDone: () {
timer?.cancel();
controller.close();
},
);
return controller.stream;
}
}
// A throttle transformer that only allows one event per duration
class ThrottleTransformer<T> extends StreamTransformerBase<T, T> {
final Duration duration;
ThrottleTransformer(this.duration);
@override
Stream<T> bind(Stream<T> stream) {
Timer? cooldown;
final controller = StreamController<T>();
stream.listen(
(data) {
if (cooldown == null || !cooldown!.isActive) {
controller.add(data);
cooldown = Timer(duration, () {});
}
},
onError: (error) => controller.addError(error),
onDone: () => controller.close(),
);
return controller.stream;
}
}
// Simulate a search box
void main() async {
final searchInput = StreamController<String>();
// Apply debounce: wait 500ms after user stops typing
searchInput.stream
.transform(DebounceTransformer(Duration(milliseconds: 500)))
.listen((query) {
print('Searching for: "$query"');
});
// Simulate typing "dart streams" character by character
final characters = 'dart streams'.split('');
String typed = '';
for (final char in characters) {
typed += char;
searchInput.add(typed);
await Future.delayed(Duration(milliseconds: 100)); // Fast typing
}
// Wait for debounce to fire
await Future.delayed(Duration(seconds: 1));
searchInput.close();
}
// Output:
// Searching for: "dart streams"
// (Only one search, not 12 searches!)
rxdart provide these and many more operators.Practical Example: Real-Time Stock Ticker
Let’s combine everything into a realistic real-time data feed that demonstrates StreamControllers, transformers, multiple listeners, and subscription management.
Stock Price Feed
import 'dart:async';
import 'dart:math';
class StockPrice {
final String symbol;
final double price;
final double change;
final DateTime timestamp;
StockPrice(this.symbol, this.price, this.change)
: timestamp = DateTime.now();
String get direction => change >= 0 ? '+' : '';
@override
String toString() =>
'$symbol: \$${price.toStringAsFixed(2)} ($direction${change.toStringAsFixed(2)}%)';
}
class StockFeed {
final _controller = StreamController<StockPrice>.broadcast();
Timer? _timer;
final _random = Random();
final Map<String, double> _prices = {};
Stream<StockPrice> get prices => _controller.stream;
void start(List<String> symbols, {Duration interval = const Duration(seconds: 1)}) {
// Initialize prices
for (final symbol in symbols) {
_prices[symbol] = 100.0 + _random.nextDouble() * 200;
}
_timer = Timer.periodic(interval, (_) {
// Pick a random stock to update
final symbol = symbols[_random.nextInt(symbols.length)];
final currentPrice = _prices[symbol]!;
// Random price change between -3% and +3%
final changePercent = (_random.nextDouble() - 0.5) * 6;
final newPrice = currentPrice * (1 + changePercent / 100);
_prices[symbol] = newPrice;
_controller.add(StockPrice(symbol, newPrice, changePercent));
});
print('Stock feed started for: ${symbols.join(", ")}\n');
}
Future<void> stop() async {
_timer?.cancel();
await _controller.close();
print('\nStock feed stopped.');
}
}
void main() async {
final feed = StockFeed();
feed.start(['AAPL', 'GOOGL', 'MSFT', 'AMZN'], interval: Duration(milliseconds: 300));
// Listener 1: All price updates
final allUpdates = feed.prices.take(12).listen(
(price) => print('[TICKER] $price'),
);
// Listener 2: Only significant changes (more than 1%)
final alerts = feed.prices
.where((price) => price.change.abs() > 1.0)
.take(3)
.listen(
(price) => print('[ALERT!] Big move: $price'),
);
// Wait for both listeners to finish
await allUpdates.asFuture();
await feed.stop();
}
StreamController when you are done with it. Failing to close controllers is a common source of memory leaks. In Flutter, close controllers in the dispose() method of your State class. Similarly, always cancel StreamSubscription objects when the widget is disposed.Summary
In this lesson, you mastered advanced stream concepts in Dart:
- StreamController -- create custom streams with full control over the data flow
- StreamController.broadcast -- support multiple simultaneous listeners
- add / addError / close -- push events, errors, and terminate streams
- StreamSink -- the write-only side of a controller
- StreamSubscription -- pause, resume, and cancel listeners
- StreamTransformer -- build reusable, stateful stream processing logic
- Debounce and Throttle -- essential UI patterns for rate-limiting events
- Lifecycle callbacks -- react to listener changes in controllers
You now have a solid foundation in Dart’s asynchronous programming model. These skills are essential for building reactive Flutter applications, handling real-time data, and implementing patterns like BLoC for state management.