Redis & Advanced Caching

Redis Pub/Sub

18 min Lesson 12 of 30

Redis Pub/Sub

Redis Publish/Subscribe (Pub/Sub) is a messaging pattern that enables real-time communication between different parts of your application. Publishers send messages to channels, and subscribers receive messages from channels they've subscribed to.

Basic Pub/Sub Concepts

Redis Pub/Sub has three main components:

Components:
1. Publishers - Send messages to channels
2. Channels - Named message streams
3. Subscribers - Receive messages from channels

Key Characteristics:
✓ Fire-and-forget: Messages not persisted
✓ Real-time: Instant delivery to active subscribers
✓ Decoupled: Publishers don't know about subscribers
✗ No message history: New subscribers miss old messages

PUBLISH and SUBSCRIBE Commands

Basic publish and subscribe operations:

Redis CLI:
# Subscribe to a channel (blocks until message received)
SUBSCRIBE notifications

# Subscribe to multiple channels
SUBSCRIBE notifications alerts updates

# Publish a message to a channel
PUBLISH notifications "New user registered"
# Returns: (integer) 2  # Number of subscribers who received message

# Unsubscribe from channels
UNSUBSCRIBE notifications
UNSUBSCRIBE  # Unsubscribe from all

Laravel Redis Pub/Sub

Laravel provides elegant methods for working with Redis Pub/Sub:

Publishing Messages:
use Illuminate\Support\Facades\Redis;

// Publish a simple message
Redis::publish('notifications', 'User registered');

// Publish JSON data
$data = [
    'event' => 'user.registered',
    'user_id' => 123,
    'email' => 'user@example.com',
    'timestamp' => time()
];
Redis::publish('notifications', json_encode($data));

// Publish to multiple channels
$channels = ['notifications', 'admin-alerts', 'analytics'];
foreach ($channels as $channel) {
    Redis::publish($channel, json_encode($data));
}
Subscribing to Messages:
// Subscribe to a single channel
Redis::subscribe(['notifications'], function ($message) {
    echo "Received: {$message}\n";

    $data = json_decode($message, true);
    // Process the message
    logger()->info('Notification received', $data);
});

// Subscribe to multiple channels
Redis::subscribe(['notifications', 'alerts'], function ($message, $channel) {
    echo "Channel: {$channel}, Message: {$message}\n";
});
Important: The subscribe() method blocks execution. Run it in a separate process or use Laravel's queue workers for production applications.

Pattern-Based Subscriptions (PSUBSCRIBE)

Subscribe to multiple channels using wildcard patterns:

Pattern Matching:
// Redis CLI
PSUBSCRIBE user.*        # Matches user.created, user.updated, user.deleted
PSUBSCRIBE order.*.paid  # Matches order.123.paid, order.456.paid
PSUBSCRIBE *             # Subscribe to ALL channels (use carefully!)

// Laravel implementation
Redis::psubscribe(['user.*', 'order.*'], function ($message, $channel) {
    echo "Pattern match - Channel: {$channel}, Message: {$message}\n";

    // Extract event type from channel name
    $parts = explode('.', $channel);
    $entity = $parts[0];  // user, order
    $action = $parts[1] ?? null;  // created, updated, deleted

    match($channel) {
        'user.created' => $this->handleUserCreated(json_decode($message, true)),
        'user.updated' => $this->handleUserUpdated(json_decode($message, true)),
        'order.completed' => $this->handleOrderCompleted(json_decode($message, true)),
        default => logger()->debug("Unhandled channel: {$channel}")
    };
});
Pattern Benefits: Subscribe to entire event categories with one command, automatically handle new event types without updating subscriber code.

Real-Time Notifications System

Build a complete notification system using Pub/Sub:

Publisher Service:
namespace App\Services;

use Illuminate\Support\Facades\Redis;

class NotificationService
{
    public function sendNotification(int $userId, string $type, array $data)
    {
        $notification = [
            'user_id' => $userId,
            'type' => $type,
            'data' => $data,
            'timestamp' => now()->toIso8601String(),
            'id' => uniqid('notif_')
        ];

        // Publish to user-specific channel
        Redis::publish("notifications.user.{$userId}", json_encode($notification));

        // Also publish to type-specific channel for analytics
        Redis::publish("notifications.type.{$type}", json_encode($notification));

        return $notification['id'];
    }

    public function broadcastToAll(string $type, array $data)
    {
        $notification = [
            'type' => $type,
            'data' => $data,
            'timestamp' => now()->toIso8601String()
        ];

        Redis::publish('notifications.broadcast', json_encode($notification));
    }
}
Subscriber Worker (Artisan Command):
namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class NotificationSubscriber extends Command
{
    protected $signature = 'notifications:subscribe';
    protected $description = 'Subscribe to notification channels';

    public function handle()
    {
        $this->info('Subscribing to notification channels...');

        Redis::psubscribe(['notifications.*'], function ($message, $channel) {
            $this->processNotification($message, $channel);
        });
    }

    private function processNotification(string $message, string $channel)
    {
        $data = json_decode($message, true);

        // Log notification
        logger()->info("Notification received", [
            'channel' => $channel,
            'data' => $data
        ]);

        // Send via different channels based on type
        if (str_contains($channel, 'user')) {
            $this->sendWebSocketNotification($data);
            $this->sendEmailIfImportant($data);
        }

        if (str_contains($channel, 'broadcast')) {
            $this->sendToAllWebSockets($data);
        }
    }
}

Real-Time Chat Implementation

Use Pub/Sub for a simple chat system:

Chat Publisher:
class ChatController extends Controller
{
    public function sendMessage(Request $request)
    {
        $validated = $request->validate([
            'room_id' => 'required|string',
            'message' => 'required|string|max:1000'
        ]);

        $messageData = [
            'user_id' => auth()->id(),
            'username' => auth()->user()->name,
            'message' => $validated['message'],
            'timestamp' => now()->toIso8601String()
        ];

        // Publish to room channel
        Redis::publish(
            "chat.room.{$validated['room_id']}",
            json_encode($messageData)
        );

        // Store message in Redis sorted set for history (optional)
        Redis::zadd(
            "chat:history:{$validated['room_id']}",
            time(),
            json_encode($messageData)
        );

        return response()->json(['status' => 'sent']);
    }
}
WebSocket Bridge (subscriber to WebSocket):
// Run as separate process: php artisan chat:subscribe
class ChatSubscriber extends Command
{
    protected $signature = 'chat:subscribe';

    public function handle()
    {
        Redis::psubscribe(['chat.room.*'], function ($message, $channel) {
            // Extract room ID from channel name
            preg_match('/chat\.room\.(\w+)/', $channel, $matches);
            $roomId = $matches[1] ?? null;

            if (!$roomId) return;

            // Broadcast to WebSocket clients in this room
            $this->broadcastToWebSocket($roomId, json_decode($message, true));
        });
    }

    private function broadcastToWebSocket(string $roomId, array $message)
    {
        // Send to WebSocket server (e.g., Laravel Echo Server, Socket.io)
        event(new ChatMessageReceived($roomId, $message));
    }
}

Pub/Sub Monitoring

Monitor active channels and subscribers:

Monitoring Commands:
// List active channels with at least one subscriber
PUBSUB CHANNELS
PUBSUB CHANNELS user.*  # Pattern matching

// Count subscribers on a channel
PUBSUB NUMSUB notifications alerts
# Returns: 1) "notifications" 2) (integer) 5 3) "alerts" 4) (integer) 3

// Count unique patterns being subscribed to
PUBSUB NUMPAT
# Returns: (integer) 7
Production Tip: Use Laravel Echo Server or Soketi for production WebSocket handling. They integrate seamlessly with Redis Pub/Sub and handle client connections efficiently.

Pub/Sub vs Laravel Broadcasting

Comparison:
Redis Pub/Sub:
✓ Lightweight, direct communication
✓ No persistence overhead
✓ Great for server-to-server messaging
✗ No built-in client authentication
✗ No message history
✗ Manual WebSocket integration

Laravel Broadcasting (uses Pub/Sub internally):
✓ Built-in WebSocket support (Pusher, Echo)
✓ Automatic client authentication
✓ Channel authorization
✓ Presence channels
✗ More complex setup
✗ Additional dependencies
Practice Exercise:
  1. Create a notification publisher service that publishes to user-specific channels
  2. Build a subscriber artisan command that listens to pattern "notifications.user.*"
  3. Implement a simple chat system with room-based channels
  4. Add message history using Redis sorted sets (last 100 messages per room)
  5. Create a monitoring endpoint that shows active channels and subscriber counts
  6. Implement graceful shutdown for subscriber processes (handle SIGTERM)