393 lines
15 KiB
C#
393 lines
15 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Replay;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
using Svrnty.CQRS.Events.Abstractions.Models;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Replay;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL-based implementation of IEventReplayService.
|
|
/// Provides efficient event replay with batching, rate limiting, and progress tracking.
|
|
/// </summary>
|
|
public class PostgresEventReplayService : IEventReplayService
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresEventReplayService> _logger;
|
|
|
|
public PostgresEventReplayService(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresEventReplayService> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}";
|
|
|
|
/// <inheritdoc />
|
|
public async IAsyncEnumerable<StoredEvent> ReplayFromOffsetAsync(
|
|
string streamName,
|
|
long startOffset,
|
|
ReplayOptions? options = null,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
options?.Validate();
|
|
|
|
var batchSize = options?.BatchSize ?? 100;
|
|
var maxEvents = options?.MaxEvents;
|
|
var eventTypeFilter = options?.EventTypeFilter;
|
|
var progressCallback = options?.ProgressCallback;
|
|
var progressInterval = options?.ProgressInterval ?? 1000;
|
|
|
|
_logger.LogInformation(
|
|
"Starting replay from offset {StartOffset} for stream {StreamName}. BatchSize={BatchSize}, MaxEvents={MaxEvents}",
|
|
startOffset, streamName, batchSize, maxEvents);
|
|
|
|
var stopwatch = Stopwatch.StartNew();
|
|
long eventsProcessed = 0;
|
|
long? estimatedTotal = null;
|
|
|
|
// Get estimated total if progress callback is provided
|
|
if (progressCallback != null)
|
|
{
|
|
estimatedTotal = await GetReplayCountAsync(
|
|
streamName, startOffset, null, null, options, cancellationToken);
|
|
|
|
_logger.LogInformation(
|
|
"Estimated {EstimatedTotal} events to replay for stream {StreamName}",
|
|
estimatedTotal, streamName);
|
|
}
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var currentOffset = startOffset;
|
|
var rateLimiter = options?.MaxEventsPerSecond.HasValue == true
|
|
? new RateLimiter(options.MaxEventsPerSecond.Value)
|
|
: null;
|
|
|
|
while (true)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
// Build query with optional event type filter
|
|
var sql = BuildReplayQuery(eventTypeFilter);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", streamName);
|
|
command.Parameters.AddWithValue("startOffset", currentOffset);
|
|
command.Parameters.AddWithValue("batchSize", batchSize);
|
|
|
|
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
|
|
{
|
|
command.Parameters.AddWithValue("eventTypes", eventTypeFilter.ToArray());
|
|
}
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
var batchCount = 0;
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
// Rate limiting
|
|
if (rateLimiter != null)
|
|
{
|
|
await rateLimiter.WaitAsync(cancellationToken);
|
|
}
|
|
|
|
var @event = MapStoredEvent(reader);
|
|
currentOffset = @event.Sequence + 1;
|
|
eventsProcessed++;
|
|
batchCount++;
|
|
|
|
// Progress callback
|
|
if (progressCallback != null && eventsProcessed % progressInterval == 0)
|
|
{
|
|
progressCallback(new ReplayProgress
|
|
{
|
|
CurrentOffset = @event.Sequence,
|
|
EventsProcessed = eventsProcessed,
|
|
EstimatedTotal = estimatedTotal,
|
|
CurrentTimestamp = @event.StoredAt,
|
|
Elapsed = stopwatch.Elapsed
|
|
});
|
|
}
|
|
|
|
yield return @event;
|
|
|
|
// Check max events limit
|
|
if (maxEvents.HasValue && eventsProcessed >= maxEvents.Value)
|
|
{
|
|
_logger.LogInformation(
|
|
"Reached max events limit ({MaxEvents}) for stream {StreamName}. Stopping replay.",
|
|
maxEvents.Value, streamName);
|
|
yield break;
|
|
}
|
|
}
|
|
|
|
// No more events in this batch
|
|
if (batchCount == 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
_logger.LogDebug(
|
|
"Replayed batch of {BatchCount} events from stream {StreamName}. Current offset: {CurrentOffset}",
|
|
batchCount, streamName, currentOffset);
|
|
}
|
|
|
|
// Final progress callback
|
|
if (progressCallback != null)
|
|
{
|
|
progressCallback(new ReplayProgress
|
|
{
|
|
CurrentOffset = currentOffset - 1,
|
|
EventsProcessed = eventsProcessed,
|
|
EstimatedTotal = estimatedTotal,
|
|
Elapsed = stopwatch.Elapsed
|
|
});
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"Completed replay of {EventsProcessed} events from stream {StreamName} in {Elapsed}ms",
|
|
eventsProcessed, streamName, stopwatch.ElapsedMilliseconds);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async IAsyncEnumerable<StoredEvent> ReplayFromTimeAsync(
|
|
string streamName,
|
|
DateTimeOffset startTime,
|
|
ReplayOptions? options = null,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
_logger.LogInformation(
|
|
"Starting replay from time {StartTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}",
|
|
startTime, streamName);
|
|
|
|
// Get the offset at the start time
|
|
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
|
|
|
|
_logger.LogInformation(
|
|
"Found starting offset {StartOffset} for time {StartTime:yyyy-MM-dd HH:mm:ss} in stream {StreamName}",
|
|
startOffset, startTime, streamName);
|
|
|
|
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
|
|
{
|
|
yield return @event;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async IAsyncEnumerable<StoredEvent> ReplayTimeRangeAsync(
|
|
string streamName,
|
|
DateTimeOffset startTime,
|
|
DateTimeOffset endTime,
|
|
ReplayOptions? options = null,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
if (endTime <= startTime)
|
|
throw new ArgumentException("End time must be after start time", nameof(endTime));
|
|
|
|
_logger.LogInformation(
|
|
"Starting time range replay from {StartTime:yyyy-MM-dd HH:mm:ss} to {EndTime:yyyy-MM-dd HH:mm:ss} for stream {StreamName}",
|
|
startTime, endTime, streamName);
|
|
|
|
var startOffset = await GetOffsetAtTimeAsync(streamName, startTime, cancellationToken);
|
|
|
|
await foreach (var @event in ReplayFromOffsetAsync(streamName, startOffset, options, cancellationToken))
|
|
{
|
|
if (@event.StoredAt >= endTime)
|
|
{
|
|
_logger.LogInformation(
|
|
"Reached end time {EndTime:yyyy-MM-dd HH:mm:ss}. Stopping time range replay for stream {StreamName}",
|
|
endTime, streamName);
|
|
yield break;
|
|
}
|
|
|
|
yield return @event;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public IAsyncEnumerable<StoredEvent> ReplayAllAsync(
|
|
string streamName,
|
|
ReplayOptions? options = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
_logger.LogInformation("Starting full replay of stream {StreamName}", streamName);
|
|
|
|
return ReplayFromOffsetAsync(streamName, 0, options, cancellationToken);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<long> GetReplayCountAsync(
|
|
string streamName,
|
|
long? startOffset = null,
|
|
DateTimeOffset? startTime = null,
|
|
DateTimeOffset? endTime = null,
|
|
ReplayOptions? options = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = BuildCountQuery(startOffset, startTime, endTime, options?.EventTypeFilter);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", streamName);
|
|
|
|
if (startOffset.HasValue)
|
|
command.Parameters.AddWithValue("startOffset", startOffset.Value);
|
|
if (startTime.HasValue)
|
|
command.Parameters.AddWithValue("startTime", startTime.Value.UtcDateTime);
|
|
if (endTime.HasValue)
|
|
command.Parameters.AddWithValue("endTime", endTime.Value.UtcDateTime);
|
|
if (options?.EventTypeFilter != null && options.EventTypeFilter.Count > 0)
|
|
command.Parameters.AddWithValue("eventTypes", options.EventTypeFilter.ToArray());
|
|
|
|
var result = await command.ExecuteScalarAsync(cancellationToken);
|
|
return result != null ? Convert.ToInt64(result) : 0;
|
|
}
|
|
|
|
private async Task<long> GetOffsetAtTimeAsync(
|
|
string streamName,
|
|
DateTimeOffset timestamp,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT COALESCE(MIN(sequence), 0)
|
|
FROM {SchemaQualifiedTable("event_store")}
|
|
WHERE stream_name = @streamName
|
|
AND stored_at >= @timestamp";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", streamName);
|
|
command.Parameters.AddWithValue("timestamp", timestamp.UtcDateTime);
|
|
|
|
var result = await command.ExecuteScalarAsync(cancellationToken);
|
|
return result != null && result != DBNull.Value ? Convert.ToInt64(result) : 0;
|
|
}
|
|
|
|
private string BuildReplayQuery(IReadOnlyList<string>? eventTypeFilter)
|
|
{
|
|
var baseQuery = $@"
|
|
SELECT event_id, correlation_id, event_type, sequence, data, metadata, occurred_at, stored_at, stream_name
|
|
FROM {SchemaQualifiedTable("event_store")}
|
|
WHERE stream_name = @streamName
|
|
AND sequence >= @startOffset";
|
|
|
|
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
|
|
{
|
|
baseQuery += " AND event_type = ANY(@eventTypes)";
|
|
}
|
|
|
|
baseQuery += " ORDER BY sequence ASC LIMIT @batchSize";
|
|
|
|
return baseQuery;
|
|
}
|
|
|
|
private string BuildCountQuery(
|
|
long? startOffset,
|
|
DateTimeOffset? startTime,
|
|
DateTimeOffset? endTime,
|
|
IReadOnlyList<string>? eventTypeFilter)
|
|
{
|
|
var sql = $@"
|
|
SELECT COUNT(*)
|
|
FROM {SchemaQualifiedTable("event_store")}
|
|
WHERE stream_name = @streamName";
|
|
|
|
if (startOffset.HasValue)
|
|
sql += " AND sequence >= @startOffset";
|
|
if (startTime.HasValue)
|
|
sql += " AND stored_at >= @startTime";
|
|
if (endTime.HasValue)
|
|
sql += " AND stored_at < @endTime";
|
|
if (eventTypeFilter != null && eventTypeFilter.Count > 0)
|
|
sql += " AND event_type = ANY(@eventTypes)";
|
|
|
|
return sql;
|
|
}
|
|
|
|
private StoredEvent MapStoredEvent(NpgsqlDataReader reader)
|
|
{
|
|
// Note: We can't fully reconstruct the ICorrelatedEvent without deserialization
|
|
// For replay purposes, we return a simplified StoredEvent with JSON data
|
|
// Consumers will need to deserialize the data field themselves
|
|
|
|
return new StoredEvent
|
|
{
|
|
EventId = reader.GetString(0),
|
|
CorrelationId = reader.GetString(1),
|
|
EventType = reader.GetString(2),
|
|
Sequence = reader.GetInt64(3),
|
|
Event = null!, // Will be populated by consumer via deserialization
|
|
OccurredAt = reader.GetDateTime(6),
|
|
StoredAt = reader.GetDateTime(7)
|
|
};
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Rate limiter for controlling replay speed.
|
|
/// Uses token bucket algorithm for smooth rate limiting.
|
|
/// </summary>
|
|
internal class RateLimiter
|
|
{
|
|
private readonly int _eventsPerSecond;
|
|
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
|
private long _eventsProcessed;
|
|
|
|
public RateLimiter(int eventsPerSecond)
|
|
{
|
|
if (eventsPerSecond <= 0)
|
|
throw new ArgumentException("Events per second must be positive", nameof(eventsPerSecond));
|
|
|
|
_eventsPerSecond = eventsPerSecond;
|
|
}
|
|
|
|
public async Task WaitAsync(CancellationToken cancellationToken)
|
|
{
|
|
_eventsProcessed++;
|
|
|
|
// Calculate how long we should have taken to process this many events
|
|
var expectedElapsedMs = (_eventsProcessed * 1000.0) / _eventsPerSecond;
|
|
var actualElapsedMs = _stopwatch.ElapsedMilliseconds;
|
|
var delayMs = (int)(expectedElapsedMs - actualElapsedMs);
|
|
|
|
// If we're ahead of schedule, delay to maintain the rate
|
|
if (delayMs > 0)
|
|
{
|
|
await Task.Delay(delayMs, cancellationToken);
|
|
}
|
|
}
|
|
}
|