# Snapshot Optimization Learn how to optimize aggregate loading with snapshots in event-sourced systems. ## What is a Snapshot? A **snapshot** is a saved state of an aggregate at a specific point in time. Instead of replaying thousands of events, you load the snapshot and replay only recent events. **Without Snapshot:** ``` Load aggregate -> Replay 10,000 events -> Current state Time: ~5 seconds ``` **With Snapshot:** ``` Load snapshot (at event 9,500) -> Replay 500 events -> Current state Time: ~0.5 seconds ``` ## When to Use Snapshots ✅ **Use snapshots when:** - Aggregates have many events (> 100) - Loading aggregates is slow - Events are frequently replayed - Read performance matters ❌ **Don't use snapshots when:** - Aggregates have few events (< 100) - Write performance is critical - Storage space is limited - Snapshots add unnecessary complexity ## Snapshot Strategy ### Periodic Snapshots Take a snapshot every N events: ```csharp public interface ISnapshotStore { Task?> GetSnapshotAsync(string aggregateId, CancellationToken ct = default) where T : AggregateRoot; Task SaveSnapshotAsync(string aggregateId, T aggregate, long version, CancellationToken ct = default) where T : AggregateRoot; } public record Snapshot where T : AggregateRoot { public string AggregateId { get; init; } = string.Empty; public T State { get; init; } = default!; public long Version { get; init; } public DateTimeOffset CreatedAt { get; init; } } public class SnapshotRepository : IAggregateRepository where T : AggregateRoot, new() { private readonly IEventStreamStore _eventStore; private readonly ISnapshotStore _snapshotStore; private readonly int _snapshotInterval; public SnapshotRepository( IEventStreamStore eventStore, ISnapshotStore snapshotStore, int snapshotInterval = 100) { _eventStore = eventStore; _snapshotStore = snapshotStore; _snapshotInterval = snapshotInterval; } public async Task LoadAsync(string aggregateId, CancellationToken ct = default) { var aggregate = new T(); var version = 0L; // Try to load snapshot var snapshot = await _snapshotStore.GetSnapshotAsync(aggregateId, ct); if (snapshot != null) { aggregate = snapshot.State; version = snapshot.Version; } // Replay events after snapshot var events = new List(); await foreach (var storedEvent in _eventStore.ReadStreamAsync( aggregateId, fromOffset: version + 1, cancellationToken: ct)) { events.Add(storedEvent.Data); } if (events.Count > 0 || snapshot != null) { aggregate.LoadFromHistory(events); return aggregate; } throw new AggregateNotFoundException(aggregateId); } public async Task SaveAsync(T aggregate, CancellationToken ct = default) { var currentVersion = aggregate.Version; // Save events foreach (var @event in aggregate.GetUncommittedEvents()) { await _eventStore.AppendAsync(aggregate.Id, @event, ct); currentVersion++; } aggregate.ClearUncommittedEvents(); // Take snapshot every N events if (currentVersion % _snapshotInterval == 0) { await _snapshotStore.SaveSnapshotAsync(aggregate.Id, aggregate, currentVersion, ct); } } } ``` ## PostgreSQL Snapshot Store Implement snapshot storage with PostgreSQL: ```csharp public class PostgresSnapshotStore : ISnapshotStore { private readonly string _connectionString; public PostgresSnapshotStore(string connectionString) { _connectionString = connectionString; } public async Task?> GetSnapshotAsync(string aggregateId, CancellationToken ct) where T : AggregateRoot { await using var connection = new NpgsqlConnection(_connectionString); await connection.OpenAsync(ct); var command = new NpgsqlCommand(@" SELECT aggregate_type, state, version, created_at FROM snapshots WHERE aggregate_id = @aggregateId ORDER BY version DESC LIMIT 1", connection); command.Parameters.AddWithValue("aggregateId", aggregateId); await using var reader = await command.ExecuteReaderAsync(ct); if (await reader.ReadAsync(ct)) { var stateJson = reader.GetString(1); var state = JsonSerializer.Deserialize(stateJson); if (state != null) { return new Snapshot { AggregateId = aggregateId, State = state, Version = reader.GetInt64(2), CreatedAt = reader.GetFieldValue(3) }; } } return null; } public async Task SaveSnapshotAsync(string aggregateId, T aggregate, long version, CancellationToken ct) where T : AggregateRoot { await using var connection = new NpgsqlConnection(_connectionString); await connection.OpenAsync(ct); var stateJson = JsonSerializer.Serialize(aggregate); var command = new NpgsqlCommand(@" INSERT INTO snapshots (aggregate_id, aggregate_type, state, version, created_at) VALUES (@aggregateId, @aggregateType, @state, @version, @createdAt)", connection); command.Parameters.AddWithValue("aggregateId", aggregateId); command.Parameters.AddWithValue("aggregateType", typeof(T).FullName ?? typeof(T).Name); command.Parameters.AddWithValue("state", stateJson); command.Parameters.AddWithValue("version", version); command.Parameters.AddWithValue("createdAt", DateTimeOffset.UtcNow); await command.ExecuteNonQueryAsync(ct); } } // Database schema /* CREATE TABLE snapshots ( id BIGSERIAL PRIMARY KEY, aggregate_id VARCHAR(255) NOT NULL, aggregate_type VARCHAR(255) NOT NULL, state JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ NOT NULL, UNIQUE (aggregate_id, version) ); CREATE INDEX idx_snapshots_aggregate_id ON snapshots(aggregate_id, version DESC); */ ``` ## Snapshot Cleanup Remove old snapshots to save storage: ```csharp public class SnapshotCleanupService : BackgroundService { private readonly ISnapshotStore _snapshotStore; private readonly int _keepLastN; public SnapshotCleanupService(ISnapshotStore snapshotStore, int keepLastN = 3) { _snapshotStore = snapshotStore; _keepLastN = keepLastN; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { await Task.Delay(TimeSpan.FromHours(1), stoppingToken); await CleanupOldSnapshotsAsync(stoppingToken); } } private async Task CleanupOldSnapshotsAsync(CancellationToken ct) { // Keep only the last N snapshots per aggregate await using var connection = new NpgsqlConnection(_connectionString); await connection.OpenAsync(ct); var command = new NpgsqlCommand($@" DELETE FROM snapshots WHERE id NOT IN ( SELECT id FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY aggregate_id ORDER BY version DESC) as rn FROM snapshots ) ranked WHERE rn <= @keepLastN )", connection); command.Parameters.AddWithValue("keepLastN", _keepLastN); var deleted = await command.ExecuteNonQueryAsync(ct); Console.WriteLine($"Deleted {deleted} old snapshots"); } } // Register cleanup service builder.Services.AddHostedService(); ``` ## Snapshot-Aware Aggregate Make aggregates aware of snapshots: ```csharp public abstract class SnapshotAggregateRoot : AggregateRoot { public virtual bool ShouldTakeSnapshot(int snapshotInterval) { return Version % snapshotInterval == 0; } public virtual Snapshot CreateSnapshot() where T : SnapshotAggregateRoot { return new Snapshot { AggregateId = Id, State = (T)this, Version = Version, CreatedAt = DateTimeOffset.UtcNow }; } } public class BankAccount : SnapshotAggregateRoot { public decimal Balance { get; private set; } private readonly List _transactions = new(); // Override to take snapshots more frequently for high-volume accounts public override bool ShouldTakeSnapshot(int snapshotInterval) { if (_transactions.Count > 1000) { return Version % 50 == 0; // Every 50 events for high-volume } return base.ShouldTakeSnapshot(snapshotInterval); // Default interval } } ``` ## Memory Snapshots Use in-memory caching for frequently accessed aggregates: ```csharp public class CachedSnapshotStore : ISnapshotStore { private readonly ISnapshotStore _innerStore; private readonly IMemoryCache _cache; public CachedSnapshotStore(ISnapshotStore innerStore, IMemoryCache cache) { _innerStore = innerStore; _cache = cache; } public async Task?> GetSnapshotAsync(string aggregateId, CancellationToken ct) where T : AggregateRoot { var cacheKey = $"snapshot:{aggregateId}"; if (_cache.TryGetValue>(cacheKey, out var cached)) { return cached; } var snapshot = await _innerStore.GetSnapshotAsync(aggregateId, ct); if (snapshot != null) { _cache.Set(cacheKey, snapshot, TimeSpan.FromMinutes(15)); } return snapshot; } public async Task SaveSnapshotAsync(string aggregateId, T aggregate, long version, CancellationToken ct) where T : AggregateRoot { await _innerStore.SaveSnapshotAsync(aggregateId, aggregate, version, ct); // Update cache var cacheKey = $"snapshot:{aggregateId}"; _cache.Set(cacheKey, new Snapshot { AggregateId = aggregateId, State = aggregate, Version = version, CreatedAt = DateTimeOffset.UtcNow }, TimeSpan.FromMinutes(15)); } } // Registration builder.Services.AddMemoryCache(); builder.Services.AddSingleton(sp => { var postgresStore = new PostgresSnapshotStore(connectionString); var cache = sp.GetRequiredService(); return new CachedSnapshotStore(postgresStore, cache); }); ``` ## Complete Example Here's a complete example with snapshots: ```csharp // Program.cs var builder = WebApplication.CreateBuilder(args); // Register event store builder.Services.AddEventStreaming() .AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore")); // Register snapshot store builder.Services.AddMemoryCache(); builder.Services.AddSingleton(sp => { var connectionString = builder.Configuration.GetConnectionString("EventStore"); var postgresStore = new PostgresSnapshotStore(connectionString!); var cache = sp.GetRequiredService(); return new CachedSnapshotStore(postgresStore, cache); }); // Register repository with snapshots (every 100 events) builder.Services.AddScoped>(sp => { var eventStore = sp.GetRequiredService(); var snapshotStore = sp.GetRequiredService(); return new SnapshotRepository(eventStore, snapshotStore, snapshotInterval: 100); }); // Register cleanup service builder.Services.AddHostedService(); var app = builder.Build(); app.Run(); // Command handler using snapshot repository public class WithdrawMoneyCommandHandler : ICommandHandler { private readonly IAggregateRepository _repository; public async Task HandleAsync(WithdrawMoneyCommand command, CancellationToken ct) { // Load from snapshot (fast!) var account = await _repository.LoadAsync(command.AccountId, ct); // Execute command account.Withdraw(command.Amount); // Save events and snapshot if needed await _repository.SaveAsync(account, ct); } } ``` ## Best Practices ✅ **DO:** - Take snapshots periodically (every 50-100 events) - Use snapshots for aggregates with many events - Clean up old snapshots to save storage - Cache snapshots in memory for hot aggregates - Test snapshot serialization/deserialization ❌ **DON'T:** - Take snapshots for every event (overhead) - Use snapshots for small aggregates (< 50 events) - Keep all snapshots forever (storage waste) - Forget to test snapshot restore - Include non-serializable fields in snapshots ## Next Steps - [06-replay-and-rebuild.md](06-replay-and-rebuild.md) - Replay and rebuild projections from events ## See Also - [Projection Checkpoints](../../event-streaming/projections/checkpoint-stores.md) - [Event Replay](../../event-streaming/event-replay/README.md) - [Performance Best Practices](../../best-practices/performance.md)