dotnet-cqrs/docs/tutorials/event-sourcing/04-projections.md

14 KiB

Building Read Models with Projections

Learn how to create projections to build read models from event streams with Svrnty.CQRS.

What is a Projection?

A projection is a read model built by processing events from one or more event streams. Projections transform write-optimized event streams into read-optimized views.

Benefits:

  • Separate read and write models (CQRS)
  • Optimize queries without affecting write performance
  • Create multiple views from the same events
  • Rebuild views by replaying events

Simple Projection

Here's a basic projection that builds a user summary:

public class UserSummaryProjection : IDynamicProjection
{
    private readonly IEventStreamStore _eventStore;
    private readonly ICheckpointStore _checkpointStore;
    private readonly IUserSummaryRepository _repository;
    private readonly ILogger<UserSummaryProjection> _logger;

    public string ProjectionName => "user-summary";

    public UserSummaryProjection(
        IEventStreamStore eventStore,
        ICheckpointStore checkpointStore,
        IUserSummaryRepository repository,
        ILogger<UserSummaryProjection> logger)
    {
        _eventStore = eventStore;
        _checkpointStore = checkpointStore;
        _repository = repository;
        _logger = logger;
    }

    public async Task RunAsync(CancellationToken ct)
    {
        // Get last processed offset
        var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);

        _logger.LogInformation("Starting projection from offset {Offset}", checkpoint);

        // Read events from checkpoint
        await foreach (var storedEvent in _eventStore.ReadStreamAsync(
            "users",
            fromOffset: checkpoint + 1,
            cancellationToken: ct))
        {
            // Handle event
            await HandleEventAsync(storedEvent.Data, ct);

            // Save checkpoint
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, storedEvent.Offset, ct);
        }
    }

    private async Task HandleEventAsync(object @event, CancellationToken ct)
    {
        switch (@event)
        {
            case UserRegisteredEvent e:
                await _repository.CreateAsync(new UserSummary
                {
                    UserId = e.UserId,
                    Name = e.Name,
                    Email = e.Email,
                    Status = "Active",
                    RegisteredAt = e.RegisteredAt
                }, ct);
                break;

            case UserEmailChangedEvent e:
                var user = await _repository.GetByIdAsync(e.UserId, ct);
                if (user != null)
                {
                    user.Email = e.NewEmail;
                    await _repository.UpdateAsync(user, ct);
                }
                break;

            case UserSuspendedEvent e:
                var suspendedUser = await _repository.GetByIdAsync(e.UserId, ct);
                if (suspendedUser != null)
                {
                    suspendedUser.Status = "Suspended";
                    suspendedUser.SuspensionReason = e.Reason;
                    await _repository.UpdateAsync(suspendedUser, ct);
                }
                break;
        }
    }
}

// Read model
public class UserSummary
{
    public string UserId { get; set; } = string.Empty;
    public string Name { get; set; } = string.Empty;
    public string Email { get; set; } = string.Empty;
    public string Status { get; set; } = string.Empty;
    public string? SuspensionReason { get; set; }
    public DateTimeOffset RegisteredAt { get; set; }
}

Checkpoint Management

Checkpoints track the last processed event offset:

public interface ICheckpointStore
{
    Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct = default);
    Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default);
}

// PostgreSQL implementation
public class PostgresCheckpointStore : ICheckpointStore
{
    private readonly string _connectionString;

    public async Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct)
    {
        await using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        var command = new NpgsqlCommand(
            "SELECT checkpoint FROM projection_checkpoints WHERE projection_name = @name",
            connection);
        command.Parameters.AddWithValue("name", projectionName);

        var result = await command.ExecuteScalarAsync(ct);
        return result != null ? (long)result : 0;
    }

    public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct)
    {
        await using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        var command = new NpgsqlCommand(@"
            INSERT INTO projection_checkpoints (projection_name, checkpoint, updated_at)
            VALUES (@name, @checkpoint, @updatedAt)
            ON CONFLICT (projection_name)
            DO UPDATE SET checkpoint = @checkpoint, updated_at = @updatedAt",
            connection);

        command.Parameters.AddWithValue("name", projectionName);
        command.Parameters.AddWithValue("checkpoint", offset);
        command.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow);

        await command.ExecuteNonQueryAsync(ct);
    }
}

// Database schema
/*
CREATE TABLE projection_checkpoints (
    projection_name VARCHAR(255) PRIMARY KEY,
    checkpoint BIGINT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL
);
*/

Multi-Stream Projection

Project from multiple streams:

public class OrderAnalyticsProjection : IDynamicProjection
{
    public string ProjectionName => "order-analytics";

    public async Task RunAsync(CancellationToken ct)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);

        // Read from multiple streams
        var streams = new[] { "orders", "payments", "shipments" };

        foreach (var streamName in streams)
        {
            await foreach (var storedEvent in _eventStore.ReadStreamAsync(
                streamName,
                fromOffset: checkpoint + 1,
                cancellationToken: ct))
            {
                await HandleEventAsync(storedEvent.Data, ct);
                await _checkpointStore.SaveCheckpointAsync(ProjectionName, storedEvent.Offset, ct);
            }
        }
    }

    private async Task HandleEventAsync(object @event, CancellationToken ct)
    {
        switch (@event)
        {
            case OrderPlacedEvent e:
                await _repository.IncrementOrderCountAsync(e.CustomerId, ct);
                await _repository.AddRevenueAsync(e.TotalAmount, ct);
                break;

            case PaymentProcessedEvent e:
                await _repository.RecordPaymentAsync(e.PaymentId, e.Amount, ct);
                break;

            case ShipmentCreatedEvent e:
                await _repository.RecordShipmentAsync(e.OrderId, e.ShipmentId, ct);
                break;
        }
    }
}

Batched Projection

Process events in batches for better performance:

public class OrderSummaryProjection : IDynamicProjection
{
    private const int BatchSize = 100;
    public string ProjectionName => "order-summary";

    public async Task RunAsync(CancellationToken ct)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName, ct);
        var batch = new List<(long Offset, object Event)>();

        await foreach (var storedEvent in _eventStore.ReadStreamAsync(
            "orders",
            fromOffset: checkpoint + 1,
            cancellationToken: ct))
        {
            batch.Add((storedEvent.Offset, storedEvent.Data));

            if (batch.Count >= BatchSize)
            {
                await ProcessBatchAsync(batch, ct);
                await _checkpointStore.SaveCheckpointAsync(ProjectionName, batch.Last().Offset, ct);
                batch.Clear();
            }
        }

        // Process remaining events
        if (batch.Count > 0)
        {
            await ProcessBatchAsync(batch, ct);
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, batch.Last().Offset, ct);
        }
    }

    private async Task ProcessBatchAsync(List<(long Offset, object Event)> batch, CancellationToken ct)
    {
        // Use a transaction for the batch
        await using var transaction = await _repository.BeginTransactionAsync(ct);

        try
        {
            foreach (var (_, @event) in batch)
            {
                await HandleEventAsync(@event, ct);
            }

            await transaction.CommitAsync(ct);
        }
        catch
        {
            await transaction.RollbackAsync(ct);
            throw;
        }
    }
}

Projection Registration

Register projections as background services:

// In Program.cs
builder.Services.AddSingleton<ICheckpointStore, PostgresCheckpointStore>();
builder.Services.AddSingleton<IDynamicProjection, UserSummaryProjection>();
builder.Services.AddSingleton<IDynamicProjection, OrderAnalyticsProjection>();

// Auto-start projections
builder.Services.AddProjectionRunner(options =>
{
    options.AutoStart = true;
    options.BatchSize = 100;
    options.CheckpointInterval = TimeSpan.FromSeconds(5);
});

var app = builder.Build();
app.Run();

Resettable Projections

Allow projections to be rebuilt from scratch:

public interface IResettableProjection : IDynamicProjection
{
    Task ResetAsync(CancellationToken ct = default);
}

public class UserSummaryProjection : IResettableProjection
{
    public string ProjectionName => "user-summary";

    public async Task ResetAsync(CancellationToken ct)
    {
        // Clear read model
        await _repository.DeleteAllAsync(ct);

        // Reset checkpoint
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0, ct);
    }

    public async Task RunAsync(CancellationToken ct)
    {
        // ... projection logic
    }
}

// Reset and rebuild projection
var projection = app.Services.GetRequiredService<IResettableProjection>();
await projection.ResetAsync();
await projection.RunAsync(CancellationToken.None);

Denormalized Projections

Create highly denormalized views for fast queries:

public class CustomerOrderHistoryProjection : IDynamicProjection
{
    public string ProjectionName => "customer-order-history";

    private async Task HandleEventAsync(object @event, CancellationToken ct)
    {
        switch (@event)
        {
            case OrderPlacedEvent e:
                // Denormalize order details into customer record
                var customer = await _repository.GetCustomerAsync(e.CustomerId, ct);

                customer.Orders.Add(new OrderSummary
                {
                    OrderId = e.OrderId,
                    PlacedAt = e.PlacedAt,
                    TotalAmount = e.TotalAmount,
                    Items = e.Items.Select(i => new ItemSummary
                    {
                        ProductId = i.ProductId,
                        ProductName = i.ProductName,
                        Quantity = i.Quantity,
                        Price = i.Price
                    }).ToList()
                });

                customer.TotalSpent += e.TotalAmount;
                customer.OrderCount++;

                await _repository.UpdateCustomerAsync(customer, ct);
                break;

            case OrderShippedEvent e:
                var customerWithOrder = await _repository.GetCustomerAsync(e.CustomerId, ct);
                var order = customerWithOrder.Orders.FirstOrDefault(o => o.OrderId == e.OrderId);

                if (order != null)
                {
                    order.Status = "Shipped";
                    order.TrackingNumber = e.TrackingNumber;
                    order.ShippedAt = e.ShippedAt;
                    await _repository.UpdateCustomerAsync(customerWithOrder, ct);
                }
                break;
        }
    }
}

// Read model
public class CustomerOrderHistory
{
    public string CustomerId { get; set; } = string.Empty;
    public string Name { get; set; } = string.Empty;
    public List<OrderSummary> Orders { get; set; } = new();
    public decimal TotalSpent { get; set; }
    public int OrderCount { get; set; }
}

public class OrderSummary
{
    public string OrderId { get; set; } = string.Empty;
    public DateTimeOffset PlacedAt { get; set; }
    public string Status { get; set; } = string.Empty;
    public decimal TotalAmount { get; set; }
    public List<ItemSummary> Items { get; set; } = new();
    public string? TrackingNumber { get; set; }
    public DateTimeOffset? ShippedAt { get; set; }
}

Best Practices

DO:

  • Save checkpoints regularly to track progress
  • Use batching for better performance
  • Handle events idempotently (safe to replay)
  • Create multiple projections for different views
  • Denormalize data for query performance
  • Make projections resettable for rebuilds

DON'T:

  • Process events without checkpointing
  • Create projections that modify write models
  • Assume events arrive in order across streams
  • Put business logic in projections (only data transformation)
  • Create too many projections (balance complexity)

Next Steps

See Also