dotnet-cqrs/docs/tutorials/event-sourcing/05-snapshots.md

13 KiB

Snapshot Optimization

Learn how to optimize aggregate loading with snapshots in event-sourced systems.

What is a Snapshot?

A snapshot is a saved state of an aggregate at a specific point in time. Instead of replaying thousands of events, you load the snapshot and replay only recent events.

Without Snapshot:

Load aggregate -> Replay 10,000 events -> Current state
Time: ~5 seconds

With Snapshot:

Load snapshot (at event 9,500) -> Replay 500 events -> Current state
Time: ~0.5 seconds

When to Use Snapshots

Use snapshots when:

  • Aggregates have many events (> 100)
  • Loading aggregates is slow
  • Events are frequently replayed
  • Read performance matters

Don't use snapshots when:

  • Aggregates have few events (< 100)
  • Write performance is critical
  • Storage space is limited
  • Snapshots add unnecessary complexity

Snapshot Strategy

Periodic Snapshots

Take a snapshot every N events:

public interface ISnapshotStore
{
    Task<Snapshot<T>?> GetSnapshotAsync<T>(string aggregateId, CancellationToken ct = default)
        where T : AggregateRoot;

    Task SaveSnapshotAsync<T>(string aggregateId, T aggregate, long version, CancellationToken ct = default)
        where T : AggregateRoot;
}

public record Snapshot<T> where T : AggregateRoot
{
    public string AggregateId { get; init; } = string.Empty;
    public T State { get; init; } = default!;
    public long Version { get; init; }
    public DateTimeOffset CreatedAt { get; init; }
}

public class SnapshotRepository<T> : IAggregateRepository<T> where T : AggregateRoot, new()
{
    private readonly IEventStreamStore _eventStore;
    private readonly ISnapshotStore _snapshotStore;
    private readonly int _snapshotInterval;

    public SnapshotRepository(
        IEventStreamStore eventStore,
        ISnapshotStore snapshotStore,
        int snapshotInterval = 100)
    {
        _eventStore = eventStore;
        _snapshotStore = snapshotStore;
        _snapshotInterval = snapshotInterval;
    }

    public async Task<T> LoadAsync(string aggregateId, CancellationToken ct = default)
    {
        var aggregate = new T();
        var version = 0L;

        // Try to load snapshot
        var snapshot = await _snapshotStore.GetSnapshotAsync<T>(aggregateId, ct);

        if (snapshot != null)
        {
            aggregate = snapshot.State;
            version = snapshot.Version;
        }

        // Replay events after snapshot
        var events = new List<object>();

        await foreach (var storedEvent in _eventStore.ReadStreamAsync(
            aggregateId,
            fromOffset: version + 1,
            cancellationToken: ct))
        {
            events.Add(storedEvent.Data);
        }

        if (events.Count > 0 || snapshot != null)
        {
            aggregate.LoadFromHistory(events);
            return aggregate;
        }

        throw new AggregateNotFoundException(aggregateId);
    }

    public async Task SaveAsync(T aggregate, CancellationToken ct = default)
    {
        var currentVersion = aggregate.Version;

        // Save events
        foreach (var @event in aggregate.GetUncommittedEvents())
        {
            await _eventStore.AppendAsync(aggregate.Id, @event, ct);
            currentVersion++;
        }

        aggregate.ClearUncommittedEvents();

        // Take snapshot every N events
        if (currentVersion % _snapshotInterval == 0)
        {
            await _snapshotStore.SaveSnapshotAsync(aggregate.Id, aggregate, currentVersion, ct);
        }
    }
}

PostgreSQL Snapshot Store

Implement snapshot storage with PostgreSQL:

public class PostgresSnapshotStore : ISnapshotStore
{
    private readonly string _connectionString;

    public PostgresSnapshotStore(string connectionString)
    {
        _connectionString = connectionString;
    }

    public async Task<Snapshot<T>?> GetSnapshotAsync<T>(string aggregateId, CancellationToken ct)
        where T : AggregateRoot
    {
        await using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        var command = new NpgsqlCommand(@"
            SELECT aggregate_type, state, version, created_at
            FROM snapshots
            WHERE aggregate_id = @aggregateId
            ORDER BY version DESC
            LIMIT 1",
            connection);

        command.Parameters.AddWithValue("aggregateId", aggregateId);

        await using var reader = await command.ExecuteReaderAsync(ct);

        if (await reader.ReadAsync(ct))
        {
            var stateJson = reader.GetString(1);
            var state = JsonSerializer.Deserialize<T>(stateJson);

            if (state != null)
            {
                return new Snapshot<T>
                {
                    AggregateId = aggregateId,
                    State = state,
                    Version = reader.GetInt64(2),
                    CreatedAt = reader.GetFieldValue<DateTimeOffset>(3)
                };
            }
        }

        return null;
    }

    public async Task SaveSnapshotAsync<T>(string aggregateId, T aggregate, long version, CancellationToken ct)
        where T : AggregateRoot
    {
        await using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        var stateJson = JsonSerializer.Serialize(aggregate);

        var command = new NpgsqlCommand(@"
            INSERT INTO snapshots (aggregate_id, aggregate_type, state, version, created_at)
            VALUES (@aggregateId, @aggregateType, @state, @version, @createdAt)",
            connection);

        command.Parameters.AddWithValue("aggregateId", aggregateId);
        command.Parameters.AddWithValue("aggregateType", typeof(T).FullName ?? typeof(T).Name);
        command.Parameters.AddWithValue("state", stateJson);
        command.Parameters.AddWithValue("version", version);
        command.Parameters.AddWithValue("createdAt", DateTimeOffset.UtcNow);

        await command.ExecuteNonQueryAsync(ct);
    }
}

// Database schema
/*
CREATE TABLE snapshots (
    id BIGSERIAL PRIMARY KEY,
    aggregate_id VARCHAR(255) NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    state JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_snapshots_aggregate_id ON snapshots(aggregate_id, version DESC);
*/

Snapshot Cleanup

Remove old snapshots to save storage:

public class SnapshotCleanupService : BackgroundService
{
    private readonly ISnapshotStore _snapshotStore;
    private readonly int _keepLastN;

    public SnapshotCleanupService(ISnapshotStore snapshotStore, int keepLastN = 3)
    {
        _snapshotStore = snapshotStore;
        _keepLastN = keepLastN;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromHours(1), stoppingToken);

            await CleanupOldSnapshotsAsync(stoppingToken);
        }
    }

    private async Task CleanupOldSnapshotsAsync(CancellationToken ct)
    {
        // Keep only the last N snapshots per aggregate
        await using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        var command = new NpgsqlCommand($@"
            DELETE FROM snapshots
            WHERE id NOT IN (
                SELECT id
                FROM (
                    SELECT id,
                           ROW_NUMBER() OVER (PARTITION BY aggregate_id ORDER BY version DESC) as rn
                    FROM snapshots
                ) ranked
                WHERE rn <= @keepLastN
            )",
            connection);

        command.Parameters.AddWithValue("keepLastN", _keepLastN);

        var deleted = await command.ExecuteNonQueryAsync(ct);
        Console.WriteLine($"Deleted {deleted} old snapshots");
    }
}

// Register cleanup service
builder.Services.AddHostedService<SnapshotCleanupService>();

Snapshot-Aware Aggregate

Make aggregates aware of snapshots:

public abstract class SnapshotAggregateRoot : AggregateRoot
{
    public virtual bool ShouldTakeSnapshot(int snapshotInterval)
    {
        return Version % snapshotInterval == 0;
    }

    public virtual Snapshot<T> CreateSnapshot<T>() where T : SnapshotAggregateRoot
    {
        return new Snapshot<T>
        {
            AggregateId = Id,
            State = (T)this,
            Version = Version,
            CreatedAt = DateTimeOffset.UtcNow
        };
    }
}

public class BankAccount : SnapshotAggregateRoot
{
    public decimal Balance { get; private set; }
    private readonly List<Transaction> _transactions = new();

    // Override to take snapshots more frequently for high-volume accounts
    public override bool ShouldTakeSnapshot(int snapshotInterval)
    {
        if (_transactions.Count > 1000)
        {
            return Version % 50 == 0;  // Every 50 events for high-volume
        }

        return base.ShouldTakeSnapshot(snapshotInterval);  // Default interval
    }
}

Memory Snapshots

Use in-memory caching for frequently accessed aggregates:

public class CachedSnapshotStore : ISnapshotStore
{
    private readonly ISnapshotStore _innerStore;
    private readonly IMemoryCache _cache;

    public CachedSnapshotStore(ISnapshotStore innerStore, IMemoryCache cache)
    {
        _innerStore = innerStore;
        _cache = cache;
    }

    public async Task<Snapshot<T>?> GetSnapshotAsync<T>(string aggregateId, CancellationToken ct)
        where T : AggregateRoot
    {
        var cacheKey = $"snapshot:{aggregateId}";

        if (_cache.TryGetValue<Snapshot<T>>(cacheKey, out var cached))
        {
            return cached;
        }

        var snapshot = await _innerStore.GetSnapshotAsync<T>(aggregateId, ct);

        if (snapshot != null)
        {
            _cache.Set(cacheKey, snapshot, TimeSpan.FromMinutes(15));
        }

        return snapshot;
    }

    public async Task SaveSnapshotAsync<T>(string aggregateId, T aggregate, long version, CancellationToken ct)
        where T : AggregateRoot
    {
        await _innerStore.SaveSnapshotAsync(aggregateId, aggregate, version, ct);

        // Update cache
        var cacheKey = $"snapshot:{aggregateId}";
        _cache.Set(cacheKey, new Snapshot<T>
        {
            AggregateId = aggregateId,
            State = aggregate,
            Version = version,
            CreatedAt = DateTimeOffset.UtcNow
        }, TimeSpan.FromMinutes(15));
    }
}

// Registration
builder.Services.AddMemoryCache();
builder.Services.AddSingleton<ISnapshotStore>(sp =>
{
    var postgresStore = new PostgresSnapshotStore(connectionString);
    var cache = sp.GetRequiredService<IMemoryCache>();
    return new CachedSnapshotStore(postgresStore, cache);
});

Complete Example

Here's a complete example with snapshots:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Register event store
builder.Services.AddEventStreaming()
    .AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));

// Register snapshot store
builder.Services.AddMemoryCache();
builder.Services.AddSingleton<ISnapshotStore>(sp =>
{
    var connectionString = builder.Configuration.GetConnectionString("EventStore");
    var postgresStore = new PostgresSnapshotStore(connectionString!);
    var cache = sp.GetRequiredService<IMemoryCache>();
    return new CachedSnapshotStore(postgresStore, cache);
});

// Register repository with snapshots (every 100 events)
builder.Services.AddScoped<IAggregateRepository<BankAccount>>(sp =>
{
    var eventStore = sp.GetRequiredService<IEventStreamStore>();
    var snapshotStore = sp.GetRequiredService<ISnapshotStore>();
    return new SnapshotRepository<BankAccount>(eventStore, snapshotStore, snapshotInterval: 100);
});

// Register cleanup service
builder.Services.AddHostedService<SnapshotCleanupService>();

var app = builder.Build();
app.Run();

// Command handler using snapshot repository
public class WithdrawMoneyCommandHandler : ICommandHandler<WithdrawMoneyCommand>
{
    private readonly IAggregateRepository<BankAccount> _repository;

    public async Task HandleAsync(WithdrawMoneyCommand command, CancellationToken ct)
    {
        // Load from snapshot (fast!)
        var account = await _repository.LoadAsync(command.AccountId, ct);

        // Execute command
        account.Withdraw(command.Amount);

        // Save events and snapshot if needed
        await _repository.SaveAsync(account, ct);
    }
}

Best Practices

DO:

  • Take snapshots periodically (every 50-100 events)
  • Use snapshots for aggregates with many events
  • Clean up old snapshots to save storage
  • Cache snapshots in memory for hot aggregates
  • Test snapshot serialization/deserialization

DON'T:

  • Take snapshots for every event (overhead)
  • Use snapshots for small aggregates (< 50 events)
  • Keep all snapshots forever (storage waste)
  • Forget to test snapshot restore
  • Include non-serializable fields in snapshots

Next Steps

See Also