Redis & Advanced Caching

Redis Streams

20 min Lesson 14 of 30

Redis Streams

Redis Streams is a powerful data structure designed for high-performance message streaming and event sourcing. It combines features of append-only logs, message queues, and consumer groups to create a robust distributed messaging system.

What are Redis Streams?

Streams are append-only log data structures where each entry has a unique ID and can contain multiple field-value pairs:

Key Characteristics:
✓ Append-only: New entries always added at the end
✓ Persistent: Messages stored on disk (unlike Pub/Sub)
✓ Ordered: Entries have auto-incrementing timestamps
✓ Queryable: Read by ID range or time range
✓ Consumer Groups: Multiple consumers process stream in parallel
✓ Acknowledgement: Track which messages were processed

XADD - Adding Entries to Stream

Add new entries to a stream with automatic ID generation:

Redis CLI:
# Add entry with auto-generated ID (timestamp-sequence)
XADD orders * user_id 123 product laptop price 1299.99
# Returns: "1705856400000-0"  (timestamp in ms - sequence number)

# Add entry with specific ID
XADD orders 1705856400000-1 user_id 124 product mouse price 25.99

# Add with maximum length (keep only last 1000 entries)
XADD orders MAXLEN 1000 * user_id 125 product keyboard price 79.99

# Approximate trimming (faster, keeps ~1000 entries)
XADD orders MAXLEN ~ 1000 * user_id 126 product monitor price 399.99
Entry ID Format: IDs are in format timestamp-sequence. Redis auto-generates IDs using current time in milliseconds plus a sequence number for entries in the same millisecond.

Laravel Redis Streams

Laravel doesn't have built-in Stream support, but you can use raw Redis commands:

Adding Entries:
use Illuminate\Support\Facades\Redis;

// Add order event to stream
$orderId = Redis::xadd('orders', '*', [
    'user_id' => 123,
    'product' => 'laptop',
    'price' => 1299.99,
    'status' => 'pending',
    'timestamp' => now()->toIso8601String()
]);
// Returns: "1705856400000-0"

// Add with max length
Redis::xadd('orders', 'MAXLEN', '~', 10000, '*', [
    'user_id' => 124,
    'product' => 'mouse',
    'price' => 25.99
]);

XREAD - Reading Stream Entries

Read entries from one or multiple streams:

Basic Reading:
# Read from beginning (ID 0-0 means start)
XREAD COUNT 10 STREAMS orders 0-0

# Read only new entries ($ means latest)
XREAD STREAMS orders $

# Blocking read - wait for new entries (2000ms timeout)
XREAD BLOCK 2000 STREAMS orders $

# Read from multiple streams
XREAD COUNT 5 STREAMS orders notifications 0-0 0-0
Laravel Example - Event Processor:
class OrderStreamProcessor
{
    public function processOrders()
    {
        $lastId = $this->getLastProcessedId() ?? '0-0';

        while (true) {
            // Blocking read - wait up to 5 seconds for new entries
            $entries = Redis::xread(['orders'], [$lastId], 10, 5000);

            if (empty($entries)) {
                continue;  // No new entries, retry
            }

            foreach ($entries['orders'] ?? [] as $id => $data) {
                try {
                    $this->processOrder($id, $data);
                    $lastId = $id;  // Update last processed ID
                    $this->saveLastProcessedId($lastId);
                } catch (\Exception $e) {
                    logger()->error("Failed to process order {$id}", [
                        'error' => $e->getMessage()
                    ]);
                }
            }
        }
    }

    private function processOrder(string $id, array $data)
    {
        logger()->info("Processing order {$id}", $data);

        // Process the order...
        $order = Order::create([
            'user_id' => $data['user_id'],
            'product' => $data['product'],
            'price' => $data['price']
        ]);

        // Send confirmation email, update inventory, etc.
    }
}

XRANGE - Query by ID Range

Read entries within a specific ID or time range:

Range Queries:
# Read all entries
XRANGE orders - +

# Read entries from specific ID to latest
XRANGE orders 1705856400000-0 +

# Read last 10 entries (reverse range)
XREVRANGE orders + - COUNT 10

# Read entries in time range (1 hour window)
XRANGE orders 1705856400000 1705860000000
Laravel - Get Recent Events:
public function getRecentOrders(int $count = 100): array
{
    // Get last $count entries
    $entries = Redis::xrevrange('orders', '+', '-', $count);

    $orders = [];
    foreach ($entries as $id => $data) {
        $orders[] = [
            'id' => $id,
            'timestamp' => $this->extractTimestamp($id),
            'data' => $data
        ];
    }

    return $orders;
}

private function extractTimestamp(string $id): int
{
    return (int) explode('-', $id)[0];
}

Consumer Groups

Consumer groups enable parallel processing where multiple consumers share the workload:

Creating Consumer Groups:
# Create consumer group starting from beginning
XGROUP CREATE orders order-processors 0-0

# Create group starting from latest entries only
XGROUP CREATE orders order-processors $ MKSTREAM

# Create multiple groups for different purposes
XGROUP CREATE orders email-notifiers 0-0
XGROUP CREATE orders analytics 0-0
Use Cases for Consumer Groups: Process high-volume streams with multiple workers, ensure each message is processed exactly once, enable different consumers to process the same stream independently.

XREADGROUP - Reading as Consumer Group

Read entries as part of a consumer group:

Consumer Group Reading:
# Read as consumer "worker-1" in group "order-processors"
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders >

# Blocking read with timeout
XREADGROUP GROUP order-processors worker-1 BLOCK 2000 COUNT 5 STREAMS orders >

# Read unacknowledged messages (pending for this consumer)
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders 0-0
Laravel - Consumer Worker:
class OrderConsumer extends Command
{
    protected $signature = 'orders:consume {worker}';

    public function handle()
    {
        $workerId = $this->argument('worker');
        $group = 'order-processors';

        $this->info("Worker {$workerId} started");

        while (true) {
            // Read new messages for this consumer
            $messages = Redis::xreadgroup(
                $group,
                $workerId,
                ['orders'],
                ['>'],  // > means undelivered messages
                5,        // Read 5 messages
                2000      // 2 second timeout
            );

            if (empty($messages['orders'] ?? [])) {
                continue;
            }

            foreach ($messages['orders'] as $id => $data) {
                try {
                    $this->processOrder($data);

                    // Acknowledge successful processing
                    Redis::xack('orders', $group, [$id]);

                    $this->info("Processed and ACKed: {$id}");
                } catch (\Exception $e) {
                    logger()->error("Failed to process {$id}", [
                        'error' => $e->getMessage()
                    ]);
                    // Message remains in pending list for retry
                }
            }
        }
    }
}

Message Acknowledgement (XACK)

Acknowledge that a message has been successfully processed:

Acknowledgement:
# Acknowledge single message
XACK orders order-processors 1705856400000-0

# Acknowledge multiple messages
XACK orders order-processors 1705856400000-0 1705856400000-1 1705856400000-2

# Returns: (integer) 3  (number of messages acknowledged)
Important: Always acknowledge messages after successful processing. Unacknowledged messages remain in the "pending entries list" and can be re-delivered to other consumers.

Handling Pending Messages

Monitor and handle messages that haven't been acknowledged:

Check Pending Messages:
# View pending messages summary
XPENDING orders order-processors

# View detailed pending messages
XPENDING orders order-processors - + 10 worker-1

# Claim abandoned messages (idle for 60000ms = 1 minute)
XAUTOCLAIM orders order-processors worker-2 60000 0-0 COUNT 10
Laravel - Dead Letter Queue Handler:
class DeadLetterHandler extends Command
{
    protected $signature = 'orders:dlq';

    public function handle()
    {
        $group = 'order-processors';
        $idleTimeout = 300000;  // 5 minutes

        while (true) {
            // Get pending messages idle for >5 minutes
            $pending = Redis::xpending('orders', $group, '-', '+', 100);

            foreach ($pending as $entry) {
                [$id, $consumer, $idleTime, $deliveryCount] = $entry;

                if ($idleTime > $idleTimeout && $deliveryCount > 3) {
                    // Message failed 3+ times, move to dead letter queue
                    $data = Redis::xrange('orders', $id, $id);
                    Redis::xadd('orders:dlq', '*', $data[$id]);
                    Redis::xack('orders', $group, [$id]);

                    $this->warn("Moved to DLQ: {$id}");
                }
            }

            sleep(60);  // Check every minute
        }
    }
}

Stream Trimming

Limit stream size to prevent unlimited growth:

Trimming Strategies:
# Exact trimming to 1000 entries (slower)
XTRIM orders MAXLEN 1000

# Approximate trimming (faster, keeps ~1000 entries)
XTRIM orders MAXLEN ~ 1000

# Trim by minimum ID (remove entries older than this)
XTRIM orders MINID 1705856400000-0

# Laravel - Scheduled trimming
// In App\Console\Kernel
$schedule->call(function () {
    Redis::xtrim('orders', 'MAXLEN', '~', 10000);
})->daily();

Event Sourcing with Streams

Use streams for event sourcing patterns:

Event Sourcing Example:
class UserAccountEventStore
{
    private const STREAM = 'user:events:';

    public function appendEvent(int $userId, string $eventType, array $data)
    {
        $streamKey = self::STREAM . $userId;

        return Redis::xadd($streamKey, '*', [
            'event_type' => $eventType,
            'data' => json_encode($data),
            'timestamp' => now()->toIso8601String()
        ]);
    }

    public function replayEvents(int $userId): array
    {
        $streamKey = self::STREAM . $userId;
        $events = Redis::xrange($streamKey, '-', '+');

        $state = [
            'balance' => 0,
            'status' => 'inactive'
        ];

        foreach ($events as $id => $event) {
            $data = json_decode($event['data'], true);

            match($event['event_type']) {
                'account_created' => $state['status'] = 'active',
                'balance_credited' => $state['balance'] += $data['amount'],
                'balance_debited' => $state['balance'] -= $data['amount'],
                'account_suspended' => $state['status'] = 'suspended',
                default => null
            };
        }

        return $state;
    }
}
Practice Exercise:
  1. Create an order processing system using Redis Streams with consumer groups
  2. Implement 3 worker processes that consume from the same stream in parallel
  3. Add proper acknowledgement and error handling
  4. Build a dead letter queue handler for failed messages (retry 3 times, then move to DLQ)
  5. Implement an event sourcing pattern for user account state (created, credited, debited, suspended)
  6. Add monitoring to track pending messages, consumer lag, and processing rate