9.7 KiB
Event Sourcing Fundamentals
This tutorial introduces event sourcing concepts and how to implement them with Svrnty.CQRS.
What is Event Sourcing?
Event sourcing is a pattern where you store the state of your application as a sequence of events rather than storing just the current state. Instead of updating a record in the database, you append an event that describes what happened.
Traditional State Storage:
// Store current state
var user = new User { Id = 1, Name = "Alice", Status = "Active" };
await db.Users.AddAsync(user);
// Update state (old state is lost)
user.Status = "Suspended";
await db.SaveChangesAsync();
Event Sourcing:
// Store events
await eventStore.AppendAsync("user-1", new UserRegisteredEvent { Name = "Alice" });
await eventStore.AppendAsync("user-1", new UserSuspendedEvent { Reason = "Policy violation" });
// Rebuild state by replaying events
var user = new User();
await foreach (var @event in eventStore.ReadStreamAsync("user-1"))
{
user.Apply(@event); // UserRegisteredEvent -> Name = "Alice"
// UserSuspendedEvent -> Status = "Suspended"
}
Key Benefits
✅ Complete Audit Trail
- Every state change is recorded as an event
- You know exactly what happened, when, and why
- Compliance and regulatory requirements are easily met
✅ Time Travel
- Reconstruct state at any point in time
- Debug production issues by replaying events
- Analyze historical data for insights
✅ Event-Driven Architecture
- Events are first-class citizens
- Other systems can subscribe to events
- Enables reactive, loosely-coupled systems
✅ Flexibility
- Add new projections from existing events
- Change read models without losing data
- Rebuild state after bugs are fixed
Event Sourcing with Svrnty.CQRS
Svrnty.CQRS provides persistent event streams for event sourcing:
// 1. Register event streaming services
builder.Services.AddEventStreaming()
.AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));
var app = builder.Build();
var eventStore = app.Services.GetRequiredService<IEventStreamStore>();
// 2. Define domain events
public record UserRegisteredEvent
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
public DateTimeOffset RegisteredAt { get; init; }
}
public record UserEmailChangedEvent
{
public string UserId { get; init; } = string.Empty;
public string OldEmail { get; init; } = string.Empty;
public string NewEmail { get; init; } = string.Empty;
public DateTimeOffset ChangedAt { get; init; }
}
public record UserSuspendedEvent
{
public string UserId { get; init; } = string.Empty;
public string Reason { get; init; } = string.Empty;
public DateTimeOffset SuspendedAt { get; init; }
}
// 3. Append events to stream
await eventStore.AppendAsync("user-123", new UserRegisteredEvent
{
UserId = "user-123",
Name = "Alice Smith",
Email = "alice@example.com",
RegisteredAt = DateTimeOffset.UtcNow
});
await eventStore.AppendAsync("user-123", new UserEmailChangedEvent
{
UserId = "user-123",
OldEmail = "alice@example.com",
NewEmail = "alice.smith@example.com",
ChangedAt = DateTimeOffset.UtcNow
});
// 4. Replay events to rebuild state
var user = new User();
await foreach (var storedEvent in eventStore.ReadStreamAsync("user-123"))
{
var @event = storedEvent.Data;
switch (@event)
{
case UserRegisteredEvent e:
user.Id = e.UserId;
user.Name = e.Name;
user.Email = e.Email;
user.Status = UserStatus.Active;
break;
case UserEmailChangedEvent e:
user.Email = e.NewEmail;
break;
case UserSuspendedEvent e:
user.Status = UserStatus.Suspended;
user.SuspensionReason = e.Reason;
break;
}
}
Console.WriteLine($"User: {user.Name}, Email: {user.Email}, Status: {user.Status}");
// Output: User: Alice Smith, Email: alice.smith@example.com, Status: Active
Stream Naming
Choose stream names that represent aggregate instances:
✅ Good Stream Names:
user-123- User aggregate with ID 123order-456- Order aggregate with ID 456account-789- Account aggregate with ID 789
❌ Bad Stream Names:
users- All users (too broad)user-events- Unclear which user123- Not descriptive
Event Naming
Design events as past-tense facts:
✅ Good Event Names:
UserRegisteredEvent- User was registeredOrderPlacedEvent- Order was placedPaymentProcessedEvent- Payment was processed
❌ Bad Event Names:
RegisterUserEvent- Sounds like a commandUserEvent- Not specificUpdateUser- Not descriptive
Event Data
Events should be immutable POCOs with init properties:
// ✅ Good: Immutable record with descriptive properties
public record OrderPlacedEvent
{
public string OrderId { get; init; } = string.Empty;
public string CustomerId { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public List<OrderLineItem> Items { get; init; } = new();
public DateTimeOffset PlacedAt { get; init; }
}
public record OrderLineItem
{
public string ProductId { get; init; } = string.Empty;
public int Quantity { get; init; }
public decimal UnitPrice { get; init; }
}
// ❌ Bad: Mutable class with setters
public class OrderEvent
{
public string Id { get; set; }
public string Data { get; set; } // Vague property name
}
Reading Events
Read events from a stream using ReadStreamAsync:
// Read all events from the beginning
await foreach (var storedEvent in eventStore.ReadStreamAsync("order-123"))
{
Console.WriteLine($"Offset: {storedEvent.Offset}, Type: {storedEvent.EventType}");
var @event = storedEvent.Data;
// Process event...
}
// Read from specific offset
await foreach (var storedEvent in eventStore.ReadStreamAsync("order-123", fromOffset: 10))
{
// Only events with offset > 10
}
Complete Example
Here's a complete user aggregate with event sourcing:
public class User
{
public string Id { get; private set; } = string.Empty;
public string Name { get; private set; } = string.Empty;
public string Email { get; private set; } = string.Empty;
public UserStatus Status { get; private set; }
public string? SuspensionReason { get; private set; }
private readonly List<object> _uncommittedEvents = new();
public IReadOnlyList<object> GetUncommittedEvents() => _uncommittedEvents;
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
// Commands that produce events
public void Register(string id, string name, string email)
{
if (string.IsNullOrEmpty(Id))
{
ApplyEvent(new UserRegisteredEvent
{
UserId = id,
Name = name,
Email = email,
RegisteredAt = DateTimeOffset.UtcNow
});
}
}
public void ChangeEmail(string newEmail)
{
if (Email != newEmail)
{
ApplyEvent(new UserEmailChangedEvent
{
UserId = Id,
OldEmail = Email,
NewEmail = newEmail,
ChangedAt = DateTimeOffset.UtcNow
});
}
}
public void Suspend(string reason)
{
if (Status != UserStatus.Suspended)
{
ApplyEvent(new UserSuspendedEvent
{
UserId = Id,
Reason = reason,
SuspendedAt = DateTimeOffset.UtcNow
});
}
}
// Apply event (used for both new events and replay)
public void Apply(object @event)
{
switch (@event)
{
case UserRegisteredEvent e:
Id = e.UserId;
Name = e.Name;
Email = e.Email;
Status = UserStatus.Active;
break;
case UserEmailChangedEvent e:
Email = e.NewEmail;
break;
case UserSuspendedEvent e:
Status = UserStatus.Suspended;
SuspensionReason = e.Reason;
break;
}
}
private void ApplyEvent(object @event)
{
Apply(@event);
_uncommittedEvents.Add(@event);
}
}
public enum UserStatus
{
Active,
Suspended
}
// Command handler that uses the aggregate
public class RegisterUserCommandHandler : ICommandHandler<RegisterUserCommand, string>
{
private readonly IEventStreamStore _eventStore;
public RegisterUserCommandHandler(IEventStreamStore eventStore)
{
_eventStore = eventStore;
}
public async Task<string> HandleAsync(RegisterUserCommand command, CancellationToken ct)
{
var userId = Guid.NewGuid().ToString();
var user = new User();
user.Register(userId, command.Name, command.Email);
// Save uncommitted events
foreach (var @event in user.GetUncommittedEvents())
{
await _eventStore.AppendAsync($"user-{userId}", @event, ct);
}
user.ClearUncommittedEvents();
return userId;
}
}
Next Steps
- 02-aggregate-design.md - Learn how to design aggregates
- 03-events-and-workflows.md - Event design and workflow patterns
- 04-projections.md - Build read models from events