using System; using Svrnty.CQRS.Events.Abstractions.Configuration; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Storage; using Svrnty.CQRS.Events.Abstractions.Models; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL-based implementation of IRetentionPolicyStore. /// Manages retention policies and enforces automatic event cleanup. /// public class PostgresRetentionPolicyStore : IRetentionPolicyStore { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresRetentionPolicyStore( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}"; /// public async Task SetPolicyAsync( IRetentionPolicy policy, CancellationToken cancellationToken = default) { if (policy == null) throw new ArgumentNullException(nameof(policy)); // Validate if it's a RetentionPolicyConfig if (policy is RetentionPolicyConfig config) { config.Validate(); } await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" INSERT INTO {SchemaQualifiedTable("retention_policies")} (stream_name, max_age_seconds, max_event_count, enabled, updated_at) VALUES (@streamName, @maxAgeSeconds, @maxEventCount, @enabled, NOW()) ON CONFLICT (stream_name) DO UPDATE SET max_age_seconds = @maxAgeSeconds, max_event_count = @maxEventCount, enabled = @enabled, updated_at = NOW()"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", policy.StreamName); command.Parameters.AddWithValue("maxAgeSeconds", (object?)policy.MaxAge?.TotalSeconds ?? DBNull.Value); command.Parameters.AddWithValue("maxEventCount", (object?)policy.MaxEventCount ?? DBNull.Value); command.Parameters.AddWithValue("enabled", policy.Enabled); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogInformation( "Set retention policy for stream {StreamName}: MaxAge={MaxAge}, MaxEventCount={MaxEventCount}, Enabled={Enabled}", policy.StreamName, policy.MaxAge, policy.MaxEventCount, policy.Enabled); } /// public async Task GetPolicyAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT stream_name, max_age_seconds, max_event_count, enabled FROM {SchemaQualifiedTable("retention_policies")} WHERE stream_name = @streamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); if (await reader.ReadAsync(cancellationToken)) { var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1); var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2); var enabled = reader.GetBoolean(3); return new RetentionPolicyConfig { StreamName = streamName, MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null, MaxEventCount = maxEventCount, Enabled = enabled }; } return null; } /// public async Task> GetAllPoliciesAsync( CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT stream_name, max_age_seconds, max_event_count, enabled FROM {SchemaQualifiedTable("retention_policies")} ORDER BY CASE WHEN stream_name = '*' THEN 0 ELSE 1 END, stream_name"; await using var command = new NpgsqlCommand(sql, connection); await using var reader = await command.ExecuteReaderAsync(cancellationToken); var policies = new List(); while (await reader.ReadAsync(cancellationToken)) { var streamName = reader.GetString(0); var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1); var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2); var enabled = reader.GetBoolean(3); policies.Add(new RetentionPolicyConfig { StreamName = streamName, MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null, MaxEventCount = maxEventCount, Enabled = enabled }); } return policies; } /// public async Task DeletePolicyAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); // Cannot delete the default policy if (streamName == "*") { _logger.LogWarning("Attempted to delete default retention policy, which is not allowed"); return false; } await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" DELETE FROM {SchemaQualifiedTable("retention_policies")} WHERE stream_name = @streamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken); if (rowsAffected > 0) { _logger.LogInformation("Deleted retention policy for stream {StreamName}", streamName); return true; } return false; } /// public async Task ApplyRetentionPoliciesAsync( CancellationToken cancellationToken = default) { var stopwatch = Stopwatch.StartNew(); var eventsDeletedPerStream = new Dictionary(); long totalEventsDeleted = 0; int streamsProcessed = 0; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); _logger.LogInformation("Starting retention policy enforcement"); try { var sql = $"SELECT * FROM {_options.SchemaName}.apply_all_retention_policies()"; await using var command = new NpgsqlCommand(sql, connection); command.CommandTimeout = 300; // 5 minutes for cleanup await using var reader = await command.ExecuteReaderAsync(cancellationToken); while (await reader.ReadAsync(cancellationToken)) { var streamName = reader.GetString(0); var eventsDeleted = reader.GetInt64(1); eventsDeletedPerStream[streamName] = eventsDeleted; totalEventsDeleted += eventsDeleted; streamsProcessed++; _logger.LogInformation( "Retention cleanup for stream {StreamName}: {EventsDeleted} events deleted", streamName, eventsDeleted); } stopwatch.Stop(); _logger.LogInformation( "Retention policy enforcement complete: {StreamsProcessed} streams processed, {TotalEventsDeleted} total events deleted in {Duration}ms", streamsProcessed, totalEventsDeleted, stopwatch.ElapsedMilliseconds); return new RetentionCleanupResult { StreamsProcessed = streamsProcessed, EventsDeleted = totalEventsDeleted, Duration = stopwatch.Elapsed, CompletedAt = DateTimeOffset.UtcNow, EventsDeletedPerStream = eventsDeletedPerStream }; } catch (Exception ex) { stopwatch.Stop(); _logger.LogError(ex, "Error during retention policy enforcement after {Duration}ms", stopwatch.ElapsedMilliseconds); throw; } } }