# Checkpoint Stores Persist projection progress with PostgreSQL or in-memory checkpoint stores. ## Overview Checkpoint stores track the last processed event offset for each projection: - **PostgreSQL Store** - Durable checkpoint storage for production - **In-Memory Store** - Fast checkpoints for development/testing - **Atomic Updates** - Ensure exactly-once processing - **Query Support** - Monitor projection progress ## Quick Start ### PostgreSQL Checkpoint Store ```csharp using Svrnty.CQRS.Events.PostgreSQL; var builder = WebApplication.CreateBuilder(args); // Register PostgreSQL checkpoint store builder.Services.AddPostgresCheckpointStore( builder.Configuration.GetConnectionString("EventStore")); var app = builder.Build(); app.Run(); ``` ### In-Memory Checkpoint Store ```csharp using Svrnty.CQRS.Events; var builder = WebApplication.CreateBuilder(args); // Register in-memory checkpoint store (for testing) builder.Services.AddInMemoryCheckpointStore(); var app = builder.Build(); app.Run(); ``` ## ICheckpointStore Interface ```csharp public interface ICheckpointStore { Task GetCheckpointAsync(string projectionName, CancellationToken ct = default); Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default); Task DeleteCheckpointAsync(string projectionName, CancellationToken ct = default); Task> GetAllCheckpointsAsync(CancellationToken ct = default); } ``` ## Basic Usage ### Get Checkpoint ```csharp var checkpointStore = serviceProvider.GetRequiredService(); // Get checkpoint (returns 0 if not found) var checkpoint = await checkpointStore.GetCheckpointAsync("order-summary"); Console.WriteLine($"Projection checkpoint: {checkpoint}"); ``` ### Save Checkpoint ```csharp // Save checkpoint after processing event await checkpointStore.SaveCheckpointAsync("order-summary", eventOffset); ``` ### Delete Checkpoint ```csharp // Reset projection by deleting checkpoint await checkpointStore.DeleteCheckpointAsync("order-summary"); ``` ### Get All Checkpoints ```csharp // Query all projection checkpoints var checkpoints = await checkpointStore.GetAllCheckpointsAsync(); foreach (var (projectionName, offset) in checkpoints) { Console.WriteLine($"{projectionName}: {offset}"); } ``` ## PostgreSQL Checkpoint Store ### Database Schema ```sql CREATE TABLE IF NOT EXISTS projection_checkpoints ( projection_name TEXT PRIMARY KEY, offset BIGINT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_checkpoints_updated ON projection_checkpoints(updated_at); ``` ### Configuration ```csharp // Connection string configuration builder.Services.AddPostgresCheckpointStore(options => { options.ConnectionString = "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"; options.TableName = "projection_checkpoints"; // Custom table name options.SchemaName = "public"; // Custom schema }); ``` ### Transaction Support ```csharp public async Task HandleEventAsync(StreamEvent @event, CancellationToken ct) { await using var transaction = await _dbContext.Database.BeginTransactionAsync(ct); try { // Update read model var summary = await CreateOrUpdateSummaryAsync(@event, ct); await _dbContext.SaveChangesAsync(ct); // Save checkpoint in same transaction await _checkpointStore.SaveCheckpointAsync( ProjectionName, @event.Offset, ct); await transaction.CommitAsync(ct); } catch { await transaction.RollbackAsync(ct); throw; } } ``` ## In-Memory Checkpoint Store ### Usage ```csharp // For testing only - data lost on restart builder.Services.AddInMemoryCheckpointStore(); // Projection will restart from beginning after app restart ``` ### Test Scenarios ```csharp [Fact] public async Task Projection_Should_Resume_From_Checkpoint() { // Arrange var checkpointStore = new InMemoryCheckpointStore(); await checkpointStore.SaveCheckpointAsync("test-projection", 100); // Act var checkpoint = await checkpointStore.GetCheckpointAsync("test-projection"); // Assert Assert.Equal(100, checkpoint); } [Fact] public async Task Projection_Should_Start_From_Zero_When_No_Checkpoint() { // Arrange var checkpointStore = new InMemoryCheckpointStore(); // Act var checkpoint = await checkpointStore.GetCheckpointAsync("new-projection"); // Assert Assert.Equal(0, checkpoint); } ``` ## Checkpoint Patterns ### Frequent Checkpointing ```csharp // Save after every event (safest but slowest) public async Task RunAsync(CancellationToken ct) { var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName); await foreach (var @event in _eventStore.ReadStreamAsync( "orders", fromOffset: checkpoint + 1, cancellationToken: ct)) { await HandleEventAsync(@event, ct); // Save checkpoint after each event await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset); } } ``` ### Batch Checkpointing ```csharp // Save after every N events (faster) public async Task RunAsync(CancellationToken ct) { var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName); var batchSize = 100; var processedCount = 0; var lastOffset = checkpoint; await foreach (var @event in _eventStore.ReadStreamAsync( "orders", fromOffset: checkpoint + 1, cancellationToken: ct)) { await HandleEventAsync(@event, ct); lastOffset = @event.Offset; processedCount++; // Save checkpoint every 100 events if (processedCount % batchSize == 0) { await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset); } } // Save final checkpoint if (lastOffset > checkpoint) { await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset); } } ``` ### Time-Based Checkpointing ```csharp // Save every N seconds public async Task RunAsync(CancellationToken ct) { var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName); var lastCheckpointTime = DateTimeOffset.UtcNow; var checkpointInterval = TimeSpan.FromSeconds(5); var lastOffset = checkpoint; await foreach (var @event in _eventStore.ReadStreamAsync( "orders", fromOffset: checkpoint + 1, cancellationToken: ct)) { await HandleEventAsync(@event, ct); lastOffset = @event.Offset; // Save checkpoint every 5 seconds if (DateTimeOffset.UtcNow - lastCheckpointTime > checkpointInterval) { await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset); lastCheckpointTime = DateTimeOffset.UtcNow; } } // Save final checkpoint await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset); } ``` ## Monitoring Checkpoints ### Projection Lag ```csharp public async Task GetProjectionLagAsync(string projectionName, CancellationToken ct) { var checkpoint = await _checkpointStore.GetCheckpointAsync(projectionName, ct); var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct); var lag = streamHead - checkpoint; if (lag > 1000) { _logger.LogWarning( "Projection {Projection} lagging: {Lag} events behind", projectionName, lag); } return lag; } ``` ### Checkpoint Dashboard ```csharp // API endpoint for checkpoint status app.MapGet("/api/projections/checkpoints", async ( ICheckpointStore checkpointStore, IEventStreamStore eventStore) => { var checkpoints = await checkpointStore.GetAllCheckpointsAsync(); var streamHead = await eventStore.GetStreamHeadAsync("orders"); var status = new List(); foreach (var (projectionName, checkpoint) in checkpoints) { var lag = streamHead - checkpoint; status.Add(new { ProjectionName = projectionName, Checkpoint = checkpoint, StreamHead = streamHead, Lag = lag, PercentComplete = streamHead > 0 ? (double)checkpoint / streamHead * 100 : 100, UpdatedAt = await GetCheckpointUpdatedAtAsync(projectionName) }); } return Results.Ok(status); }); ``` ### Health Check ```csharp public class ProjectionHealthCheck : IHealthCheck { private readonly ICheckpointStore _checkpointStore; private readonly IEventStreamStore _eventStore; public async Task CheckHealthAsync( HealthCheckContext context, CancellationToken ct = default) { var checkpoint = await _checkpointStore.GetCheckpointAsync("order-summary", ct); var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct); var lag = streamHead - checkpoint; return lag switch { 0 => HealthCheckResult.Healthy("Projection up-to-date"), < 1000 => HealthCheckResult.Degraded($"Projection lagging: {lag} events"), _ => HealthCheckResult.Unhealthy($"Projection critically lagging: {lag} events") }; } } // Register health check builder.Services.AddHealthChecks() .AddCheck("projection-health"); ``` ## Custom Checkpoint Store ```csharp public class RedisCheckpointStore : ICheckpointStore { private readonly IConnectionMultiplexer _redis; public async Task GetCheckpointAsync(string projectionName, CancellationToken ct) { var db = _redis.GetDatabase(); var key = $"checkpoint:{projectionName}"; var value = await db.StringGetAsync(key); return value.HasValue ? (long)value : 0; } public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct) { var db = _redis.GetDatabase(); var key = $"checkpoint:{projectionName}"; await db.StringSetAsync(key, offset); } public async Task DeleteCheckpointAsync(string projectionName, CancellationToken ct) { var db = _redis.GetDatabase(); var key = $"checkpoint:{projectionName}"; await db.KeyDeleteAsync(key); } public async Task> GetAllCheckpointsAsync(CancellationToken ct) { var db = _redis.GetDatabase(); var server = _redis.GetServer(_redis.GetEndPoints().First()); var keys = server.Keys(pattern: "checkpoint:*"); var checkpoints = new Dictionary(); foreach (var key in keys) { var projectionName = key.ToString().Replace("checkpoint:", ""); var value = await db.StringGetAsync(key); if (value.HasValue) { checkpoints[projectionName] = (long)value; } } return checkpoints; } } // Register custom checkpoint store builder.Services.AddSingleton(); ``` ## Best Practices ### ✅ DO - Use PostgreSQL checkpoint store for production - Save checkpoints in transactions with read model updates - Use batch or time-based checkpointing for performance - Monitor projection lag regularly - Set up health checks - Test checkpoint recovery - Backup checkpoint data - Use appropriate checkpoint intervals ### ❌ DON'T - Don't use in-memory store for production - Don't skip checkpoint saves - Don't checkpoint too frequently (every event) - Don't ignore checkpoint failures - Don't forget to handle checkpoint not found (return 0) - Don't share checkpoint stores across environments - Don't manually modify checkpoints without reason ## See Also - [Projections Overview](README.md) - [Creating Projections](creating-projections.md) - [Projection Options](projection-options.md) - [Resettable Projections](resettable-projections.md) - [PostgreSQL Storage](../storage/postgresql-storage.md)