using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Streaming; using Svrnty.CQRS.Events.Abstractions.Configuration; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Configuration; /// /// PostgreSQL implementation of stream configuration provider. /// Merges stream-specific configuration with global defaults. /// public class PostgresStreamConfigurationProvider : IStreamConfigurationProvider { private readonly IStreamConfigurationStore _store; private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresStreamConfigurationProvider( IStreamConfigurationStore store, IOptions options, ILogger logger) { _store = store ?? throw new ArgumentNullException(nameof(store)); _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task GetEffectiveConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { // Get stream-specific configuration if it exists var streamConfig = await _store.GetConfigurationAsync(streamName, cancellationToken); // If no stream-specific config exists, return defaults if (streamConfig == null) { _logger.LogDebug("No stream-specific configuration for {StreamName}, using defaults", streamName); return CreateDefaultConfiguration(streamName); } // Merge with defaults (stream-specific takes precedence) _logger.LogDebug("Using stream-specific configuration for {StreamName}", streamName); return streamConfig; } public async Task GetRetentionConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken); return config.Retention; } public async Task GetDeadLetterQueueConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken); return config.DeadLetterQueue; } public async Task GetLifecycleConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken); return config.Lifecycle; } private static StreamConfiguration CreateDefaultConfiguration(string streamName) { return new StreamConfiguration { StreamName = streamName, Description = null, Tags = null, Retention = null, DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = false }, Lifecycle = new LifecycleConfiguration { AutoCreate = true, AutoArchive = false, AutoDelete = false }, Performance = null, AccessControl = new AccessControlConfiguration { PublicRead = false, PublicWrite = false }, CreatedAt = DateTimeOffset.UtcNow }; } }