using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Streaming; using Svrnty.CQRS.Events.Abstractions.Configuration; using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL implementation of stream configuration store. /// public class PostgresStreamConfigurationStore : IStreamConfigurationStore { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresStreamConfigurationStore( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task GetConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { const string sql = @" SELECT * FROM event_streaming.stream_configurations WHERE stream_name = @StreamName"; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@StreamName", streamName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); if (await reader.ReadAsync(cancellationToken)) { return MapToStreamConfiguration(reader); } _logger.LogDebug("No configuration found for stream {StreamName}", streamName); return null; } public async Task> GetAllConfigurationsAsync( CancellationToken cancellationToken = default) { const string sql = "SELECT * FROM event_streaming.stream_configurations ORDER BY stream_name"; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var command = new NpgsqlCommand(sql, connection); await using var reader = await command.ExecuteReaderAsync(cancellationToken); var configurations = new List(); while (await reader.ReadAsync(cancellationToken)) { configurations.Add(MapToStreamConfiguration(reader)); } _logger.LogDebug("Retrieved {Count} stream configurations", configurations.Count); return configurations; } public async Task SetConfigurationAsync( StreamConfiguration configuration, CancellationToken cancellationToken = default) { if (configuration == null) throw new ArgumentNullException(nameof(configuration)); configuration.Validate(); const string sql = @" INSERT INTO event_streaming.stream_configurations ( stream_name, description, tags, retention_max_age_seconds, retention_max_size_bytes, retention_max_event_count, retention_enable_partitioning, retention_partition_interval_seconds, dlq_enabled, dlq_stream_name, dlq_max_delivery_attempts, dlq_retry_delay_seconds, dlq_store_original_event, dlq_store_error_details, lifecycle_auto_create, lifecycle_auto_archive, lifecycle_archive_after_seconds, lifecycle_archive_location, lifecycle_auto_delete, lifecycle_delete_after_seconds, performance_batch_size, performance_enable_compression, performance_compression_algorithm, performance_enable_indexing, performance_indexed_fields, performance_cache_size, access_public_read, access_public_write, access_allowed_readers, access_allowed_writers, access_max_consumer_groups, access_max_events_per_second, created_at, updated_at, created_by, updated_by ) VALUES ( @StreamName, @Description, @Tags::jsonb, @RetentionMaxAge, @RetentionMaxSize, @RetentionMaxCount, @RetentionPartitioning, @RetentionPartitionInterval, @DlqEnabled, @DlqStreamName, @DlqMaxAttempts, @DlqRetryDelay, @DlqStoreOriginal, @DlqStoreError, @LifecycleAutoCreate, @LifecycleAutoArchive, @LifecycleArchiveAfter, @LifecycleArchiveLocation, @LifecycleAutoDelete, @LifecycleDeleteAfter, @PerfBatchSize, @PerfCompression, @PerfCompressionAlgorithm, @PerfIndexing, @PerfIndexedFields::jsonb, @PerfCacheSize, @AccessPublicRead, @AccessPublicWrite, @AccessReaders::jsonb, @AccessWriters::jsonb, @AccessMaxConsumerGroups, @AccessMaxEventsPerSecond, @CreatedAt, @UpdatedAt, @CreatedBy, @UpdatedBy ) ON CONFLICT (stream_name) DO UPDATE SET description = EXCLUDED.description, tags = EXCLUDED.tags, retention_max_age_seconds = EXCLUDED.retention_max_age_seconds, retention_max_size_bytes = EXCLUDED.retention_max_size_bytes, retention_max_event_count = EXCLUDED.retention_max_event_count, retention_enable_partitioning = EXCLUDED.retention_enable_partitioning, retention_partition_interval_seconds = EXCLUDED.retention_partition_interval_seconds, dlq_enabled = EXCLUDED.dlq_enabled, dlq_stream_name = EXCLUDED.dlq_stream_name, dlq_max_delivery_attempts = EXCLUDED.dlq_max_delivery_attempts, dlq_retry_delay_seconds = EXCLUDED.dlq_retry_delay_seconds, dlq_store_original_event = EXCLUDED.dlq_store_original_event, dlq_store_error_details = EXCLUDED.dlq_store_error_details, lifecycle_auto_create = EXCLUDED.lifecycle_auto_create, lifecycle_auto_archive = EXCLUDED.lifecycle_auto_archive, lifecycle_archive_after_seconds = EXCLUDED.lifecycle_archive_after_seconds, lifecycle_archive_location = EXCLUDED.lifecycle_archive_location, lifecycle_auto_delete = EXCLUDED.lifecycle_auto_delete, lifecycle_delete_after_seconds = EXCLUDED.lifecycle_delete_after_seconds, performance_batch_size = EXCLUDED.performance_batch_size, performance_enable_compression = EXCLUDED.performance_enable_compression, performance_compression_algorithm = EXCLUDED.performance_compression_algorithm, performance_enable_indexing = EXCLUDED.performance_enable_indexing, performance_indexed_fields = EXCLUDED.performance_indexed_fields, performance_cache_size = EXCLUDED.performance_cache_size, access_public_read = EXCLUDED.access_public_read, access_public_write = EXCLUDED.access_public_write, access_allowed_readers = EXCLUDED.access_allowed_readers, access_allowed_writers = EXCLUDED.access_allowed_writers, access_max_consumer_groups = EXCLUDED.access_max_consumer_groups, access_max_events_per_second = EXCLUDED.access_max_events_per_second, updated_at = EXCLUDED.updated_at, updated_by = EXCLUDED.updated_by"; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var command = new NpgsqlCommand(sql, connection); // Basic fields command.Parameters.AddWithValue("@StreamName", configuration.StreamName); command.Parameters.AddWithValue("@Description", (object?)configuration.Description ?? DBNull.Value); command.Parameters.AddWithValue("@Tags", configuration.Tags != null ? JsonSerializer.Serialize(configuration.Tags) : DBNull.Value); // Retention var retention = configuration.Retention; command.Parameters.AddWithValue("@RetentionMaxAge", retention?.MaxAge?.TotalSeconds ?? (object)DBNull.Value); command.Parameters.AddWithValue("@RetentionMaxSize", retention?.MaxSizeBytes ?? (object)DBNull.Value); command.Parameters.AddWithValue("@RetentionMaxCount", retention?.MaxEventCount ?? (object)DBNull.Value); command.Parameters.AddWithValue("@RetentionPartitioning", retention?.EnablePartitioning ?? (object)DBNull.Value); command.Parameters.AddWithValue("@RetentionPartitionInterval", retention?.PartitionInterval?.TotalSeconds ?? (object)DBNull.Value); // DLQ var dlq = configuration.DeadLetterQueue; command.Parameters.AddWithValue("@DlqEnabled", dlq?.Enabled ?? false); command.Parameters.AddWithValue("@DlqStreamName", (object?)dlq?.DeadLetterStreamName ?? DBNull.Value); command.Parameters.AddWithValue("@DlqMaxAttempts", dlq?.MaxDeliveryAttempts ?? 3); command.Parameters.AddWithValue("@DlqRetryDelay", dlq?.RetryDelay?.TotalSeconds ?? (object)DBNull.Value); command.Parameters.AddWithValue("@DlqStoreOriginal", dlq?.StoreOriginalEvent ?? (object)DBNull.Value); command.Parameters.AddWithValue("@DlqStoreError", dlq?.StoreErrorDetails ?? (object)DBNull.Value); // Lifecycle var lifecycle = configuration.Lifecycle; command.Parameters.AddWithValue("@LifecycleAutoCreate", lifecycle?.AutoCreate ?? true); command.Parameters.AddWithValue("@LifecycleAutoArchive", lifecycle?.AutoArchive ?? false); command.Parameters.AddWithValue("@LifecycleArchiveAfter", lifecycle?.ArchiveAfter?.TotalSeconds ?? (object)DBNull.Value); command.Parameters.AddWithValue("@LifecycleArchiveLocation", (object?)lifecycle?.ArchiveLocation ?? DBNull.Value); command.Parameters.AddWithValue("@LifecycleAutoDelete", lifecycle?.AutoDelete ?? false); command.Parameters.AddWithValue("@LifecycleDeleteAfter", lifecycle?.DeleteAfter?.TotalSeconds ?? (object)DBNull.Value); // Performance var perf = configuration.Performance; command.Parameters.AddWithValue("@PerfBatchSize", perf?.BatchSize ?? (object)DBNull.Value); command.Parameters.AddWithValue("@PerfCompression", perf?.EnableCompression ?? (object)DBNull.Value); command.Parameters.AddWithValue("@PerfCompressionAlgorithm", (object?)perf?.CompressionAlgorithm ?? DBNull.Value); command.Parameters.AddWithValue("@PerfIndexing", perf?.EnableIndexing ?? (object)DBNull.Value); command.Parameters.AddWithValue("@PerfIndexedFields", perf?.IndexedFields != null ? JsonSerializer.Serialize(perf.IndexedFields) : DBNull.Value); command.Parameters.AddWithValue("@PerfCacheSize", perf?.CacheSize ?? (object)DBNull.Value); // Access Control var access = configuration.AccessControl; command.Parameters.AddWithValue("@AccessPublicRead", access?.PublicRead ?? false); command.Parameters.AddWithValue("@AccessPublicWrite", access?.PublicWrite ?? false); command.Parameters.AddWithValue("@AccessReaders", access?.AllowedReaders != null ? JsonSerializer.Serialize(access.AllowedReaders) : DBNull.Value); command.Parameters.AddWithValue("@AccessWriters", access?.AllowedWriters != null ? JsonSerializer.Serialize(access.AllowedWriters) : DBNull.Value); command.Parameters.AddWithValue("@AccessMaxConsumerGroups", access?.MaxConsumerGroups ?? (object)DBNull.Value); command.Parameters.AddWithValue("@AccessMaxEventsPerSecond", access?.MaxEventsPerSecond ?? (object)DBNull.Value); // Metadata command.Parameters.AddWithValue("@CreatedAt", configuration.CreatedAt != default ? configuration.CreatedAt : DateTimeOffset.UtcNow); command.Parameters.AddWithValue("@UpdatedAt", configuration.UpdatedAt ?? DateTimeOffset.UtcNow); command.Parameters.AddWithValue("@CreatedBy", (object?)configuration.CreatedBy ?? DBNull.Value); command.Parameters.AddWithValue("@UpdatedBy", (object?)configuration.UpdatedBy ?? DBNull.Value); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogInformation("Set configuration for stream {StreamName}", configuration.StreamName); } public async Task DeleteConfigurationAsync( string streamName, CancellationToken cancellationToken = default) { const string sql = @" DELETE FROM event_streaming.stream_configurations WHERE stream_name = @StreamName"; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@StreamName", streamName); var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken); if (rowsAffected > 0) { _logger.LogInformation("Deleted configuration for stream {StreamName}", streamName); } else { _logger.LogDebug("No configuration found to delete for stream {StreamName}", streamName); } } public async Task> FindConfigurationsAsync( Func predicate, CancellationToken cancellationToken = default) { var allConfigurations = await GetAllConfigurationsAsync(cancellationToken); return allConfigurations.Where(predicate).ToList(); } private static StreamConfiguration MapToStreamConfiguration(NpgsqlDataReader reader) { var config = new StreamConfiguration { StreamName = reader.GetString(reader.GetOrdinal("stream_name")), Description = reader.IsDBNull(reader.GetOrdinal("description")) ? null : reader.GetString(reader.GetOrdinal("description")), Tags = reader.IsDBNull(reader.GetOrdinal("tags")) ? null : JsonSerializer.Deserialize>( reader.GetString(reader.GetOrdinal("tags"))), CreatedAt = reader.GetFieldValue(reader.GetOrdinal("created_at")), UpdatedAt = reader.IsDBNull(reader.GetOrdinal("updated_at")) ? null : reader.GetFieldValue(reader.GetOrdinal("updated_at")), CreatedBy = reader.IsDBNull(reader.GetOrdinal("created_by")) ? null : reader.GetString(reader.GetOrdinal("created_by")), UpdatedBy = reader.IsDBNull(reader.GetOrdinal("updated_by")) ? null : reader.GetString(reader.GetOrdinal("updated_by")) }; // Map retention configuration if (!reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds")) || !reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes")) || !reader.IsDBNull(reader.GetOrdinal("retention_max_event_count"))) { config.Retention = new RetentionConfiguration { MaxAge = reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds")) ? null : TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_max_age_seconds"))), MaxSizeBytes = reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes")) ? null : reader.GetInt64(reader.GetOrdinal("retention_max_size_bytes")), MaxEventCount = reader.IsDBNull(reader.GetOrdinal("retention_max_event_count")) ? null : reader.GetInt64(reader.GetOrdinal("retention_max_event_count")), EnablePartitioning = reader.IsDBNull(reader.GetOrdinal("retention_enable_partitioning")) ? null : reader.GetBoolean(reader.GetOrdinal("retention_enable_partitioning")), PartitionInterval = reader.IsDBNull(reader.GetOrdinal("retention_partition_interval_seconds")) ? null : TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_partition_interval_seconds"))) }; } // Map DLQ configuration var dlqEnabled = !reader.IsDBNull(reader.GetOrdinal("dlq_enabled")) && reader.GetBoolean(reader.GetOrdinal("dlq_enabled")); if (dlqEnabled) { config.DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = true, DeadLetterStreamName = reader.IsDBNull(reader.GetOrdinal("dlq_stream_name")) ? null : reader.GetString(reader.GetOrdinal("dlq_stream_name")), MaxDeliveryAttempts = reader.GetInt32(reader.GetOrdinal("dlq_max_delivery_attempts")), RetryDelay = reader.IsDBNull(reader.GetOrdinal("dlq_retry_delay_seconds")) ? null : TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("dlq_retry_delay_seconds"))), StoreOriginalEvent = reader.IsDBNull(reader.GetOrdinal("dlq_store_original_event")) ? null : reader.GetBoolean(reader.GetOrdinal("dlq_store_original_event")), StoreErrorDetails = reader.IsDBNull(reader.GetOrdinal("dlq_store_error_details")) ? null : reader.GetBoolean(reader.GetOrdinal("dlq_store_error_details")) }; } // Map lifecycle configuration (always create since fields have defaults) config.Lifecycle = new LifecycleConfiguration { AutoCreate = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_create")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_create")), AutoArchive = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_archive")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_archive")), ArchiveAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_after_seconds")) ? null : TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_archive_after_seconds"))), ArchiveLocation = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_location")) ? null : reader.GetString(reader.GetOrdinal("lifecycle_archive_location")), AutoDelete = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_delete")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_delete")), DeleteAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_delete_after_seconds")) ? null : TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_delete_after_seconds"))) }; // Map performance configuration if (!reader.IsDBNull(reader.GetOrdinal("performance_batch_size")) || !reader.IsDBNull(reader.GetOrdinal("performance_enable_compression"))) { config.Performance = new PerformanceConfiguration { BatchSize = reader.IsDBNull(reader.GetOrdinal("performance_batch_size")) ? null : reader.GetInt32(reader.GetOrdinal("performance_batch_size")), EnableCompression = reader.IsDBNull(reader.GetOrdinal("performance_enable_compression")) ? null : reader.GetBoolean(reader.GetOrdinal("performance_enable_compression")), CompressionAlgorithm = reader.IsDBNull(reader.GetOrdinal("performance_compression_algorithm")) ? null : reader.GetString(reader.GetOrdinal("performance_compression_algorithm")), EnableIndexing = reader.IsDBNull(reader.GetOrdinal("performance_enable_indexing")) ? null : reader.GetBoolean(reader.GetOrdinal("performance_enable_indexing")), IndexedFields = reader.IsDBNull(reader.GetOrdinal("performance_indexed_fields")) ? null : JsonSerializer.Deserialize>( reader.GetString(reader.GetOrdinal("performance_indexed_fields"))), CacheSize = reader.IsDBNull(reader.GetOrdinal("performance_cache_size")) ? null : reader.GetInt32(reader.GetOrdinal("performance_cache_size")) }; } // Map access control configuration (always create since fields have defaults) config.AccessControl = new AccessControlConfiguration { PublicRead = !reader.IsDBNull(reader.GetOrdinal("access_public_read")) && reader.GetBoolean(reader.GetOrdinal("access_public_read")), PublicWrite = !reader.IsDBNull(reader.GetOrdinal("access_public_write")) && reader.GetBoolean(reader.GetOrdinal("access_public_write")), AllowedReaders = reader.IsDBNull(reader.GetOrdinal("access_allowed_readers")) ? null : JsonSerializer.Deserialize>( reader.GetString(reader.GetOrdinal("access_allowed_readers"))), AllowedWriters = reader.IsDBNull(reader.GetOrdinal("access_allowed_writers")) ? null : JsonSerializer.Deserialize>( reader.GetString(reader.GetOrdinal("access_allowed_writers"))), MaxConsumerGroups = reader.IsDBNull(reader.GetOrdinal("access_max_consumer_groups")) ? null : reader.GetInt32(reader.GetOrdinal("access_max_consumer_groups")), MaxEventsPerSecond = reader.IsDBNull(reader.GetOrdinal("access_max_events_per_second")) ? null : reader.GetInt64(reader.GetOrdinal("access_max_events_per_second")) }; return config; } }