dotnet-cqrs/docs/event-streaming/projections/checkpoint-stores.md

458 lines
12 KiB
Markdown

# Checkpoint Stores
Persist projection progress with PostgreSQL or in-memory checkpoint stores.
## Overview
Checkpoint stores track the last processed event offset for each projection:
- **PostgreSQL Store** - Durable checkpoint storage for production
- **In-Memory Store** - Fast checkpoints for development/testing
- **Atomic Updates** - Ensure exactly-once processing
- **Query Support** - Monitor projection progress
## Quick Start
### PostgreSQL Checkpoint Store
```csharp
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Register PostgreSQL checkpoint store
builder.Services.AddPostgresCheckpointStore(
builder.Configuration.GetConnectionString("EventStore"));
var app = builder.Build();
app.Run();
```
### In-Memory Checkpoint Store
```csharp
using Svrnty.CQRS.Events;
var builder = WebApplication.CreateBuilder(args);
// Register in-memory checkpoint store (for testing)
builder.Services.AddInMemoryCheckpointStore();
var app = builder.Build();
app.Run();
```
## ICheckpointStore Interface
```csharp
public interface ICheckpointStore
{
Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct = default);
Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct = default);
Task DeleteCheckpointAsync(string projectionName, CancellationToken ct = default);
Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct = default);
}
```
## Basic Usage
### Get Checkpoint
```csharp
var checkpointStore = serviceProvider.GetRequiredService<ICheckpointStore>();
// Get checkpoint (returns 0 if not found)
var checkpoint = await checkpointStore.GetCheckpointAsync("order-summary");
Console.WriteLine($"Projection checkpoint: {checkpoint}");
```
### Save Checkpoint
```csharp
// Save checkpoint after processing event
await checkpointStore.SaveCheckpointAsync("order-summary", eventOffset);
```
### Delete Checkpoint
```csharp
// Reset projection by deleting checkpoint
await checkpointStore.DeleteCheckpointAsync("order-summary");
```
### Get All Checkpoints
```csharp
// Query all projection checkpoints
var checkpoints = await checkpointStore.GetAllCheckpointsAsync();
foreach (var (projectionName, offset) in checkpoints)
{
Console.WriteLine($"{projectionName}: {offset}");
}
```
## PostgreSQL Checkpoint Store
### Database Schema
```sql
CREATE TABLE IF NOT EXISTS projection_checkpoints (
projection_name TEXT PRIMARY KEY,
offset BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_checkpoints_updated
ON projection_checkpoints(updated_at);
```
### Configuration
```csharp
// Connection string configuration
builder.Services.AddPostgresCheckpointStore(options =>
{
options.ConnectionString = "Host=localhost;Database=eventstore;Username=postgres;Password=postgres";
options.TableName = "projection_checkpoints"; // Custom table name
options.SchemaName = "public"; // Custom schema
});
```
### Transaction Support
```csharp
public async Task HandleEventAsync(StreamEvent @event, CancellationToken ct)
{
await using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);
try
{
// Update read model
var summary = await CreateOrUpdateSummaryAsync(@event, ct);
await _dbContext.SaveChangesAsync(ct);
// Save checkpoint in same transaction
await _checkpointStore.SaveCheckpointAsync(
ProjectionName,
@event.Offset,
ct);
await transaction.CommitAsync(ct);
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
```
## In-Memory Checkpoint Store
### Usage
```csharp
// For testing only - data lost on restart
builder.Services.AddInMemoryCheckpointStore();
// Projection will restart from beginning after app restart
```
### Test Scenarios
```csharp
[Fact]
public async Task Projection_Should_Resume_From_Checkpoint()
{
// Arrange
var checkpointStore = new InMemoryCheckpointStore();
await checkpointStore.SaveCheckpointAsync("test-projection", 100);
// Act
var checkpoint = await checkpointStore.GetCheckpointAsync("test-projection");
// Assert
Assert.Equal(100, checkpoint);
}
[Fact]
public async Task Projection_Should_Start_From_Zero_When_No_Checkpoint()
{
// Arrange
var checkpointStore = new InMemoryCheckpointStore();
// Act
var checkpoint = await checkpointStore.GetCheckpointAsync("new-projection");
// Assert
Assert.Equal(0, checkpoint);
}
```
## Checkpoint Patterns
### Frequent Checkpointing
```csharp
// Save after every event (safest but slowest)
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
// Save checkpoint after each event
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
}
}
```
### Batch Checkpointing
```csharp
// Save after every N events (faster)
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
var batchSize = 100;
var processedCount = 0;
var lastOffset = checkpoint;
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
lastOffset = @event.Offset;
processedCount++;
// Save checkpoint every 100 events
if (processedCount % batchSize == 0)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
}
// Save final checkpoint
if (lastOffset > checkpoint)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
}
```
### Time-Based Checkpointing
```csharp
// Save every N seconds
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
var lastCheckpointTime = DateTimeOffset.UtcNow;
var checkpointInterval = TimeSpan.FromSeconds(5);
var lastOffset = checkpoint;
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
lastOffset = @event.Offset;
// Save checkpoint every 5 seconds
if (DateTimeOffset.UtcNow - lastCheckpointTime > checkpointInterval)
{
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
lastCheckpointTime = DateTimeOffset.UtcNow;
}
}
// Save final checkpoint
await _checkpointStore.SaveCheckpointAsync(ProjectionName, lastOffset);
}
```
## Monitoring Checkpoints
### Projection Lag
```csharp
public async Task<long> GetProjectionLagAsync(string projectionName, CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(projectionName, ct);
var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
var lag = streamHead - checkpoint;
if (lag > 1000)
{
_logger.LogWarning(
"Projection {Projection} lagging: {Lag} events behind",
projectionName,
lag);
}
return lag;
}
```
### Checkpoint Dashboard
```csharp
// API endpoint for checkpoint status
app.MapGet("/api/projections/checkpoints", async (
ICheckpointStore checkpointStore,
IEventStreamStore eventStore) =>
{
var checkpoints = await checkpointStore.GetAllCheckpointsAsync();
var streamHead = await eventStore.GetStreamHeadAsync("orders");
var status = new List<object>();
foreach (var (projectionName, checkpoint) in checkpoints)
{
var lag = streamHead - checkpoint;
status.Add(new
{
ProjectionName = projectionName,
Checkpoint = checkpoint,
StreamHead = streamHead,
Lag = lag,
PercentComplete = streamHead > 0 ? (double)checkpoint / streamHead * 100 : 100,
UpdatedAt = await GetCheckpointUpdatedAtAsync(projectionName)
});
}
return Results.Ok(status);
});
```
### Health Check
```csharp
public class ProjectionHealthCheck : IHealthCheck
{
private readonly ICheckpointStore _checkpointStore;
private readonly IEventStreamStore _eventStore;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync("order-summary", ct);
var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
var lag = streamHead - checkpoint;
return lag switch
{
0 => HealthCheckResult.Healthy("Projection up-to-date"),
< 1000 => HealthCheckResult.Degraded($"Projection lagging: {lag} events"),
_ => HealthCheckResult.Unhealthy($"Projection critically lagging: {lag} events")
};
}
}
// Register health check
builder.Services.AddHealthChecks()
.AddCheck<ProjectionHealthCheck>("projection-health");
```
## Custom Checkpoint Store
```csharp
public class RedisCheckpointStore : ICheckpointStore
{
private readonly IConnectionMultiplexer _redis;
public async Task<long> GetCheckpointAsync(string projectionName, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
var value = await db.StringGetAsync(key);
return value.HasValue ? (long)value : 0;
}
public async Task SaveCheckpointAsync(string projectionName, long offset, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
await db.StringSetAsync(key, offset);
}
public async Task DeleteCheckpointAsync(string projectionName, CancellationToken ct)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{projectionName}";
await db.KeyDeleteAsync(key);
}
public async Task<Dictionary<string, long>> GetAllCheckpointsAsync(CancellationToken ct)
{
var db = _redis.GetDatabase();
var server = _redis.GetServer(_redis.GetEndPoints().First());
var keys = server.Keys(pattern: "checkpoint:*");
var checkpoints = new Dictionary<string, long>();
foreach (var key in keys)
{
var projectionName = key.ToString().Replace("checkpoint:", "");
var value = await db.StringGetAsync(key);
if (value.HasValue)
{
checkpoints[projectionName] = (long)value;
}
}
return checkpoints;
}
}
// Register custom checkpoint store
builder.Services.AddSingleton<ICheckpointStore, RedisCheckpointStore>();
```
## Best Practices
### ✅ DO
- Use PostgreSQL checkpoint store for production
- Save checkpoints in transactions with read model updates
- Use batch or time-based checkpointing for performance
- Monitor projection lag regularly
- Set up health checks
- Test checkpoint recovery
- Backup checkpoint data
- Use appropriate checkpoint intervals
### ❌ DON'T
- Don't use in-memory store for production
- Don't skip checkpoint saves
- Don't checkpoint too frequently (every event)
- Don't ignore checkpoint failures
- Don't forget to handle checkpoint not found (return 0)
- Don't share checkpoint stores across environments
- Don't manually modify checkpoints without reason
## See Also
- [Projections Overview](README.md)
- [Creating Projections](creating-projections.md)
- [Projection Options](projection-options.md)
- [Resettable Projections](resettable-projections.md)
- [PostgreSQL Storage](../storage/postgresql-storage.md)