441 lines
14 KiB
Markdown
441 lines
14 KiB
Markdown
# Building Read Models with Projections
|
|
|
|
Learn how to create projections to build read models from event streams with Svrnty.CQRS.
|
|
|
|
## What is a Projection?
|
|
|
|
A **projection** is a read model built by processing events from one or more event streams. Projections transform write-optimized event streams into read-optimized views.
|
|
|
|
**Benefits:**
|
|
- Separate read and write models (CQRS)
|
|
- Optimize queries without affecting write performance
|
|
- Create multiple views from the same events
|
|
- Rebuild views by replaying events
|
|
|
|
## Simple Projection
|
|
|
|
Here's a basic projection that builds a user summary:
|
|
|
|
```csharp
|
|
public class UserSummaryProjection : IDynamicProjection
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
private readonly ICheckpointStore _checkpointStore;
|
|
private readonly IUserSummaryRepository _repository;
|
|
private readonly ILogger<UserSummaryProjection> _logger;
|
|
|
|
public string ProjectionName => "user-summary";
|
|
|
|
public UserSummaryProjection(
|
|
IEventStreamStore eventStore,
|
|
ICheckpointStore checkpointStore,
|
|
IUserSummaryRepository repository,
|
|
ILogger<UserSummaryProjection> logger)
|
|
{
|
|
_eventStore = eventStore;
|
|
_checkpointStore = checkpointStore;
|
|
_repository = repository;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
// Get last processed offset
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);
|
|
|
|
_logger.LogInformation("Starting projection from offset {Offset}", checkpoint);
|
|
|
|
// Read events from checkpoint
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
"users",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
// Handle event
|
|
await HandleEventAsync(storedEvent.Data, ct);
|
|
|
|
// Save checkpoint
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, storedEvent.Offset, ct);
|
|
}
|
|
}
|
|
|
|
private async Task HandleEventAsync(object @event, CancellationToken ct)
|
|
{
|
|
switch (@event)
|
|
{
|
|
case UserRegisteredEvent e:
|
|
await _repository.CreateAsync(new UserSummary
|
|
{
|
|
UserId = e.UserId,
|
|
Name = e.Name,
|
|
Email = e.Email,
|
|
Status = "Active",
|
|
RegisteredAt = e.RegisteredAt
|
|
}, ct);
|
|
break;
|
|
|
|
case UserEmailChangedEvent e:
|
|
var user = await _repository.GetByIdAsync(e.UserId, ct);
|
|
if (user != null)
|
|
{
|
|
user.Email = e.NewEmail;
|
|
await _repository.UpdateAsync(user, ct);
|
|
}
|
|
break;
|
|
|
|
case UserSuspendedEvent e:
|
|
var suspendedUser = await _repository.GetByIdAsync(e.UserId, ct);
|
|
if (suspendedUser != null)
|
|
{
|
|
suspendedUser.Status = "Suspended";
|
|
suspendedUser.SuspensionReason = e.Reason;
|
|
await _repository.UpdateAsync(suspendedUser, ct);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read model
|
|
public class UserSummary
|
|
{
|
|
public string UserId { get; set; } = string.Empty;
|
|
public string Name { get; set; } = string.Empty;
|
|
public string Email { get; set; } = string.Empty;
|
|
public string Status { get; set; } = string.Empty;
|
|
public string? SuspensionReason { get; set; }
|
|
public DateTimeOffset RegisteredAt { get; set; }
|
|
}
|
|
```
|
|
|
|
## Checkpoint Management
|
|
|
|
Checkpoints track the last processed event offset:
|
|
|
|
```csharp
|
|
public interface ICheckpointStore
|
|
{
|
|
Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct = default);
|
|
Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default);
|
|
}
|
|
|
|
// PostgreSQL implementation
|
|
public class PostgresCheckpointStore : ICheckpointStore
|
|
{
|
|
private readonly string _connectionString;
|
|
|
|
public async Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_connectionString);
|
|
await connection.OpenAsync(ct);
|
|
|
|
var command = new NpgsqlCommand(
|
|
"SELECT checkpoint FROM projection_checkpoints WHERE projection_name = @name",
|
|
connection);
|
|
command.Parameters.AddWithValue("name", projectionName);
|
|
|
|
var result = await command.ExecuteScalarAsync(ct);
|
|
return result != null ? (long)result : 0;
|
|
}
|
|
|
|
public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_connectionString);
|
|
await connection.OpenAsync(ct);
|
|
|
|
var command = new NpgsqlCommand(@"
|
|
INSERT INTO projection_checkpoints (projection_name, checkpoint, updated_at)
|
|
VALUES (@name, @checkpoint, @updatedAt)
|
|
ON CONFLICT (projection_name)
|
|
DO UPDATE SET checkpoint = @checkpoint, updated_at = @updatedAt",
|
|
connection);
|
|
|
|
command.Parameters.AddWithValue("name", projectionName);
|
|
command.Parameters.AddWithValue("checkpoint", offset);
|
|
command.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow);
|
|
|
|
await command.ExecuteNonQueryAsync(ct);
|
|
}
|
|
}
|
|
|
|
// Database schema
|
|
/*
|
|
CREATE TABLE projection_checkpoints (
|
|
projection_name VARCHAR(255) PRIMARY KEY,
|
|
checkpoint BIGINT NOT NULL,
|
|
updated_at TIMESTAMPTZ NOT NULL
|
|
);
|
|
*/
|
|
```
|
|
|
|
## Multi-Stream Projection
|
|
|
|
Project from multiple streams:
|
|
|
|
```csharp
|
|
public class OrderAnalyticsProjection : IDynamicProjection
|
|
{
|
|
public string ProjectionName => "order-analytics";
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);
|
|
|
|
// Read from multiple streams
|
|
var streams = new[] { "orders", "payments", "shipments" };
|
|
|
|
foreach (var streamName in streams)
|
|
{
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
streamName,
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(storedEvent.Data, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, storedEvent.Offset, ct);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task HandleEventAsync(object @event, CancellationToken ct)
|
|
{
|
|
switch (@event)
|
|
{
|
|
case OrderPlacedEvent e:
|
|
await _repository.IncrementOrderCountAsync(e.CustomerId, ct);
|
|
await _repository.AddRevenueAsync(e.TotalAmount, ct);
|
|
break;
|
|
|
|
case PaymentProcessedEvent e:
|
|
await _repository.RecordPaymentAsync(e.PaymentId, e.Amount, ct);
|
|
break;
|
|
|
|
case ShipmentCreatedEvent e:
|
|
await _repository.RecordShipmentAsync(e.OrderId, e.ShipmentId, ct);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Batched Projection
|
|
|
|
Process events in batches for better performance:
|
|
|
|
```csharp
|
|
public class OrderSummaryProjection : IDynamicProjection
|
|
{
|
|
private const int BatchSize = 100;
|
|
public string ProjectionName => "order-summary";
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);
|
|
var batch = new List<(long Offset, object Event)>();
|
|
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
batch.Add((storedEvent.Offset, storedEvent.Data));
|
|
|
|
if (batch.Count >= BatchSize)
|
|
{
|
|
await ProcessBatchAsync(batch, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, batch.Last().Offset, ct);
|
|
batch.Clear();
|
|
}
|
|
}
|
|
|
|
// Process remaining events
|
|
if (batch.Count > 0)
|
|
{
|
|
await ProcessBatchAsync(batch, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, batch.Last().Offset, ct);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessBatchAsync(List<(long Offset, object Event)> batch, CancellationToken ct)
|
|
{
|
|
// Use a transaction for the batch
|
|
await using var transaction = await _repository.BeginTransactionAsync(ct);
|
|
|
|
try
|
|
{
|
|
foreach (var (_, @event) in batch)
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
}
|
|
|
|
await transaction.CommitAsync(ct);
|
|
}
|
|
catch
|
|
{
|
|
await transaction.RollbackAsync(ct);
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Projection Registration
|
|
|
|
Register projections as background services:
|
|
|
|
```csharp
|
|
// In Program.cs
|
|
builder.Services.AddSingleton<ICheckpointStore, PostgresCheckpointStore>();
|
|
builder.Services.AddSingleton<IDynamicProjection, UserSummaryProjection>();
|
|
builder.Services.AddSingleton<IDynamicProjection, OrderAnalyticsProjection>();
|
|
|
|
// Auto-start projections
|
|
builder.Services.AddProjectionRunner(options =>
|
|
{
|
|
options.AutoStart = true;
|
|
options.BatchSize = 100;
|
|
options.CheckpointInterval = TimeSpan.FromSeconds(5);
|
|
});
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Resettable Projections
|
|
|
|
Allow projections to be rebuilt from scratch:
|
|
|
|
```csharp
|
|
public interface IResettableProjection : IDynamicProjection
|
|
{
|
|
Task ResetAsync(CancellationToken ct = default);
|
|
}
|
|
|
|
public class UserSummaryProjection : IResettableProjection
|
|
{
|
|
public string ProjectionName => "user-summary";
|
|
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
// Clear read model
|
|
await _repository.DeleteAllAsync(ct);
|
|
|
|
// Reset checkpoint
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0, ct);
|
|
}
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
// ... projection logic
|
|
}
|
|
}
|
|
|
|
// Reset and rebuild projection
|
|
var projection = app.Services.GetRequiredService<IResettableProjection>();
|
|
await projection.ResetAsync();
|
|
await projection.RunAsync(CancellationToken.None);
|
|
```
|
|
|
|
## Denormalized Projections
|
|
|
|
Create highly denormalized views for fast queries:
|
|
|
|
```csharp
|
|
public class CustomerOrderHistoryProjection : IDynamicProjection
|
|
{
|
|
public string ProjectionName => "customer-order-history";
|
|
|
|
private async Task HandleEventAsync(object @event, CancellationToken ct)
|
|
{
|
|
switch (@event)
|
|
{
|
|
case OrderPlacedEvent e:
|
|
// Denormalize order details into customer record
|
|
var customer = await _repository.GetCustomerAsync(e.CustomerId, ct);
|
|
|
|
customer.Orders.Add(new OrderSummary
|
|
{
|
|
OrderId = e.OrderId,
|
|
PlacedAt = e.PlacedAt,
|
|
TotalAmount = e.TotalAmount,
|
|
Items = e.Items.Select(i => new ItemSummary
|
|
{
|
|
ProductId = i.ProductId,
|
|
ProductName = i.ProductName,
|
|
Quantity = i.Quantity,
|
|
Price = i.Price
|
|
}).ToList()
|
|
});
|
|
|
|
customer.TotalSpent += e.TotalAmount;
|
|
customer.OrderCount++;
|
|
|
|
await _repository.UpdateCustomerAsync(customer, ct);
|
|
break;
|
|
|
|
case OrderShippedEvent e:
|
|
var customerWithOrder = await _repository.GetCustomerAsync(e.CustomerId, ct);
|
|
var order = customerWithOrder.Orders.FirstOrDefault(o => o.OrderId == e.OrderId);
|
|
|
|
if (order != null)
|
|
{
|
|
order.Status = "Shipped";
|
|
order.TrackingNumber = e.TrackingNumber;
|
|
order.ShippedAt = e.ShippedAt;
|
|
await _repository.UpdateCustomerAsync(customerWithOrder, ct);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read model
|
|
public class CustomerOrderHistory
|
|
{
|
|
public string CustomerId { get; set; } = string.Empty;
|
|
public string Name { get; set; } = string.Empty;
|
|
public List<OrderSummary> Orders { get; set; } = new();
|
|
public decimal TotalSpent { get; set; }
|
|
public int OrderCount { get; set; }
|
|
}
|
|
|
|
public class OrderSummary
|
|
{
|
|
public string OrderId { get; set; } = string.Empty;
|
|
public DateTimeOffset PlacedAt { get; set; }
|
|
public string Status { get; set; } = string.Empty;
|
|
public decimal TotalAmount { get; set; }
|
|
public List<ItemSummary> Items { get; set; } = new();
|
|
public string? TrackingNumber { get; set; }
|
|
public DateTimeOffset? ShippedAt { get; set; }
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
✅ **DO:**
|
|
- Save checkpoints regularly to track progress
|
|
- Use batching for better performance
|
|
- Handle events idempotently (safe to replay)
|
|
- Create multiple projections for different views
|
|
- Denormalize data for query performance
|
|
- Make projections resettable for rebuilds
|
|
|
|
❌ **DON'T:**
|
|
- Process events without checkpointing
|
|
- Create projections that modify write models
|
|
- Assume events arrive in order across streams
|
|
- Put business logic in projections (only data transformation)
|
|
- Create too many projections (balance complexity)
|
|
|
|
## Next Steps
|
|
|
|
- [05-snapshots.md](05-snapshots.md) - Optimize aggregate loading with snapshots
|
|
- [06-replay-and-rebuild.md](06-replay-and-rebuild.md) - Replay and rebuild projections
|
|
|
|
## See Also
|
|
|
|
- [Creating Projections](../../event-streaming/projections/creating-projections.md)
|
|
- [Projection Options](../../event-streaming/projections/projection-options.md)
|
|
- [Resettable Projections](../../event-streaming/projections/resettable-projections.md)
|