using System;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Storage;
using Svrnty.CQRS.Events.Abstractions.Models;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
///
/// PostgreSQL-based implementation of IRetentionPolicyStore.
/// Manages retention policies and enforces automatic event cleanup.
///
public class PostgresRetentionPolicyStore : IRetentionPolicyStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
public PostgresRetentionPolicyStore(
IOptions options,
ILogger logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}";
///
public async Task SetPolicyAsync(
IRetentionPolicy policy,
CancellationToken cancellationToken = default)
{
if (policy == null)
throw new ArgumentNullException(nameof(policy));
// Validate if it's a RetentionPolicyConfig
if (policy is RetentionPolicyConfig config)
{
config.Validate();
}
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
INSERT INTO {SchemaQualifiedTable("retention_policies")}
(stream_name, max_age_seconds, max_event_count, enabled, updated_at)
VALUES (@streamName, @maxAgeSeconds, @maxEventCount, @enabled, NOW())
ON CONFLICT (stream_name)
DO UPDATE SET
max_age_seconds = @maxAgeSeconds,
max_event_count = @maxEventCount,
enabled = @enabled,
updated_at = NOW()";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", policy.StreamName);
command.Parameters.AddWithValue("maxAgeSeconds", (object?)policy.MaxAge?.TotalSeconds ?? DBNull.Value);
command.Parameters.AddWithValue("maxEventCount", (object?)policy.MaxEventCount ?? DBNull.Value);
command.Parameters.AddWithValue("enabled", policy.Enabled);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation(
"Set retention policy for stream {StreamName}: MaxAge={MaxAge}, MaxEventCount={MaxEventCount}, Enabled={Enabled}",
policy.StreamName,
policy.MaxAge,
policy.MaxEventCount,
policy.Enabled);
}
///
public async Task GetPolicyAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT stream_name, max_age_seconds, max_event_count, enabled
FROM {SchemaQualifiedTable("retention_policies")}
WHERE stream_name = @streamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1);
var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2);
var enabled = reader.GetBoolean(3);
return new RetentionPolicyConfig
{
StreamName = streamName,
MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null,
MaxEventCount = maxEventCount,
Enabled = enabled
};
}
return null;
}
///
public async Task> GetAllPoliciesAsync(
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT stream_name, max_age_seconds, max_event_count, enabled
FROM {SchemaQualifiedTable("retention_policies")}
ORDER BY
CASE WHEN stream_name = '*' THEN 0 ELSE 1 END,
stream_name";
await using var command = new NpgsqlCommand(sql, connection);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var policies = new List();
while (await reader.ReadAsync(cancellationToken))
{
var streamName = reader.GetString(0);
var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1);
var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2);
var enabled = reader.GetBoolean(3);
policies.Add(new RetentionPolicyConfig
{
StreamName = streamName,
MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null,
MaxEventCount = maxEventCount,
Enabled = enabled
});
}
return policies;
}
///
public async Task DeletePolicyAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
// Cannot delete the default policy
if (streamName == "*")
{
_logger.LogWarning("Attempted to delete default retention policy, which is not allowed");
return false;
}
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
DELETE FROM {SchemaQualifiedTable("retention_policies")}
WHERE stream_name = @streamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
if (rowsAffected > 0)
{
_logger.LogInformation("Deleted retention policy for stream {StreamName}", streamName);
return true;
}
return false;
}
///
public async Task ApplyRetentionPoliciesAsync(
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var eventsDeletedPerStream = new Dictionary();
long totalEventsDeleted = 0;
int streamsProcessed = 0;
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
_logger.LogInformation("Starting retention policy enforcement");
try
{
var sql = $"SELECT * FROM {_options.SchemaName}.apply_all_retention_policies()";
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = 300; // 5 minutes for cleanup
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var streamName = reader.GetString(0);
var eventsDeleted = reader.GetInt64(1);
eventsDeletedPerStream[streamName] = eventsDeleted;
totalEventsDeleted += eventsDeleted;
streamsProcessed++;
_logger.LogInformation(
"Retention cleanup for stream {StreamName}: {EventsDeleted} events deleted",
streamName,
eventsDeleted);
}
stopwatch.Stop();
_logger.LogInformation(
"Retention policy enforcement complete: {StreamsProcessed} streams processed, {TotalEventsDeleted} total events deleted in {Duration}ms",
streamsProcessed,
totalEventsDeleted,
stopwatch.ElapsedMilliseconds);
return new RetentionCleanupResult
{
StreamsProcessed = streamsProcessed,
EventsDeleted = totalEventsDeleted,
Duration = stopwatch.Elapsed,
CompletedAt = DateTimeOffset.UtcNow,
EventsDeletedPerStream = eventsDeletedPerStream
};
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex,
"Error during retention policy enforcement after {Duration}ms",
stopwatch.ElapsedMilliseconds);
throw;
}
}
}