using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Replay; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Models; namespace Svrnty.CQRS.Events.PostgreSQL.Replay; /// /// PostgreSQL-based implementation of IEventReplayService. /// Provides efficient event replay with batching, rate limiting, and progress tracking. /// public class PostgresEventReplayService : IEventReplayService { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresEventReplayService( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}"; /// public async IAsyncEnumerable ReplayFromOffsetAsync( string streamName, long startOffset, ReplayOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); options?.Validate(); var batchSize = options?.BatchSize ?? 100; var maxEvents = options?.MaxEvents; var eventTypeFilter = options?.EventTypeFilter; var progressCallback = options?.ProgressCallback; var progressInterval = options?.ProgressInterval ?? 1000; _logger.LogInformation( "Starting replay from offset {StartOffset} for stream {StreamName}. BatchSize={BatchSize}, MaxEvents={MaxEvents}", startOffset, streamName, batchSize, maxEvents); var stopwatch = Stopwatch.StartNew(); long eventsProcessed = 0; long? estimatedTotal = null; // Get estimated total if progress callback is provided if (progressCallback != null) { estimatedTotal = await GetReplayCountAsync( streamName, startOffset, null, null, options, cancellationToken); _logger.LogInformation( "Estimated {EstimatedTotal} events to replay for stream {StreamName}", estimatedTotal, streamName); } await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var currentOffset = startOffset; var rateLimiter = options?.MaxEventsPerSecond.HasValue == true ? new RateLimiter(options.MaxEventsPerSecond.Value) : null; while (true) { cancellationToken.ThrowIfCancellationRequested(); // Build query with optional event type filter var sql = BuildReplayQuery(eventTypeFilter); await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("startOffset", currentOffset); command.Parameters.AddWithValue("batchSize", batchSize); if (eventTypeFilter != null && eventTypeFilter.Count > 0) { command.Parameters.AddWithValue("eventTypes", eventTypeFilter.ToArray()); } await using var reader = await command.ExecuteReaderAsync(cancellationToken); var batchCount = 0; while (await reader.ReadAsync(cancellationToken)) { // Rate limiting if (rateLimiter != null) { await rateLimiter.WaitAsync(cancellationToken); } var @event = MapStoredEvent(reader); currentOffset = @event.Sequence + 1; eventsProcessed++; batchCount++; // Progress callback if (progressCallback != null && eventsProcessed % progressInterval == 0) { progressCallback(new ReplayProgress { CurrentOffset = @event.Sequence, EventsProcessed = eventsProcessed, EstimatedTotal = estimatedTotal, CurrentTimestamp = @event.StoredAt, Elapsed = stopwatch.Elapsed }); } yield return @event; // Check max events limit if (maxEvents.HasValue && eventsProcessed >= maxEvents.Value) { _logger.LogInformation( "Reached max events limit ({MaxEvents}) for stream {StreamName}. Stopping replay.", maxEvents.Value, streamName); yield break; } } // No more events in this batch if (batchCount == 0) { break; } _logger.LogDebug( "Replayed batch of {BatchCount} events from stream {StreamName}. Current offset: {CurrentOffset}", batchCount, streamName, currentOffset); } // Final progress callback if (progressCallback != null) { progressCallback(new ReplayProgress { CurrentOffset = currentOffset - 1, EventsProcessed = eventsProcessed, EstimatedTotal = estimatedTotal, Elapsed = stopwatch.Elapsed }); } _logger.LogInformation( "Completed replay of {EventsProcessed} events from stream {StreamName} in {Elapsed}ms", eventsProcessed, streamName, stopwatch.ElapsedMilliseconds); } /// public async IAsyncEnumerable ReplayFromTimeAsync( string streamName, DateTimeOffset startTime, ReplayOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); _logger.LogInformation( "Starting replay from time {StartTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}", startTime, streamName); // Get the offset at the start time var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken); _logger.LogInformation( "Found starting offset {StartOffset} for time {StartTime:yyyy-MM-dd HH:mm:ss} in stream {StreamName}", startOffset, startTime, streamName); await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken)) { yield return @event; } } /// public async IAsyncEnumerable ReplayTimeRangeAsync( string streamName, DateTimeOffset startTime, DateTimeOffset endTime, ReplayOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); if (endTime <= startTime) throw new ArgumentException("End time must be after start time", nameof(endTime)); _logger.LogInformation( "Starting time range replay from {StartTime:yyyy-MM-dd HH:mm:ss} to {EndTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}", startTime, endTime, streamName); var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken); await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken)) { if (@event.StoredAt >= endTime) { _logger.LogInformation( "Reached end time {EndTime:yyyy-MM-dd HH:mm:ss}. Stopping time range replay for stream {StreamName}", endTime, streamName); yield break; } yield return @event; } } /// public IAsyncEnumerable ReplayAllAsync( string streamName, ReplayOptions? options = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); _logger.LogInformation("Starting full replay of stream {StreamName}", streamName); return ReplayFromOffsetAsync(streamName, 0, options, cancellationToken); } /// public async Task GetReplayCountAsync( string streamName, long? startOffset = null, DateTimeOffset? startTime = null, DateTimeOffset? endTime = null, ReplayOptions? options = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = BuildCountQuery(startOffset, startTime, endTime, options?.EventTypeFilter); await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); if (startOffset.HasValue) command.Parameters.AddWithValue("startOffset", startOffset.Value); if (startTime.HasValue) command.Parameters.AddWithValue("startTime", startTime.Value.UtcDateTime); if (endTime.HasValue) command.Parameters.AddWithValue("endTime", endTime.Value.UtcDateTime); if (options?.EventTypeFilter != null && options.EventTypeFilter.Count > 0) command.Parameters.AddWithValue("eventTypes", options.EventTypeFilter.ToArray()); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null ? Convert.ToInt64(result) : 0; } private async Task GetOffsetAtTimeAsync( string streamName, DateTimeOffset timestamp, CancellationToken cancellationToken) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT COALESCE(MIN(sequence), 0) FROM {SchemaQualifiedTable("event_store")} WHERE stream_name = @streamName AND stored_at >= @timestamp"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("timestamp", timestamp.UtcDateTime); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null && result != DBNull.Value ? Convert.ToInt64(result) : 0; } private string BuildReplayQuery(IReadOnlyList? eventTypeFilter) { var baseQuery = $@" SELECT event_id, correlation_id, event_type, sequence, data, metadata, occurred_at, stored_at, stream_name FROM {SchemaQualifiedTable("event_store")} WHERE stream_name = @streamName AND sequence >= @startOffset"; if (eventTypeFilter != null && eventTypeFilter.Count > 0) { baseQuery += " AND event_type = ANY(@eventTypes)"; } baseQuery += " ORDER BY sequence ASC LIMIT @batchSize"; return baseQuery; } private string BuildCountQuery( long? startOffset, DateTimeOffset? startTime, DateTimeOffset? endTime, IReadOnlyList? eventTypeFilter) { var sql = $@" SELECT COUNT(*) FROM {SchemaQualifiedTable("event_store")} WHERE stream_name = @streamName"; if (startOffset.HasValue) sql += " AND sequence >= @startOffset"; if (startTime.HasValue) sql += " AND stored_at >= @startTime"; if (endTime.HasValue) sql += " AND stored_at < @endTime"; if (eventTypeFilter != null && eventTypeFilter.Count > 0) sql += " AND event_type = ANY(@eventTypes)"; return sql; } private StoredEvent MapStoredEvent(NpgsqlDataReader reader) { // Note: We can't fully reconstruct the ICorrelatedEvent without deserialization // For replay purposes, we return a simplified StoredEvent with JSON data // Consumers will need to deserialize the data field themselves return new StoredEvent { EventId = reader.GetString(0), CorrelationId = reader.GetString(1), EventType = reader.GetString(2), Sequence = reader.GetInt64(3), Event = null!, // Will be populated by consumer via deserialization OccurredAt = reader.GetDateTime(6), StoredAt = reader.GetDateTime(7) }; } } /// /// Rate limiter for controlling replay speed. /// Uses token bucket algorithm for smooth rate limiting. /// internal class RateLimiter { private readonly int _eventsPerSecond; private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); private long _eventsProcessed; public RateLimiter(int eventsPerSecond) { if (eventsPerSecond <= 0) throw new ArgumentException("Events per second must be positive", nameof(eventsPerSecond)); _eventsPerSecond = eventsPerSecond; } public async Task WaitAsync(CancellationToken cancellationToken) { _eventsProcessed++; // Calculate how long we should have taken to process this many events var expectedElapsedMs = (_eventsProcessed * 1000.0) / _eventsPerSecond; var actualElapsedMs = _stopwatch.ElapsedMilliseconds; var delayMs = (int)(expectedElapsedMs - actualElapsedMs); // If we're ahead of schedule, delay to maintain the rate if (delayMs > 0) { await Task.Delay(delayMs, cancellationToken); } } }