Redis Streams
Redis Streams
Redis Streams is a powerful data structure designed for high-performance message streaming and event sourcing. It combines features of append-only logs, message queues, and consumer groups to create a robust distributed messaging system.
What are Redis Streams?
Streams are append-only log data structures where each entry has a unique ID and can contain multiple field-value pairs:
✓ Append-only: New entries always added at the end ✓ Persistent: Messages stored on disk (unlike Pub/Sub) ✓ Ordered: Entries have auto-incrementing timestamps ✓ Queryable: Read by ID range or time range ✓ Consumer Groups: Multiple consumers process stream in parallel ✓ Acknowledgement: Track which messages were processed
XADD - Adding Entries to Stream
Add new entries to a stream with automatic ID generation:
# Add entry with auto-generated ID (timestamp-sequence) XADD orders * user_id 123 product laptop price 1299.99 # Returns: "1705856400000-0" (timestamp in ms - sequence number) # Add entry with specific ID XADD orders 1705856400000-1 user_id 124 product mouse price 25.99 # Add with maximum length (keep only last 1000 entries) XADD orders MAXLEN 1000 * user_id 125 product keyboard price 79.99 # Approximate trimming (faster, keeps ~1000 entries) XADD orders MAXLEN ~ 1000 * user_id 126 product monitor price 399.99
timestamp-sequence. Redis auto-generates IDs using current time in milliseconds plus a sequence number for entries in the same millisecond.Laravel Redis Streams
Laravel doesn't have built-in Stream support, but you can use raw Redis commands:
use Illuminate\Support\Facades\Redis;
// Add order event to stream
$orderId = Redis::xadd('orders', '*', [
'user_id' => 123,
'product' => 'laptop',
'price' => 1299.99,
'status' => 'pending',
'timestamp' => now()->toIso8601String()
]);
// Returns: "1705856400000-0"
// Add with max length
Redis::xadd('orders', 'MAXLEN', '~', 10000, '*', [
'user_id' => 124,
'product' => 'mouse',
'price' => 25.99
]);XREAD - Reading Stream Entries
Read entries from one or multiple streams:
# Read from beginning (ID 0-0 means start) XREAD COUNT 10 STREAMS orders 0-0 # Read only new entries ($ means latest) XREAD STREAMS orders $ # Blocking read - wait for new entries (2000ms timeout) XREAD BLOCK 2000 STREAMS orders $ # Read from multiple streams XREAD COUNT 5 STREAMS orders notifications 0-0 0-0
class OrderStreamProcessor
{
public function processOrders()
{
$lastId = $this->getLastProcessedId() ?? '0-0';
while (true) {
// Blocking read - wait up to 5 seconds for new entries
$entries = Redis::xread(['orders'], [$lastId], 10, 5000);
if (empty($entries)) {
continue; // No new entries, retry
}
foreach ($entries['orders'] ?? [] as $id => $data) {
try {
$this->processOrder($id, $data);
$lastId = $id; // Update last processed ID
$this->saveLastProcessedId($lastId);
} catch (\Exception $e) {
logger()->error("Failed to process order {$id}", [
'error' => $e->getMessage()
]);
}
}
}
}
private function processOrder(string $id, array $data)
{
logger()->info("Processing order {$id}", $data);
// Process the order...
$order = Order::create([
'user_id' => $data['user_id'],
'product' => $data['product'],
'price' => $data['price']
]);
// Send confirmation email, update inventory, etc.
}
}XRANGE - Query by ID Range
Read entries within a specific ID or time range:
# Read all entries XRANGE orders - + # Read entries from specific ID to latest XRANGE orders 1705856400000-0 + # Read last 10 entries (reverse range) XREVRANGE orders + - COUNT 10 # Read entries in time range (1 hour window) XRANGE orders 1705856400000 1705860000000
public function getRecentOrders(int $count = 100): array
{
// Get last $count entries
$entries = Redis::xrevrange('orders', '+', '-', $count);
$orders = [];
foreach ($entries as $id => $data) {
$orders[] = [
'id' => $id,
'timestamp' => $this->extractTimestamp($id),
'data' => $data
];
}
return $orders;
}
private function extractTimestamp(string $id): int
{
return (int) explode('-', $id)[0];
}Consumer Groups
Consumer groups enable parallel processing where multiple consumers share the workload:
# Create consumer group starting from beginning XGROUP CREATE orders order-processors 0-0 # Create group starting from latest entries only XGROUP CREATE orders order-processors $ MKSTREAM # Create multiple groups for different purposes XGROUP CREATE orders email-notifiers 0-0 XGROUP CREATE orders analytics 0-0
XREADGROUP - Reading as Consumer Group
Read entries as part of a consumer group:
# Read as consumer "worker-1" in group "order-processors" XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders > # Blocking read with timeout XREADGROUP GROUP order-processors worker-1 BLOCK 2000 COUNT 5 STREAMS orders > # Read unacknowledged messages (pending for this consumer) XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders 0-0
class OrderConsumer extends Command
{
protected $signature = 'orders:consume {worker}';
public function handle()
{
$workerId = $this->argument('worker');
$group = 'order-processors';
$this->info("Worker {$workerId} started");
while (true) {
// Read new messages for this consumer
$messages = Redis::xreadgroup(
$group,
$workerId,
['orders'],
['>'], // > means undelivered messages
5, // Read 5 messages
2000 // 2 second timeout
);
if (empty($messages['orders'] ?? [])) {
continue;
}
foreach ($messages['orders'] as $id => $data) {
try {
$this->processOrder($data);
// Acknowledge successful processing
Redis::xack('orders', $group, [$id]);
$this->info("Processed and ACKed: {$id}");
} catch (\Exception $e) {
logger()->error("Failed to process {$id}", [
'error' => $e->getMessage()
]);
// Message remains in pending list for retry
}
}
}
}
}Message Acknowledgement (XACK)
Acknowledge that a message has been successfully processed:
# Acknowledge single message XACK orders order-processors 1705856400000-0 # Acknowledge multiple messages XACK orders order-processors 1705856400000-0 1705856400000-1 1705856400000-2 # Returns: (integer) 3 (number of messages acknowledged)
Handling Pending Messages
Monitor and handle messages that haven't been acknowledged:
# View pending messages summary XPENDING orders order-processors # View detailed pending messages XPENDING orders order-processors - + 10 worker-1 # Claim abandoned messages (idle for 60000ms = 1 minute) XAUTOCLAIM orders order-processors worker-2 60000 0-0 COUNT 10
class DeadLetterHandler extends Command
{
protected $signature = 'orders:dlq';
public function handle()
{
$group = 'order-processors';
$idleTimeout = 300000; // 5 minutes
while (true) {
// Get pending messages idle for >5 minutes
$pending = Redis::xpending('orders', $group, '-', '+', 100);
foreach ($pending as $entry) {
[$id, $consumer, $idleTime, $deliveryCount] = $entry;
if ($idleTime > $idleTimeout && $deliveryCount > 3) {
// Message failed 3+ times, move to dead letter queue
$data = Redis::xrange('orders', $id, $id);
Redis::xadd('orders:dlq', '*', $data[$id]);
Redis::xack('orders', $group, [$id]);
$this->warn("Moved to DLQ: {$id}");
}
}
sleep(60); // Check every minute
}
}
}Stream Trimming
Limit stream size to prevent unlimited growth:
# Exact trimming to 1000 entries (slower)
XTRIM orders MAXLEN 1000
# Approximate trimming (faster, keeps ~1000 entries)
XTRIM orders MAXLEN ~ 1000
# Trim by minimum ID (remove entries older than this)
XTRIM orders MINID 1705856400000-0
# Laravel - Scheduled trimming
// In App\Console\Kernel
$schedule->call(function () {
Redis::xtrim('orders', 'MAXLEN', '~', 10000);
})->daily();Event Sourcing with Streams
Use streams for event sourcing patterns:
class UserAccountEventStore
{
private const STREAM = 'user:events:';
public function appendEvent(int $userId, string $eventType, array $data)
{
$streamKey = self::STREAM . $userId;
return Redis::xadd($streamKey, '*', [
'event_type' => $eventType,
'data' => json_encode($data),
'timestamp' => now()->toIso8601String()
]);
}
public function replayEvents(int $userId): array
{
$streamKey = self::STREAM . $userId;
$events = Redis::xrange($streamKey, '-', '+');
$state = [
'balance' => 0,
'status' => 'inactive'
];
foreach ($events as $id => $event) {
$data = json_decode($event['data'], true);
match($event['event_type']) {
'account_created' => $state['status'] = 'active',
'balance_credited' => $state['balance'] += $data['amount'],
'balance_debited' => $state['balance'] -= $data['amount'],
'account_suspended' => $state['status'] = 'suspended',
default => null
};
}
return $state;
}
}- Create an order processing system using Redis Streams with consumer groups
- Implement 3 worker processes that consume from the same stream in parallel
- Add proper acknowledgement and error handling
- Build a dead letter queue handler for failed messages (retry 3 times, then move to DLQ)
- Implement an event sourcing pattern for user account state (created, credited, debited, suspended)
- Add monitoring to track pending messages, consumer lag, and processing rate