using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Replay;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Models;
namespace Svrnty.CQRS.Events.PostgreSQL.Replay;
///
/// PostgreSQL-based implementation of IEventReplayService.
/// Provides efficient event replay with batching, rate limiting, and progress tracking.
///
public class PostgresEventReplayService : IEventReplayService
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
public PostgresEventReplayService(
IOptions options,
ILogger logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}";
///
public async IAsyncEnumerable ReplayFromOffsetAsync(
string streamName,
long startOffset,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
options?.Validate();
var batchSize = options?.BatchSize ?? 100;
var maxEvents = options?.MaxEvents;
var eventTypeFilter = options?.EventTypeFilter;
var progressCallback = options?.ProgressCallback;
var progressInterval = options?.ProgressInterval ?? 1000;
_logger.LogInformation(
"Starting replay from offset {StartOffset} for stream {StreamName}. BatchSize={BatchSize}, MaxEvents={MaxEvents}",
startOffset, streamName, batchSize, maxEvents);
var stopwatch = Stopwatch.StartNew();
long eventsProcessed = 0;
long? estimatedTotal = null;
// Get estimated total if progress callback is provided
if (progressCallback != null)
{
estimatedTotal = await GetReplayCountAsync(
streamName, startOffset, null, null, options, cancellationToken);
_logger.LogInformation(
"Estimated {EstimatedTotal} events to replay for stream {StreamName}",
estimatedTotal, streamName);
}
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var currentOffset = startOffset;
var rateLimiter = options?.MaxEventsPerSecond.HasValue == true
? new RateLimiter(options.MaxEventsPerSecond.Value)
: null;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
// Build query with optional event type filter
var sql = BuildReplayQuery(eventTypeFilter);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("startOffset", currentOffset);
command.Parameters.AddWithValue("batchSize", batchSize);
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
{
command.Parameters.AddWithValue("eventTypes", eventTypeFilter.ToArray());
}
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var batchCount = 0;
while (await reader.ReadAsync(cancellationToken))
{
// Rate limiting
if (rateLimiter != null)
{
await rateLimiter.WaitAsync(cancellationToken);
}
var @event = MapStoredEvent(reader);
currentOffset = @event.Sequence + 1;
eventsProcessed++;
batchCount++;
// Progress callback
if (progressCallback != null && eventsProcessed % progressInterval == 0)
{
progressCallback(new ReplayProgress
{
CurrentOffset = @event.Sequence,
EventsProcessed = eventsProcessed,
EstimatedTotal = estimatedTotal,
CurrentTimestamp = @event.StoredAt,
Elapsed = stopwatch.Elapsed
});
}
yield return @event;
// Check max events limit
if (maxEvents.HasValue && eventsProcessed >= maxEvents.Value)
{
_logger.LogInformation(
"Reached max events limit ({MaxEvents}) for stream {StreamName}. Stopping replay.",
maxEvents.Value, streamName);
yield break;
}
}
// No more events in this batch
if (batchCount == 0)
{
break;
}
_logger.LogDebug(
"Replayed batch of {BatchCount} events from stream {StreamName}. Current offset: {CurrentOffset}",
batchCount, streamName, currentOffset);
}
// Final progress callback
if (progressCallback != null)
{
progressCallback(new ReplayProgress
{
CurrentOffset = currentOffset - 1,
EventsProcessed = eventsProcessed,
EstimatedTotal = estimatedTotal,
Elapsed = stopwatch.Elapsed
});
}
_logger.LogInformation(
"Completed replay of {EventsProcessed} events from stream {StreamName} in {Elapsed}ms",
eventsProcessed, streamName, stopwatch.ElapsedMilliseconds);
}
///
public async IAsyncEnumerable ReplayFromTimeAsync(
string streamName,
DateTimeOffset startTime,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
_logger.LogInformation(
"Starting replay from time {StartTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}",
startTime, streamName);
// Get the offset at the start time
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
_logger.LogInformation(
"Found starting offset {StartOffset} for time {StartTime:yyyy-MM-dd HH:mm:ss} in stream {StreamName}",
startOffset, startTime, streamName);
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
{
yield return @event;
}
}
///
public async IAsyncEnumerable ReplayTimeRangeAsync(
string streamName,
DateTimeOffset startTime,
DateTimeOffset endTime,
ReplayOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
if (endTime <= startTime)
throw new ArgumentException("End time must be after start time", nameof(endTime));
_logger.LogInformation(
"Starting time range replay from {StartTime:yyyy-MM-dd HH:mm:ss} to {EndTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}",
startTime, endTime, streamName);
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
{
if (@event.StoredAt >= endTime)
{
_logger.LogInformation(
"Reached end time {EndTime:yyyy-MM-dd HH:mm:ss}. Stopping time range replay for stream {StreamName}",
endTime, streamName);
yield break;
}
yield return @event;
}
}
///
public IAsyncEnumerable ReplayAllAsync(
string streamName,
ReplayOptions? options = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
_logger.LogInformation("Starting full replay of stream {StreamName}", streamName);
return ReplayFromOffsetAsync(streamName, 0, options, cancellationToken);
}
///
public async Task GetReplayCountAsync(
string streamName,
long? startOffset = null,
DateTimeOffset? startTime = null,
DateTimeOffset? endTime = null,
ReplayOptions? options = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = BuildCountQuery(startOffset, startTime, endTime, options?.EventTypeFilter);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
if (startOffset.HasValue)
command.Parameters.AddWithValue("startOffset", startOffset.Value);
if (startTime.HasValue)
command.Parameters.AddWithValue("startTime", startTime.Value.UtcDateTime);
if (endTime.HasValue)
command.Parameters.AddWithValue("endTime", endTime.Value.UtcDateTime);
if (options?.EventTypeFilter != null && options.EventTypeFilter.Count > 0)
command.Parameters.AddWithValue("eventTypes", options.EventTypeFilter.ToArray());
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt64(result) : 0;
}
private async Task GetOffsetAtTimeAsync(
string streamName,
DateTimeOffset timestamp,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT COALESCE(MIN(sequence), 0)
FROM {SchemaQualifiedTable("event_store")}
WHERE stream_name = @streamName
AND stored_at >= @timestamp";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("timestamp", timestamp.UtcDateTime);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null && result != DBNull.Value ? Convert.ToInt64(result) : 0;
}
private string BuildReplayQuery(IReadOnlyList? eventTypeFilter)
{
var baseQuery = $@"
SELECT event_id, correlation_id, event_type, sequence, data, metadata, occurred_at, stored_at, stream_name
FROM {SchemaQualifiedTable("event_store")}
WHERE stream_name = @streamName
AND sequence >= @startOffset";
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
{
baseQuery += " AND event_type = ANY(@eventTypes)";
}
baseQuery += " ORDER BY sequence ASC LIMIT @batchSize";
return baseQuery;
}
private string BuildCountQuery(
long? startOffset,
DateTimeOffset? startTime,
DateTimeOffset? endTime,
IReadOnlyList? eventTypeFilter)
{
var sql = $@"
SELECT COUNT(*)
FROM {SchemaQualifiedTable("event_store")}
WHERE stream_name = @streamName";
if (startOffset.HasValue)
sql += " AND sequence >= @startOffset";
if (startTime.HasValue)
sql += " AND stored_at >= @startTime";
if (endTime.HasValue)
sql += " AND stored_at < @endTime";
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
sql += " AND event_type = ANY(@eventTypes)";
return sql;
}
private StoredEvent MapStoredEvent(NpgsqlDataReader reader)
{
// Note: We can't fully reconstruct the ICorrelatedEvent without deserialization
// For replay purposes, we return a simplified StoredEvent with JSON data
// Consumers will need to deserialize the data field themselves
return new StoredEvent
{
EventId = reader.GetString(0),
CorrelationId = reader.GetString(1),
EventType = reader.GetString(2),
Sequence = reader.GetInt64(3),
Event = null!, // Will be populated by consumer via deserialization
OccurredAt = reader.GetDateTime(6),
StoredAt = reader.GetDateTime(7)
};
}
}
///
/// Rate limiter for controlling replay speed.
/// Uses token bucket algorithm for smooth rate limiting.
///
internal class RateLimiter
{
private readonly int _eventsPerSecond;
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
private long _eventsProcessed;
public RateLimiter(int eventsPerSecond)
{
if (eventsPerSecond <= 0)
throw new ArgumentException("Events per second must be positive", nameof(eventsPerSecond));
_eventsPerSecond = eventsPerSecond;
}
public async Task WaitAsync(CancellationToken cancellationToken)
{
_eventsProcessed++;
// Calculate how long we should have taken to process this many events
var expectedElapsedMs = (_eventsProcessed * 1000.0) / _eventsPerSecond;
var actualElapsedMs = _stopwatch.ElapsedMilliseconds;
var delayMs = (int)(expectedElapsedMs - actualElapsedMs);
// If we're ahead of schedule, delay to maintain the rate
if (delayMs > 0)
{
await Task.Delay(delayMs, cancellationToken);
}
}
}