dotnet-cqrs/docs/event-streaming/projections/checkpoint-stores.md

12 KiB

Checkpoint Stores

Persist projection progress with PostgreSQL or in-memory checkpoint stores.

Overview

Checkpoint stores track the last processed event offset for each projection:

  • PostgreSQL Store - Durable checkpoint storage for production
  • In-Memory Store - Fast checkpoints for development/testing
  • Atomic Updates - Ensure exactly-once processing
  • Query Support - Monitor projection progress

Quick Start

PostgreSQL Checkpoint Store

using Svrnty.CQRS.Events.PostgreSQL;

var builder = WebApplication.CreateBuilder(args);

// Register PostgreSQL checkpoint store
builder.Services.AddPostgresCheckpointStore(
    builder.Configuration.GetConnectionString("EventStore"));

var app = builder.Build();
app.Run();

In-Memory Checkpoint Store

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Register in-memory checkpoint store (for testing)
builder.Services.AddInMemoryCheckpointStore();

var app = builder.Build();
app.Run();

ICheckpointStore Interface

public interface ICheckpointStore
{
    Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct = default);
    Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default);
    Task DeleteCheckpointAsync(string projectionName, CancellationToken ct = default);
    Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct = default);
}

Basic Usage

Get Checkpoint

var checkpointStore = serviceProvider.GetRequiredService<ICheckpointStore>();

// Get checkpoint (returns 0 if not found)
var checkpoint = await checkpointStore.GetCheckpointAsync("order-summary");

Console.WriteLine($"Projection checkpoint: {checkpoint}");

Save Checkpoint

// Save checkpoint after processing event
await checkpointStore.SaveCheckpointAsync("order-summary", eventOffset);

Delete Checkpoint

// Reset projection by deleting checkpoint
await checkpointStore.DeleteCheckpointAsync("order-summary");

Get All Checkpoints

// Query all projection checkpoints
var checkpoints = await checkpointStore.GetAllCheckpointsAsync();

foreach (var (projectionName, offset) in checkpoints)
{
    Console.WriteLine($"{projectionName}: {offset}");
}

PostgreSQL Checkpoint Store

Database Schema

CREATE TABLE IF NOT EXISTS projection_checkpoints (
    projection_name TEXT PRIMARY KEY,
    offset BIGINT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_checkpoints_updated
ON projection_checkpoints(updated_at);

Configuration

// Connection string configuration
builder.Services.AddPostgresCheckpointStore(options =>
{
    options.ConnectionString = "Host=localhost;Database=eventstore;Username=postgres;Password=postgres";
    options.TableName = "projection_checkpoints";  // Custom table name
    options.SchemaName = "public";                 // Custom schema
});

Transaction Support

public async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
    await using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);

    try
    {
        // Update read model
        var summary = await CreateOrUpdateSummaryAsync(@event, ct);
        await _dbContext.SaveChangesAsync(ct);

        // Save checkpoint in same transaction
        await _checkpointStore.SaveCheckpointAsync(
            ProjectionName,
            @event.Offset,
            ct);

        await transaction.CommitAsync(ct);
    }
    catch
    {
        await transaction.RollbackAsync(ct);
        throw;
    }
}

In-Memory Checkpoint Store

Usage

// For testing only - data lost on restart
builder.Services.AddInMemoryCheckpointStore();

// Projection will restart from beginning after app restart

Test Scenarios

[Fact]
public async Task Projection_Should_Resume_From_Checkpoint()
{
    // Arrange
    var checkpointStore = new InMemoryCheckpointStore();
    await checkpointStore.SaveCheckpointAsync("test-projection", 100);

    // Act
    var checkpoint = await checkpointStore.GetCheckpointAsync("test-projection");

    // Assert
    Assert.Equal(100, checkpoint);
}

[Fact]
public async Task Projection_Should_Start_From_Zero_When_No_Checkpoint()
{
    // Arrange
    var checkpointStore = new InMemoryCheckpointStore();

    // Act
    var checkpoint = await checkpointStore.GetCheckpointAsync("new-projection");

    // Assert
    Assert.Equal(0, checkpoint);
}

Checkpoint Patterns

Frequent Checkpointing

// Save after every event (safest but slowest)
public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        await HandleEventAsync(@event, ct);

        // Save checkpoint after each event
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
    }
}

Batch Checkpointing

// Save after every N events (faster)
public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
    var batchSize = 100;
    var processedCount = 0;
    var lastOffset = checkpoint;

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        await HandleEventAsync(@event, ct);
        lastOffset = @event.Offset;
        processedCount++;

        // Save checkpoint every 100 events
        if (processedCount % batchSize == 0)
        {
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
        }
    }

    // Save final checkpoint
    if (lastOffset > checkpoint)
    {
        await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
    }
}

Time-Based Checkpointing

// Save every N seconds
public async Task RunAsync(CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
    var lastCheckpointTime = DateTimeOffset.UtcNow;
    var checkpointInterval = TimeSpan.FromSeconds(5);
    var lastOffset = checkpoint;

    await foreach (var @event in _eventStore.ReadStreamAsync(
        "orders",
        fromOffset: checkpoint + 1,
        cancellationToken: ct))
    {
        await HandleEventAsync(@event, ct);
        lastOffset = @event.Offset;

        // Save checkpoint every 5 seconds
        if (DateTimeOffset.UtcNow - lastCheckpointTime > checkpointInterval)
        {
            await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
            lastCheckpointTime = DateTimeOffset.UtcNow;
        }
    }

    // Save final checkpoint
    await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}

Monitoring Checkpoints

Projection Lag

public async Task<long> GetProjectionLagAsync(string projectionName, CancellationToken ct)
{
    var checkpoint = await _checkpointStore.GetCheckpointAsync(projectionName, ct);
    var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);

    var lag = streamHead - checkpoint;

    if (lag > 1000)
    {
        _logger.LogWarning(
            "Projection {Projection} lagging: {Lag} events behind",
            projectionName,
            lag);
    }

    return lag;
}

Checkpoint Dashboard

// API endpoint for checkpoint status
app.MapGet("/api/projections/checkpoints", async (
    ICheckpointStore checkpointStore,
    IEventStreamStore eventStore) =>
{
    var checkpoints = await checkpointStore.GetAllCheckpointsAsync();
    var streamHead = await eventStore.GetStreamHeadAsync("orders");

    var status = new List<object>();

    foreach (var (projectionName, checkpoint) in checkpoints)
    {
        var lag = streamHead - checkpoint;

        status.Add(new
        {
            ProjectionName = projectionName,
            Checkpoint = checkpoint,
            StreamHead = streamHead,
            Lag = lag,
            PercentComplete = streamHead > 0 ? (double)checkpoint / streamHead * 100 : 100,
            UpdatedAt = await GetCheckpointUpdatedAtAsync(projectionName)
        });
    }

    return Results.Ok(status);
});

Health Check

public class ProjectionHealthCheck : IHealthCheck
{
    private readonly ICheckpointStore _checkpointStore;
    private readonly IEventStreamStore _eventStore;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken ct = default)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync("order-summary", ct);
        var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
        var lag = streamHead - checkpoint;

        return lag switch
        {
            0 => HealthCheckResult.Healthy("Projection up-to-date"),
            < 1000 => HealthCheckResult.Degraded($"Projection lagging: {lag} events"),
            _ => HealthCheckResult.Unhealthy($"Projection critically lagging: {lag} events")
        };
    }
}

// Register health check
builder.Services.AddHealthChecks()
    .AddCheck<ProjectionHealthCheck>("projection-health");

Custom Checkpoint Store

public class RedisCheckpointStore : ICheckpointStore
{
    private readonly IConnectionMultiplexer _redis;

    public async Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct)
    {
        var db = _redis.GetDatabase();
        var key = $"checkpoint:{projectionName}";
        var value = await db.StringGetAsync(key);

        return value.HasValue ? (long)value : 0;
    }

    public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct)
    {
        var db = _redis.GetDatabase();
        var key = $"checkpoint:{projectionName}";

        await db.StringSetAsync(key, offset);
    }

    public async Task DeleteCheckpointAsync(string projectionName, CancellationToken ct)
    {
        var db = _redis.GetDatabase();
        var key = $"checkpoint:{projectionName}";

        await db.KeyDeleteAsync(key);
    }

    public async Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct)
    {
        var db = _redis.GetDatabase();
        var server = _redis.GetServer(_redis.GetEndPoints().First());
        var keys = server.Keys(pattern: "checkpoint:*");

        var checkpoints = new Dictionary<string, long>();

        foreach (var key in keys)
        {
            var projectionName = key.ToString().Replace("checkpoint:", "");
            var value = await db.StringGetAsync(key);

            if (value.HasValue)
            {
                checkpoints[projectionName] = (long)value;
            }
        }

        return checkpoints;
    }
}

// Register custom checkpoint store
builder.Services.AddSingleton<ICheckpointStore, RedisCheckpointStore>();

Best Practices

DO

  • Use PostgreSQL checkpoint store for production
  • Save checkpoints in transactions with read model updates
  • Use batch or time-based checkpointing for performance
  • Monitor projection lag regularly
  • Set up health checks
  • Test checkpoint recovery
  • Backup checkpoint data
  • Use appropriate checkpoint intervals

DON'T

  • Don't use in-memory store for production
  • Don't skip checkpoint saves
  • Don't checkpoint too frequently (every event)
  • Don't ignore checkpoint failures
  • Don't forget to handle checkpoint not found (return 0)
  • Don't share checkpoint stores across environments
  • Don't manually modify checkpoints without reason

See Also