dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Stores/PostgresStreamConfigurationStore.cs

400 lines
22 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Streaming;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
/// <summary>
/// PostgreSQL implementation of stream configuration store.
/// </summary>
public class PostgresStreamConfigurationStore : IStreamConfigurationStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<PostgresStreamConfigurationStore> _logger;
public PostgresStreamConfigurationStore(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<PostgresStreamConfigurationStore> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<StreamConfiguration?> GetConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
const string sql = @"
SELECT * FROM event_streaming.stream_configurations
WHERE stream_name = @StreamName";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@StreamName", streamName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
return MapToStreamConfiguration(reader);
}
_logger.LogDebug("No configuration found for stream {StreamName}", streamName);
return null;
}
public async Task<IReadOnlyList<StreamConfiguration>> GetAllConfigurationsAsync(
CancellationToken cancellationToken = default)
{
const string sql = "SELECT * FROM event_streaming.stream_configurations ORDER BY stream_name";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var configurations = new List<StreamConfiguration>();
while (await reader.ReadAsync(cancellationToken))
{
configurations.Add(MapToStreamConfiguration(reader));
}
_logger.LogDebug("Retrieved {Count} stream configurations", configurations.Count);
return configurations;
}
public async Task SetConfigurationAsync(
StreamConfiguration configuration,
CancellationToken cancellationToken = default)
{
if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
configuration.Validate();
const string sql = @"
INSERT INTO event_streaming.stream_configurations (
stream_name, description, tags,
retention_max_age_seconds, retention_max_size_bytes, retention_max_event_count,
retention_enable_partitioning, retention_partition_interval_seconds,
dlq_enabled, dlq_stream_name, dlq_max_delivery_attempts,
dlq_retry_delay_seconds, dlq_store_original_event, dlq_store_error_details,
lifecycle_auto_create, lifecycle_auto_archive, lifecycle_archive_after_seconds,
lifecycle_archive_location, lifecycle_auto_delete, lifecycle_delete_after_seconds,
performance_batch_size, performance_enable_compression, performance_compression_algorithm,
performance_enable_indexing, performance_indexed_fields, performance_cache_size,
access_public_read, access_public_write, access_allowed_readers, access_allowed_writers,
access_max_consumer_groups, access_max_events_per_second,
created_at, updated_at, created_by, updated_by
)
VALUES (
@StreamName, @Description, @Tags::jsonb,
@RetentionMaxAge, @RetentionMaxSize, @RetentionMaxCount,
@RetentionPartitioning, @RetentionPartitionInterval,
@DlqEnabled, @DlqStreamName, @DlqMaxAttempts,
@DlqRetryDelay, @DlqStoreOriginal, @DlqStoreError,
@LifecycleAutoCreate, @LifecycleAutoArchive, @LifecycleArchiveAfter,
@LifecycleArchiveLocation, @LifecycleAutoDelete, @LifecycleDeleteAfter,
@PerfBatchSize, @PerfCompression, @PerfCompressionAlgorithm,
@PerfIndexing, @PerfIndexedFields::jsonb, @PerfCacheSize,
@AccessPublicRead, @AccessPublicWrite, @AccessReaders::jsonb, @AccessWriters::jsonb,
@AccessMaxConsumerGroups, @AccessMaxEventsPerSecond,
@CreatedAt, @UpdatedAt, @CreatedBy, @UpdatedBy
)
ON CONFLICT (stream_name) DO UPDATE SET
description = EXCLUDED.description,
tags = EXCLUDED.tags,
retention_max_age_seconds = EXCLUDED.retention_max_age_seconds,
retention_max_size_bytes = EXCLUDED.retention_max_size_bytes,
retention_max_event_count = EXCLUDED.retention_max_event_count,
retention_enable_partitioning = EXCLUDED.retention_enable_partitioning,
retention_partition_interval_seconds = EXCLUDED.retention_partition_interval_seconds,
dlq_enabled = EXCLUDED.dlq_enabled,
dlq_stream_name = EXCLUDED.dlq_stream_name,
dlq_max_delivery_attempts = EXCLUDED.dlq_max_delivery_attempts,
dlq_retry_delay_seconds = EXCLUDED.dlq_retry_delay_seconds,
dlq_store_original_event = EXCLUDED.dlq_store_original_event,
dlq_store_error_details = EXCLUDED.dlq_store_error_details,
lifecycle_auto_create = EXCLUDED.lifecycle_auto_create,
lifecycle_auto_archive = EXCLUDED.lifecycle_auto_archive,
lifecycle_archive_after_seconds = EXCLUDED.lifecycle_archive_after_seconds,
lifecycle_archive_location = EXCLUDED.lifecycle_archive_location,
lifecycle_auto_delete = EXCLUDED.lifecycle_auto_delete,
lifecycle_delete_after_seconds = EXCLUDED.lifecycle_delete_after_seconds,
performance_batch_size = EXCLUDED.performance_batch_size,
performance_enable_compression = EXCLUDED.performance_enable_compression,
performance_compression_algorithm = EXCLUDED.performance_compression_algorithm,
performance_enable_indexing = EXCLUDED.performance_enable_indexing,
performance_indexed_fields = EXCLUDED.performance_indexed_fields,
performance_cache_size = EXCLUDED.performance_cache_size,
access_public_read = EXCLUDED.access_public_read,
access_public_write = EXCLUDED.access_public_write,
access_allowed_readers = EXCLUDED.access_allowed_readers,
access_allowed_writers = EXCLUDED.access_allowed_writers,
access_max_consumer_groups = EXCLUDED.access_max_consumer_groups,
access_max_events_per_second = EXCLUDED.access_max_events_per_second,
updated_at = EXCLUDED.updated_at,
updated_by = EXCLUDED.updated_by";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
// Basic fields
command.Parameters.AddWithValue("@StreamName", configuration.StreamName);
command.Parameters.AddWithValue("@Description", (object?)configuration.Description ?? DBNull.Value);
command.Parameters.AddWithValue("@Tags", configuration.Tags != null
? JsonSerializer.Serialize(configuration.Tags)
: DBNull.Value);
// Retention
var retention = configuration.Retention;
command.Parameters.AddWithValue("@RetentionMaxAge", retention?.MaxAge?.TotalSeconds ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@RetentionMaxSize", retention?.MaxSizeBytes ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@RetentionMaxCount", retention?.MaxEventCount ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@RetentionPartitioning", retention?.EnablePartitioning ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@RetentionPartitionInterval", retention?.PartitionInterval?.TotalSeconds ?? (object)DBNull.Value);
// DLQ
var dlq = configuration.DeadLetterQueue;
command.Parameters.AddWithValue("@DlqEnabled", dlq?.Enabled ?? false);
command.Parameters.AddWithValue("@DlqStreamName", (object?)dlq?.DeadLetterStreamName ?? DBNull.Value);
command.Parameters.AddWithValue("@DlqMaxAttempts", dlq?.MaxDeliveryAttempts ?? 3);
command.Parameters.AddWithValue("@DlqRetryDelay", dlq?.RetryDelay?.TotalSeconds ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@DlqStoreOriginal", dlq?.StoreOriginalEvent ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@DlqStoreError", dlq?.StoreErrorDetails ?? (object)DBNull.Value);
// Lifecycle
var lifecycle = configuration.Lifecycle;
command.Parameters.AddWithValue("@LifecycleAutoCreate", lifecycle?.AutoCreate ?? true);
command.Parameters.AddWithValue("@LifecycleAutoArchive", lifecycle?.AutoArchive ?? false);
command.Parameters.AddWithValue("@LifecycleArchiveAfter", lifecycle?.ArchiveAfter?.TotalSeconds ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@LifecycleArchiveLocation", (object?)lifecycle?.ArchiveLocation ?? DBNull.Value);
command.Parameters.AddWithValue("@LifecycleAutoDelete", lifecycle?.AutoDelete ?? false);
command.Parameters.AddWithValue("@LifecycleDeleteAfter", lifecycle?.DeleteAfter?.TotalSeconds ?? (object)DBNull.Value);
// Performance
var perf = configuration.Performance;
command.Parameters.AddWithValue("@PerfBatchSize", perf?.BatchSize ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@PerfCompression", perf?.EnableCompression ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@PerfCompressionAlgorithm", (object?)perf?.CompressionAlgorithm ?? DBNull.Value);
command.Parameters.AddWithValue("@PerfIndexing", perf?.EnableIndexing ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@PerfIndexedFields", perf?.IndexedFields != null
? JsonSerializer.Serialize(perf.IndexedFields)
: DBNull.Value);
command.Parameters.AddWithValue("@PerfCacheSize", perf?.CacheSize ?? (object)DBNull.Value);
// Access Control
var access = configuration.AccessControl;
command.Parameters.AddWithValue("@AccessPublicRead", access?.PublicRead ?? false);
command.Parameters.AddWithValue("@AccessPublicWrite", access?.PublicWrite ?? false);
command.Parameters.AddWithValue("@AccessReaders", access?.AllowedReaders != null
? JsonSerializer.Serialize(access.AllowedReaders)
: DBNull.Value);
command.Parameters.AddWithValue("@AccessWriters", access?.AllowedWriters != null
? JsonSerializer.Serialize(access.AllowedWriters)
: DBNull.Value);
command.Parameters.AddWithValue("@AccessMaxConsumerGroups", access?.MaxConsumerGroups ?? (object)DBNull.Value);
command.Parameters.AddWithValue("@AccessMaxEventsPerSecond", access?.MaxEventsPerSecond ?? (object)DBNull.Value);
// Metadata
command.Parameters.AddWithValue("@CreatedAt", configuration.CreatedAt != default ? configuration.CreatedAt : DateTimeOffset.UtcNow);
command.Parameters.AddWithValue("@UpdatedAt", configuration.UpdatedAt ?? DateTimeOffset.UtcNow);
command.Parameters.AddWithValue("@CreatedBy", (object?)configuration.CreatedBy ?? DBNull.Value);
command.Parameters.AddWithValue("@UpdatedBy", (object?)configuration.UpdatedBy ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation("Set configuration for stream {StreamName}", configuration.StreamName);
}
public async Task DeleteConfigurationAsync(
string streamName,
CancellationToken cancellationToken = default)
{
const string sql = @"
DELETE FROM event_streaming.stream_configurations
WHERE stream_name = @StreamName";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@StreamName", streamName);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
if (rowsAffected > 0)
{
_logger.LogInformation("Deleted configuration for stream {StreamName}", streamName);
}
else
{
_logger.LogDebug("No configuration found to delete for stream {StreamName}", streamName);
}
}
public async Task<IReadOnlyList<StreamConfiguration>> FindConfigurationsAsync(
Func<StreamConfiguration, bool> predicate,
CancellationToken cancellationToken = default)
{
var allConfigurations = await GetAllConfigurationsAsync(cancellationToken);
return allConfigurations.Where(predicate).ToList();
}
private static StreamConfiguration MapToStreamConfiguration(NpgsqlDataReader reader)
{
var config = new StreamConfiguration
{
StreamName = reader.GetString(reader.GetOrdinal("stream_name")),
Description = reader.IsDBNull(reader.GetOrdinal("description"))
? null
: reader.GetString(reader.GetOrdinal("description")),
Tags = reader.IsDBNull(reader.GetOrdinal("tags"))
? null
: JsonSerializer.Deserialize<Dictionary<string, string>>(
reader.GetString(reader.GetOrdinal("tags"))),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
UpdatedAt = reader.IsDBNull(reader.GetOrdinal("updated_at"))
? null
: reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("updated_at")),
CreatedBy = reader.IsDBNull(reader.GetOrdinal("created_by"))
? null
: reader.GetString(reader.GetOrdinal("created_by")),
UpdatedBy = reader.IsDBNull(reader.GetOrdinal("updated_by"))
? null
: reader.GetString(reader.GetOrdinal("updated_by"))
};
// Map retention configuration
if (!reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds")) ||
!reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes")) ||
!reader.IsDBNull(reader.GetOrdinal("retention_max_event_count")))
{
config.Retention = new RetentionConfiguration
{
MaxAge = reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds"))
? null
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_max_age_seconds"))),
MaxSizeBytes = reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes"))
? null
: reader.GetInt64(reader.GetOrdinal("retention_max_size_bytes")),
MaxEventCount = reader.IsDBNull(reader.GetOrdinal("retention_max_event_count"))
? null
: reader.GetInt64(reader.GetOrdinal("retention_max_event_count")),
EnablePartitioning = reader.IsDBNull(reader.GetOrdinal("retention_enable_partitioning"))
? null
: reader.GetBoolean(reader.GetOrdinal("retention_enable_partitioning")),
PartitionInterval = reader.IsDBNull(reader.GetOrdinal("retention_partition_interval_seconds"))
? null
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_partition_interval_seconds")))
};
}
// Map DLQ configuration
var dlqEnabled = !reader.IsDBNull(reader.GetOrdinal("dlq_enabled")) && reader.GetBoolean(reader.GetOrdinal("dlq_enabled"));
if (dlqEnabled)
{
config.DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = reader.IsDBNull(reader.GetOrdinal("dlq_stream_name"))
? null
: reader.GetString(reader.GetOrdinal("dlq_stream_name")),
MaxDeliveryAttempts = reader.GetInt32(reader.GetOrdinal("dlq_max_delivery_attempts")),
RetryDelay = reader.IsDBNull(reader.GetOrdinal("dlq_retry_delay_seconds"))
? null
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("dlq_retry_delay_seconds"))),
StoreOriginalEvent = reader.IsDBNull(reader.GetOrdinal("dlq_store_original_event"))
? null
: reader.GetBoolean(reader.GetOrdinal("dlq_store_original_event")),
StoreErrorDetails = reader.IsDBNull(reader.GetOrdinal("dlq_store_error_details"))
? null
: reader.GetBoolean(reader.GetOrdinal("dlq_store_error_details"))
};
}
// Map lifecycle configuration (always create since fields have defaults)
config.Lifecycle = new LifecycleConfiguration
{
AutoCreate = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_create")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_create")),
AutoArchive = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_archive")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_archive")),
ArchiveAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_after_seconds"))
? null
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_archive_after_seconds"))),
ArchiveLocation = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_location"))
? null
: reader.GetString(reader.GetOrdinal("lifecycle_archive_location")),
AutoDelete = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_delete")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_delete")),
DeleteAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_delete_after_seconds"))
? null
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_delete_after_seconds")))
};
// Map performance configuration
if (!reader.IsDBNull(reader.GetOrdinal("performance_batch_size")) ||
!reader.IsDBNull(reader.GetOrdinal("performance_enable_compression")))
{
config.Performance = new PerformanceConfiguration
{
BatchSize = reader.IsDBNull(reader.GetOrdinal("performance_batch_size"))
? null
: reader.GetInt32(reader.GetOrdinal("performance_batch_size")),
EnableCompression = reader.IsDBNull(reader.GetOrdinal("performance_enable_compression"))
? null
: reader.GetBoolean(reader.GetOrdinal("performance_enable_compression")),
CompressionAlgorithm = reader.IsDBNull(reader.GetOrdinal("performance_compression_algorithm"))
? null
: reader.GetString(reader.GetOrdinal("performance_compression_algorithm")),
EnableIndexing = reader.IsDBNull(reader.GetOrdinal("performance_enable_indexing"))
? null
: reader.GetBoolean(reader.GetOrdinal("performance_enable_indexing")),
IndexedFields = reader.IsDBNull(reader.GetOrdinal("performance_indexed_fields"))
? null
: JsonSerializer.Deserialize<List<string>>(
reader.GetString(reader.GetOrdinal("performance_indexed_fields"))),
CacheSize = reader.IsDBNull(reader.GetOrdinal("performance_cache_size"))
? null
: reader.GetInt32(reader.GetOrdinal("performance_cache_size"))
};
}
// Map access control configuration (always create since fields have defaults)
config.AccessControl = new AccessControlConfiguration
{
PublicRead = !reader.IsDBNull(reader.GetOrdinal("access_public_read")) && reader.GetBoolean(reader.GetOrdinal("access_public_read")),
PublicWrite = !reader.IsDBNull(reader.GetOrdinal("access_public_write")) && reader.GetBoolean(reader.GetOrdinal("access_public_write")),
AllowedReaders = reader.IsDBNull(reader.GetOrdinal("access_allowed_readers"))
? null
: JsonSerializer.Deserialize<List<string>>(
reader.GetString(reader.GetOrdinal("access_allowed_readers"))),
AllowedWriters = reader.IsDBNull(reader.GetOrdinal("access_allowed_writers"))
? null
: JsonSerializer.Deserialize<List<string>>(
reader.GetString(reader.GetOrdinal("access_allowed_writers"))),
MaxConsumerGroups = reader.IsDBNull(reader.GetOrdinal("access_max_consumer_groups"))
? null
: reader.GetInt32(reader.GetOrdinal("access_max_consumer_groups")),
MaxEventsPerSecond = reader.IsDBNull(reader.GetOrdinal("access_max_events_per_second"))
? null
: reader.GetInt64(reader.GetOrdinal("access_max_events_per_second"))
};
return config;
}
}