462 lines
12 KiB
Markdown
462 lines
12 KiB
Markdown
# Resettable Projections
|
|
|
|
Rebuild read models from scratch with IResettableProjection.
|
|
|
|
## Overview
|
|
|
|
Resettable projections allow you to rebuild read models from the beginning:
|
|
- **Schema Changes** - Rebuild after modifying read model structure
|
|
- **Bug Fixes** - Rebuild after fixing projection logic
|
|
- **Data Corruption** - Rebuild from events after corruption
|
|
- **New Projections** - Build new read models from historical events
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
public class OrderSummaryProjection : IResettableProjection
|
|
{
|
|
public string ProjectionName => "order-summary";
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: checkpoint + 1,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, @event.Offset);
|
|
}
|
|
}
|
|
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
_logger.LogWarning("Resetting projection {Projection}", ProjectionName);
|
|
|
|
// 1. Clear read model
|
|
await _dbContext.Database.ExecuteSqlRawAsync(
|
|
"TRUNCATE TABLE order_summaries",
|
|
ct);
|
|
|
|
// 2. Reset checkpoint to beginning
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
|
|
|
|
_logger.LogInformation("Projection {Projection} reset complete", ProjectionName);
|
|
}
|
|
}
|
|
```
|
|
|
|
## IResettableProjection Interface
|
|
|
|
```csharp
|
|
public interface IResettableProjection : IDynamicProjection
|
|
{
|
|
Task ResetAsync(CancellationToken cancellationToken);
|
|
}
|
|
```
|
|
|
|
## Reset Implementation
|
|
|
|
### Basic Reset
|
|
|
|
```csharp
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
// 1. Stop projection
|
|
await StopProjectionAsync(ct);
|
|
|
|
// 2. Clear data
|
|
await ClearReadModelAsync(ct);
|
|
|
|
// 3. Reset checkpoint
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
|
|
|
|
// 4. Restart projection
|
|
await StartProjectionAsync(ct);
|
|
}
|
|
```
|
|
|
|
### Safe Reset with Backup
|
|
|
|
```csharp
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
_logger.LogWarning("Starting reset for projection {Projection}", ProjectionName);
|
|
|
|
// 1. Stop projection
|
|
await StopProjectionAsync(ct);
|
|
|
|
try
|
|
{
|
|
// 2. Backup existing data
|
|
var backupTable = $"order_summaries_backup_{DateTime.UtcNow:yyyyMMddHHmmss}";
|
|
await _dbContext.Database.ExecuteSqlRawAsync(
|
|
$"CREATE TABLE {backupTable} AS SELECT * FROM order_summaries",
|
|
ct);
|
|
|
|
_logger.LogInformation("Created backup table {Table}", backupTable);
|
|
|
|
// 3. Clear data
|
|
await _dbContext.Database.ExecuteSqlRawAsync(
|
|
"TRUNCATE TABLE order_summaries",
|
|
ct);
|
|
|
|
// 4. Reset checkpoint
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
|
|
|
|
_logger.LogInformation("Projection {Projection} reset complete", ProjectionName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error resetting projection {Projection}", ProjectionName);
|
|
|
|
// Restore from backup
|
|
await RestoreFromBackupAsync(ct);
|
|
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
// 5. Restart projection
|
|
await StartProjectionAsync(ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Incremental Reset
|
|
|
|
```csharp
|
|
public async Task ResetAsync(DateTimeOffset fromDate, CancellationToken ct)
|
|
{
|
|
_logger.LogWarning(
|
|
"Resetting projection {Projection} from {Date}",
|
|
ProjectionName,
|
|
fromDate);
|
|
|
|
// 1. Stop projection
|
|
await StopProjectionAsync(ct);
|
|
|
|
try
|
|
{
|
|
// 2. Delete data after fromDate
|
|
await _dbContext.OrderSummaries
|
|
.Where(o => o.PlacedAt >= fromDate)
|
|
.ExecuteDeleteAsync(ct);
|
|
|
|
// 3. Find offset for fromDate
|
|
var startOffset = await FindOffsetForDateAsync(fromDate, ct);
|
|
|
|
// 4. Reset checkpoint to that offset
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, startOffset);
|
|
|
|
_logger.LogInformation(
|
|
"Incremental reset complete, resuming from offset {Offset}",
|
|
startOffset);
|
|
}
|
|
finally
|
|
{
|
|
// 5. Restart projection
|
|
await StartProjectionAsync(ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Triggering Resets
|
|
|
|
### Manual Reset via API
|
|
|
|
```csharp
|
|
// API endpoint for reset
|
|
app.MapPost("/api/projections/{name}/reset", async (
|
|
string name,
|
|
IProjectionService projectionService,
|
|
CancellationToken ct) =>
|
|
{
|
|
var projection = projectionService.GetProjection(name);
|
|
|
|
if (projection is not IResettableProjection resettable)
|
|
{
|
|
return Results.BadRequest($"Projection {name} is not resettable");
|
|
}
|
|
|
|
await resettable.ResetAsync(ct);
|
|
|
|
return Results.Ok($"Projection {name} reset initiated");
|
|
})
|
|
.RequireAuthorization("Admin"); // Require admin role
|
|
```
|
|
|
|
### Reset from CLI
|
|
|
|
```csharp
|
|
// CLI command
|
|
public class ResetProjectionCommand
|
|
{
|
|
public static async Task ExecuteAsync(
|
|
string projectionName,
|
|
IServiceProvider services)
|
|
{
|
|
var projectionService = services.GetRequiredService<IProjectionService>();
|
|
var projection = projectionService.GetProjection(projectionName);
|
|
|
|
if (projection is not IResettableProjection resettable)
|
|
{
|
|
Console.WriteLine($"Projection {projectionName} is not resettable");
|
|
return;
|
|
}
|
|
|
|
Console.WriteLine($"Resetting projection {projectionName}...");
|
|
|
|
await resettable.ResetAsync(CancellationToken.None);
|
|
|
|
Console.WriteLine("Reset complete");
|
|
}
|
|
}
|
|
|
|
// Usage:
|
|
// dotnet run -- reset-projection order-summary
|
|
```
|
|
|
|
### Automated Reset on Schema Change
|
|
|
|
```csharp
|
|
public class OrderSummaryProjection : IResettableProjection
|
|
{
|
|
private const int CurrentSchemaVersion = 2;
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
// Check schema version
|
|
var storedVersion = await GetSchemaVersionAsync(ct);
|
|
|
|
if (storedVersion < CurrentSchemaVersion)
|
|
{
|
|
_logger.LogWarning(
|
|
"Schema version mismatch (stored: {Stored}, current: {Current}), resetting projection",
|
|
storedVersion,
|
|
CurrentSchemaVersion);
|
|
|
|
await ResetAsync(ct);
|
|
await SetSchemaVersionAsync(CurrentSchemaVersion, ct);
|
|
}
|
|
|
|
// Normal projection processing
|
|
await ProcessEventsAsync(ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Reset Strategies
|
|
|
|
### Full Rebuild
|
|
|
|
```csharp
|
|
// Delete everything and rebuild from beginning
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
await _dbContext.OrderSummaries.ExecuteDeleteAsync(ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
|
|
}
|
|
```
|
|
|
|
### Partial Rebuild
|
|
|
|
```csharp
|
|
// Delete specific subset and rebuild
|
|
public async Task ResetForCustomerAsync(int customerId, CancellationToken ct)
|
|
{
|
|
// Delete customer's data
|
|
await _dbContext.OrderSummaries
|
|
.Where(o => o.CustomerId == customerId)
|
|
.ExecuteDeleteAsync(ct);
|
|
|
|
// Find first order for customer
|
|
var firstEvent = await _eventStore.ReadStreamAsync("orders")
|
|
.Where(e => e.Metadata["CustomerId"] == customerId.ToString())
|
|
.FirstOrDefaultAsync(ct);
|
|
|
|
// Rebuild from that point
|
|
if (firstEvent != null)
|
|
{
|
|
await RebuildFromOffsetAsync(firstEvent.Offset, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Rolling Rebuild
|
|
|
|
```csharp
|
|
// Rebuild in chunks without stopping
|
|
public async Task RebuildAsync(CancellationToken ct)
|
|
{
|
|
var batchSize = 10000;
|
|
var totalEvents = await _eventStore.GetStreamLengthAsync("orders");
|
|
|
|
for (long offset = 0; offset < totalEvents; offset += batchSize)
|
|
{
|
|
// Delete chunk
|
|
await DeleteChunkAsync(offset, offset + batchSize, ct);
|
|
|
|
// Rebuild chunk
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
"orders",
|
|
fromOffset: offset,
|
|
count: batchSize,
|
|
cancellationToken: ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"Rebuilt {Offset}/{Total} events",
|
|
Math.Min(offset + batchSize, totalEvents),
|
|
totalEvents);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Reset Progress Tracking
|
|
|
|
```csharp
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
var totalEvents = await _eventStore.GetStreamLengthAsync("orders");
|
|
var processedEvents = 0L;
|
|
|
|
_logger.LogInformation(
|
|
"Resetting projection {Projection}, {Total} events to process",
|
|
ProjectionName,
|
|
totalEvents);
|
|
|
|
await _dbContext.OrderSummaries.ExecuteDeleteAsync(ct);
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, 0);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync("orders", ct))
|
|
{
|
|
await HandleEventAsync(@event, ct);
|
|
processedEvents++;
|
|
|
|
if (processedEvents % 1000 == 0)
|
|
{
|
|
var progress = (double)processedEvents / totalEvents * 100;
|
|
_logger.LogInformation(
|
|
"Reset progress: {Processed}/{Total} ({Progress:F1}%)",
|
|
processedEvents,
|
|
totalEvents,
|
|
progress);
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation("Reset complete: {Total} events processed", processedEvents);
|
|
}
|
|
```
|
|
|
|
## Zero-Downtime Reset
|
|
|
|
```csharp
|
|
public async Task ResetAsync(CancellationToken ct)
|
|
{
|
|
var tempTableName = $"order_summaries_temp_{Guid.NewGuid():N}";
|
|
|
|
// 1. Create temp table
|
|
await _dbContext.Database.ExecuteSqlRawAsync(
|
|
$"CREATE TABLE {tempTableName} (LIKE order_summaries INCLUDING ALL)",
|
|
ct);
|
|
|
|
// 2. Rebuild into temp table
|
|
var tempContext = CreateTempContext(tempTableName);
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync("orders", ct))
|
|
{
|
|
await HandleEventIntoTempTableAsync(@event, tempContext, ct);
|
|
}
|
|
|
|
// 3. Swap tables atomically
|
|
await _dbContext.Database.ExecuteSqlRawAsync(
|
|
$@"BEGIN;
|
|
DROP TABLE order_summaries;
|
|
ALTER TABLE {tempTableName} RENAME TO order_summaries;
|
|
COMMIT;",
|
|
ct);
|
|
|
|
// 4. Reset checkpoint
|
|
await _checkpointStore.SaveCheckpointAsync(ProjectionName, await GetStreamHeadAsync());
|
|
|
|
_logger.LogInformation("Zero-downtime reset complete");
|
|
}
|
|
```
|
|
|
|
## Reset Validation
|
|
|
|
```csharp
|
|
public async Task<bool> ValidateResetAsync(CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(ProjectionName);
|
|
var streamHead = await _eventStore.GetStreamHeadAsync("orders");
|
|
|
|
// Verify checkpoint is at stream head after full rebuild
|
|
if (checkpoint < streamHead)
|
|
{
|
|
_logger.LogWarning(
|
|
"Projection {Projection} not fully rebuilt: checkpoint {Checkpoint}, stream head {Head}",
|
|
ProjectionName,
|
|
checkpoint,
|
|
streamHead);
|
|
return false;
|
|
}
|
|
|
|
// Verify data integrity
|
|
var summaryCount = await _dbContext.OrderSummaries.CountAsync(ct);
|
|
var orderCount = await CountOrderPlacedEventsAsync(ct);
|
|
|
|
if (summaryCount != orderCount)
|
|
{
|
|
_logger.LogWarning(
|
|
"Data integrity issue: {Summaries} summaries, {Orders} order events",
|
|
summaryCount,
|
|
orderCount);
|
|
return false;
|
|
}
|
|
|
|
_logger.LogInformation("Reset validation passed");
|
|
return true;
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Implement IResettableProjection for all projections
|
|
- Backup data before resetting
|
|
- Log reset operations
|
|
- Track reset progress
|
|
- Validate after reset
|
|
- Use transactions where possible
|
|
- Consider zero-downtime resets for production
|
|
- Test reset procedures regularly
|
|
- Document reset procedures
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't reset without backup in production
|
|
- Don't forget to stop projection before reset
|
|
- Don't skip validation after reset
|
|
- Don't reset during peak hours
|
|
- Don't forget to update schema version
|
|
- Don't lose checkpoint after reset
|
|
- Don't forget to restart projection
|
|
- Don't ignore reset errors
|
|
|
|
## See Also
|
|
|
|
- [Projections Overview](README.md)
|
|
- [Creating Projections](creating-projections.md)
|
|
- [Projection Options](projection-options.md)
|
|
- [Checkpoint Stores](checkpoint-stores.md)
|
|
- [Event Replay](../event-replay/README.md)
|