Foundation
Event Sourcing
Capture every state change as an immutable event. The Core Event Sourcing module provides aggregates, command handlers, stream repositories, projections, and an outbox pattern — all backed by Marten and PostgreSQL.
Package Overview
Event sourcing spans three Foundation packages. Use them selectively for aggregates where full event history and audit/replay value is needed.
| Package | Purpose | Key Contents |
|---|---|---|
| Sillium.Core.EventSourcing | Event sourcing abstractions | EventSourcedAggregateBase, EventSourcedCommandHandler, EventSourcedCreateCommandHandler, IStreamRepository, StreamData, IProjectionUpdater, IReadModel, ProjectionUpdaterBase |
| Sillium.Core.EventSourcing.Api | API integration | StreamConcurrencyExceptionHandler (sets ETag header on version conflicts) |
| Sillium.Core.Infrastructure | Marten event store & outbox | IEventStore, MartenEventStore, DomainEventEnvelope, OutboxMessage, OutboxProcessor, DbContextBase |
Architecture
The system separates write-side (events in Marten) from read-side (projections in EF Core). The outbox pattern ensures transactional consistency between both stores.
Write Path (Update)
When updating an existing aggregate, events are loaded, the aggregate is rehydrated, business logic executes, and new events are appended to the stream.
Show code
Copy code
// Write Path: Command → Handler → Event Store
//
// 1. Command arrives at EventSourcedCommandHandler
// 2. Load event stream via StreamRepository.LoadAsync()
// 3. Rehydrate aggregate from events (Replay)
// 4. Execute business logic (raises new events via ApplyAndRaise)
// 5. Append new events to stream via StreamRepository.AppendAsync()
// 6. Update read-side projections via ProjectionUpdater.ApplyAsync()
//
// Command
// → EventSourcedCommandHandler.HandleAsync()
// ├─ StreamRepository.LoadAsync() → StreamData (events + version)
// ├─ Rehydrate() → Aggregate with current state
// ├─ Execute() → Business logic (ApplyAndRaise)
// ├─ StreamRepository.AppendAsync() → Marten event store
// └─ ProjectionUpdater.ApplyAsync() → EF Core read modelWrite Path (Create)
When creating a new aggregate, a fresh stream is started with the initial events and the result (e.g., the new ID) is returned.
Show code
Copy code
// Create Path: CreateCommand → Handler → New Stream
//
// 1. CreateCommand arrives at EventSourcedCreateCommandHandler
// 2. Create() builds new aggregate + result (e.g., new ID)
// 3. Append events as new stream (version 0 → Marten.StartStream)
// 4. Update projections and return result
//
// CreateCommand
// → EventSourcedCreateCommandHandler.HandleAsync()
// ├─ Create() → (Aggregate, Result)
// ├─ StreamRepository.AppendAsync(v:0) → Marten StartStream
// ├─ ProjectionUpdater.ApplyAsync() → EF Core read model
// └─ return ResultOutbox Flow
For aggregates persisted via EF Core, domain events are written to an outbox table in the same transaction. A background processor forwards them to the event store.
Show code
Copy code
// Outbox Pattern: Transactional consistency between EF Core and Event Store
//
// DbContextBase.SaveChangesAsync()
// ├─ Collect domain events from tracked aggregates
// ├─ Create OutboxMessage rows (same transaction)
// └─ SaveChangesAsync() — single atomic transaction
//
// OutboxProcessor (background hosted service)
// ├─ Poll OutboxMessages (FOR UPDATE SKIP LOCKED)
// ├─ Deserialize events via type cache
// ├─ IEventStore.AppendEventsAsync()
// ├─ Mark ProcessedAt on success
// └─ Dead-letter after MaxRetryCount failuresAggregate Pattern
Event-sourced aggregates extend EventSourcedAggregateBase and implement ApplyEvent to rebuild state from events. Business methods use ApplyAndRaise to change state and record the event.
EventSourcedAggregateBase
The abstract base class provides ApplyAndRaise (mutate + record), Replay (rehydrate from history), and the abstract ApplyEvent method.
Show code
Copy code
// EventSourcedAggregateBase<TId> extends AggregateRootBase<TId>
// and adds event replay + apply-and-raise capabilities.
public abstract class EventSourcedAggregateBase<TId> : AggregateRootBase<TId>
where TId : notnull, IGuidId
{
// Apply event to state AND raise it for persistence
protected void ApplyAndRaise(IDomainEvent @event)
{
ApplyEvent(@event);
RaiseDomainEvent(@event);
}
// Rehydrate aggregate from event history
protected void Replay(IEnumerable<IDomainEvent> events)
{
foreach (var @event in events)
ApplyEvent(@event);
ClearDomainEvents();
}
// Implement in subclass: apply event to aggregate state
protected abstract void ApplyEvent(IDomainEvent @event);
}Example: Order Aggregate
A complete example showing an event-sourced aggregate with domain events, a factory method, a business method, and the ApplyEvent switch.
Show code
Copy code
// Example: Order aggregate with event sourcing
public sealed record OrderId(Guid Value) : GuidIdBase(Value)
{
public static OrderId New() => new(Guid.NewGuid());
}
// Domain events
public sealed record OrderCreated(OrderId Id, string Customer) : DomainEventBase;
public sealed record OrderItemAdded(OrderId Id, string Product, int Qty) : DomainEventBase;
// Event-sourced aggregate
public sealed class Order : EventSourcedAggregateBase<OrderId>
{
public string Customer { get; private set; } = string.Empty;
private readonly List<(string Product, int Qty)> _items = [];
private Order() { } // For rehydration
public static Order Create(string customer)
{
var order = new Order { Id = OrderId.New() };
order.ApplyAndRaise(new OrderCreated(order.Id, customer));
return order;
}
public void AddItem(string product, int qty)
=> ApplyAndRaise(new OrderItemAdded(Id, product, qty));
protected override void ApplyEvent(IDomainEvent @event)
{
switch (@event)
{
case OrderCreated e:
Id = e.Id;
Customer = e.Customer;
break;
case OrderItemAdded e:
_items.Add((e.Product, e.Qty));
break;
}
}
}Command Handlers
Two base classes cover the create and update paths. Both handle event persistence and projection updates automatically.
EventSourcedCreateCommandHandler
For creating new aggregates. Implement Create() to build the aggregate and return a result. The handler starts a new event stream and updates projections.
Show code
Copy code
// EventSourcedCreateCommandHandler: for creating new aggregates
// Returns a result (e.g., the new aggregate ID).
public sealed record CreateOrder(string Customer) : ICommand<OrderId>;
public sealed class CreateOrderHandler
: EventSourcedCreateCommandHandler<Order, OrderId, CreateOrder, OrderId>
{
public CreateOrderHandler(
IStreamRepository<OrderId> streamRepo,
IProjectionUpdater projectionUpdater)
: base(streamRepo, projectionUpdater) { }
protected override (Order Aggregate, OrderId Result) Create(CreateOrder command)
{
var order = Order.Create(command.Customer);
return (order, order.Id);
}
}EventSourcedCommandHandler
For updating existing aggregates. Implement GetAggregateId(), Rehydrate(), and Execute(). The handler loads the stream, checks concurrency, rehydrates, and appends new events.
Show code
Copy code
// EventSourcedCommandHandler: for updating existing aggregates
// Loads, rehydrates, executes, and persists new events.
public sealed record AddOrderItem(OrderId Id, string Product, int Qty)
: ICommand, IHasExpectedVersion
{
public long? ExpectedVersion { get; init; }
}
public sealed class AddOrderItemHandler
: EventSourcedCommandHandler<Order, OrderId, AddOrderItem>
{
protected override string AggregateType => "Order";
public AddOrderItemHandler(
IStreamRepository<OrderId> streamRepo,
IProjectionUpdater projectionUpdater)
: base(streamRepo, projectionUpdater) { }
protected override OrderId GetAggregateId(AddOrderItem cmd) => cmd.Id;
protected override Order Rehydrate(IReadOnlyList<IDomainEvent> events)
{
var order = (Order)Activator.CreateInstance(typeof(Order), true)!;
order.Replay(events);
return order;
}
protected override void Execute(Order aggregate, AddOrderItem cmd)
=> aggregate.AddItem(cmd.Product, cmd.Qty);
}Optimistic Concurrency
Commands can implement IHasExpectedVersion to enable optimistic concurrency. If the client's expected version doesn't match the actual stream version, a StreamConcurrencyException is thrown. The API layer translates this into an ETag response header.
Show code
Copy code
// Optimistic concurrency via IHasExpectedVersion
//
// Commands can implement IHasExpectedVersion to provide a version check.
// StreamData.ValidateExpectedVersion() throws StreamConcurrencyException
// if the client's expected version doesn't match the actual stream version.
//
// In the API layer, StreamConcurrencyExceptionHandler sets an ETag
// response header with the actual version for client retry guidance.
public interface IHasExpectedVersion
{
long? ExpectedVersion { get; }
}Streams & Projections
Stream repositories abstract event persistence. Projection updaters synchronize read models in EF Core from domain events.
Stream Repository
Extend StreamRepositoryBase for each aggregate type. It handles loading events from Marten and appending new ones.
Show code
Copy code
// Implement a concrete StreamRepository for your aggregate.
// StreamRepositoryBase handles loading/appending via IEventStore.
public sealed class OrderStreamRepository : StreamRepositoryBase<OrderId>
{
protected override string AggregateType => "Order";
public OrderStreamRepository(IEventStore eventStore)
: base(eventStore) { }
}
// Register in DI:
// services.AddScoped<IStreamRepository<OrderId>, OrderStreamRepository>();Projection Updater
Extend ProjectionUpdaterBase to dispatch events to your EF Core read models. Each event type updates the corresponding read model entity.
Show code
Copy code
// ProjectionUpdaterBase dispatches events to your read model via EF Core.
// Implement DispatchAsync to handle each event type.
public sealed class OrderProjectionUpdater : ProjectionUpdaterBase<AppDbContext>
{
public OrderProjectionUpdater(AppDbContext db) : base(db) { }
protected override async Task DispatchAsync(
IDomainEvent @event, CancellationToken ct)
{
switch (@event)
{
case OrderCreated e:
DbContext.OrderReadModels.Add(new OrderReadModel
{
Id = e.Id.Value,
Customer = e.Customer,
Version = 1,
CreatedAt = DateTime.UtcNow,
UpdatedAt = DateTime.UtcNow,
});
break;
case OrderItemAdded e:
var model = await DbContext.OrderReadModels
.FindAsync([e.Id.Value], ct);
if (model is not null)
{
model.ItemCount++;
model.Version++;
model.UpdatedAt = DateTime.UtcNow;
}
break;
}
}
}IReadModel Interface
| Property | Type | Description |
|---|---|---|
| Id | Guid | Unique identifier (matches aggregate ID) |
| Version | int | Current projection version (incremented on each event) |
| CreatedAt | DateTime | Timestamp when the read model was created |
| UpdatedAt | DateTime | Timestamp of the last update |
DI Setup
Register event sourcing services in your application's dependency injection container. The setup involves Marten, core infrastructure, and your custom repositories, projections, and handlers.
Service Registration
Complete Program.cs setup with all required registrations for event sourcing.
Show code
Copy code
// Program.cs — complete DI registration for event sourcing
var builder = WebApplication.CreateBuilder(args);
// 1. Marten event store (PostgreSQL-backed)
builder.Services.AddMartenEventStore(
builder.Configuration.GetConnectionString("marten-db")!);
// 2. Core infrastructure (outbox processor, etc.)
builder.Services.AddCoreInfrastructure(builder.Configuration);
// 3. Event sourcing abstractions
builder.Services.AddCoreEventSourcing();
// 4. Your stream repositories
builder.Services.AddScoped<IStreamRepository<OrderId>, OrderStreamRepository>();
// 5. Your projection updaters
builder.Services.AddScoped<IProjectionUpdater, OrderProjectionUpdater>();
// 6. Your command handlers
builder.Services.AddScoped<ICommandHandler<CreateOrder, OrderId>, CreateOrderHandler>();
builder.Services.AddScoped<ICommandHandler<AddOrderItem>, AddOrderItemHandler>();Aspire Service Discovery
Use AddMartenEventStoreFromConfiguration() instead of AddMartenEventStore() when connection strings come from Aspire service discovery or environment configuration.
Outbox Pattern
The outbox pattern ensures domain events are reliably forwarded from EF Core transactions to the Marten event store. Events are written to an outbox table in the same transaction as aggregate changes, then asynchronously processed by a background service.
OutboxProcessorOptions
| Property | Type | Default | Description |
|---|---|---|---|
| PollIntervalSeconds | int | 2 | How often the processor polls for unprocessed messages |
| BatchSize | int | 50 | Maximum number of messages processed per batch |
| MaxRetryCount | int | 5 | Maximum retry attempts before dead-lettering a message |
Configuration
Configure the outbox processor via the Outbox section in appsettings.json. These options are bound via AddCoreInfrastructure().
Show code
Copy code
// appsettings.json — configure outbox processor options
{
"Outbox": {
"PollIntervalSeconds": 2,
"BatchSize": 50,
"MaxRetryCount": 5
}
}