92 lines
2.1 KiB
Markdown
92 lines
2.1 KiB
Markdown
# Replay from Offset
|
|
|
|
Replay events starting from a specific offset position.
|
|
|
|
## Usage
|
|
|
|
```csharp
|
|
var replayService = serviceProvider.GetRequiredService<IEventReplayService>();
|
|
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync(
|
|
streamName: "orders",
|
|
startOffset: 1000,
|
|
options: new ReplayOptions
|
|
{
|
|
BatchSize = 100,
|
|
MaxEvents = 50000,
|
|
MaxEventsPerSecond = 1000
|
|
}))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
## Options
|
|
|
|
```csharp
|
|
public class ReplayOptions
|
|
{
|
|
public int BatchSize { get; set; } = 100;
|
|
public long? MaxEvents { get; set; } // null = unlimited
|
|
public int? MaxEventsPerSecond { get; set; } // null = unlimited
|
|
public string[]? EventTypeFilter { get; set; }
|
|
public Action<ReplayProgress>? ProgressCallback { get; set; }
|
|
public TimeSpan ProgressInterval { get; set; } = TimeSpan.FromSeconds(1);
|
|
}
|
|
```
|
|
|
|
## Use Cases
|
|
|
|
### Rebuild Projection from Scratch
|
|
|
|
```csharp
|
|
// Reset checkpoint
|
|
await _checkpointStore.DeleteCheckpointAsync("order-summary");
|
|
|
|
// Replay all events
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync("orders", startOffset: 0))
|
|
{
|
|
await _projection.HandleAsync(@event);
|
|
await _checkpointStore.SaveCheckpointAsync("order-summary", @event.Offset);
|
|
}
|
|
```
|
|
|
|
### Replay Specific Range
|
|
|
|
```csharp
|
|
// Replay events 1000-2000
|
|
long currentOffset = 1000;
|
|
const long endOffset = 2000;
|
|
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync("orders", startOffset: 1000))
|
|
{
|
|
if (@event.Offset > endOffset)
|
|
break;
|
|
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Resume from Checkpoint
|
|
|
|
```csharp
|
|
// Get last processed offset
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync("analytics");
|
|
|
|
// Resume from next offset
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync(
|
|
"orders",
|
|
startOffset: checkpoint + 1))
|
|
{
|
|
await ProcessAnalyticsAsync(@event);
|
|
await _checkpointStore.SaveCheckpointAsync("analytics", @event.Offset);
|
|
}
|
|
```
|
|
|
|
## See Also
|
|
|
|
- [Event Replay Overview](README.md)
|
|
- [Replay from Time](replay-from-time.md)
|
|
- [Rate Limiting](rate-limiting.md)
|
|
- [Progress Tracking](progress-tracking.md)
|