using System;
using Svrnty.CQRS.Events.Abstractions.Storage;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Retention;
///
/// Background service that automatically enforces retention policies.
/// Periodically cleans up events that exceed configured retention limits.
///
public class RetentionPolicyService : BackgroundService
{
private readonly IRetentionPolicyStore _policyStore;
private readonly RetentionServiceOptions _options;
private readonly ILogger _logger;
public RetentionPolicyService(
IRetentionPolicyStore policyStore,
IOptions options,
ILogger logger)
{
_policyStore = policyStore ?? throw new ArgumentNullException(nameof(policyStore));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options.Validate();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("Retention policy service is disabled");
return;
}
_logger.LogInformation(
"Retention policy service started. Cleanup interval: {CleanupInterval}, Window: {WindowStart}-{WindowEnd} UTC, UseWindow: {UseWindow}",
_options.CleanupInterval,
_options.CleanupWindowStart,
_options.CleanupWindowEnd,
_options.UseCleanupWindow);
using var timer = new PeriodicTimer(_options.CleanupInterval);
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await RunCleanupCycleAsync(stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Retention policy service stopping");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Retention policy service encountered a fatal error");
throw;
}
}
private async Task RunCleanupCycleAsync(CancellationToken cancellationToken)
{
try
{
// Check if we're in the cleanup window
if (_options.UseCleanupWindow && !IsInCleanupWindow())
{
_logger.LogDebug(
"Outside cleanup window ({WindowStart}-{WindowEnd} UTC), skipping retention enforcement",
_options.CleanupWindowStart,
_options.CleanupWindowEnd);
return;
}
_logger.LogInformation("Starting retention policy enforcement cycle");
var result = await _policyStore.ApplyRetentionPoliciesAsync(cancellationToken);
if (result.EventsDeleted > 0)
{
_logger.LogInformation(
"Retention cleanup complete: {StreamsProcessed} streams processed, {EventsDeleted} events deleted in {Duration}",
result.StreamsProcessed,
result.EventsDeleted,
result.Duration);
// Log per-stream details at Debug level
if (result.EventsDeletedPerStream != null)
{
foreach (var (streamName, count) in result.EventsDeletedPerStream)
{
_logger.LogDebug(
"Stream {StreamName}: {EventsDeleted} events deleted",
streamName,
count);
}
}
}
else
{
_logger.LogDebug(
"Retention cleanup complete: No events needed cleanup (processed {StreamsProcessed} streams in {Duration})",
result.StreamsProcessed,
result.Duration);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during retention policy enforcement cycle");
// Don't rethrow - we want the service to continue running
}
}
private bool IsInCleanupWindow()
{
var now = DateTime.UtcNow.TimeOfDay;
// Handle window that crosses midnight
if (_options.CleanupWindowEnd < _options.CleanupWindowStart)
{
// Window spans midnight (e.g., 22:00 - 02:00)
return now >= _options.CleanupWindowStart || now <= _options.CleanupWindowEnd;
}
else
{
// Normal window (e.g., 02:00 - 06:00)
return now >= _options.CleanupWindowStart && now <= _options.CleanupWindowEnd;
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Retention policy service is stopping");
await base.StopAsync(cancellationToken);
}
}