462 lines
13 KiB
Markdown
462 lines
13 KiB
Markdown
# Creating Projections
|
|
|
|
Build read models from event streams with IDynamicProjection.
|
|
|
|
## Overview
|
|
|
|
Projections transform event streams into materialized read models for efficient querying:
|
|
- **Event Sourcing Pattern** - Build state from events
|
|
- **Automatic Processing** - Background service processes events
|
|
- **Checkpoint Tracking** - Resume from last processed event
|
|
- **Fault Tolerance** - Handle errors and retries
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
public class OrderSummaryProjection : IDynamicProjection
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
private readonly ICheckpointStore _checkpointStore;
|
|
private readonly OrderDbContext _dbContext;
|
|
|
|
public string ProjectionName => "order-summary";
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
}
|
|
}
|
|
|
|
private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
|
|
{
|
|
switch (@event.EventType)
|
|
{
|
|
case "OrderPlaced":
|
|
var placedEvent = @event.DeserializeAs<OrderPlacedEvent>();
|
|
await CreateOrderSummaryAsync(placedEvent, ct);
|
|
break;
|
|
|
|
case "OrderShipped":
|
|
var shippedEvent = @event.DeserializeAs<OrderShippedEvent>();
|
|
await UpdateOrderStatusAsync(shippedEvent.OrderId, "Shipped", ct);
|
|
break;
|
|
|
|
case "OrderCancelled":
|
|
var cancelledEvent = @event.DeserializeAs<OrderCancelledEvent>();
|
|
await UpdateOrderStatusAsync(cancelledEvent.OrderId, "Cancelled", ct);
|
|
break;
|
|
}
|
|
}
|
|
|
|
private async Task CreateOrderSummaryAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var summary = new OrderSummary
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CustomerId = @event.CustomerId,
|
|
TotalAmount = @event.TotalAmount,
|
|
Status = "Placed",
|
|
PlacedAt = @event.PlacedAt,
|
|
ItemCount = @event.Items.Count
|
|
};
|
|
|
|
_dbContext.OrderSummaries.Add(summary);
|
|
await _dbContext.SaveChangesAsync(ct);
|
|
}
|
|
|
|
private async Task UpdateOrderStatusAsync(int orderId, string status, CancellationToken ct)
|
|
{
|
|
var summary = await _dbContext.OrderSummaries.FindAsync(orderId);
|
|
if (summary != null)
|
|
{
|
|
summary.Status = status;
|
|
await _dbContext.SaveChangesAsync(ct);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## IDynamicProjection Interface
|
|
|
|
```csharp
|
|
public interface IDynamicProjection
|
|
{
|
|
string ProjectionName { get; }
|
|
Task RunAsync(CancellationToken cancellationToken);
|
|
}
|
|
```
|
|
|
|
## Registration
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register projection
|
|
builder.Services.AddSingleton<IDynamicProjection, OrderSummaryProjection>();
|
|
|
|
// Register projection service (runs projections in background)
|
|
builder.Services.AddDynamicProjections(options =>
|
|
{
|
|
options.AutoStart = true;
|
|
options.CheckpointInterval = TimeSpan.FromSeconds(5);
|
|
});
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Event Handling Patterns
|
|
|
|
### Switch-Based Handler
|
|
|
|
```csharp
|
|
private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
|
|
{
|
|
switch (@event.EventType)
|
|
{
|
|
case "OrderPlaced":
|
|
await HandleOrderPlacedAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct);
|
|
break;
|
|
|
|
case "OrderShipped":
|
|
await HandleOrderShippedAsync(@event.DeserializeAs<OrderShippedEvent>(), ct);
|
|
break;
|
|
|
|
case "OrderCancelled":
|
|
await HandleOrderCancelledAsync(@event.DeserializeAs<OrderCancelledEvent>(), ct);
|
|
break;
|
|
|
|
default:
|
|
// Ignore unknown events
|
|
break;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Dictionary-Based Handler
|
|
|
|
```csharp
|
|
private readonly Dictionary<string, Func<StreamEvent, CancellationToken, Task>> _handlers;
|
|
|
|
public OrderSummaryProjection()
|
|
{
|
|
_handlers = new Dictionary<string, Func<StreamEvent, CancellationToken, Task>>
|
|
{
|
|
["OrderPlaced"] = async (@event, ct) =>
|
|
await HandleOrderPlacedAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct),
|
|
|
|
["OrderShipped"] = async (@event, ct) =>
|
|
await HandleOrderShippedAsync(@event.DeserializeAs<OrderShippedEvent>(), ct),
|
|
|
|
["OrderCancelled"] = async (@event, ct) =>
|
|
await HandleOrderCancelledAsync(@event.DeserializeAs<OrderCancelledEvent>(), ct)
|
|
};
|
|
}
|
|
|
|
private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
|
|
{
|
|
if (_handlers.TryGetValue(@event.EventType, out var handler))
|
|
{
|
|
await handler(@event, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Reflection-Based Handler
|
|
|
|
```csharp
|
|
private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
|
|
{
|
|
var methodName = $"Handle{@event.EventType}Async";
|
|
var method = GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
|
|
|
|
if (method != null)
|
|
{
|
|
var eventData = @event.Deserialize();
|
|
await (Task)method.Invoke(this, new[] { eventData, ct });
|
|
}
|
|
}
|
|
|
|
private async Task HandleOrderPlacedAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
// Handle event
|
|
}
|
|
```
|
|
|
|
## Checkpoint Management
|
|
|
|
### Basic Checkpointing
|
|
|
|
```csharp
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
|
|
// Save checkpoint after each event
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Batch Checkpointing
|
|
|
|
```csharp
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
var batchSize = 100;
|
|
var processedCount = 0;
|
|
var lastOffset = checkpoint;
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
lastOffset = @event.Offset;
|
|
processedCount++;
|
|
|
|
// Save checkpoint every 100 events
|
|
if (processedCount % batchSize == 0)
|
|
{
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
|
|
}
|
|
}
|
|
|
|
// Save final checkpoint
|
|
if (lastOffset > checkpoint)
|
|
{
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Transaction-Based Checkpointing
|
|
|
|
```csharp
|
|
private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
|
|
{
|
|
using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);
|
|
|
|
try
|
|
{
|
|
// Update read model
|
|
switch (@event.EventType)
|
|
{
|
|
case "OrderPlaced":
|
|
await CreateOrderSummaryAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct);
|
|
break;
|
|
}
|
|
|
|
// Save checkpoint in same transaction
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
|
|
await transaction.CommitAsync(ct);
|
|
}
|
|
catch
|
|
{
|
|
await transaction.RollbackAsync(ct);
|
|
throw;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Read Model Design
|
|
|
|
### Simple Read Model
|
|
|
|
```csharp
|
|
public class OrderSummary
|
|
{
|
|
public int OrderId { get; set; }
|
|
public int CustomerId { get; set; }
|
|
public decimal TotalAmount { get; set; }
|
|
public string Status { get; set; } = string.Empty;
|
|
public DateTimeOffset PlacedAt { get; set; }
|
|
public int ItemCount { get; set; }
|
|
}
|
|
```
|
|
|
|
### Denormalized Read Model
|
|
|
|
```csharp
|
|
public class OrderSummary
|
|
{
|
|
public int OrderId { get; set; }
|
|
|
|
// Customer details (denormalized)
|
|
public int CustomerId { get; set; }
|
|
public string CustomerName { get; set; } = string.Empty;
|
|
public string CustomerEmail { get; set; } = string.Empty;
|
|
|
|
// Order details
|
|
public decimal TotalAmount { get; set; }
|
|
public string Status { get; set; } = string.Empty;
|
|
public DateTimeOffset PlacedAt { get; set; }
|
|
public DateTimeOffset? ShippedAt { get; set; }
|
|
public DateTimeOffset? CancelledAt { get; set; }
|
|
|
|
// Aggregated data
|
|
public int ItemCount { get; set; }
|
|
public List<string> ProductNames { get; set; } = new();
|
|
}
|
|
```
|
|
|
|
### Aggregate Read Model
|
|
|
|
```csharp
|
|
public class CustomerOrderStats
|
|
{
|
|
public int CustomerId { get; set; }
|
|
public int TotalOrders { get; set; }
|
|
public decimal TotalSpent { get; set; }
|
|
public decimal AverageOrderValue { get; set; }
|
|
public DateTimeOffset? LastOrderDate { get; set; }
|
|
public int OrdersThisMonth { get; set; }
|
|
public int OrdersThisYear { get; set; }
|
|
}
|
|
|
|
private async Task HandleOrderPlacedAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var stats = await _dbContext.CustomerOrderStats.FindAsync(@event.CustomerId)
|
|
?? new CustomerOrderStats { CustomerId = @event.CustomerId };
|
|
|
|
stats.TotalOrders++;
|
|
stats.TotalSpent += @event.TotalAmount;
|
|
stats.AverageOrderValue = stats.TotalSpent / stats.TotalOrders;
|
|
stats.LastOrderDate = @event.PlacedAt;
|
|
|
|
if (@event.PlacedAt.Month == DateTimeOffset.UtcNow.Month)
|
|
stats.OrdersThisMonth++;
|
|
|
|
if (@event.PlacedAt.Year == DateTimeOffset.UtcNow.Year)
|
|
stats.OrdersThisYear++;
|
|
|
|
_dbContext.CustomerOrderStats.Update(stats);
|
|
await _dbContext.SaveChangesAsync(ct);
|
|
}
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
```csharp
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
try
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Error processing event {EventId} in projection {Projection}",
|
|
@event.EventId,
|
|
ProjectionName);
|
|
|
|
// Strategy 1: Retry with exponential backoff
|
|
await RetryWithBackoffAsync(() => HandleEventAsync(@event, ct), maxAttempts: 3);
|
|
|
|
// Strategy 2: Skip event and continue
|
|
// await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
|
|
// Strategy 3: Dead letter queue
|
|
// await _dlqStore.SendAsync(ProjectionName, @event);
|
|
|
|
// Strategy 4: Fail projection
|
|
// throw;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Multiple Stream Projection
|
|
|
|
```csharp
|
|
public class OrderFulfillmentProjection : IDynamicProjection
|
|
{
|
|
public string ProjectionName => "order-fulfillment";
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
|
|
// Merge events from multiple streams
|
|
var orderEvents = _eventStore.ReadStreamAsync("orders", checkpoint + 1, ct);
|
|
var paymentEvents = _eventStore.ReadStreamAsync("payments", checkpoint + 1, ct);
|
|
var shippingEvents = _eventStore.ReadStreamAsync("shipping", checkpoint + 1, ct);
|
|
|
|
// Process events in timestamp order
|
|
await foreach (var @event in MergeStreamsAsync(
|
|
orderEvents,
|
|
paymentEvents,
|
|
shippingEvents,
|
|
ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use idempotent event handlers
|
|
- Save checkpoints frequently
|
|
- Use transactions for consistency
|
|
- Handle unknown event types gracefully
|
|
- Log projection errors
|
|
- Monitor projection lag
|
|
- Design denormalized read models
|
|
- Use batch checkpointing for performance
|
|
- Handle schema evolution
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't skip checkpoint saves
|
|
- Don't modify events in projections
|
|
- Don't use projection state for writes
|
|
- Don't ignore errors
|
|
- Don't forget idempotency
|
|
- Don't query write model from projections
|
|
- Don't use blocking operations
|
|
- Don't forget cancellation tokens
|
|
|
|
## See Also
|
|
|
|
- [Projections Overview](README.md)
|
|
- [Projection Options](projection-options.md)
|
|
- [Resettable Projections](resettable-projections.md)
|
|
- [Checkpoint Stores](checkpoint-stores.md)
|
|
- [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)
|