260 lines
9.6 KiB
C#
260 lines
9.6 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.Abstractions.Configuration;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Storage;
|
|
using Svrnty.CQRS.Events.Abstractions.Models;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL-based implementation of IRetentionPolicyStore.
|
|
/// Manages retention policies and enforces automatic event cleanup.
|
|
/// </summary>
|
|
public class PostgresRetentionPolicyStore : IRetentionPolicyStore
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresRetentionPolicyStore> _logger;
|
|
|
|
public PostgresRetentionPolicyStore(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresRetentionPolicyStore> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
private string SchemaQualifiedTable(string tableName) => $"{_options.SchemaName}.{tableName}";
|
|
|
|
/// <inheritdoc />
|
|
public async Task SetPolicyAsync(
|
|
IRetentionPolicy policy,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (policy == null)
|
|
throw new ArgumentNullException(nameof(policy));
|
|
|
|
// Validate if it's a RetentionPolicyConfig
|
|
if (policy is RetentionPolicyConfig config)
|
|
{
|
|
config.Validate();
|
|
}
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
INSERT INTO {SchemaQualifiedTable("retention_policies")}
|
|
(stream_name, max_age_seconds, max_event_count, enabled, updated_at)
|
|
VALUES (@streamName, @maxAgeSeconds, @maxEventCount, @enabled, NOW())
|
|
ON CONFLICT (stream_name)
|
|
DO UPDATE SET
|
|
max_age_seconds = @maxAgeSeconds,
|
|
max_event_count = @maxEventCount,
|
|
enabled = @enabled,
|
|
updated_at = NOW()";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", policy.StreamName);
|
|
command.Parameters.AddWithValue("maxAgeSeconds", (object?)policy.MaxAge?.TotalSeconds ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("maxEventCount", (object?)policy.MaxEventCount ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("enabled", policy.Enabled);
|
|
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogInformation(
|
|
"Set retention policy for stream {StreamName}: MaxAge={MaxAge}, MaxEventCount={MaxEventCount}, Enabled={Enabled}",
|
|
policy.StreamName,
|
|
policy.MaxAge,
|
|
policy.MaxEventCount,
|
|
policy.Enabled);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<IRetentionPolicy?> GetPolicyAsync(
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT stream_name, max_age_seconds, max_event_count, enabled
|
|
FROM {SchemaQualifiedTable("retention_policies")}
|
|
WHERE stream_name = @streamName";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", streamName);
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
if (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1);
|
|
var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2);
|
|
var enabled = reader.GetBoolean(3);
|
|
|
|
return new RetentionPolicyConfig
|
|
{
|
|
StreamName = streamName,
|
|
MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null,
|
|
MaxEventCount = maxEventCount,
|
|
Enabled = enabled
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<IReadOnlyList<IRetentionPolicy>> GetAllPoliciesAsync(
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT stream_name, max_age_seconds, max_event_count, enabled
|
|
FROM {SchemaQualifiedTable("retention_policies")}
|
|
ORDER BY
|
|
CASE WHEN stream_name = '*' THEN 0 ELSE 1 END,
|
|
stream_name";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
var policies = new List<IRetentionPolicy>();
|
|
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
var streamName = reader.GetString(0);
|
|
var maxAgeSeconds = reader.IsDBNull(1) ? (int?)null : reader.GetInt32(1);
|
|
var maxEventCount = reader.IsDBNull(2) ? (long?)null : reader.GetInt64(2);
|
|
var enabled = reader.GetBoolean(3);
|
|
|
|
policies.Add(new RetentionPolicyConfig
|
|
{
|
|
StreamName = streamName,
|
|
MaxAge = maxAgeSeconds.HasValue ? TimeSpan.FromSeconds(maxAgeSeconds.Value) : null,
|
|
MaxEventCount = maxEventCount,
|
|
Enabled = enabled
|
|
});
|
|
}
|
|
|
|
return policies;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<bool> DeletePolicyAsync(
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName));
|
|
|
|
// Cannot delete the default policy
|
|
if (streamName == "*")
|
|
{
|
|
_logger.LogWarning("Attempted to delete default retention policy, which is not allowed");
|
|
return false;
|
|
}
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
DELETE FROM {SchemaQualifiedTable("retention_policies")}
|
|
WHERE stream_name = @streamName";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("streamName", streamName);
|
|
|
|
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (rowsAffected > 0)
|
|
{
|
|
_logger.LogInformation("Deleted retention policy for stream {StreamName}", streamName);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<RetentionCleanupResult> ApplyRetentionPoliciesAsync(
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var stopwatch = Stopwatch.StartNew();
|
|
var eventsDeletedPerStream = new Dictionary<string, long>();
|
|
long totalEventsDeleted = 0;
|
|
int streamsProcessed = 0;
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
_logger.LogInformation("Starting retention policy enforcement");
|
|
|
|
try
|
|
{
|
|
var sql = $"SELECT * FROM {_options.SchemaName}.apply_all_retention_policies()";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.CommandTimeout = 300; // 5 minutes for cleanup
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
var streamName = reader.GetString(0);
|
|
var eventsDeleted = reader.GetInt64(1);
|
|
|
|
eventsDeletedPerStream[streamName] = eventsDeleted;
|
|
totalEventsDeleted += eventsDeleted;
|
|
streamsProcessed++;
|
|
|
|
_logger.LogInformation(
|
|
"Retention cleanup for stream {StreamName}: {EventsDeleted} events deleted",
|
|
streamName,
|
|
eventsDeleted);
|
|
}
|
|
|
|
stopwatch.Stop();
|
|
|
|
_logger.LogInformation(
|
|
"Retention policy enforcement complete: {StreamsProcessed} streams processed, {TotalEventsDeleted} total events deleted in {Duration}ms",
|
|
streamsProcessed,
|
|
totalEventsDeleted,
|
|
stopwatch.ElapsedMilliseconds);
|
|
|
|
return new RetentionCleanupResult
|
|
{
|
|
StreamsProcessed = streamsProcessed,
|
|
EventsDeleted = totalEventsDeleted,
|
|
Duration = stopwatch.Elapsed,
|
|
CompletedAt = DateTimeOffset.UtcNow,
|
|
EventsDeletedPerStream = eventsDeletedPerStream
|
|
};
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
stopwatch.Stop();
|
|
|
|
_logger.LogError(ex,
|
|
"Error during retention policy enforcement after {Duration}ms",
|
|
stopwatch.ElapsedMilliseconds);
|
|
|
|
throw;
|
|
}
|
|
}
|
|
}
|