using System; using Svrnty.CQRS.Events.PostgreSQL.Stores; using Svrnty.CQRS.Events.Abstractions.Schema; using Svrnty.CQRS.Events.Abstractions.Models; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Npgsql; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL implementation of . /// /// /// Stores event schema information in a PostgreSQL table for centralized schema management. /// public sealed class PostgresSchemaStore : ISchemaStore { private readonly string _connectionString; private readonly string _schemaName; private readonly ILogger _logger; public PostgresSchemaStore( string connectionString, string schemaName, ILogger logger) { _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); _schemaName = schemaName ?? throw new ArgumentNullException(nameof(schemaName)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task StoreSchemaAsync( SchemaInfo schema, CancellationToken cancellationToken = default) { schema.Validate(); await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" INSERT INTO {_schemaName}.event_schemas (event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at) VALUES (@EventType, @Version, @ClrTypeName, @JsonSchema, @UpcastFromType, @UpcastFromVersion, @RegisteredAt)"; await using var cmd = new NpgsqlCommand(sql, conn); cmd.Parameters.AddWithValue("@EventType", schema.EventType); cmd.Parameters.AddWithValue("@Version", schema.Version); cmd.Parameters.AddWithValue("@ClrTypeName", schema.ClrType.AssemblyQualifiedName ?? schema.ClrType.FullName ?? schema.ClrType.Name); cmd.Parameters.AddWithValue("@JsonSchema", (object?)schema.JsonSchema ?? DBNull.Value); cmd.Parameters.AddWithValue("@UpcastFromType", (object?)schema.UpcastFromType?.AssemblyQualifiedName ?? DBNull.Value); cmd.Parameters.AddWithValue("@UpcastFromVersion", (object?)schema.UpcastFromVersion ?? DBNull.Value); cmd.Parameters.AddWithValue("@RegisteredAt", schema.RegisteredAt); try { await cmd.ExecuteNonQueryAsync(cancellationToken); _logger.LogInformation( "Stored schema {EventType} v{Version} in PostgreSQL", schema.EventType, schema.Version); } catch (PostgresException ex) when (ex.SqlState == "23505") // Unique violation { throw new InvalidOperationException( $"Schema for {schema.EventType} v{schema.Version} already exists", ex); } } public async Task GetSchemaAsync( string eventType, int version, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" SELECT event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at FROM {_schemaName}.event_schemas WHERE event_type = @EventType AND version = @Version"; await using var cmd = new NpgsqlCommand(sql, conn); cmd.Parameters.AddWithValue("@EventType", eventType); cmd.Parameters.AddWithValue("@Version", version); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); if (!await reader.ReadAsync(cancellationToken)) return null; return ReadSchemaInfo(reader); } public async Task> GetSchemaHistoryAsync( string eventType, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" SELECT event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at FROM {_schemaName}.event_schemas WHERE event_type = @EventType ORDER BY version ASC"; await using var cmd = new NpgsqlCommand(sql, conn); cmd.Parameters.AddWithValue("@EventType", eventType); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var schemas = new List(); while (await reader.ReadAsync(cancellationToken)) { schemas.Add(ReadSchemaInfo(reader)); } return schemas; } public async Task GetLatestVersionAsync( string eventType, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" SELECT MAX(version) FROM {_schemaName}.event_schemas WHERE event_type = @EventType"; await using var cmd = new NpgsqlCommand(sql, conn); cmd.Parameters.AddWithValue("@EventType", eventType); var result = await cmd.ExecuteScalarAsync(cancellationToken); return result != DBNull.Value && result != null ? (int)result : null; } public async Task> GetAllEventTypesAsync( CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" SELECT DISTINCT event_type FROM {_schemaName}.event_schemas ORDER BY event_type"; await using var cmd = new NpgsqlCommand(sql, conn); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var eventTypes = new List(); while (await reader.ReadAsync(cancellationToken)) { eventTypes.Add(reader.GetString(0)); } return eventTypes; } public async Task SchemaExistsAsync( string eventType, int version, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var sql = $@" SELECT COUNT(*) FROM {_schemaName}.event_schemas WHERE event_type = @EventType AND version = @Version"; await using var cmd = new NpgsqlCommand(sql, conn); cmd.Parameters.AddWithValue("@EventType", eventType); cmd.Parameters.AddWithValue("@Version", version); var count = (long)(await cmd.ExecuteScalarAsync(cancellationToken) ?? 0L); return count > 0; } private static SchemaInfo ReadSchemaInfo(NpgsqlDataReader reader) { var eventType = reader.GetString(0); var version = reader.GetInt32(1); var clrTypeName = reader.GetString(2); var jsonSchema = reader.IsDBNull(3) ? null : reader.GetString(3); var upcastFromTypeName = reader.IsDBNull(4) ? null : reader.GetString(4); var upcastFromVersion = reader.IsDBNull(5) ? null : (int?)reader.GetInt32(5); var registeredAt = reader.GetFieldValue(6); // Resolve CLR types var clrType = Type.GetType(clrTypeName) ?? throw new InvalidOperationException($"Could not resolve CLR type: {clrTypeName}"); Type? upcastFromType = null; if (upcastFromTypeName != null) { upcastFromType = Type.GetType(upcastFromTypeName) ?? throw new InvalidOperationException($"Could not resolve upcast from type: {upcastFromTypeName}"); } return new SchemaInfo( eventType, version, clrType, jsonSchema, upcastFromType, upcastFromVersion, registeredAt); } }