355 lines
9.7 KiB
Markdown
355 lines
9.7 KiB
Markdown
# Event Sourcing Fundamentals
|
|
|
|
This tutorial introduces event sourcing concepts and how to implement them with Svrnty.CQRS.
|
|
|
|
## What is Event Sourcing?
|
|
|
|
Event sourcing is a pattern where you store the state of your application as a sequence of events rather than storing just the current state. Instead of updating a record in the database, you append an event that describes what happened.
|
|
|
|
**Traditional State Storage:**
|
|
```csharp
|
|
// Store current state
|
|
var user = new User { Id = 1, Name = "Alice", Status = "Active" };
|
|
await db.Users.AddAsync(user);
|
|
|
|
// Update state (old state is lost)
|
|
user.Status = "Suspended";
|
|
await db.SaveChangesAsync();
|
|
```
|
|
|
|
**Event Sourcing:**
|
|
```csharp
|
|
// Store events
|
|
await eventStore.AppendAsync("user-1", new UserRegisteredEvent { Name = "Alice" });
|
|
await eventStore.AppendAsync("user-1", new UserSuspendedEvent { Reason = "Policy violation" });
|
|
|
|
// Rebuild state by replaying events
|
|
var user = new User();
|
|
await foreach (var @event in eventStore.ReadStreamAsync("user-1"))
|
|
{
|
|
user.Apply(@event); // UserRegisteredEvent -> Name = "Alice"
|
|
// UserSuspendedEvent -> Status = "Suspended"
|
|
}
|
|
```
|
|
|
|
## Key Benefits
|
|
|
|
✅ **Complete Audit Trail**
|
|
- Every state change is recorded as an event
|
|
- You know exactly what happened, when, and why
|
|
- Compliance and regulatory requirements are easily met
|
|
|
|
✅ **Time Travel**
|
|
- Reconstruct state at any point in time
|
|
- Debug production issues by replaying events
|
|
- Analyze historical data for insights
|
|
|
|
✅ **Event-Driven Architecture**
|
|
- Events are first-class citizens
|
|
- Other systems can subscribe to events
|
|
- Enables reactive, loosely-coupled systems
|
|
|
|
✅ **Flexibility**
|
|
- Add new projections from existing events
|
|
- Change read models without losing data
|
|
- Rebuild state after bugs are fixed
|
|
|
|
## Event Sourcing with Svrnty.CQRS
|
|
|
|
Svrnty.CQRS provides persistent event streams for event sourcing:
|
|
|
|
```csharp
|
|
// 1. Register event streaming services
|
|
builder.Services.AddEventStreaming()
|
|
.AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
var app = builder.Build();
|
|
var eventStore = app.Services.GetRequiredService<IEventStreamStore>();
|
|
|
|
// 2. Define domain events
|
|
public record UserRegisteredEvent
|
|
{
|
|
public string UserId { get; init; } = string.Empty;
|
|
public string Name { get; init; } = string.Empty;
|
|
public string Email { get; init; } = string.Empty;
|
|
public DateTimeOffset RegisteredAt { get; init; }
|
|
}
|
|
|
|
public record UserEmailChangedEvent
|
|
{
|
|
public string UserId { get; init; } = string.Empty;
|
|
public string OldEmail { get; init; } = string.Empty;
|
|
public string NewEmail { get; init; } = string.Empty;
|
|
public DateTimeOffset ChangedAt { get; init; }
|
|
}
|
|
|
|
public record UserSuspendedEvent
|
|
{
|
|
public string UserId { get; init; } = string.Empty;
|
|
public string Reason { get; init; } = string.Empty;
|
|
public DateTimeOffset SuspendedAt { get; init; }
|
|
}
|
|
|
|
// 3. Append events to stream
|
|
await eventStore.AppendAsync("user-123", new UserRegisteredEvent
|
|
{
|
|
UserId = "user-123",
|
|
Name = "Alice Smith",
|
|
Email = "alice@example.com",
|
|
RegisteredAt = DateTimeOffset.UtcNow
|
|
});
|
|
|
|
await eventStore.AppendAsync("user-123", new UserEmailChangedEvent
|
|
{
|
|
UserId = "user-123",
|
|
OldEmail = "alice@example.com",
|
|
NewEmail = "alice.smith@example.com",
|
|
ChangedAt = DateTimeOffset.UtcNow
|
|
});
|
|
|
|
// 4. Replay events to rebuild state
|
|
var user = new User();
|
|
await foreach (var storedEvent in eventStore.ReadStreamAsync("user-123"))
|
|
{
|
|
var @event = storedEvent.Data;
|
|
|
|
switch (@event)
|
|
{
|
|
case UserRegisteredEvent e:
|
|
user.Id = e.UserId;
|
|
user.Name = e.Name;
|
|
user.Email = e.Email;
|
|
user.Status = UserStatus.Active;
|
|
break;
|
|
|
|
case UserEmailChangedEvent e:
|
|
user.Email = e.NewEmail;
|
|
break;
|
|
|
|
case UserSuspendedEvent e:
|
|
user.Status = UserStatus.Suspended;
|
|
user.SuspensionReason = e.Reason;
|
|
break;
|
|
}
|
|
}
|
|
|
|
Console.WriteLine($"User: {user.Name}, Email: {user.Email}, Status: {user.Status}");
|
|
// Output: User: Alice Smith, Email: alice.smith@example.com, Status: Active
|
|
```
|
|
|
|
## Stream Naming
|
|
|
|
Choose stream names that represent aggregate instances:
|
|
|
|
✅ **Good Stream Names:**
|
|
- `user-123` - User aggregate with ID 123
|
|
- `order-456` - Order aggregate with ID 456
|
|
- `account-789` - Account aggregate with ID 789
|
|
|
|
❌ **Bad Stream Names:**
|
|
- `users` - All users (too broad)
|
|
- `user-events` - Unclear which user
|
|
- `123` - Not descriptive
|
|
|
|
## Event Naming
|
|
|
|
Design events as past-tense facts:
|
|
|
|
✅ **Good Event Names:**
|
|
- `UserRegisteredEvent` - User was registered
|
|
- `OrderPlacedEvent` - Order was placed
|
|
- `PaymentProcessedEvent` - Payment was processed
|
|
|
|
❌ **Bad Event Names:**
|
|
- `RegisterUserEvent` - Sounds like a command
|
|
- `UserEvent` - Not specific
|
|
- `UpdateUser` - Not descriptive
|
|
|
|
## Event Data
|
|
|
|
Events should be immutable POCOs with `init` properties:
|
|
|
|
```csharp
|
|
// ✅ Good: Immutable record with descriptive properties
|
|
public record OrderPlacedEvent
|
|
{
|
|
public string OrderId { get; init; } = string.Empty;
|
|
public string CustomerId { get; init; } = string.Empty;
|
|
public decimal TotalAmount { get; init; }
|
|
public List<OrderLineItem> Items { get; init; } = new();
|
|
public DateTimeOffset PlacedAt { get; init; }
|
|
}
|
|
|
|
public record OrderLineItem
|
|
{
|
|
public string ProductId { get; init; } = string.Empty;
|
|
public int Quantity { get; init; }
|
|
public decimal UnitPrice { get; init; }
|
|
}
|
|
|
|
// ❌ Bad: Mutable class with setters
|
|
public class OrderEvent
|
|
{
|
|
public string Id { get; set; }
|
|
public string Data { get; set; } // Vague property name
|
|
}
|
|
```
|
|
|
|
## Reading Events
|
|
|
|
Read events from a stream using `ReadStreamAsync`:
|
|
|
|
```csharp
|
|
// Read all events from the beginning
|
|
await foreach (var storedEvent in eventStore.ReadStreamAsync("order-123"))
|
|
{
|
|
Console.WriteLine($"Offset: {storedEvent.Offset}, Type: {storedEvent.EventType}");
|
|
var @event = storedEvent.Data;
|
|
// Process event...
|
|
}
|
|
|
|
// Read from specific offset
|
|
await foreach (var storedEvent in eventStore.ReadStreamAsync("order-123", fromOffset: 10))
|
|
{
|
|
// Only events with offset > 10
|
|
}
|
|
```
|
|
|
|
## Complete Example
|
|
|
|
Here's a complete user aggregate with event sourcing:
|
|
|
|
```csharp
|
|
public class User
|
|
{
|
|
public string Id { get; private set; } = string.Empty;
|
|
public string Name { get; private set; } = string.Empty;
|
|
public string Email { get; private set; } = string.Empty;
|
|
public UserStatus Status { get; private set; }
|
|
public string? SuspensionReason { get; private set; }
|
|
|
|
private readonly List<object> _uncommittedEvents = new();
|
|
|
|
public IReadOnlyList<object> GetUncommittedEvents() => _uncommittedEvents;
|
|
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
|
|
|
|
// Commands that produce events
|
|
public void Register(string id, string name, string email)
|
|
{
|
|
if (string.IsNullOrEmpty(Id))
|
|
{
|
|
ApplyEvent(new UserRegisteredEvent
|
|
{
|
|
UserId = id,
|
|
Name = name,
|
|
Email = email,
|
|
RegisteredAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
}
|
|
|
|
public void ChangeEmail(string newEmail)
|
|
{
|
|
if (Email != newEmail)
|
|
{
|
|
ApplyEvent(new UserEmailChangedEvent
|
|
{
|
|
UserId = Id,
|
|
OldEmail = Email,
|
|
NewEmail = newEmail,
|
|
ChangedAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
}
|
|
|
|
public void Suspend(string reason)
|
|
{
|
|
if (Status != UserStatus.Suspended)
|
|
{
|
|
ApplyEvent(new UserSuspendedEvent
|
|
{
|
|
UserId = Id,
|
|
Reason = reason,
|
|
SuspendedAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
}
|
|
|
|
// Apply event (used for both new events and replay)
|
|
public void Apply(object @event)
|
|
{
|
|
switch (@event)
|
|
{
|
|
case UserRegisteredEvent e:
|
|
Id = e.UserId;
|
|
Name = e.Name;
|
|
Email = e.Email;
|
|
Status = UserStatus.Active;
|
|
break;
|
|
|
|
case UserEmailChangedEvent e:
|
|
Email = e.NewEmail;
|
|
break;
|
|
|
|
case UserSuspendedEvent e:
|
|
Status = UserStatus.Suspended;
|
|
SuspensionReason = e.Reason;
|
|
break;
|
|
}
|
|
}
|
|
|
|
private void ApplyEvent(object @event)
|
|
{
|
|
Apply(@event);
|
|
_uncommittedEvents.Add(@event);
|
|
}
|
|
}
|
|
|
|
public enum UserStatus
|
|
{
|
|
Active,
|
|
Suspended
|
|
}
|
|
|
|
// Command handler that uses the aggregate
|
|
public class RegisterUserCommandHandler : ICommandHandler<RegisterUserCommand, string>
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public RegisterUserCommandHandler(IEventStreamStore eventStore)
|
|
{
|
|
_eventStore = eventStore;
|
|
}
|
|
|
|
public async Task<string> HandleAsync(RegisterUserCommand command, CancellationToken ct)
|
|
{
|
|
var userId = Guid.NewGuid().ToString();
|
|
var user = new User();
|
|
|
|
user.Register(userId, command.Name, command.Email);
|
|
|
|
// Save uncommitted events
|
|
foreach (var @event in user.GetUncommittedEvents())
|
|
{
|
|
await _eventStore.AppendAsync($"user-{userId}", @event, ct);
|
|
}
|
|
|
|
user.ClearUncommittedEvents();
|
|
|
|
return userId;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
- [02-aggregate-design.md](02-aggregate-design.md) - Learn how to design aggregates
|
|
- [03-events-and-workflows.md](03-events-and-workflows.md) - Event design and workflow patterns
|
|
- [04-projections.md](04-projections.md) - Build read models from events
|
|
|
|
## See Also
|
|
|
|
- [Event Streaming Fundamentals](../../event-streaming/fundamentals/getting-started.md)
|
|
- [Persistent Streams](../../event-streaming/fundamentals/persistent-streams.md)
|
|
- [Projections](../../event-streaming/projections/creating-projections.md)
|