using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Streaming;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Configuration;
///
/// PostgreSQL implementation of stream configuration provider.
/// Merges stream-specific configuration with global defaults.
///
public class PostgresStreamConfigurationProvider : IStreamConfigurationProvider
{
private readonly IStreamConfigurationStore _store;
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
public PostgresStreamConfigurationProvider(
IStreamConfigurationStore store,
IOptions options,
ILogger logger)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task GetEffectiveConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
// Get stream-specific configuration if it exists
var streamConfig = await _store.GetConfigurationAsync(streamName, cancellationToken);
// If no stream-specific config exists, return defaults
if (streamConfig == null)
{
_logger.LogDebug("No stream-specific configuration for {StreamName}, using defaults", streamName);
return CreateDefaultConfiguration(streamName);
}
// Merge with defaults (stream-specific takes precedence)
_logger.LogDebug("Using stream-specific configuration for {StreamName}", streamName);
return streamConfig;
}
public async Task GetRetentionConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken);
return config.Retention;
}
public async Task GetDeadLetterQueueConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken);
return config.DeadLetterQueue;
}
public async Task GetLifecycleConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
var config = await GetEffectiveConfigurationAsync(streamName, cancellationToken);
return config.Lifecycle;
}
private static StreamConfiguration CreateDefaultConfiguration(string streamName)
{
return new StreamConfiguration
{
StreamName = streamName,
Description = null,
Tags = null,
Retention = null,
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = false
},
Lifecycle = new LifecycleConfiguration
{
AutoCreate = true,
AutoArchive = false,
AutoDelete = false
},
Performance = null,
AccessControl = new AccessControlConfiguration
{
PublicRead = false,
PublicWrite = false
},
CreatedAt = DateTimeOffset.UtcNow
};
}
}