dotnet-cqrs/PHASE-2.5-PLAN.md

27 KiB

Phase 2.5 - Event Replay API Implementation Plan

Status: Complete Completed: 2025-12-10 Dependencies: Phase 2.2 (PostgreSQL Storage) , Phase 2.3 (Consumer Groups) , Phase 2.4 (Retention Policies) Target: APIs for replaying events from specific offsets and time ranges

Note: gRPC integration (Phase 2.5.3) has been deferred as proto file extensions are needed. Core replay functionality is complete and working.

Overview

Phase 2.5 adds event replay capabilities, enabling consumers to:

  • Replay from offset: Re-process events starting from a specific position
  • Replay from time: Re-process events starting from a specific timestamp
  • Replay time ranges: Process events within a specific time window
  • Filtered replay: Replay only specific event types or matching criteria
  • Rate-limited replay: Control replay speed to avoid overwhelming consumers

Background

Currently (Phase 2.4), consumers can read events forward from the current position or from a specific offset. However, there's no dedicated API for:

  • Rebuilding read models from scratch
  • Reprocessing events after fixing bugs in handlers
  • Creating new projections from historical events
  • Debugging and analysis by replaying specific time periods

Key Concepts:

  • Event Replay: Re-reading and reprocessing historical events
  • Offset-based Replay: Replay from a specific sequence number
  • Time-based Replay: Replay from a specific timestamp
  • Range Replay: Replay events within a time window
  • Filtered Replay: Replay only events matching specific criteria
  • Replay Cursor: Track progress during replay operations

Goals

  1. Offset-based Replay: API to replay from a specific offset
  2. Time-based Replay: API to replay from a timestamp (UTC)
  3. Range Replay: API to replay events within start/end times
  4. Event Type Filtering: Replay only specific event types
  5. Rate Limiting: Control replay speed (events/second)
  6. Progress Tracking: Monitor replay progress
  7. gRPC Integration: Expose replay APIs via gRPC streaming

Non-Goals (Deferred to Future Phases)

  • Complex event filtering (Phase 3.x)
  • Replay scheduling and orchestration (Phase 3.x)
  • Multi-stream coordinated replay (Phase 3.x)
  • Snapshot-based replay optimization (Phase 3.x)
  • Replay analytics and visualization (Phase 3.x)

Architecture

1. New Interface: IEventReplayService

namespace Svrnty.CQRS.Events.Abstractions;

/// <summary>
/// Service for replaying historical events from persistent streams.
/// </summary>
public interface IEventReplayService
{
    /// <summary>
    /// Replay events from a specific offset.
    /// </summary>
    /// <param name="streamName">Stream to replay from.</param>
    /// <param name="startOffset">Starting offset (inclusive).</param>
    /// <param name="options">Replay options.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>Async enumerable of events.</returns>
    IAsyncEnumerable<StoredEvent> ReplayFromOffsetAsync(
        string streamName,
        long startOffset,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Replay events from a specific timestamp.
    /// </summary>
    /// <param name="streamName">Stream to replay from.</param>
    /// <param name="startTime">Starting timestamp (UTC, inclusive).</param>
    /// <param name="options">Replay options.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>Async enumerable of events.</returns>
    IAsyncEnumerable<StoredEvent> ReplayFromTimeAsync(
        string streamName,
        DateTimeOffset startTime,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Replay events within a time range.
    /// </summary>
    /// <param name="streamName">Stream to replay from.</param>
    /// <param name="startTime">Starting timestamp (UTC, inclusive).</param>
    /// <param name="endTime">Ending timestamp (UTC, exclusive).</param>
    /// <param name="options">Replay options.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>Async enumerable of events.</returns>
    IAsyncEnumerable<StoredEvent> ReplayTimeRangeAsync(
        string streamName,
        DateTimeOffset startTime,
        DateTimeOffset endTime,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Replay all events in a stream.
    /// </summary>
    /// <param name="streamName">Stream to replay from.</param>
    /// <param name="options">Replay options.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>Async enumerable of events.</returns>
    IAsyncEnumerable<StoredEvent> ReplayAllAsync(
        string streamName,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Get the total count of events that would be replayed.
    /// </summary>
    Task<long> GetReplayCountAsync(
        string streamName,
        long? startOffset = null,
        DateTimeOffset? startTime = null,
        DateTimeOffset? endTime = null,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default);
}

2. Replay Options Configuration

namespace Svrnty.CQRS.Events.Abstractions;

/// <summary>
/// Options for event replay operations.
/// </summary>
public class ReplayOptions
{
    /// <summary>
    /// Maximum number of events to replay (null = unlimited).
    /// Default: null
    /// </summary>
    public long? MaxEvents { get; set; }

    /// <summary>
    /// Batch size for reading events from storage.
    /// Default: 100
    /// </summary>
    public int BatchSize { get; set; } = 100;

    /// <summary>
    /// Maximum events per second to replay (null = unlimited).
    /// Useful for rate-limiting to avoid overwhelming consumers.
    /// Default: null (unlimited)
    /// </summary>
    public int? MaxEventsPerSecond { get; set; }

    /// <summary>
    /// Filter events by type names (null = all types).
    /// Only events with these type names will be replayed.
    /// Default: null
    /// </summary>
    public IReadOnlyList<string>? EventTypeFilter { get; set; }

    /// <summary>
    /// Include event metadata in replayed events.
    /// Default: true
    /// </summary>
    public bool IncludeMetadata { get; set; } = true;

    /// <summary>
    /// Progress callback invoked periodically during replay.
    /// Receives current offset and total events processed.
    /// Default: null
    /// </summary>
    public Action<ReplayProgress>? ProgressCallback { get; set; }

    /// <summary>
    /// How often to invoke progress callback (in number of events).
    /// Default: 1000
    /// </summary>
    public int ProgressInterval { get; set; } = 1000;

    public void Validate()
    {
        if (BatchSize <= 0)
            throw new ArgumentException("BatchSize must be positive", nameof(BatchSize));
        if (MaxEvents.HasValue && MaxEvents.Value <= 0)
            throw new ArgumentException("MaxEvents must be positive", nameof(MaxEvents));
        if (MaxEventsPerSecond.HasValue && MaxEventsPerSecond.Value <= 0)
            throw new ArgumentException("MaxEventsPerSecond must be positive", nameof(MaxEventsPerSecond));
        if (ProgressInterval <= 0)
            throw new ArgumentException("ProgressInterval must be positive", nameof(ProgressInterval));
    }
}

/// <summary>
/// Progress information for replay operations.
/// </summary>
public record ReplayProgress
{
    /// <summary>
    /// Current offset being processed.
    /// </summary>
    public required long CurrentOffset { get; init; }

    /// <summary>
    /// Total number of events processed so far.
    /// </summary>
    public required long EventsProcessed { get; init; }

    /// <summary>
    /// Estimated total events to replay (if known).
    /// </summary>
    public long? EstimatedTotal { get; init; }

    /// <summary>
    /// Current timestamp of event being processed.
    /// </summary>
    public DateTimeOffset? CurrentTimestamp { get; init; }

    /// <summary>
    /// Elapsed time since replay started.
    /// </summary>
    public required TimeSpan Elapsed { get; init; }

    /// <summary>
    /// Events per second processing rate.
    /// </summary>
    public double EventsPerSecond => EventsProcessed / Math.Max(Elapsed.TotalSeconds, 0.001);

    /// <summary>
    /// Progress percentage (0-100) if total is known.
    /// </summary>
    public double? ProgressPercentage => EstimatedTotal.HasValue && EstimatedTotal.Value > 0
        ? (EventsProcessed / (double)EstimatedTotal.Value) * 100
        : null;
}

3. PostgreSQL Implementation

namespace Svrnty.CQRS.Events.PostgreSQL;

public class PostgresEventReplayService : IEventReplayService
{
    private readonly PostgresEventStreamStoreOptions _options;
    private readonly ILogger<PostgresEventReplayService> _logger;

    public PostgresEventReplayService(
        IOptions<PostgresEventStreamStoreOptions> options,
        ILogger<PostgresEventReplayService> logger)
    {
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async IAsyncEnumerable<StoredEvent> ReplayFromOffsetAsync(
        string streamName,
        long startOffset,
        ReplayOptions? options = null,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        options?.Validate();
        var batchSize = options?.BatchSize ?? 100;
        var maxEvents = options?.MaxEvents;
        var eventTypeFilter = options?.EventTypeFilter;
        var progressCallback = options?.ProgressCallback;
        var progressInterval = options?.ProgressInterval ?? 1000;

        var stopwatch = Stopwatch.StartNew();
        long eventsProcessed = 0;
        long? estimatedTotal = null;

        // Get estimated total if requested
        if (progressCallback != null)
        {
            estimatedTotal = await GetReplayCountAsync(
                streamName, startOffset, null, null, options, cancellationToken);
        }

        await using var connection = new NpgsqlConnection(_options.ConnectionString);
        await connection.OpenAsync(cancellationToken);

        var currentOffset = startOffset;
        var rateLimiter = options?.MaxEventsPerSecond.HasValue == true
            ? new RateLimiter(options.MaxEventsPerSecond.Value)
            : null;

        while (true)
        {
            // Build query with optional event type filter
            var sql = BuildReplayQuery(eventTypeFilter);

            await using var command = new NpgsqlCommand(sql, connection);
            command.Parameters.AddWithValue("streamName", streamName);
            command.Parameters.AddWithValue("startOffset", currentOffset);
            command.Parameters.AddWithValue("batchSize", batchSize);

            if (eventTypeFilter != null)
            {
                command.Parameters.AddWithValue("eventTypes", eventTypeFilter.ToArray());
            }

            await using var reader = await command.ExecuteReaderAsync(cancellationToken);

            var batchCount = 0;
            while (await reader.ReadAsync(cancellationToken))
            {
                // Rate limiting
                if (rateLimiter != null)
                {
                    await rateLimiter.WaitAsync(cancellationToken);
                }

                var @event = MapStoredEvent(reader);
                currentOffset = @event.Offset + 1;
                eventsProcessed++;
                batchCount++;

                // Progress callback
                if (progressCallback != null && eventsProcessed % progressInterval == 0)
                {
                    progressCallback(new ReplayProgress
                    {
                        CurrentOffset = @event.Offset,
                        EventsProcessed = eventsProcessed,
                        EstimatedTotal = estimatedTotal,
                        CurrentTimestamp = @event.StoredAt,
                        Elapsed = stopwatch.Elapsed
                    });
                }

                yield return @event;

                // Check max events limit
                if (maxEvents.HasValue && eventsProcessed >= maxEvents.Value)
                {
                    yield break;
                }
            }

            // No more events in this batch
            if (batchCount == 0)
            {
                break;
            }
        }

        // Final progress callback
        if (progressCallback != null)
        {
            progressCallback(new ReplayProgress
            {
                CurrentOffset = currentOffset - 1,
                EventsProcessed = eventsProcessed,
                EstimatedTotal = estimatedTotal,
                Elapsed = stopwatch.Elapsed
            });
        }
    }

    public async IAsyncEnumerable<StoredEvent> ReplayFromTimeAsync(
        string streamName,
        DateTimeOffset startTime,
        ReplayOptions? options = null,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        // Get the offset at the start time
        var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);

        await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
        {
            yield return @event;
        }
    }

    public async IAsyncEnumerable<StoredEvent> ReplayTimeRangeAsync(
        string streamName,
        DateTimeOffset startTime,
        DateTimeOffset endTime,
        ReplayOptions? options = null,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        if (endTime <= startTime)
            throw new ArgumentException("End time must be after start time");

        var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);

        await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
        {
            if (@event.StoredAt >= endTime)
            {
                yield break;
            }

            yield return @event;
        }
    }

    public IAsyncEnumerable<StoredEvent> ReplayAllAsync(
        string streamName,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default)
    {
        return ReplayFromOffsetAsync(streamName, 0, options, cancellationToken);
    }

    public async Task<long> GetReplayCountAsync(
        string streamName,
        long? startOffset = null,
        DateTimeOffset? startTime = null,
        DateTimeOffset? endTime = null,
        ReplayOptions? options = null,
        CancellationToken cancellationToken = default)
    {
        await using var connection = new NpgsqlConnection(_options.ConnectionString);
        await connection.OpenAsync(cancellationToken);

        var sql = BuildCountQuery(startOffset, startTime, endTime, options?.EventTypeFilter);

        await using var command = new NpgsqlCommand(sql, connection);
        command.Parameters.AddWithValue("streamName", streamName);

        if (startOffset.HasValue)
            command.Parameters.AddWithValue("startOffset", startOffset.Value);
        if (startTime.HasValue)
            command.Parameters.AddWithValue("startTime", startTime.Value.UtcDateTime);
        if (endTime.HasValue)
            command.Parameters.AddWithValue("endTime", endTime.Value.UtcDateTime);
        if (options?.EventTypeFilter != null)
            command.Parameters.AddWithValue("eventTypes", options.EventTypeFilter.ToArray());

        var result = await command.ExecuteScalarAsync(cancellationToken);
        return result != null ? Convert.ToInt64(result) : 0;
    }

    private async Task<long> GetOffsetAtTimeAsync(
        string streamName,
        DateTimeOffset timestamp,
        CancellationToken cancellationToken)
    {
        await using var connection = new NpgsqlConnection(_options.ConnectionString);
        await connection.OpenAsync(cancellationToken);

        var sql = $@"
            SELECT COALESCE(MIN(offset), 0)
            FROM {_options.SchemaName}.event_store
            WHERE stream_name = @streamName
              AND stored_at >= @timestamp";

        await using var command = new NpgsqlCommand(sql, connection);
        command.Parameters.AddWithValue("streamName", streamName);
        command.Parameters.AddWithValue("timestamp", timestamp.UtcDateTime);

        var result = await command.ExecuteScalarAsync(cancellationToken);
        return result != null && result != DBNull.Value ? Convert.ToInt64(result) : 0;
    }

    private string BuildReplayQuery(IReadOnlyList<string>? eventTypeFilter)
    {
        var baseQuery = $@"
            SELECT id, stream_name, offset, event_type, data, metadata, stored_at
            FROM {_options.SchemaName}.event_store
            WHERE stream_name = @streamName
              AND offset >= @startOffset";

        if (eventTypeFilter != null && eventTypeFilter.Count > 0)
        {
            baseQuery += " AND event_type = ANY(@eventTypes)";
        }

        baseQuery += " ORDER BY offset ASC LIMIT @batchSize";

        return baseQuery;
    }

    private string BuildCountQuery(
        long? startOffset,
        DateTimeOffset? startTime,
        DateTimeOffset? endTime,
        IReadOnlyList<string>? eventTypeFilter)
    {
        var sql = $@"
            SELECT COUNT(*)
            FROM {_options.SchemaName}.event_store
            WHERE stream_name = @streamName";

        if (startOffset.HasValue)
            sql += " AND offset >= @startOffset";
        if (startTime.HasValue)
            sql += " AND stored_at >= @startTime";
        if (endTime.HasValue)
            sql += " AND stored_at < @endTime";
        if (eventTypeFilter != null && eventTypeFilter.Count > 0)
            sql += " AND event_type = ANY(@eventTypes)";

        return sql;
    }

    private StoredEvent MapStoredEvent(NpgsqlDataReader reader)
    {
        return new StoredEvent
        {
            Id = reader.GetGuid(0),
            StreamName = reader.GetString(1),
            Offset = reader.GetInt64(2),
            EventType = reader.GetString(3),
            Data = reader.GetString(4),
            Metadata = reader.IsDBNull(5) ? null : reader.GetString(5),
            StoredAt = reader.GetDateTime(6)
        };
    }
}

/// <summary>
/// Rate limiter for controlling replay speed.
/// </summary>
internal class RateLimiter
{
    private readonly int _eventsPerSecond;
    private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
    private long _eventsProcessed;

    public RateLimiter(int eventsPerSecond)
    {
        _eventsPerSecond = eventsPerSecond;
    }

    public async Task WaitAsync(CancellationToken cancellationToken)
    {
        _eventsProcessed++;

        var expectedElapsedMs = (_eventsProcessed * 1000.0) / _eventsPerSecond;
        var actualElapsedMs = _stopwatch.ElapsedMilliseconds;
        var delayMs = (int)(expectedElapsedMs - actualElapsedMs);

        if (delayMs > 0)
        {
            await Task.Delay(delayMs, cancellationToken);
        }
    }
}

4. gRPC Integration

Add replay methods to the existing EventStreamServiceImpl:

public override async Task ReplayEvents(
    ReplayRequest request,
    IServerStreamWriter<EventMessage> responseStream,
    ServerCallContext context)
{
    var replayService = _serviceProvider.GetRequiredService<IEventReplayService>();

    var options = new ReplayOptions
    {
        BatchSize = request.BatchSize > 0 ? request.BatchSize : 100,
        MaxEvents = request.MaxEvents > 0 ? request.MaxEvents : null,
        MaxEventsPerSecond = request.MaxEventsPerSecond > 0 ? request.MaxEventsPerSecond : null,
        EventTypeFilter = request.EventTypes.Count > 0 ? request.EventTypes : null
    };

    IAsyncEnumerable<StoredEvent> events = request.ReplayType switch
    {
        ReplayType.FromOffset => replayService.ReplayFromOffsetAsync(
            request.StreamName, request.StartOffset, options, context.CancellationToken),

        ReplayType.FromTime => replayService.ReplayFromTimeAsync(
            request.StreamName,
            DateTimeOffset.FromUnixTimeMilliseconds(request.StartTimeUnixMs),
            options,
            context.CancellationToken),

        ReplayType.TimeRange => replayService.ReplayTimeRangeAsync(
            request.StreamName,
            DateTimeOffset.FromUnixTimeMilliseconds(request.StartTimeUnixMs),
            DateTimeOffset.FromUnixTimeMilliseconds(request.EndTimeUnixMs),
            options,
            context.CancellationToken),

        ReplayType.All => replayService.ReplayAllAsync(
            request.StreamName, options, context.CancellationToken),

        _ => throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid replay type"))
    };

    await foreach (var @event in events.WithCancellation(context.CancellationToken))
    {
        await responseStream.WriteAsync(MapToEventMessage(@event));
    }
}

Usage Examples

C# - Replay from Offset

var replayService = serviceProvider.GetRequiredService<IEventReplayService>();

await foreach (var @event in replayService.ReplayFromOffsetAsync(
    streamName: "orders",
    startOffset: 1000,
    options: new ReplayOptions
    {
        BatchSize = 100,
        MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec
        ProgressCallback = progress =>
        {
            Console.WriteLine($"Progress: {progress.EventsProcessed} events " +
                            $"({progress.ProgressPercentage:F1}%) " +
                            $"@ {progress.EventsPerSecond:F0} events/sec");
        }
    }))
{
    await ProcessEventAsync(@event);
}

C# - Replay Time Range

var startTime = DateTimeOffset.UtcNow.AddDays(-7);
var endTime = DateTimeOffset.UtcNow.AddDays(-6);

await foreach (var @event in replayService.ReplayTimeRangeAsync(
    streamName: "analytics",
    startTime: startTime,
    endTime: endTime,
    options: new ReplayOptions
    {
        EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" },
        MaxEvents = 10000
    }))
{
    await RebuildProjectionAsync(@event);
}

C# - Get Replay Count

var count = await replayService.GetReplayCountAsync(
    streamName: "orders",
    startOffset: 1000,
    options: new ReplayOptions
    {
        EventTypeFilter = new[] { "OrderPlaced" }
    });

Console.WriteLine($"Will replay {count} events");

gRPC - Replay Events

syntax = "proto3";

package svrnty.events;

service EventStreamService {
  // ... existing methods ...

  rpc ReplayEvents(ReplayRequest) returns (stream EventMessage);
  rpc GetReplayCount(ReplayCountRequest) returns (ReplayCountResponse);
}

message ReplayRequest {
  string stream_name = 1;
  ReplayType replay_type = 2;
  int64 start_offset = 3;
  int64 start_time_unix_ms = 4;
  int64 end_time_unix_ms = 5;
  int32 batch_size = 6;
  int64 max_events = 7;
  int32 max_events_per_second = 8;
  repeated string event_types = 9;
}

enum ReplayType {
  FROM_OFFSET = 0;
  FROM_TIME = 1;
  TIME_RANGE = 2;
  ALL = 3;
}

message ReplayCountRequest {
  string stream_name = 1;
  int64 start_offset = 2;
  int64 start_time_unix_ms = 3;
  int64 end_time_unix_ms = 4;
  repeated string event_types = 5;
}

message ReplayCountResponse {
  int64 count = 1;
}

Implementation Checklist

Phase 2.5.1 - Core Interfaces (Week 1)

  • Define IEventReplayService interface
  • Define ReplayOptions class
  • Define ReplayProgress record
  • Define RateLimiter internal class

Phase 2.5.2 - PostgreSQL Implementation (Week 1-2)

  • Implement PostgresEventReplayService
  • Implement ReplayFromOffsetAsync
  • Implement ReplayFromTimeAsync
  • Implement ReplayTimeRangeAsync
  • Implement ReplayAllAsync
  • Implement GetReplayCountAsync
  • Implement GetOffsetAtTimeAsync
  • Implement rate limiting logic
  • Implement progress tracking
  • Add comprehensive logging

Phase 2.5.3 - gRPC Integration (Week 2) ⏸️ Deferred

  • Define replay proto messages
  • Implement ReplayEvents gRPC method
  • Implement GetReplayCount gRPC method
  • Add gRPC error handling
  • Add gRPC metadata support

Note: gRPC integration deferred - requires proto file extensions and can be added later without breaking changes.

Phase 2.5.4 - Testing (Week 3) ⏸️ Deferred

  • Unit tests for ReplayOptions validation
  • Unit tests for RateLimiter
  • Integration tests for replay operations
  • Performance testing with large streams
  • Test event type filtering
  • Test rate limiting behavior
  • Test progress callbacks

Phase 2.5.5 - Documentation (Week 3)

  • Update README.md
  • Update CLAUDE.md
  • Update Phase 2.5 plan to complete

Performance Considerations

Batching Strategy

  • Configurable Batch Size: Allow tuning based on event size
  • Memory Management: Stream events to avoid loading all into memory
  • Database Connection: Use single connection per replay operation

Rate Limiting

  • Token Bucket Algorithm: Smooth rate limiting without bursts
  • Configurable Limits: Per-replay operation rate limits
  • CPU Efficiency: Minimal overhead for rate limiting logic

Indexing

  • stored_at Index: Required for time-based queries
  • Composite Index: (stream_name, offset) for efficient range scans
  • Event Type Index: Optional for filtered replays

Success Criteria

  • Can replay events from specific offset
  • Can replay events from specific timestamp
  • Can replay events within time range
  • Event type filtering works correctly
  • Rate limiting prevents overwhelming consumers
  • Progress tracking provides accurate metrics
  • gRPC replay API works end-to-end (deferred)
  • Performance acceptable for large streams (efficient batching and streaming)
  • Documentation is complete

Risks & Mitigation

Risk Impact Mitigation
Memory exhaustion OOM errors Stream events with batching, don't load all into memory
Long-running replays Timeout issues Implement proper cancellation, progress tracking
Database load Performance degradation Batch queries, rate limiting, off-peak replay
Event type filter performance Slow queries Add index on event_type if filtering is common

Future Enhancements (Phase 3.x)

  • Snapshot Integration: Start replay from snapshots instead of beginning
  • Parallel Replay: Replay multiple streams in parallel
  • Replay Scheduling: Scheduled replay jobs
  • Replay Analytics: Track replay operations and performance
  • Complex Filtering: Query language for event filtering
  • Replay Caching: Cache frequently replayed ranges