Event Sourcing Concepts
Understand event sourcing architecture, implement event stores, create projections, replay events, and apply CQRS (Command Query Responsibility Segregation) patterns using Spatie Event Sourcing package.
What is Event Sourcing?
Event sourcing is an architectural pattern where application state changes are stored as a sequence of events, rather than just the current state.
// Traditional approach - stores current state
User::where('id', 1)->update(['balance' => 1000]);
// Event sourcing - stores events that led to state
MoneyAdded::dispatch($user, 500); // Event 1: +500
MoneySubtracted::dispatch($user, 200); // Event 2: -200
MoneyAdded::dispatch($user, 700); // Event 3: +700
// Current balance = 1000 (derived from events)
Pro Tip: Event sourcing provides a complete audit trail, enables time travel (viewing past states), and allows rebuilding state from scratch by replaying events.
Benefits and Use Cases
Event sourcing excels in specific scenarios:
Benefits:
✓ Complete audit trail - every state change is recorded
✓ Time travel - reconstruct state at any point in time
✓ Event replay - rebuild projections from events
✓ Debugging - see exact sequence of events that caused issues
✓ Analytics - analyze historical patterns
✓ Scalability - separate read and write models (CQRS)
Use Cases:
• Financial systems (banking, accounting)
• E-commerce (order processing, inventory)
• Collaborative systems (document editing)
• Gaming (game state, leaderboards)
• Healthcare (patient records)
• IoT (sensor data, device states)
When NOT to use:
✗ Simple CRUD applications
✗ Systems without audit requirements
✗ High-write, low-read scenarios without analytics needs
✗ Teams unfamiliar with the pattern
Installing Spatie Event Sourcing
Set up event sourcing with Spatie's package:
// Install package
composer require spatie/laravel-event-sourcing
// Publish configuration and migrations
php artisan vendor:publish --provider="Spatie\EventSourcing\EventSourcingServiceProvider"
// Run migrations
php artisan migrate
// Key tables created:
// - stored_events: stores all events
// - snapshots: stores aggregate snapshots
// - event_sourcing_jobs: tracks replay jobs
Creating Events
Define domain events that represent state changes:
// app/Domain/Account/Events/AccountCreated.php
namespace App\Domain\Account\Events;
use Spatie\EventSourcing\StoredEvents\ShouldBeStored;
class AccountCreated extends ShouldBeStored
{
public function __construct(
public string $accountUuid,
public string $name,
public string $userId
) {}
}
// app/Domain/Account/Events/MoneyAdded.php
class MoneyAdded extends ShouldBeStored
{
public function __construct(
public string $accountUuid,
public int $amount
) {}
}
// app/Domain/Account/Events/MoneySubtracted.php
class MoneySubtracted extends ShouldBeStored
{
public function __construct(
public string $accountUuid,
public int $amount
) {}
}
// app/Domain/Account/Events/AccountLimitReached.php
class AccountLimitReached extends ShouldBeStored
{
public function __construct(
public string $accountUuid,
public int $currentBalance
) {}
}
Note: Events should be immutable, past-tense named, and contain all data needed to reconstruct state. Never modify event structure after production use.
Creating Aggregates
Aggregates are the entities that apply events and maintain business rules:
// app/Domain/Account/AccountAggregateRoot.php
namespace App\Domain\Account;
use Spatie\EventSourcing\AggregateRoots\AggregateRoot;
class AccountAggregateRoot extends AggregateRoot
{
protected int $balance = 0;
protected int $balanceLimit = 10000;
// Apply events to update internal state
protected function applyAccountCreated(AccountCreated $event): void
{
$this->balance = 0;
}
protected function applyMoneyAdded(MoneyAdded $event): void
{
$this->balance += $event->amount;
}
protected function applyMoneySubtracted(MoneySubtracted $event): void
{
$this->balance -= $event->amount;
}
// Business logic methods
public function createAccount(string $name, string $userId): self
{
$this->recordThat(new AccountCreated(
accountUuid: $this->uuid(),
name: $name,
userId: $userId
));
return $this;
}
public function addMoney(int $amount): self
{
if ($amount <= 0) {
throw new InvalidAmountException('Amount must be positive');
}
$newBalance = $this->balance + $amount;
if ($newBalance > $this->balanceLimit) {
$this->recordThat(new AccountLimitReached(
accountUuid: $this->uuid(),
currentBalance: $this->balance
));
throw new AccountLimitException('Balance limit exceeded');
}
$this->recordThat(new MoneyAdded(
accountUuid: $this->uuid(),
amount: $amount
));
return $this;
}
public function subtractMoney(int $amount): self
{
if ($amount <= 0) {
throw new InvalidAmountException('Amount must be positive');
}
if ($this->balance - $amount < 0) {
throw new InsufficientFundsException('Insufficient funds');
}
$this->recordThat(new MoneySubtracted(
accountUuid: $this->uuid(),
amount: $amount
));
return $this;
}
}
Using Aggregates
Interact with aggregates in controllers and services:
// app/Http/Controllers/AccountController.php
namespace App\Http\Controllers;
use App\Domain\Account\AccountAggregateRoot;
use Illuminate\Support\Str;
class AccountController extends Controller
{
public function create(Request $request)
{
$accountUuid = Str::uuid()->toString();
AccountAggregateRoot::retrieve($accountUuid)
->createAccount(
name: $request->name,
userId: auth()->id()
)
->persist();
return response()->json([
'account_uuid' => $accountUuid,
'message' => 'Account created successfully'
]);
}
public function deposit(Request $request, string $accountUuid)
{
try {
AccountAggregateRoot::retrieve($accountUuid)
->addMoney($request->amount)
->persist();
return response()->json(['message' => 'Deposit successful']);
} catch (AccountLimitException $e) {
return response()->json(['error' => $e->getMessage()], 422);
}
}
public function withdraw(Request $request, string $accountUuid)
{
try {
AccountAggregateRoot::retrieve($accountUuid)
->subtractMoney($request->amount)
->persist();
return response()->json(['message' => 'Withdrawal successful']);
} catch (InsufficientFundsException $e) {
return response()->json(['error' => $e->getMessage()], 422);
}
}
}
Creating Projections
Projections are read models built from events:
// app/Domain/Account/Projections/Account.php
namespace App\Domain\Account\Projections;
use Illuminate\Database\Eloquent\Model;
class Account extends Model
{
protected $guarded = [];
protected $casts = [
'balance' => 'integer',
];
}
// app/Domain/Account/Projectors/AccountProjector.php
namespace App\Domain\Account\Projectors;
use Spatie\EventSourcing\EventHandlers\Projectors\Projector;
class AccountProjector extends Projector
{
public function onAccountCreated(AccountCreated $event): void
{
Account::create([
'uuid' => $event->accountUuid,
'name' => $event->name,
'user_id' => $event->userId,
'balance' => 0,
]);
}
public function onMoneyAdded(MoneyAdded $event): void
{
$account = Account::where('uuid', $event->accountUuid)->first();
$account->balance += $event->amount;
$account->save();
}
public function onMoneySubtracted(MoneySubtracted $event): void
{
$account = Account::where('uuid', $event->accountUuid)->first();
$account->balance -= $event->amount;
$account->save();
}
}
// Register projector in EventSourcingServiceProvider
public function boot()
{
Projectionist::addProjector(AccountProjector::class);
}
Warning: Projections should be idempotent (applying same event multiple times produces same result) and eventually consistent with the event store.
Creating Reactors
Reactors handle side effects when events occur:
// app/Domain/Account/Reactors/SendAccountLimitNotification.php
namespace App\Domain\Account\Reactors;
use Spatie\EventSourcing\EventHandlers\Reactors\Reactor;
class SendAccountLimitNotification extends Reactor
{
public function onAccountLimitReached(AccountLimitReached $event): void
{
$account = Account::where('uuid', $event->accountUuid)->first();
Mail::to($account->user)
->send(new AccountLimitReachedMail($account));
Log::warning('Account limit reached', [
'account_uuid' => $event->accountUuid,
'balance' => $event->currentBalance,
]);
}
}
// app/Domain/Account/Reactors/TransactionLogger.php
class TransactionLogger extends Reactor
{
public function onMoneyAdded(MoneyAdded $event): void
{
TransactionLog::create([
'account_uuid' => $event->accountUuid,
'type' => 'deposit',
'amount' => $event->amount,
'timestamp' => now(),
]);
}
public function onMoneySubtracted(MoneySubtracted $event): void
{
TransactionLog::create([
'account_uuid' => $event->accountUuid,
'type' => 'withdrawal',
'amount' => $event->amount,
'timestamp' => now(),
]);
}
}
// Register reactors
public function boot()
{
Projectionist::addReactor(SendAccountLimitNotification::class);
Projectionist::addReactor(TransactionLogger::class);
}
Replaying Events
Rebuild projections by replaying all events:
// Replay all events for all projectors
php artisan event-sourcing:replay
// Replay for specific projector
php artisan event-sourcing:replay --projector=AccountProjector
// Replay from specific event ID
php artisan event-sourcing:replay --from=1000
// Clear projection before replay
Account::truncate();
php artisan event-sourcing:replay --projector=AccountProjector
// Programmatic replay
use Spatie\EventSourcing\Facades\Projectionist;
Projectionist::replay(
AccountProjector::class,
startingFromEventId: 0
);
Snapshots for Performance
Optimize aggregate reconstruction with snapshots:
class AccountAggregateRoot extends AggregateRoot
{
// Take snapshot every 100 events
protected int $snapshotEvery = 100;
// Create snapshot of current state
protected function snapshot(): array
{
return [
'balance' => $this->balance,
'balance_limit' => $this->balanceLimit,
];
}
// Restore from snapshot
protected function restoreFromSnapshot(array $state): void
{
$this->balance = $state['balance'];
$this->balanceLimit = $state['balance_limit'];
}
}
// Manual snapshot creation
$aggregate = AccountAggregateRoot::retrieve($uuid);
$aggregate->snapshot();
// Without snapshots: replay 10,000 events
// With snapshots (every 100): replay 100 events from last snapshot
CQRS Pattern
Separate command and query responsibilities:
// Commands - modify state (write)
class DepositMoneyCommand
{
public function __construct(
public string $accountUuid,
public int $amount
) {}
}
class DepositMoneyHandler
{
public function handle(DepositMoneyCommand $command): void
{
AccountAggregateRoot::retrieve($command->accountUuid)
->addMoney($command->amount)
->persist();
}
}
// Queries - read state (read)
class GetAccountBalanceQuery
{
public function __construct(
public string $accountUuid
) {}
}
class GetAccountBalanceHandler
{
public function handle(GetAccountBalanceQuery $query): int
{
// Read from projection, not events
$account = Account::where('uuid', $query->accountUuid)->first();
return $account->balance;
}
}
// Usage
Bus::dispatch(new DepositMoneyCommand($uuid, 500));
$balance = Bus::dispatch(new GetAccountBalanceQuery($uuid));
// Separate databases for reads and writes
// config/database.php
'connections' => [
'write' => [
'driver' => 'mysql',
'host' => env('DB_WRITE_HOST'),
// Event store connection
],
'read' => [
'driver' => 'mysql',
'host' => env('DB_READ_HOST'),
// Projection connection (can be replicated)
],
],
Testing Event Sourced Systems
Write tests for event sourcing logic:
// tests/Feature/AccountTest.php
use Spatie\EventSourcing\StoredEvents\Models\EloquentStoredEvent;
class AccountTest extends TestCase
{
public function test_account_creation_records_event()
{
$uuid = Str::uuid()->toString();
AccountAggregateRoot::retrieve($uuid)
->createAccount('Test Account', 1)
->persist();
$this->assertDatabaseHas('stored_events', [
'aggregate_uuid' => $uuid,
'event_class' => AccountCreated::class,
]);
}
public function test_adding_money_increases_balance()
{
$uuid = $this->createAccount();
AccountAggregateRoot::retrieve($uuid)
->addMoney(500)
->persist();
$account = Account::where('uuid', $uuid)->first();
$this->assertEquals(500, $account->balance);
}
public function test_exceeding_limit_throws_exception()
{
$uuid = $this->createAccount();
$this->expectException(AccountLimitException::class);
AccountAggregateRoot::retrieve($uuid)
->addMoney(15000)
->persist();
}
public function test_projection_can_be_rebuilt_from_events()
{
$uuid = $this->createAccountWithTransactions();
// Delete projection
Account::where('uuid', $uuid)->delete();
// Replay events
Projectionist::replay(AccountProjector::class);
// Verify projection rebuilt correctly
$account = Account::where('uuid', $uuid)->first();
$this->assertEquals(1200, $account->balance);
}
}
Exercise 1: Build an order management system using event sourcing:
1. Create OrderAggregateRoot with events: OrderPlaced, OrderPaid, OrderShipped, OrderDelivered, OrderCancelled
2. Implement business rules: can't ship unpaid orders, can't cancel shipped orders
3. Create projections for: current order state, order history timeline
4. Add reactors for: sending emails, updating inventory
5. Implement snapshots every 50 events
Test all state transitions and replay scenarios.
Exercise 2: Create a collaborative document editor with event sourcing:
1. Events: DocumentCreated, ContentAdded, ContentDeleted, ContentModified, UserJoined, UserLeft
2. Track all changes with user attribution and timestamps
3. Build projection showing current document state
4. Implement "time travel" feature to view document at any point in history
5. Create analytics projection for: edit frequency, active users, popular sections
Support concurrent editing and conflict resolution.
Exercise 3: Implement a game leaderboard system with event sourcing and CQRS:
1. Events: PlayerRegistered, ScoreAdded, AchievementUnlocked, LevelCompleted
2. Commands: RegisterPlayer, AddScore, UnlockAchievement
3. Queries: GetPlayerRank, GetTopPlayers, GetPlayerStats
4. Separate write (event store) and read (projections) databases
5. Create multiple projections: all-time leaderboard, monthly leaderboard, player statistics
Handle high concurrency and implement efficient rank calculation.