12 KiB
12 KiB
Resettable Projections
Rebuild read models from scratch with IResettableProjection.
Overview
Resettable projections allow you to rebuild read models from the beginning:
- Schema Changes - Rebuild after modifying read model structure
- Bug Fixes - Rebuild after fixing projection logic
- Data Corruption - Rebuild from events after corruption
- New Projections - Build new read models from historical events
Quick Start
using Svrnty.CQRS.Events.Abstractions;
public class OrderSummaryProjection : IResettableProjection
{
public string ProjectionName => "order-summary";
public async Task RunAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: checkpoint + 1,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
}
}
public async Task ResetAsync(CancellationToken ct)
{
_logger.LogWarning("Resetting projection {Projection}", ProjectionName);
// 1. Clear read model
await _dbContext.Database.ExecuteSqlRawAsync(
"TRUNCATE TABLE order_summaries",
ct);
// 2. Reset checkpoint to beginning
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
_logger.LogInformation("Projection {Projection} reset complete", ProjectionName);
}
}
IResettableProjection Interface
public interface IResettableProjection : IDynamicProjection
{
Task ResetAsync(CancellationToken cancellationToken);
}
Reset Implementation
Basic Reset
public async Task ResetAsync(CancellationToken ct)
{
// 1. Stop projection
await StopProjectionAsync(ct);
// 2. Clear data
await ClearReadModelAsync(ct);
// 3. Reset checkpoint
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
// 4. Restart projection
await StartProjectionAsync(ct);
}
Safe Reset with Backup
public async Task ResetAsync(CancellationToken ct)
{
_logger.LogWarning("Starting reset for projection {Projection}", ProjectionName);
// 1. Stop projection
await StopProjectionAsync(ct);
try
{
// 2. Backup existing data
var backupTable = $"order_summaries_backup_{DateTime.UtcNow:yyyyMMddHHmmss}";
await _dbContext.Database.ExecuteSqlRawAsync(
$"CREATE TABLE {backupTable} AS SELECT * FROM order_summaries",
ct);
_logger.LogInformation("Created backup table {Table}", backupTable);
// 3. Clear data
await _dbContext.Database.ExecuteSqlRawAsync(
"TRUNCATE TABLE order_summaries",
ct);
// 4. Reset checkpoint
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
_logger.LogInformation("Projection {Projection} reset complete", ProjectionName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error resetting projection {Projection}", ProjectionName);
// Restore from backup
await RestoreFromBackupAsync(ct);
throw;
}
finally
{
// 5. Restart projection
await StartProjectionAsync(ct);
}
}
Incremental Reset
public async Task ResetAsync(DateTimeOffset fromDate, CancellationToken ct)
{
_logger.LogWarning(
"Resetting projection {Projection} from {Date}",
ProjectionName,
fromDate);
// 1. Stop projection
await StopProjectionAsync(ct);
try
{
// 2. Delete data after fromDate
await _dbContext.OrderSummaries
.Where(o => o.PlacedAt >= fromDate)
.ExecuteDeleteAsync(ct);
// 3. Find offset for fromDate
var startOffset = await FindOffsetForDateAsync(fromDate, ct);
// 4. Reset checkpoint to that offset
await _checkpointStore.SaveCheckpointAsync(ProjectionName, startOffset);
_logger.LogInformation(
"Incremental reset complete, resuming from offset {Offset}",
startOffset);
}
finally
{
// 5. Restart projection
await StartProjectionAsync(ct);
}
}
Triggering Resets
Manual Reset via API
// API endpoint for reset
app.MapPost("/api/projections/{name}/reset", async (
string name,
IProjectionService projectionService,
CancellationToken ct) =>
{
var projection = projectionService.GetProjection(name);
if (projection is not IResettableProjection resettable)
{
return Results.BadRequest($"Projection {name} is not resettable");
}
await resettable.ResetAsync(ct);
return Results.Ok($"Projection {name} reset initiated");
})
.RequireAuthorization("Admin"); // Require admin role
Reset from CLI
// CLI command
public class ResetProjectionCommand
{
public static async Task ExecuteAsync(
string projectionName,
IServiceProvider services)
{
var projectionService = services.GetRequiredService<IProjectionService>();
var projection = projectionService.GetProjection(projectionName);
if (projection is not IResettableProjection resettable)
{
Console.WriteLine($"Projection {projectionName} is not resettable");
return;
}
Console.WriteLine($"Resetting projection {projectionName}...");
await resettable.ResetAsync(CancellationToken.None);
Console.WriteLine("Reset complete");
}
}
// Usage:
// dotnet run -- reset-projection order-summary
Automated Reset on Schema Change
public class OrderSummaryProjection : IResettableProjection
{
private const int CurrentSchemaVersion = 2;
public async Task RunAsync(CancellationToken ct)
{
// Check schema version
var storedVersion = await GetSchemaVersionAsync(ct);
if (storedVersion < CurrentSchemaVersion)
{
_logger.LogWarning(
"Schema version mismatch (stored: {Stored}, current: {Current}), resetting projection",
storedVersion,
CurrentSchemaVersion);
await ResetAsync(ct);
await SetSchemaVersionAsync(CurrentSchemaVersion, ct);
}
// Normal projection processing
await ProcessEventsAsync(ct);
}
}
Reset Strategies
Full Rebuild
// Delete everything and rebuild from beginning
public async Task ResetAsync(CancellationToken ct)
{
await _dbContext.OrderSummaries.ExecuteDeleteAsync(ct);
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
}
Partial Rebuild
// Delete specific subset and rebuild
public async Task ResetForCustomerAsync(int customerId, CancellationToken ct)
{
// Delete customer's data
await _dbContext.OrderSummaries
.Where(o => o.CustomerId == customerId)
.ExecuteDeleteAsync(ct);
// Find first order for customer
var firstEvent = await _eventStore.ReadStreamAsync("orders")
.Where(e => e.Metadata["CustomerId"] == customerId.ToString())
.FirstOrDefaultAsync(ct);
// Rebuild from that point
if (firstEvent != null)
{
await RebuildFromOffsetAsync(firstEvent.Offset, ct);
}
}
Rolling Rebuild
// Rebuild in chunks without stopping
public async Task RebuildAsync(CancellationToken ct)
{
var batchSize = 10000;
var totalEvents = await _eventStore.GetStreamLengthAsync("orders");
for (long offset = 0; offset < totalEvents; offset += batchSize)
{
// Delete chunk
await DeleteChunkAsync(offset, offset + batchSize, ct);
// Rebuild chunk
await foreach (var @event in _eventStore.ReadStreamAsync(
"orders",
fromOffset: offset,
count: batchSize,
cancellationToken: ct))
{
await HandleEventAsync(@event, ct);
}
_logger.LogInformation(
"Rebuilt {Offset}/{Total} events",
Math.Min(offset + batchSize, totalEvents),
totalEvents);
}
}
Reset Progress Tracking
public async Task ResetAsync(CancellationToken ct)
{
var totalEvents = await _eventStore.GetStreamLengthAsync("orders");
var processedEvents = 0L;
_logger.LogInformation(
"Resetting projection {Projection}, {Total} events to process",
ProjectionName,
totalEvents);
await _dbContext.OrderSummaries.ExecuteDeleteAsync(ct);
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
await foreach (var @event in _eventStore.ReadStreamAsync("orders", ct))
{
await HandleEventAsync(@event, ct);
processedEvents++;
if (processedEvents % 1000 == 0)
{
var progress = (double)processedEvents / totalEvents * 100;
_logger.LogInformation(
"Reset progress: {Processed}/{Total} ({Progress:F1}%)",
processedEvents,
totalEvents,
progress);
}
}
_logger.LogInformation("Reset complete: {Total} events processed", processedEvents);
}
Zero-Downtime Reset
public async Task ResetAsync(CancellationToken ct)
{
var tempTableName = $"order_summaries_temp_{Guid.NewGuid():N}";
// 1. Create temp table
await _dbContext.Database.ExecuteSqlRawAsync(
$"CREATE TABLE {tempTableName} (LIKE order_summaries INCLUDING ALL)",
ct);
// 2. Rebuild into temp table
var tempContext = CreateTempContext(tempTableName);
await foreach (var @event in _eventStore.ReadStreamAsync("orders", ct))
{
await HandleEventIntoTempTableAsync(@event, tempContext, ct);
}
// 3. Swap tables atomically
await _dbContext.Database.ExecuteSqlRawAsync(
$@"BEGIN;
DROP TABLE order_summaries;
ALTER TABLE {tempTableName} RENAME TO order_summaries;
COMMIT;",
ct);
// 4. Reset checkpoint
await _checkpointStore.SaveCheckpointAsync(ProjectionName, await GetStreamHeadAsync());
_logger.LogInformation("Zero-downtime reset complete");
}
Reset Validation
public async Task<bool> ValidateResetAsync(CancellationToken ct)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
var streamHead = await _eventStore.GetStreamHeadAsync("orders");
// Verify checkpoint is at stream head after full rebuild
if (checkpoint < streamHead)
{
_logger.LogWarning(
"Projection {Projection} not fully rebuilt: checkpoint {Checkpoint}, stream head {Head}",
ProjectionName,
checkpoint,
streamHead);
return false;
}
// Verify data integrity
var summaryCount = await _dbContext.OrderSummaries.CountAsync(ct);
var orderCount = await CountOrderPlacedEventsAsync(ct);
if (summaryCount != orderCount)
{
_logger.LogWarning(
"Data integrity issue: {Summaries} summaries, {Orders} order events",
summaryCount,
orderCount);
return false;
}
_logger.LogInformation("Reset validation passed");
return true;
}
Best Practices
✅ DO
- Implement IResettableProjection for all projections
- Backup data before resetting
- Log reset operations
- Track reset progress
- Validate after reset
- Use transactions where possible
- Consider zero-downtime resets for production
- Test reset procedures regularly
- Document reset procedures
❌ DON'T
- Don't reset without backup in production
- Don't forget to stop projection before reset
- Don't skip validation after reset
- Don't reset during peak hours
- Don't forget to update schema version
- Don't lose checkpoint after reset
- Don't forget to restart projection
- Don't ignore reset errors