Redis & Advanced Caching

Redis Streams

20 min Lesson 14 of 42

Redis Streams

Redis Streams is a powerful data structure designed for high-performance message streaming and event sourcing. It combines features of append-only logs, message queues, and consumer groups to create a robust distributed messaging system.

What are Redis Streams?

Streams are append-only log data structures where each entry has a unique ID and can contain multiple field-value pairs:

Key Characteristics:
✓ Append-only: New entries always added at the end
✓ Persistent: Messages stored on disk (unlike Pub/Sub)
✓ Ordered: Entries have auto-incrementing timestamps
✓ Queryable: Read by ID range or time range
✓ Consumer Groups: Multiple consumers process stream in parallel
✓ Acknowledgement: Track which messages were processed

XADD - Adding Entries to Stream

Add new entries to a stream with automatic ID generation:

Redis CLI:
# Add entry with auto-generated ID (timestamp-sequence)
XADD orders * user_id 123 product laptop price 1299.99
# Returns: "1705856400000-0"  (timestamp in ms - sequence number)

# Add entry with specific ID
XADD orders 1705856400000-1 user_id 124 product mouse price 25.99

# Add with maximum length (keep only last 1000 entries)
XADD orders MAXLEN 1000 * user_id 125 product keyboard price 79.99

# Approximate trimming (faster, keeps ~1000 entries)
XADD orders MAXLEN ~ 1000 * user_id 126 product monitor price 399.99
Entry ID Format: IDs are in format timestamp-sequence. Redis auto-generates IDs using current time in milliseconds plus a sequence number for entries in the same millisecond.

Laravel Redis Streams

Laravel doesn't have built-in Stream support, but you can use raw Redis commands:

Adding Entries:
use Illuminate\Support\Facades\Redis;

// Add order event to stream
$orderId = Redis::xadd('orders', '*', [
    'user_id' => 123,
    'product' => 'laptop',
    'price' => 1299.99,
    'status' => 'pending',
    'timestamp' => now()->toIso8601String()
]);
// Returns: "1705856400000-0"

// Add with max length
Redis::xadd('orders', 'MAXLEN', '~', 10000, '*', [
    'user_id' => 124,
    'product' => 'mouse',
    'price' => 25.99
]);

XREAD - Reading Stream Entries

Read entries from one or multiple streams:

Basic Reading:
# Read from beginning (ID 0-0 means start)
XREAD COUNT 10 STREAMS orders 0-0

# Read only new entries ($ means latest)
XREAD STREAMS orders $

# Blocking read - wait for new entries (2000ms timeout)
XREAD BLOCK 2000 STREAMS orders $

# Read from multiple streams
XREAD COUNT 5 STREAMS orders notifications 0-0 0-0
Laravel Example - Event Processor:
class OrderStreamProcessor
{
    public function processOrders()
    {
        $lastId = $this->getLastProcessedId() ?? '0-0';

        while (true) {
            // Blocking read - wait up to 5 seconds for new entries
            $entries = Redis::xread(['orders'], [$lastId], 10, 5000);

            if (empty($entries)) {
                continue;  // No new entries, retry
            }

            foreach ($entries['orders'] ?? [] as $id => $data) {
                try {
                    $this->processOrder($id, $data);
                    $lastId = $id;  // Update last processed ID
                    $this->saveLastProcessedId($lastId);
                } catch (\Exception $e) {
                    logger()->error("Failed to process order {$id}", [
                        'error' => $e->getMessage()
                    ]);
                }
            }
        }
    }

    private function processOrder(string $id, array $data)
    {
        logger()->info("Processing order {$id}", $data);

        // Process the order...
        $order = Order::create([
            'user_id' => $data['user_id'],
            'product' => $data['product'],
            'price' => $data['price']
        ]);

        // Send confirmation email, update inventory, etc.
    }
}

XRANGE - Query by ID Range

Read entries within a specific ID or time range:

Range Queries:
# Read all entries
XRANGE orders - +

# Read entries from specific ID to latest
XRANGE orders 1705856400000-0 +

# Read last 10 entries (reverse range)
XREVRANGE orders + - COUNT 10

# Read entries in time range (1 hour window)
XRANGE orders 1705856400000 1705860000000
Laravel - Get Recent Events:
public function getRecentOrders(int $count = 100): array
{
    // Get last $count entries
    $entries = Redis::xrevrange('orders', '+', '-', $count);

    $orders = [];
    foreach ($entries as $id => $data) {
        $orders[] = [
            'id' => $id,
            'timestamp' => $this->extractTimestamp($id),
            'data' => $data
        ];
    }

    return $orders;
}

private function extractTimestamp(string $id): int
{
    return (int) explode('-', $id)[0];
}

Consumer Groups

Consumer groups enable parallel processing where multiple consumers share the workload:

Creating Consumer Groups:
# Create consumer group starting from beginning
XGROUP CREATE orders order-processors 0-0

# Create group starting from latest entries only
XGROUP CREATE orders order-processors $ MKSTREAM

# Create multiple groups for different purposes
XGROUP CREATE orders email-notifiers 0-0
XGROUP CREATE orders analytics 0-0
Use Cases for Consumer Groups: Process high-volume streams with multiple workers, ensure each message is processed exactly once, enable different consumers to process the same stream independently.

XREADGROUP - Reading as Consumer Group

Read entries as part of a consumer group:

Consumer Group Reading:
# Read as consumer "worker-1" in group "order-processors"
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders >

# Blocking read with timeout
XREADGROUP GROUP order-processors worker-1 BLOCK 2000 COUNT 5 STREAMS orders >

# Read unacknowledged messages (pending for this consumer)
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders 0-0
Laravel - Consumer Worker:
class OrderConsumer extends Command
{
    protected $signature = 'orders:consume {worker}';

    public function handle()
    {
        $workerId = $this->argument('worker');
        $group = 'order-processors';

        $this->info("Worker {$workerId} started");

        while (true) {
            // Read new messages for this consumer
            $messages = Redis::xreadgroup(
                $group,
                $workerId,
                ['orders'],
                ['>'],  // > means undelivered messages
                5,        // Read 5 messages
                2000      // 2 second timeout
            );

            if (empty($messages['orders'] ?? [])) {
                continue;
            }

            foreach ($messages['orders'] as $id => $data) {
                try {
                    $this->processOrder($data);

                    // Acknowledge successful processing
                    Redis::xack('orders', $group, [$id]);

                    $this->info("Processed and ACKed: {$id}");
                } catch (\Exception $e) {
                    logger()->error("Failed to process {$id}", [
                        'error' => $e->getMessage()
                    ]);
                    // Message remains in pending list for retry
                }
            }
        }
    }
}

Message Acknowledgement (XACK)

Acknowledge that a message has been successfully processed:

Acknowledgement:
# Acknowledge single message
XACK orders order-processors 1705856400000-0

# Acknowledge multiple messages
XACK orders order-processors 1705856400000-0 1705856400000-1 1705856400000-2

# Returns: (integer) 3  (number of messages acknowledged)
Important: Always acknowledge messages after successful processing. Unacknowledged messages remain in the "pending entries list" and can be re-delivered to other consumers.

Handling Pending Messages

Monitor and handle messages that haven't been acknowledged:

Check Pending Messages:
# View pending messages summary
XPENDING orders order-processors

# View detailed pending messages
XPENDING orders order-processors - + 10 worker-1

# Claim abandoned messages (idle for 60000ms = 1 minute)
XAUTOCLAIM orders order-processors worker-2 60000 0-0 COUNT 10
Laravel - Dead Letter Queue Handler:
class DeadLetterHandler extends Command
{
    protected $signature = 'orders:dlq';

    public function handle()
    {
        $group = 'order-processors';
        $idleTimeout = 300000;  // 5 minutes

        while (true) {
            // Get pending messages idle for >5 minutes
            $pending = Redis::xpending('orders', $group, '-', '+', 100);

            foreach ($pending as $entry) {
                [$id, $consumer, $idleTime, $deliveryCount] = $entry;

                if ($idleTime > $idleTimeout && $deliveryCount > 3) {
                    // Message failed 3+ times, move to dead letter queue
                    $data = Redis::xrange('orders', $id, $id);
                    Redis::xadd('orders:dlq', '*', $data[$id]);
                    Redis::xack('orders', $group, [$id]);

                    $this->warn("Moved to DLQ: {$id}");
                }
            }

            sleep(60);  // Check every minute
        }
    }
}

Stream Trimming

Limit stream size to prevent unlimited growth:

Trimming Strategies:
# Exact trimming to 1000 entries (slower)
XTRIM orders MAXLEN 1000

# Approximate trimming (faster, keeps ~1000 entries)
XTRIM orders MAXLEN ~ 1000

# Trim by minimum ID (remove entries older than this)
XTRIM orders MINID 1705856400000-0

# Laravel - Scheduled trimming
// In App\Console\Kernel
$schedule->call(function () {
    Redis::xtrim('orders', 'MAXLEN', '~', 10000);
})->daily();

Event Sourcing with Streams

Use streams for event sourcing patterns:

Event Sourcing Example:
class UserAccountEventStore
{
    private const STREAM = 'user:events:';

    public function appendEvent(int $userId, string $eventType, array $data)
    {
        $streamKey = self::STREAM . $userId;

        return Redis::xadd($streamKey, '*', [
            'event_type' => $eventType,
            'data' => json_encode($data),
            'timestamp' => now()->toIso8601String()
        ]);
    }

    public function replayEvents(int $userId): array
    {
        $streamKey = self::STREAM . $userId;
        $events = Redis::xrange($streamKey, '-', '+');

        $state = [
            'balance' => 0,
            'status' => 'inactive'
        ];

        foreach ($events as $id => $event) {
            $data = json_decode($event['data'], true);

            match($event['event_type']) {
                'account_created' => $state['status'] = 'active',
                'balance_credited' => $state['balance'] += $data['amount'],
                'balance_debited' => $state['balance'] -= $data['amount'],
                'account_suspended' => $state['status'] = 'suspended',
                default => null
            };
        }

        return $state;
    }
}
Practice Exercise:
  1. Create an order processing system using Redis Streams with consumer groups
  2. Implement 3 worker processes that consume from the same stream in parallel
  3. Add proper acknowledgement and error handling
  4. Build a dead letter queue handler for failed messages (retry 3 times, then move to DLQ)
  5. Implement an event sourcing pattern for user account state (created, credited, debited, suspended)
  6. Add monitoring to track pending messages, consumer lag, and processing rate

ES
Edrees Salih
1 hour ago

We are still cooking the magic in the way!