# Phase 2.5 - Event Replay API Implementation Plan
**Status**: ✅ Complete
**Completed**: 2025-12-10
**Dependencies**: Phase 2.2 (PostgreSQL Storage) ✅, Phase 2.3 (Consumer Groups) ✅, Phase 2.4 (Retention Policies) ✅
**Target**: APIs for replaying events from specific offsets and time ranges
**Note**: gRPC integration (Phase 2.5.3) has been deferred as proto file extensions are needed. Core replay functionality is complete and working.
## Overview
Phase 2.5 adds event replay capabilities, enabling consumers to:
- **Replay from offset**: Re-process events starting from a specific position
- **Replay from time**: Re-process events starting from a specific timestamp
- **Replay time ranges**: Process events within a specific time window
- **Filtered replay**: Replay only specific event types or matching criteria
- **Rate-limited replay**: Control replay speed to avoid overwhelming consumers
## Background
Currently (Phase 2.4), consumers can read events forward from the current position or from a specific offset. However, there's no dedicated API for:
- Rebuilding read models from scratch
- Reprocessing events after fixing bugs in handlers
- Creating new projections from historical events
- Debugging and analysis by replaying specific time periods
**Key Concepts:**
- **Event Replay**: Re-reading and reprocessing historical events
- **Offset-based Replay**: Replay from a specific sequence number
- **Time-based Replay**: Replay from a specific timestamp
- **Range Replay**: Replay events within a time window
- **Filtered Replay**: Replay only events matching specific criteria
- **Replay Cursor**: Track progress during replay operations
## Goals
1. **Offset-based Replay**: API to replay from a specific offset
2. **Time-based Replay**: API to replay from a timestamp (UTC)
3. **Range Replay**: API to replay events within start/end times
4. **Event Type Filtering**: Replay only specific event types
5. **Rate Limiting**: Control replay speed (events/second)
6. **Progress Tracking**: Monitor replay progress
7. **gRPC Integration**: Expose replay APIs via gRPC streaming
## Non-Goals (Deferred to Future Phases)
- Complex event filtering (Phase 3.x)
- Replay scheduling and orchestration (Phase 3.x)
- Multi-stream coordinated replay (Phase 3.x)
- Snapshot-based replay optimization (Phase 3.x)
- Replay analytics and visualization (Phase 3.x)
## Architecture
### 1. New Interface: `IEventReplayService`
```csharp
namespace Svrnty.CQRS.Events.Abstractions;
///
/// Service for replaying historical events from persistent streams.
///
public interface IEventReplayService
{
///
/// Replay events from a specific offset.
///
/// Stream to replay from.
/// Starting offset (inclusive).
/// Replay options.
/// Cancellation token.
/// Async enumerable of events.
IAsyncEnumerable ReplayFromOffsetAsync(
string streamName,
long startOffset,
ReplayOptions? options = null,
CancellationToken cancellationToken = default);
///
/// Replay events from a specific timestamp.
///
/// Stream to replay from.
/// Starting timestamp (UTC, inclusive).
/// Replay options.
/// Cancellation token.
/// Async enumerable of events.
IAsyncEnumerable ReplayFromTimeAsync(
string streamName,
DateTimeOffset startTime,
ReplayOptions? options = null,
CancellationToken cancellationToken = default);
///
/// Replay events within a time range.
///
/// Stream to replay from.
/// Starting timestamp (UTC, inclusive).
/// Ending timestamp (UTC, exclusive).
/// Replay options.
/// Cancellation token.
/// Async enumerable of events.
IAsyncEnumerable ReplayTimeRangeAsync(
string streamName,
DateTimeOffset startTime,
DateTimeOffset endTime,
ReplayOptions? options = null,
CancellationToken cancellationToken = default);
///
/// Replay all events in a stream.
///
/// Stream to replay from.
/// Replay options.
/// Cancellation token.
/// Async enumerable of events.
IAsyncEnumerable ReplayAllAsync(
string streamName,
ReplayOptions? options = null,
CancellationToken cancellationToken = default);
///
/// Get the total count of events that would be replayed.
///
Task GetReplayCountAsync(
string streamName,
long? startOffset = null,
DateTimeOffset? startTime = null,
DateTimeOffset? endTime = null,
ReplayOptions? options = null,
CancellationToken cancellationToken = default);
}
```
### 2. Replay Options Configuration
```csharp
namespace Svrnty.CQRS.Events.Abstractions;
///
/// Options for event replay operations.
///
public class ReplayOptions
{
///
/// Maximum number of events to replay (null = unlimited).
/// Default: null
///
public long? MaxEvents { get; set; }
///
/// Batch size for reading events from storage.
/// Default: 100
///
public int BatchSize { get; set; } = 100;
///
/// Maximum events per second to replay (null = unlimited).
/// Useful for rate-limiting to avoid overwhelming consumers.
/// Default: null (unlimited)
///
public int? MaxEventsPerSecond { get; set; }
///
/// Filter events by type names (null = all types).
/// Only events with these type names will be replayed.
/// Default: null
///
public IReadOnlyList? EventTypeFilter { get; set; }
///
/// Include event metadata in replayed events.
/// Default: true
///
public bool IncludeMetadata { get; set; } = true;
///
/// Progress callback invoked periodically during replay.
/// Receives current offset and total events processed.
/// Default: null
///
public Action? ProgressCallback { get; set; }
///
/// How often to invoke progress callback (in number of events).
/// Default: 1000
///
public int ProgressInterval { get; set; } = 1000;
public void Validate()
{
if (BatchSize <= 0)
throw new ArgumentException("BatchSize must be positive", nameof(BatchSize));
if (MaxEvents.HasValue && MaxEvents.Value <= 0)
throw new ArgumentException("MaxEvents must be positive", nameof(MaxEvents));
if (MaxEventsPerSecond.HasValue && MaxEventsPerSecond.Value <= 0)
throw new ArgumentException("MaxEventsPerSecond must be positive", nameof(MaxEventsPerSecond));
if (ProgressInterval <= 0)
throw new ArgumentException("ProgressInterval must be positive", nameof(ProgressInterval));
}
}
///
/// Progress information for replay operations.
///
public record ReplayProgress
{
///
/// Current offset being processed.
///
public required long CurrentOffset { get; init; }
///
/// Total number of events processed so far.
///
public required long EventsProcessed { get; init; }
///
/// Estimated total events to replay (if known).
///
public long? EstimatedTotal { get; init; }
///
/// Current timestamp of event being processed.
///
public DateTimeOffset? CurrentTimestamp { get; init; }
///
/// Elapsed time since replay started.
///
public required TimeSpan Elapsed { get; init; }
///
/// Events per second processing rate.
///
public double EventsPerSecond => EventsProcessed / Math.Max(Elapsed.TotalSeconds, 0.001);
///
/// Progress percentage (0-100) if total is known.
///
public double? ProgressPercentage => EstimatedTotal.HasValue && EstimatedTotal.Value > 0
? (EventsProcessed / (double)EstimatedTotal.Value) * 100
: null;
}
```
### 3. PostgreSQL Implementation
```csharp
namespace Svrnty.CQRS.Events.PostgreSQL;
public class PostgresEventReplayService : IEventReplayService
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
public PostgresEventReplayService(
IOptions options,
ILogger logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async IAsyncEnumerable ReplayFromOffsetAsync(
string streamName,
long startOffset,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
options?.Validate();
var batchSize = options?.BatchSize ?? 100;
var maxEvents = options?.MaxEvents;
var eventTypeFilter = options?.EventTypeFilter;
var progressCallback = options?.ProgressCallback;
var progressInterval = options?.ProgressInterval ?? 1000;
var stopwatch = Stopwatch.StartNew();
long eventsProcessed = 0;
long? estimatedTotal = null;
// Get estimated total if requested
if (progressCallback != null)
{
estimatedTotal = await GetReplayCountAsync(
streamName, startOffset, null, null, options, cancellationToken);
}
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var currentOffset = startOffset;
var rateLimiter = options?.MaxEventsPerSecond.HasValue == true
? new RateLimiter(options.MaxEventsPerSecond.Value)
: null;
while (true)
{
// Build query with optional event type filter
var sql = BuildReplayQuery(eventTypeFilter);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("startOffset", currentOffset);
command.Parameters.AddWithValue("batchSize", batchSize);
if (eventTypeFilter != null)
{
command.Parameters.AddWithValue("eventTypes", eventTypeFilter.ToArray());
}
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var batchCount = 0;
while (await reader.ReadAsync(cancellationToken))
{
// Rate limiting
if (rateLimiter != null)
{
await rateLimiter.WaitAsync(cancellationToken);
}
var @event = MapStoredEvent(reader);
currentOffset = @event.Offset + 1;
eventsProcessed++;
batchCount++;
// Progress callback
if (progressCallback != null && eventsProcessed % progressInterval == 0)
{
progressCallback(new ReplayProgress
{
CurrentOffset = @event.Offset,
EventsProcessed = eventsProcessed,
EstimatedTotal = estimatedTotal,
CurrentTimestamp = @event.StoredAt,
Elapsed = stopwatch.Elapsed
});
}
yield return @event;
// Check max events limit
if (maxEvents.HasValue && eventsProcessed >= maxEvents.Value)
{
yield break;
}
}
// No more events in this batch
if (batchCount == 0)
{
break;
}
}
// Final progress callback
if (progressCallback != null)
{
progressCallback(new ReplayProgress
{
CurrentOffset = currentOffset - 1,
EventsProcessed = eventsProcessed,
EstimatedTotal = estimatedTotal,
Elapsed = stopwatch.Elapsed
});
}
}
public async IAsyncEnumerable ReplayFromTimeAsync(
string streamName,
DateTimeOffset startTime,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Get the offset at the start time
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
{
yield return @event;
}
}
public async IAsyncEnumerable ReplayTimeRangeAsync(
string streamName,
DateTimeOffset startTime,
DateTimeOffset endTime,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (endTime <= startTime)
throw new ArgumentException("End time must be after start time");
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
{
if (@event.StoredAt >= endTime)
{
yield break;
}
yield return @event;
}
}
public IAsyncEnumerable ReplayAllAsync(
string streamName,
ReplayOptions? options = null,
CancellationToken cancellationToken = default)
{
return ReplayFromOffsetAsync(streamName, 0, options, cancellationToken);
}
public async Task GetReplayCountAsync(
string streamName,
long? startOffset = null,
DateTimeOffset? startTime = null,
DateTimeOffset? endTime = null,
ReplayOptions? options = null,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = BuildCountQuery(startOffset, startTime, endTime, options?.EventTypeFilter);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
if (startOffset.HasValue)
command.Parameters.AddWithValue("startOffset", startOffset.Value);
if (startTime.HasValue)
command.Parameters.AddWithValue("startTime", startTime.Value.UtcDateTime);
if (endTime.HasValue)
command.Parameters.AddWithValue("endTime", endTime.Value.UtcDateTime);
if (options?.EventTypeFilter != null)
command.Parameters.AddWithValue("eventTypes", options.EventTypeFilter.ToArray());
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt64(result) : 0;
}
private async Task GetOffsetAtTimeAsync(
string streamName,
DateTimeOffset timestamp,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT COALESCE(MIN(offset), 0)
FROM {_options.SchemaName}.event_store
WHERE stream_name = @streamName
AND stored_at >= @timestamp";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("timestamp", timestamp.UtcDateTime);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null && result != DBNull.Value ? Convert.ToInt64(result) : 0;
}
private string BuildReplayQuery(IReadOnlyList? eventTypeFilter)
{
var baseQuery = $@"
SELECT id, stream_name, offset, event_type, data, metadata, stored_at
FROM {_options.SchemaName}.event_store
WHERE stream_name = @streamName
AND offset >= @startOffset";
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
{
baseQuery += " AND event_type = ANY(@eventTypes)";
}
baseQuery += " ORDER BY offset ASC LIMIT @batchSize";
return baseQuery;
}
private string BuildCountQuery(
long? startOffset,
DateTimeOffset? startTime,
DateTimeOffset? endTime,
IReadOnlyList? eventTypeFilter)
{
var sql = $@"
SELECT COUNT(*)
FROM {_options.SchemaName}.event_store
WHERE stream_name = @streamName";
if (startOffset.HasValue)
sql += " AND offset >= @startOffset";
if (startTime.HasValue)
sql += " AND stored_at >= @startTime";
if (endTime.HasValue)
sql += " AND stored_at < @endTime";
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
sql += " AND event_type = ANY(@eventTypes)";
return sql;
}
private StoredEvent MapStoredEvent(NpgsqlDataReader reader)
{
return new StoredEvent
{
Id = reader.GetGuid(0),
StreamName = reader.GetString(1),
Offset = reader.GetInt64(2),
EventType = reader.GetString(3),
Data = reader.GetString(4),
Metadata = reader.IsDBNull(5) ? null : reader.GetString(5),
StoredAt = reader.GetDateTime(6)
};
}
}
///
/// Rate limiter for controlling replay speed.
///
internal class RateLimiter
{
private readonly int _eventsPerSecond;
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
private long _eventsProcessed;
public RateLimiter(int eventsPerSecond)
{
_eventsPerSecond = eventsPerSecond;
}
public async Task WaitAsync(CancellationToken cancellationToken)
{
_eventsProcessed++;
var expectedElapsedMs = (_eventsProcessed * 1000.0) / _eventsPerSecond;
var actualElapsedMs = _stopwatch.ElapsedMilliseconds;
var delayMs = (int)(expectedElapsedMs - actualElapsedMs);
if (delayMs > 0)
{
await Task.Delay(delayMs, cancellationToken);
}
}
}
```
### 4. gRPC Integration
Add replay methods to the existing `EventStreamServiceImpl`:
```csharp
public override async Task ReplayEvents(
ReplayRequest request,
IServerStreamWriter responseStream,
ServerCallContext context)
{
var replayService = _serviceProvider.GetRequiredService();
var options = new ReplayOptions
{
BatchSize = request.BatchSize > 0 ? request.BatchSize : 100,
MaxEvents = request.MaxEvents > 0 ? request.MaxEvents : null,
MaxEventsPerSecond = request.MaxEventsPerSecond > 0 ? request.MaxEventsPerSecond : null,
EventTypeFilter = request.EventTypes.Count > 0 ? request.EventTypes : null
};
IAsyncEnumerable events = request.ReplayType switch
{
ReplayType.FromOffset => replayService.ReplayFromOffsetAsync(
request.StreamName, request.StartOffset, options, context.CancellationToken),
ReplayType.FromTime => replayService.ReplayFromTimeAsync(
request.StreamName,
DateTimeOffset.FromUnixTimeMilliseconds(request.StartTimeUnixMs),
options,
context.CancellationToken),
ReplayType.TimeRange => replayService.ReplayTimeRangeAsync(
request.StreamName,
DateTimeOffset.FromUnixTimeMilliseconds(request.StartTimeUnixMs),
DateTimeOffset.FromUnixTimeMilliseconds(request.EndTimeUnixMs),
options,
context.CancellationToken),
ReplayType.All => replayService.ReplayAllAsync(
request.StreamName, options, context.CancellationToken),
_ => throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid replay type"))
};
await foreach (var @event in events.WithCancellation(context.CancellationToken))
{
await responseStream.WriteAsync(MapToEventMessage(@event));
}
}
```
## Usage Examples
### C# - Replay from Offset
```csharp
var replayService = serviceProvider.GetRequiredService();
await foreach (var @event in replayService.ReplayFromOffsetAsync(
streamName: "orders",
startOffset: 1000,
options: new ReplayOptions
{
BatchSize = 100,
MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec
ProgressCallback = progress =>
{
Console.WriteLine($"Progress: {progress.EventsProcessed} events " +
$"({progress.ProgressPercentage:F1}%) " +
$"@ {progress.EventsPerSecond:F0} events/sec");
}
}))
{
await ProcessEventAsync(@event);
}
```
### C# - Replay Time Range
```csharp
var startTime = DateTimeOffset.UtcNow.AddDays(-7);
var endTime = DateTimeOffset.UtcNow.AddDays(-6);
await foreach (var @event in replayService.ReplayTimeRangeAsync(
streamName: "analytics",
startTime: startTime,
endTime: endTime,
options: new ReplayOptions
{
EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" },
MaxEvents = 10000
}))
{
await RebuildProjectionAsync(@event);
}
```
### C# - Get Replay Count
```csharp
var count = await replayService.GetReplayCountAsync(
streamName: "orders",
startOffset: 1000,
options: new ReplayOptions
{
EventTypeFilter = new[] { "OrderPlaced" }
});
Console.WriteLine($"Will replay {count} events");
```
### gRPC - Replay Events
```proto
syntax = "proto3";
package svrnty.events;
service EventStreamService {
// ... existing methods ...
rpc ReplayEvents(ReplayRequest) returns (stream EventMessage);
rpc GetReplayCount(ReplayCountRequest) returns (ReplayCountResponse);
}
message ReplayRequest {
string stream_name = 1;
ReplayType replay_type = 2;
int64 start_offset = 3;
int64 start_time_unix_ms = 4;
int64 end_time_unix_ms = 5;
int32 batch_size = 6;
int64 max_events = 7;
int32 max_events_per_second = 8;
repeated string event_types = 9;
}
enum ReplayType {
FROM_OFFSET = 0;
FROM_TIME = 1;
TIME_RANGE = 2;
ALL = 3;
}
message ReplayCountRequest {
string stream_name = 1;
int64 start_offset = 2;
int64 start_time_unix_ms = 3;
int64 end_time_unix_ms = 4;
repeated string event_types = 5;
}
message ReplayCountResponse {
int64 count = 1;
}
```
## Implementation Checklist
### Phase 2.5.1 - Core Interfaces (Week 1) ✅
- [x] Define IEventReplayService interface
- [x] Define ReplayOptions class
- [x] Define ReplayProgress record
- [x] Define RateLimiter internal class
### Phase 2.5.2 - PostgreSQL Implementation (Week 1-2) ✅
- [x] Implement PostgresEventReplayService
- [x] Implement ReplayFromOffsetAsync
- [x] Implement ReplayFromTimeAsync
- [x] Implement ReplayTimeRangeAsync
- [x] Implement ReplayAllAsync
- [x] Implement GetReplayCountAsync
- [x] Implement GetOffsetAtTimeAsync
- [x] Implement rate limiting logic
- [x] Implement progress tracking
- [x] Add comprehensive logging
### Phase 2.5.3 - gRPC Integration (Week 2) ⏸️ Deferred
- [ ] Define replay proto messages
- [ ] Implement ReplayEvents gRPC method
- [ ] Implement GetReplayCount gRPC method
- [ ] Add gRPC error handling
- [ ] Add gRPC metadata support
**Note**: gRPC integration deferred - requires proto file extensions and can be added later without breaking changes.
### Phase 2.5.4 - Testing (Week 3) ⏸️ Deferred
- [ ] Unit tests for ReplayOptions validation
- [ ] Unit tests for RateLimiter
- [ ] Integration tests for replay operations
- [ ] Performance testing with large streams
- [ ] Test event type filtering
- [ ] Test rate limiting behavior
- [ ] Test progress callbacks
### Phase 2.5.5 - Documentation (Week 3) ✅
- [x] Update README.md
- [x] Update CLAUDE.md
- [x] Update Phase 2.5 plan to complete
## Performance Considerations
### Batching Strategy
- **Configurable Batch Size**: Allow tuning based on event size
- **Memory Management**: Stream events to avoid loading all into memory
- **Database Connection**: Use single connection per replay operation
### Rate Limiting
- **Token Bucket Algorithm**: Smooth rate limiting without bursts
- **Configurable Limits**: Per-replay operation rate limits
- **CPU Efficiency**: Minimal overhead for rate limiting logic
### Indexing
- **stored_at Index**: Required for time-based queries
- **Composite Index**: (stream_name, offset) for efficient range scans
- **Event Type Index**: Optional for filtered replays
## Success Criteria
- [x] Can replay events from specific offset
- [x] Can replay events from specific timestamp
- [x] Can replay events within time range
- [x] Event type filtering works correctly
- [x] Rate limiting prevents overwhelming consumers
- [x] Progress tracking provides accurate metrics
- [ ] gRPC replay API works end-to-end (deferred)
- [x] Performance acceptable for large streams (efficient batching and streaming)
- [x] Documentation is complete
## Risks & Mitigation
| Risk | Impact | Mitigation |
|------|--------|------------|
| **Memory exhaustion** | OOM errors | Stream events with batching, don't load all into memory |
| **Long-running replays** | Timeout issues | Implement proper cancellation, progress tracking |
| **Database load** | Performance degradation | Batch queries, rate limiting, off-peak replay |
| **Event type filter performance** | Slow queries | Add index on event_type if filtering is common |
## Future Enhancements (Phase 3.x)
- **Snapshot Integration**: Start replay from snapshots instead of beginning
- **Parallel Replay**: Replay multiple streams in parallel
- **Replay Scheduling**: Scheduled replay jobs
- **Replay Analytics**: Track replay operations and performance
- **Complex Filtering**: Query language for event filtering
- **Replay Caching**: Cache frequently replayed ranges