dotnet-cqrs/docs/event-streaming/projections/creating-projections.md

13 KiB

Creating Projections

Build read models from event streams with IDynamicProjection.

Overview

Projections transform event streams into materialized read models for efficient querying:

  • Event Sourcing Pattern - Build state from events
  • Automatic Processing - Background service processes events
  • Checkpoint Tracking - Resume from last processed event
  • Fault Tolerance - Handle errors and retries

Quick Start

using Svrnty.CQRS.Events.Abstractions;

public class OrderSummaryProjection : IDynamicProjection
{
    private readonly IEventStreamStore _eventStore;
    private readonly ICheckpointStore _checkpointStore;
    private readonly OrderDbContext _dbContext;

    public string ProjectionName => "order-summary";

    public async Task RunAsync(CancellationToken ct)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);

        await foreach (var @event in _eventStore.ReadStreamAsync(
            "orders",
            fromOffset: checkpoint + 1,
            cancellationToken: ct))
        {
            await HandleEventAsync(@event, ct);
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
        }
    }

    private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
    {
        switch (@event.EventType)
        {
            case "OrderPlaced":
                var placedEvent = @event.DeserializeAs<OrderPlacedEvent>();
                await CreateOrderSummaryAsync(placedEvent, ct);
                break;

            case "OrderShipped":
                var shippedEvent = @event.DeserializeAs<OrderShippedEvent>();
                await UpdateOrderStatusAsync(shippedEvent.OrderId, "Shipped", ct);
                break;

            case "OrderCancelled":
                var cancelledEvent = @event.DeserializeAs<OrderCancelledEvent>();
                await UpdateOrderStatusAsync(cancelledEvent.OrderId, "Cancelled", ct);
                break;
        }
    }

    private async Task CreateOrderSummaryAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        var summary = new OrderSummary
        {
            OrderId = @event.OrderId,
            CustomerId = @event.CustomerId,
            TotalAmount = @event.TotalAmount,
            Status = "Placed",
            PlacedAt = @event.PlacedAt,
            ItemCount = @event.Items.Count
        };

        _dbContext.OrderSummaries.Add(summary);
        await _dbContext.SaveChangesAsync(ct);
    }

    private async Task UpdateOrderStatusAsync(int orderId, string status, CancellationToken ct)
    {
        var summary = await _dbContext.OrderSummaries.FindAsync(orderId);
        if (summary != null)
        {
            summary.Status = status;
            await _dbContext.SaveChangesAsync(ct);
        }
    }
}

IDynamicProjection Interface

public interface IDynamicProjection
{
    string ProjectionName { get; }
    Task RunAsync(CancellationToken cancellationToken);
}

Registration

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Register projection
builder.Services.AddSingleton<IDynamicProjection, OrderSummaryProjection>();

// Register projection service (runs projections in background)
builder.Services.AddDynamicProjections(options =>
{
    options.AutoStart = true;
    options.CheckpointInterval = TimeSpan.FromSeconds(5);
});

var app = builder.Build();
app.Run();

Event Handling Patterns

Switch-Based Handler

private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
    switch (@event.EventType)
    {
        case "OrderPlaced":
            await HandleOrderPlacedAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct);
            break;

        case "OrderShipped":
            await HandleOrderShippedAsync(@event.DeserializeAs<OrderShippedEvent>(), ct);
            break;

        case "OrderCancelled":
            await HandleOrderCancelledAsync(@event.DeserializeAs<OrderCancelledEvent>(), ct);
            break;

        default:
            // Ignore unknown events
            break;
    }
}

Dictionary-Based Handler

private readonly Dictionary<string, Func<StreamEvent, CancellationToken, Task>> _handlers;

public OrderSummaryProjection()
{
    _handlers = new Dictionary<string, Func<StreamEvent, CancellationToken, Task>>
    {
        ["OrderPlaced"] = async (@event, ct) =>
            await HandleOrderPlacedAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct),

        ["OrderShipped"] = async (@event, ct) =>
            await HandleOrderShippedAsync(@event.DeserializeAs<OrderShippedEvent>(), ct),

        ["OrderCancelled"] = async (@event, ct) =>
            await HandleOrderCancelledAsync(@event.DeserializeAs<OrderCancelledEvent>(), ct)
    };
}

private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
    if (_handlers.TryGetValue(@event.EventType, out var handler))
    {
        await handler(@event, ct);
    }
}

Reflection-Based Handler

private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
    var methodName = $"Handle{@event.EventType}Async";
    var method = GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);

    if (method != null)
    {
        var eventData = @event.Deserialize();
        await (Task)method.Invoke(this, new[] { eventData, ct });
    }
}

private async Task HandleOrderPlacedAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    // Handle event
}

Checkpoint Management

Basic Checkpointing

public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        await HandleEventAsync(@event, ct);

        // Save checkpoint after each event
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
    }
}

Batch Checkpointing

public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
    var batchSize = 100;
    var processedCount = 0;
    var lastOffset = checkpoint;

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        await HandleEventAsync(@event, ct);
        lastOffset = @event.Offset;
        processedCount++;

        // Save checkpoint every 100 events
        if (processedCount % batchSize == 0)
        {
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
        }
    }

    // Save final checkpoint
    if (lastOffset > checkpoint)
    {
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
    }
}

Transaction-Based Checkpointing

private async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
    using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);

    try
    {
        // Update read model
        switch (@event.EventType)
        {
            case "OrderPlaced":
                await CreateOrderSummaryAsync(@event.DeserializeAs<OrderPlacedEvent>(), ct);
                break;
        }

        // Save checkpoint in same transaction
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);

        await transaction.CommitAsync(ct);
    }
    catch
    {
        await transaction.RollbackAsync(ct);
        throw;
    }
}

Read Model Design

Simple Read Model

public class OrderSummary
{
    public int OrderId { get; set; }
    public int CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public string Status { get; set; } = string.Empty;
    public DateTimeOffset PlacedAt { get; set; }
    public int ItemCount { get; set; }
}

Denormalized Read Model

public class OrderSummary
{
    public int OrderId { get; set; }

    // Customer details (denormalized)
    public int CustomerId { get; set; }
    public string CustomerName { get; set; } = string.Empty;
    public string CustomerEmail { get; set; } = string.Empty;

    // Order details
    public decimal TotalAmount { get; set; }
    public string Status { get; set; } = string.Empty;
    public DateTimeOffset PlacedAt { get; set; }
    public DateTimeOffset? ShippedAt { get; set; }
    public DateTimeOffset? CancelledAt { get; set; }

    // Aggregated data
    public int ItemCount { get; set; }
    public List<string> ProductNames { get; set; } = new();
}

Aggregate Read Model

public class CustomerOrderStats
{
    public int CustomerId { get; set; }
    public int TotalOrders { get; set; }
    public decimal TotalSpent { get; set; }
    public decimal AverageOrderValue { get; set; }
    public DateTimeOffset? LastOrderDate { get; set; }
    public int OrdersThisMonth { get; set; }
    public int OrdersThisYear { get; set; }
}

private async Task HandleOrderPlacedAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var stats = await _dbContext.CustomerOrderStats.FindAsync(@event.CustomerId)
        ?? new CustomerOrderStats { CustomerId = @event.CustomerId };

    stats.TotalOrders++;
    stats.TotalSpent += @event.TotalAmount;
    stats.AverageOrderValue = stats.TotalSpent / stats.TotalOrders;
    stats.LastOrderDate = @event.PlacedAt;

    if (@event.PlacedAt.Month == DateTimeOffset.UtcNow.Month)
        stats.OrdersThisMonth++;

    if (@event.PlacedAt.Year == DateTimeOffset.UtcNow.Year)
        stats.OrdersThisYear++;

    _dbContext.CustomerOrderStats.Update(stats);
    await _dbContext.SaveChangesAsync(ct);
}

Error Handling

public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        try
        {
            await HandleEventAsync(@event, ct);
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Error processing event {EventId} in projection {Projection}",
                @event.EventId,
                ProjectionName);

            // Strategy 1: Retry with exponential backoff
            await RetryWithBackoffAsync(() => HandleEventAsync(@event, ct), maxAttempts: 3);

            // Strategy 2: Skip event and continue
            // await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);

            // Strategy 3: Dead letter queue
            // await _dlqStore.SendAsync(ProjectionName, @event);

            // Strategy 4: Fail projection
            // throw;
        }
    }
}

Multiple Stream Projection

public class OrderFulfillmentProjection : IDynamicProjection
{
    public string ProjectionName => "order-fulfillment";

    public async Task RunAsync(CancellationToken ct)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);

        // Merge events from multiple streams
        var orderEvents = _eventStore.ReadStreamAsync("orders", checkpoint + 1, ct);
        var paymentEvents = _eventStore.ReadStreamAsync("payments", checkpoint + 1, ct);
        var shippingEvents = _eventStore.ReadStreamAsync("shipping", checkpoint + 1, ct);

        // Process events in timestamp order
        await foreach (var @event in MergeStreamsAsync(
            orderEvents,
            paymentEvents,
            shippingEvents,
            ct))
        {
            await HandleEventAsync(@event, ct);
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
        }
    }
}

Best Practices

DO

  • Use idempotent event handlers
  • Save checkpoints frequently
  • Use transactions for consistency
  • Handle unknown event types gracefully
  • Log projection errors
  • Monitor projection lag
  • Design denormalized read models
  • Use batch checkpointing for performance
  • Handle schema evolution

DON'T

  • Don't skip checkpoint saves
  • Don't modify events in projections
  • Don't use projection state for writes
  • Don't ignore errors
  • Don't forget idempotency
  • Don't query write model from projections
  • Don't use blocking operations
  • Don't forget cancellation tokens

See Also