using System; using Svrnty.CQRS.Events.Abstractions.Storage; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Retention; /// /// Background service that automatically enforces retention policies. /// Periodically cleans up events that exceed configured retention limits. /// public class RetentionPolicyService : BackgroundService { private readonly IRetentionPolicyStore _policyStore; private readonly RetentionServiceOptions _options; private readonly ILogger _logger; public RetentionPolicyService( IRetentionPolicyStore policyStore, IOptions options, ILogger logger) { _policyStore = policyStore ?? throw new ArgumentNullException(nameof(policyStore)); _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _options.Validate(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!_options.Enabled) { _logger.LogInformation("Retention policy service is disabled"); return; } _logger.LogInformation( "Retention policy service started. Cleanup interval: {CleanupInterval}, Window: {WindowStart}-{WindowEnd} UTC, UseWindow: {UseWindow}", _options.CleanupInterval, _options.CleanupWindowStart, _options.CleanupWindowEnd, _options.UseCleanupWindow); using var timer = new PeriodicTimer(_options.CleanupInterval); try { while (await timer.WaitForNextTickAsync(stoppingToken)) { await RunCleanupCycleAsync(stoppingToken); } } catch (OperationCanceledException) { _logger.LogInformation("Retention policy service stopping"); } catch (Exception ex) { _logger.LogCritical(ex, "Retention policy service encountered a fatal error"); throw; } } private async Task RunCleanupCycleAsync(CancellationToken cancellationToken) { try { // Check if we're in the cleanup window if (_options.UseCleanupWindow && !IsInCleanupWindow()) { _logger.LogDebug( "Outside cleanup window ({WindowStart}-{WindowEnd} UTC), skipping retention enforcement", _options.CleanupWindowStart, _options.CleanupWindowEnd); return; } _logger.LogInformation("Starting retention policy enforcement cycle"); var result = await _policyStore.ApplyRetentionPoliciesAsync(cancellationToken); if (result.EventsDeleted > 0) { _logger.LogInformation( "Retention cleanup complete: {StreamsProcessed} streams processed, {EventsDeleted} events deleted in {Duration}", result.StreamsProcessed, result.EventsDeleted, result.Duration); // Log per-stream details at Debug level if (result.EventsDeletedPerStream != null) { foreach (var (streamName, count) in result.EventsDeletedPerStream) { _logger.LogDebug( "Stream {StreamName}: {EventsDeleted} events deleted", streamName, count); } } } else { _logger.LogDebug( "Retention cleanup complete: No events needed cleanup (processed {StreamsProcessed} streams in {Duration})", result.StreamsProcessed, result.Duration); } } catch (Exception ex) { _logger.LogError(ex, "Error during retention policy enforcement cycle"); // Don't rethrow - we want the service to continue running } } private bool IsInCleanupWindow() { var now = DateTime.UtcNow.TimeOfDay; // Handle window that crosses midnight if (_options.CleanupWindowEnd < _options.CleanupWindowStart) { // Window spans midnight (e.g., 22:00 - 02:00) return now >= _options.CleanupWindowStart || now <= _options.CleanupWindowEnd; } else { // Normal window (e.g., 02:00 - 06:00) return now >= _options.CleanupWindowStart && now <= _options.CleanupWindowEnd; } } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Retention policy service is stopping"); await base.StopAsync(cancellationToken); } }