12 KiB
12 KiB
Checkpoint Stores
Persist projection progress with PostgreSQL or in-memory checkpoint stores.
Overview
Checkpoint stores track the last processed event offset for each projection:
- PostgreSQL Store - Durable checkpoint storage for production
- In-Memory Store - Fast checkpoints for development/testing
- Atomic Updates - Ensure exactly-once processing
- Query Support - Monitor projection progress
Quick Start
PostgreSQL Checkpoint Store
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Register PostgreSQL checkpoint store
builder.Services.AddPostgresCheckpointStore(
builder.Configuration.GetConnectionString("EventStore"));
var app = builder.Build();
app.Run();
In-Memory Checkpoint Store
using Svrnty.CQRS.Events;
var builder = WebApplication.CreateBuilder(args);
// Register in-memory checkpoint store (for testing)
builder.Services.AddInMemoryCheckpointStore();
var app = builder.Build();
app.Run();
ICheckpointStore Interface
public interface ICheckpointStore
{
Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct = default);
Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default);
Task DeleteCheckpointAsync(string projectionName, CancellationToken ct = default);
Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct = default);
}
Basic Usage
Get Checkpoint
var checkpointStore = serviceProvider.GetRequiredService<ICheckpointStore>();
// Get checkpoint (returns 0 if not found)
var checkpoint = await checkpointStore.GetCheckpointAsync("order-summary");
Console.WriteLine($"Projection checkpoint: {checkpoint}");
Save Checkpoint
// Save checkpoint after processing event
await checkpointStore.SaveCheckpointAsync("order-summary", eventOffset);
Delete Checkpoint
// Reset projection by deleting checkpoint
await checkpointStore.DeleteCheckpointAsync("order-summary");
Get All Checkpoints
// Query all projection checkpoints
var checkpoints = await checkpointStore.GetAllCheckpointsAsync();
foreach (var (projectionName, offset) in checkpoints)
{
Console.WriteLine($"{projectionName}: {offset}");
}
PostgreSQL Checkpoint Store
Database Schema
CREATE TABLE IF NOT EXISTS projection_checkpoints (
projection_name TEXT PRIMARY KEY,
offset BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_checkpoints_updated
ON projection_checkpoints(updated_at);
Configuration
// Connection string configuration
builder.Services.AddPostgresCheckpointStore(options =>
{
options.ConnectionString = "Host=localhost;Database=eventstore;Username=postgres;Password=postgres";
options.TableName = "projection_checkpoints"; // Custom table name
options.SchemaName = "public"; // Custom schema
});
Transaction Support
public async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
await using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);
try
{
// Update read model
var summary = await CreateOrUpdateSummaryAsync(@event, ct);
await _dbContext.SaveChangesAsync(ct);
// Save checkpoint in same transaction
await _checkpointStore.SaveCheckpointAsync(
ProjectionName,
@event.Offset,
ct);
await transaction.CommitAsync(ct);
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
In-Memory Checkpoint Store
Usage
// For testing only - data lost on restart
builder.Services.AddInMemoryCheckpointStore();
// Projection will restart from beginning after app restart
Test Scenarios
[Fact]
public async Task Projection_Should_Resume_From_Checkpoint()
{
// Arrange
var checkpointStore = new InMemoryCheckpointStore();
await checkpointStore.SaveCheckpointAsync("test-projection", 100);
// Act
var checkpoint = await checkpointStore.GetCheckpointAsync("test-projection");
// Assert
Assert.Equal(100, checkpoint);
}
[Fact]
public async Task Projection_Should_Start_From_Zero_When_No_Checkpoint()
{
// Arrange
var checkpointStore = new InMemoryCheckpointStore();
// Act
var checkpoint = await checkpointStore.GetCheckpointAsync("new-projection");
// Assert
Assert.Equal(0, checkpoint);
}
Checkpoint Patterns
Frequent Checkpointing
// Save after every event (safest but slowest)
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
// Save checkpoint after each event
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
}
}
Batch Checkpointing
// Save after every N events (faster)
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
var batchSize = 100;
var processedCount = 0;
var lastOffset = checkpoint;
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
lastOffset = @event.Offset;
processedCount++;
// Save checkpoint every 100 events
if (processedCount % batchSize == 0)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
}
// Save final checkpoint
if (lastOffset > checkpoint)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
}
Time-Based Checkpointing
// Save every N seconds
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
var lastCheckpointTime = DateTimeOffset.UtcNow;
var checkpointInterval = TimeSpan.FromSeconds(5);
var lastOffset = checkpoint;
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
lastOffset = @event.Offset;
// Save checkpoint every 5 seconds
if (DateTimeOffset.UtcNow - lastCheckpointTime > checkpointInterval)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
lastCheckpointTime = DateTimeOffset.UtcNow;
}
}
// Save final checkpoint
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
Monitoring Checkpoints
Projection Lag
public async Task<long> GetProjectionLagAsync(string projectionName, CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(projectionName, ct);
var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
var lag = streamHead - checkpoint;
if (lag > 1000)
{
_logger.LogWarning(
"Projection {Projection} lagging: {Lag} events behind",
projectionName,
lag);
}
return lag;
}
Checkpoint Dashboard
// API endpoint for checkpoint status
app.MapGet("/api/projections/checkpoints", async (
ICheckpointStore checkpointStore,
IEventStreamStore eventStore) =>
{
var checkpoints = await checkpointStore.GetAllCheckpointsAsync();
var streamHead = await eventStore.GetStreamHeadAsync("orders");
var status = new List<object>();
foreach (var (projectionName, checkpoint) in checkpoints)
{
var lag = streamHead - checkpoint;
status.Add(new
{
ProjectionName = projectionName,
Checkpoint = checkpoint,
StreamHead = streamHead,
Lag = lag,
PercentComplete = streamHead > 0 ? (double)checkpoint / streamHead * 100 : 100,
UpdatedAt = await GetCheckpointUpdatedAtAsync(projectionName)
});
}
return Results.Ok(status);
});
Health Check
public class ProjectionHealthCheck : IHealthCheck
{
private readonly ICheckpointStore _checkpointStore;
private readonly IEventStreamStore _eventStore;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync("order-summary", ct);
var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
var lag = streamHead - checkpoint;
return lag switch
{
0 => HealthCheckResult.Healthy("Projection up-to-date"),
< 1000 => HealthCheckResult.Degraded($"Projection lagging: {lag} events"),
_ => HealthCheckResult.Unhealthy($"Projection critically lagging: {lag} events")
};
}
}
// Register health check
builder.Services.AddHealthChecks()
.AddCheck<ProjectionHealthCheck>("projection-health");
Custom Checkpoint Store
public class RedisCheckpointStore : ICheckpointStore
{
private readonly IConnectionMultiplexer _redis;
public async Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
var value = await db.StringGetAsync(key);
return value.HasValue ? (long)value : 0;
}
public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
await db.StringSetAsync(key, offset);
}
public async Task DeleteCheckpointAsync(string projectionName, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
await db.KeyDeleteAsync(key);
}
public async Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct)
{
var db = _redis.GetDatabase();
var server = _redis.GetServer(_redis.GetEndPoints().First());
var keys = server.Keys(pattern: "checkpoint:*");
var checkpoints = new Dictionary<string, long>();
foreach (var key in keys)
{
var projectionName = key.ToString().Replace("checkpoint:", "");
var value = await db.StringGetAsync(key);
if (value.HasValue)
{
checkpoints[projectionName] = (long)value;
}
}
return checkpoints;
}
}
// Register custom checkpoint store
builder.Services.AddSingleton<ICheckpointStore, RedisCheckpointStore>();
Best Practices
✅ DO
- Use PostgreSQL checkpoint store for production
- Save checkpoints in transactions with read model updates
- Use batch or time-based checkpointing for performance
- Monitor projection lag regularly
- Set up health checks
- Test checkpoint recovery
- Backup checkpoint data
- Use appropriate checkpoint intervals
❌ DON'T
- Don't use in-memory store for production
- Don't skip checkpoint saves
- Don't checkpoint too frequently (every event)
- Don't ignore checkpoint failures
- Don't forget to handle checkpoint not found (return 0)
- Don't share checkpoint stores across environments
- Don't manually modify checkpoints without reason