Foundation

Event Sourcing

Capture every state change as an immutable event. The Core Event Sourcing module provides aggregates, command handlers, stream repositories, projections, and an outbox pattern — all backed by Marten and PostgreSQL.

Package Overview

Event sourcing spans three Foundation packages. Use them selectively for aggregates where full event history and audit/replay value is needed.

Package Purpose Key Contents
Sillium.Core.EventSourcing Event sourcing abstractions EventSourcedAggregateBase, EventSourcedCommandHandler, EventSourcedCreateCommandHandler, IStreamRepository, StreamData, IProjectionUpdater, IReadModel, ProjectionUpdaterBase
Sillium.Core.EventSourcing.Api API integration StreamConcurrencyExceptionHandler (sets ETag header on version conflicts)
Sillium.Core.Infrastructure Marten event store & outbox IEventStore, MartenEventStore, DomainEventEnvelope, OutboxMessage, OutboxProcessor, DbContextBase

Architecture

The system separates write-side (events in Marten) from read-side (projections in EF Core). The outbox pattern ensures transactional consistency between both stores.

Write Path (Update)

When updating an existing aggregate, events are loaded, the aggregate is rehydrated, business logic executes, and new events are appended to the stream.

Show code
// Write Path: Command → Handler → Event Store
//
// 1. Command arrives at EventSourcedCommandHandler
// 2. Load event stream via StreamRepository.LoadAsync()
// 3. Rehydrate aggregate from events (Replay)
// 4. Execute business logic (raises new events via ApplyAndRaise)
// 5. Append new events to stream via StreamRepository.AppendAsync()
// 6. Update read-side projections via ProjectionUpdater.ApplyAsync()
//
// Command
//   → EventSourcedCommandHandler.HandleAsync()
//     ├─ StreamRepository.LoadAsync()      → StreamData (events + version)
//     ├─ Rehydrate()                       → Aggregate with current state
//     ├─ Execute()                         → Business logic (ApplyAndRaise)
//     ├─ StreamRepository.AppendAsync()    → Marten event store
//     └─ ProjectionUpdater.ApplyAsync()    → EF Core read model

Write Path (Create)

When creating a new aggregate, a fresh stream is started with the initial events and the result (e.g., the new ID) is returned.

Show code
// Create Path: CreateCommand → Handler → New Stream
//
// 1. CreateCommand arrives at EventSourcedCreateCommandHandler
// 2. Create() builds new aggregate + result (e.g., new ID)
// 3. Append events as new stream (version 0 → Marten.StartStream)
// 4. Update projections and return result
//
// CreateCommand
//   → EventSourcedCreateCommandHandler.HandleAsync()
//     ├─ Create()                          → (Aggregate, Result)
//     ├─ StreamRepository.AppendAsync(v:0) → Marten StartStream
//     ├─ ProjectionUpdater.ApplyAsync()    → EF Core read model
//     └─ return Result

Outbox Flow

For aggregates persisted via EF Core, domain events are written to an outbox table in the same transaction. A background processor forwards them to the event store.

Show code
// Outbox Pattern: Transactional consistency between EF Core and Event Store
//
// DbContextBase.SaveChangesAsync()
//   ├─ Collect domain events from tracked aggregates
//   ├─ Create OutboxMessage rows (same transaction)
//   └─ SaveChangesAsync() — single atomic transaction
//
// OutboxProcessor (background hosted service)
//   ├─ Poll OutboxMessages (FOR UPDATE SKIP LOCKED)
//   ├─ Deserialize events via type cache
//   ├─ IEventStore.AppendEventsAsync()
//   ├─ Mark ProcessedAt on success
//   └─ Dead-letter after MaxRetryCount failures

Aggregate Pattern

Event-sourced aggregates extend EventSourcedAggregateBase and implement ApplyEvent to rebuild state from events. Business methods use ApplyAndRaise to change state and record the event.

EventSourcedAggregateBase

The abstract base class provides ApplyAndRaise (mutate + record), Replay (rehydrate from history), and the abstract ApplyEvent method.

Show code
// EventSourcedAggregateBase<TId> extends AggregateRootBase<TId>
// and adds event replay + apply-and-raise capabilities.

public abstract class EventSourcedAggregateBase<TId> : AggregateRootBase<TId>
    where TId : notnull, IGuidId
{
    // Apply event to state AND raise it for persistence
    protected void ApplyAndRaise(IDomainEvent @event)
    {
        ApplyEvent(@event);
        RaiseDomainEvent(@event);
    }

    // Rehydrate aggregate from event history
    protected void Replay(IEnumerable<IDomainEvent> events)
    {
        foreach (var @event in events)
            ApplyEvent(@event);
        ClearDomainEvents();
    }

    // Implement in subclass: apply event to aggregate state
    protected abstract void ApplyEvent(IDomainEvent @event);
}

Example: Order Aggregate

A complete example showing an event-sourced aggregate with domain events, a factory method, a business method, and the ApplyEvent switch.

Show code
// Example: Order aggregate with event sourcing

public sealed record OrderId(Guid Value) : GuidIdBase(Value)
{
    public static OrderId New() => new(Guid.NewGuid());
}

// Domain events
public sealed record OrderCreated(OrderId Id, string Customer) : DomainEventBase;
public sealed record OrderItemAdded(OrderId Id, string Product, int Qty) : DomainEventBase;

// Event-sourced aggregate
public sealed class Order : EventSourcedAggregateBase<OrderId>
{
    public string Customer { get; private set; } = string.Empty;
    private readonly List<(string Product, int Qty)> _items = [];

    private Order() { } // For rehydration

    public static Order Create(string customer)
    {
        var order = new Order { Id = OrderId.New() };
        order.ApplyAndRaise(new OrderCreated(order.Id, customer));
        return order;
    }

    public void AddItem(string product, int qty)
        => ApplyAndRaise(new OrderItemAdded(Id, product, qty));

    protected override void ApplyEvent(IDomainEvent @event)
    {
        switch (@event)
        {
            case OrderCreated e:
                Id = e.Id;
                Customer = e.Customer;
                break;
            case OrderItemAdded e:
                _items.Add((e.Product, e.Qty));
                break;
        }
    }
}

Command Handlers

Two base classes cover the create and update paths. Both handle event persistence and projection updates automatically.

EventSourcedCreateCommandHandler

For creating new aggregates. Implement Create() to build the aggregate and return a result. The handler starts a new event stream and updates projections.

Show code
// EventSourcedCreateCommandHandler: for creating new aggregates
// Returns a result (e.g., the new aggregate ID).

public sealed record CreateOrder(string Customer) : ICommand<OrderId>;

public sealed class CreateOrderHandler
    : EventSourcedCreateCommandHandler<Order, OrderId, CreateOrder, OrderId>
{
    public CreateOrderHandler(
        IStreamRepository<OrderId> streamRepo,
        IProjectionUpdater projectionUpdater)
        : base(streamRepo, projectionUpdater) { }

    protected override (Order Aggregate, OrderId Result) Create(CreateOrder command)
    {
        var order = Order.Create(command.Customer);
        return (order, order.Id);
    }
}

EventSourcedCommandHandler

For updating existing aggregates. Implement GetAggregateId(), Rehydrate(), and Execute(). The handler loads the stream, checks concurrency, rehydrates, and appends new events.

Show code
// EventSourcedCommandHandler: for updating existing aggregates
// Loads, rehydrates, executes, and persists new events.

public sealed record AddOrderItem(OrderId Id, string Product, int Qty)
    : ICommand, IHasExpectedVersion
{
    public long? ExpectedVersion { get; init; }
}

public sealed class AddOrderItemHandler
    : EventSourcedCommandHandler<Order, OrderId, AddOrderItem>
{
    protected override string AggregateType => "Order";

    public AddOrderItemHandler(
        IStreamRepository<OrderId> streamRepo,
        IProjectionUpdater projectionUpdater)
        : base(streamRepo, projectionUpdater) { }

    protected override OrderId GetAggregateId(AddOrderItem cmd) => cmd.Id;

    protected override Order Rehydrate(IReadOnlyList<IDomainEvent> events)
    {
        var order = (Order)Activator.CreateInstance(typeof(Order), true)!;
        order.Replay(events);
        return order;
    }

    protected override void Execute(Order aggregate, AddOrderItem cmd)
        => aggregate.AddItem(cmd.Product, cmd.Qty);
}
Optimistic Concurrency

Commands can implement IHasExpectedVersion to enable optimistic concurrency. If the client's expected version doesn't match the actual stream version, a StreamConcurrencyException is thrown. The API layer translates this into an ETag response header.

Show code
// Optimistic concurrency via IHasExpectedVersion
//
// Commands can implement IHasExpectedVersion to provide a version check.
// StreamData.ValidateExpectedVersion() throws StreamConcurrencyException
// if the client's expected version doesn't match the actual stream version.
//
// In the API layer, StreamConcurrencyExceptionHandler sets an ETag
// response header with the actual version for client retry guidance.

public interface IHasExpectedVersion
{
    long? ExpectedVersion { get; }
}

Streams & Projections

Stream repositories abstract event persistence. Projection updaters synchronize read models in EF Core from domain events.

Stream Repository

Extend StreamRepositoryBase for each aggregate type. It handles loading events from Marten and appending new ones.

Show code
// Implement a concrete StreamRepository for your aggregate.
// StreamRepositoryBase handles loading/appending via IEventStore.

public sealed class OrderStreamRepository : StreamRepositoryBase<OrderId>
{
    protected override string AggregateType => "Order";

    public OrderStreamRepository(IEventStore eventStore)
        : base(eventStore) { }
}

// Register in DI:
// services.AddScoped<IStreamRepository<OrderId>, OrderStreamRepository>();

Projection Updater

Extend ProjectionUpdaterBase to dispatch events to your EF Core read models. Each event type updates the corresponding read model entity.

Show code
// ProjectionUpdaterBase dispatches events to your read model via EF Core.
// Implement DispatchAsync to handle each event type.

public sealed class OrderProjectionUpdater : ProjectionUpdaterBase<AppDbContext>
{
    public OrderProjectionUpdater(AppDbContext db) : base(db) { }

    protected override async Task DispatchAsync(
        IDomainEvent @event, CancellationToken ct)
    {
        switch (@event)
        {
            case OrderCreated e:
                DbContext.OrderReadModels.Add(new OrderReadModel
                {
                    Id = e.Id.Value,
                    Customer = e.Customer,
                    Version = 1,
                    CreatedAt = DateTime.UtcNow,
                    UpdatedAt = DateTime.UtcNow,
                });
                break;
            case OrderItemAdded e:
                var model = await DbContext.OrderReadModels
                    .FindAsync([e.Id.Value], ct);
                if (model is not null)
                {
                    model.ItemCount++;
                    model.Version++;
                    model.UpdatedAt = DateTime.UtcNow;
                }
                break;
        }
    }
}
IReadModel Interface
Property Type Description
Id Guid Unique identifier (matches aggregate ID)
Version int Current projection version (incremented on each event)
CreatedAt DateTime Timestamp when the read model was created
UpdatedAt DateTime Timestamp of the last update

DI Setup

Register event sourcing services in your application's dependency injection container. The setup involves Marten, core infrastructure, and your custom repositories, projections, and handlers.

Service Registration

Complete Program.cs setup with all required registrations for event sourcing.

Show code
// Program.cs — complete DI registration for event sourcing

var builder = WebApplication.CreateBuilder(args);

// 1. Marten event store (PostgreSQL-backed)
builder.Services.AddMartenEventStore(
    builder.Configuration.GetConnectionString("marten-db")!);

// 2. Core infrastructure (outbox processor, etc.)
builder.Services.AddCoreInfrastructure(builder.Configuration);

// 3. Event sourcing abstractions
builder.Services.AddCoreEventSourcing();

// 4. Your stream repositories
builder.Services.AddScoped<IStreamRepository<OrderId>, OrderStreamRepository>();

// 5. Your projection updaters
builder.Services.AddScoped<IProjectionUpdater, OrderProjectionUpdater>();

// 6. Your command handlers
builder.Services.AddScoped<ICommandHandler<CreateOrder, OrderId>, CreateOrderHandler>();
builder.Services.AddScoped<ICommandHandler<AddOrderItem>, AddOrderItemHandler>();

Outbox Pattern

The outbox pattern ensures domain events are reliably forwarded from EF Core transactions to the Marten event store. Events are written to an outbox table in the same transaction as aggregate changes, then asynchronously processed by a background service.

OutboxProcessorOptions
Property Type Default Description
PollIntervalSeconds int 2 How often the processor polls for unprocessed messages
BatchSize int 50 Maximum number of messages processed per batch
MaxRetryCount int 5 Maximum retry attempts before dead-lettering a message

Configuration

Configure the outbox processor via the Outbox section in appsettings.json. These options are bound via AddCoreInfrastructure().

Show code
// appsettings.json — configure outbox processor options
{
  "Outbox": {
    "PollIntervalSeconds": 2,
    "BatchSize": 50,
    "MaxRetryCount": 5
  }
}
An unhandled error has occurred. Reload Dismiss