Advanced Laravel

Event Sourcing Concepts

18 min Lesson 10 of 40

Event Sourcing Concepts

Understand event sourcing architecture, implement event stores, create projections, replay events, and apply CQRS (Command Query Responsibility Segregation) patterns using Spatie Event Sourcing package.

What is Event Sourcing?

Event sourcing is an architectural pattern where application state changes are stored as a sequence of events, rather than just the current state.

// Traditional approach - stores current state User::where('id', 1)->update(['balance' => 1000]); // Event sourcing - stores events that led to state MoneyAdded::dispatch($user, 500); // Event 1: +500 MoneySubtracted::dispatch($user, 200); // Event 2: -200 MoneyAdded::dispatch($user, 700); // Event 3: +700 // Current balance = 1000 (derived from events)
Pro Tip: Event sourcing provides a complete audit trail, enables time travel (viewing past states), and allows rebuilding state from scratch by replaying events.

Benefits and Use Cases

Event sourcing excels in specific scenarios:

Benefits: ✓ Complete audit trail - every state change is recorded ✓ Time travel - reconstruct state at any point in time ✓ Event replay - rebuild projections from events ✓ Debugging - see exact sequence of events that caused issues ✓ Analytics - analyze historical patterns ✓ Scalability - separate read and write models (CQRS) Use Cases: • Financial systems (banking, accounting) • E-commerce (order processing, inventory) • Collaborative systems (document editing) • Gaming (game state, leaderboards) • Healthcare (patient records) • IoT (sensor data, device states) When NOT to use: ✗ Simple CRUD applications ✗ Systems without audit requirements ✗ High-write, low-read scenarios without analytics needs ✗ Teams unfamiliar with the pattern

Installing Spatie Event Sourcing

Set up event sourcing with Spatie's package:

// Install package composer require spatie/laravel-event-sourcing // Publish configuration and migrations php artisan vendor:publish --provider="Spatie\EventSourcing\EventSourcingServiceProvider" // Run migrations php artisan migrate // Key tables created: // - stored_events: stores all events // - snapshots: stores aggregate snapshots // - event_sourcing_jobs: tracks replay jobs

Creating Events

Define domain events that represent state changes:

// app/Domain/Account/Events/AccountCreated.php namespace App\Domain\Account\Events; use Spatie\EventSourcing\StoredEvents\ShouldBeStored; class AccountCreated extends ShouldBeStored { public function __construct( public string $accountUuid, public string $name, public string $userId ) {} } // app/Domain/Account/Events/MoneyAdded.php class MoneyAdded extends ShouldBeStored { public function __construct( public string $accountUuid, public int $amount ) {} } // app/Domain/Account/Events/MoneySubtracted.php class MoneySubtracted extends ShouldBeStored { public function __construct( public string $accountUuid, public int $amount ) {} } // app/Domain/Account/Events/AccountLimitReached.php class AccountLimitReached extends ShouldBeStored { public function __construct( public string $accountUuid, public int $currentBalance ) {} }
Note: Events should be immutable, past-tense named, and contain all data needed to reconstruct state. Never modify event structure after production use.

Creating Aggregates

Aggregates are the entities that apply events and maintain business rules:

// app/Domain/Account/AccountAggregateRoot.php namespace App\Domain\Account; use Spatie\EventSourcing\AggregateRoots\AggregateRoot; class AccountAggregateRoot extends AggregateRoot { protected int $balance = 0; protected int $balanceLimit = 10000; // Apply events to update internal state protected function applyAccountCreated(AccountCreated $event): void { $this->balance = 0; } protected function applyMoneyAdded(MoneyAdded $event): void { $this->balance += $event->amount; } protected function applyMoneySubtracted(MoneySubtracted $event): void { $this->balance -= $event->amount; } // Business logic methods public function createAccount(string $name, string $userId): self { $this->recordThat(new AccountCreated( accountUuid: $this->uuid(), name: $name, userId: $userId )); return $this; } public function addMoney(int $amount): self { if ($amount <= 0) { throw new InvalidAmountException('Amount must be positive'); } $newBalance = $this->balance + $amount; if ($newBalance > $this->balanceLimit) { $this->recordThat(new AccountLimitReached( accountUuid: $this->uuid(), currentBalance: $this->balance )); throw new AccountLimitException('Balance limit exceeded'); } $this->recordThat(new MoneyAdded( accountUuid: $this->uuid(), amount: $amount )); return $this; } public function subtractMoney(int $amount): self { if ($amount <= 0) { throw new InvalidAmountException('Amount must be positive'); } if ($this->balance - $amount < 0) { throw new InsufficientFundsException('Insufficient funds'); } $this->recordThat(new MoneySubtracted( accountUuid: $this->uuid(), amount: $amount )); return $this; } }

Using Aggregates

Interact with aggregates in controllers and services:

// app/Http/Controllers/AccountController.php namespace App\Http\Controllers; use App\Domain\Account\AccountAggregateRoot; use Illuminate\Support\Str; class AccountController extends Controller { public function create(Request $request) { $accountUuid = Str::uuid()->toString(); AccountAggregateRoot::retrieve($accountUuid) ->createAccount( name: $request->name, userId: auth()->id() ) ->persist(); return response()->json([ 'account_uuid' => $accountUuid, 'message' => 'Account created successfully' ]); } public function deposit(Request $request, string $accountUuid) { try { AccountAggregateRoot::retrieve($accountUuid) ->addMoney($request->amount) ->persist(); return response()->json(['message' => 'Deposit successful']); } catch (AccountLimitException $e) { return response()->json(['error' => $e->getMessage()], 422); } } public function withdraw(Request $request, string $accountUuid) { try { AccountAggregateRoot::retrieve($accountUuid) ->subtractMoney($request->amount) ->persist(); return response()->json(['message' => 'Withdrawal successful']); } catch (InsufficientFundsException $e) { return response()->json(['error' => $e->getMessage()], 422); } } }

Creating Projections

Projections are read models built from events:

// app/Domain/Account/Projections/Account.php namespace App\Domain\Account\Projections; use Illuminate\Database\Eloquent\Model; class Account extends Model { protected $guarded = []; protected $casts = [ 'balance' => 'integer', ]; } // app/Domain/Account/Projectors/AccountProjector.php namespace App\Domain\Account\Projectors; use Spatie\EventSourcing\EventHandlers\Projectors\Projector; class AccountProjector extends Projector { public function onAccountCreated(AccountCreated $event): void { Account::create([ 'uuid' => $event->accountUuid, 'name' => $event->name, 'user_id' => $event->userId, 'balance' => 0, ]); } public function onMoneyAdded(MoneyAdded $event): void { $account = Account::where('uuid', $event->accountUuid)->first(); $account->balance += $event->amount; $account->save(); } public function onMoneySubtracted(MoneySubtracted $event): void { $account = Account::where('uuid', $event->accountUuid)->first(); $account->balance -= $event->amount; $account->save(); } } // Register projector in EventSourcingServiceProvider public function boot() { Projectionist::addProjector(AccountProjector::class); }
Warning: Projections should be idempotent (applying same event multiple times produces same result) and eventually consistent with the event store.

Creating Reactors

Reactors handle side effects when events occur:

// app/Domain/Account/Reactors/SendAccountLimitNotification.php namespace App\Domain\Account\Reactors; use Spatie\EventSourcing\EventHandlers\Reactors\Reactor; class SendAccountLimitNotification extends Reactor { public function onAccountLimitReached(AccountLimitReached $event): void { $account = Account::where('uuid', $event->accountUuid)->first(); Mail::to($account->user) ->send(new AccountLimitReachedMail($account)); Log::warning('Account limit reached', [ 'account_uuid' => $event->accountUuid, 'balance' => $event->currentBalance, ]); } } // app/Domain/Account/Reactors/TransactionLogger.php class TransactionLogger extends Reactor { public function onMoneyAdded(MoneyAdded $event): void { TransactionLog::create([ 'account_uuid' => $event->accountUuid, 'type' => 'deposit', 'amount' => $event->amount, 'timestamp' => now(), ]); } public function onMoneySubtracted(MoneySubtracted $event): void { TransactionLog::create([ 'account_uuid' => $event->accountUuid, 'type' => 'withdrawal', 'amount' => $event->amount, 'timestamp' => now(), ]); } } // Register reactors public function boot() { Projectionist::addReactor(SendAccountLimitNotification::class); Projectionist::addReactor(TransactionLogger::class); }

Replaying Events

Rebuild projections by replaying all events:

// Replay all events for all projectors php artisan event-sourcing:replay // Replay for specific projector php artisan event-sourcing:replay --projector=AccountProjector // Replay from specific event ID php artisan event-sourcing:replay --from=1000 // Clear projection before replay Account::truncate(); php artisan event-sourcing:replay --projector=AccountProjector // Programmatic replay use Spatie\EventSourcing\Facades\Projectionist; Projectionist::replay( AccountProjector::class, startingFromEventId: 0 );

Snapshots for Performance

Optimize aggregate reconstruction with snapshots:

class AccountAggregateRoot extends AggregateRoot { // Take snapshot every 100 events protected int $snapshotEvery = 100; // Create snapshot of current state protected function snapshot(): array { return [ 'balance' => $this->balance, 'balance_limit' => $this->balanceLimit, ]; } // Restore from snapshot protected function restoreFromSnapshot(array $state): void { $this->balance = $state['balance']; $this->balanceLimit = $state['balance_limit']; } } // Manual snapshot creation $aggregate = AccountAggregateRoot::retrieve($uuid); $aggregate->snapshot(); // Without snapshots: replay 10,000 events // With snapshots (every 100): replay 100 events from last snapshot

CQRS Pattern

Separate command and query responsibilities:

// Commands - modify state (write) class DepositMoneyCommand { public function __construct( public string $accountUuid, public int $amount ) {} } class DepositMoneyHandler { public function handle(DepositMoneyCommand $command): void { AccountAggregateRoot::retrieve($command->accountUuid) ->addMoney($command->amount) ->persist(); } } // Queries - read state (read) class GetAccountBalanceQuery { public function __construct( public string $accountUuid ) {} } class GetAccountBalanceHandler { public function handle(GetAccountBalanceQuery $query): int { // Read from projection, not events $account = Account::where('uuid', $query->accountUuid)->first(); return $account->balance; } } // Usage Bus::dispatch(new DepositMoneyCommand($uuid, 500)); $balance = Bus::dispatch(new GetAccountBalanceQuery($uuid)); // Separate databases for reads and writes // config/database.php 'connections' => [ 'write' => [ 'driver' => 'mysql', 'host' => env('DB_WRITE_HOST'), // Event store connection ], 'read' => [ 'driver' => 'mysql', 'host' => env('DB_READ_HOST'), // Projection connection (can be replicated) ], ],

Testing Event Sourced Systems

Write tests for event sourcing logic:

// tests/Feature/AccountTest.php use Spatie\EventSourcing\StoredEvents\Models\EloquentStoredEvent; class AccountTest extends TestCase { public function test_account_creation_records_event() { $uuid = Str::uuid()->toString(); AccountAggregateRoot::retrieve($uuid) ->createAccount('Test Account', 1) ->persist(); $this->assertDatabaseHas('stored_events', [ 'aggregate_uuid' => $uuid, 'event_class' => AccountCreated::class, ]); } public function test_adding_money_increases_balance() { $uuid = $this->createAccount(); AccountAggregateRoot::retrieve($uuid) ->addMoney(500) ->persist(); $account = Account::where('uuid', $uuid)->first(); $this->assertEquals(500, $account->balance); } public function test_exceeding_limit_throws_exception() { $uuid = $this->createAccount(); $this->expectException(AccountLimitException::class); AccountAggregateRoot::retrieve($uuid) ->addMoney(15000) ->persist(); } public function test_projection_can_be_rebuilt_from_events() { $uuid = $this->createAccountWithTransactions(); // Delete projection Account::where('uuid', $uuid)->delete(); // Replay events Projectionist::replay(AccountProjector::class); // Verify projection rebuilt correctly $account = Account::where('uuid', $uuid)->first(); $this->assertEquals(1200, $account->balance); } }
Exercise 1: Build an order management system using event sourcing:
1. Create OrderAggregateRoot with events: OrderPlaced, OrderPaid, OrderShipped, OrderDelivered, OrderCancelled
2. Implement business rules: can't ship unpaid orders, can't cancel shipped orders
3. Create projections for: current order state, order history timeline
4. Add reactors for: sending emails, updating inventory
5. Implement snapshots every 50 events
Test all state transitions and replay scenarios.
Exercise 2: Create a collaborative document editor with event sourcing:
1. Events: DocumentCreated, ContentAdded, ContentDeleted, ContentModified, UserJoined, UserLeft
2. Track all changes with user attribution and timestamps
3. Build projection showing current document state
4. Implement "time travel" feature to view document at any point in history
5. Create analytics projection for: edit frequency, active users, popular sections
Support concurrent editing and conflict resolution.
Exercise 3: Implement a game leaderboard system with event sourcing and CQRS:
1. Events: PlayerRegistered, ScoreAdded, AchievementUnlocked, LevelCompleted
2. Commands: RegisterPlayer, AddScore, UnlockAchievement
3. Queries: GetPlayerRank, GetTopPlayers, GetPlayerStats
4. Separate write (event store) and read (projections) databases
5. Create multiple projections: all-time leaderboard, monthly leaderboard, player statistics
Handle high concurrency and implement efficient rank calculation.