using System;
using Svrnty.CQRS.Events.PostgreSQL.Stores;
using Svrnty.CQRS.Events.Abstractions.Schema;
using Svrnty.CQRS.Events.Abstractions.Models;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
///
/// PostgreSQL implementation of .
///
///
/// Stores event schema information in a PostgreSQL table for centralized schema management.
///
public sealed class PostgresSchemaStore : ISchemaStore
{
private readonly string _connectionString;
private readonly string _schemaName;
private readonly ILogger _logger;
public PostgresSchemaStore(
string connectionString,
string schemaName,
ILogger logger)
{
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_schemaName = schemaName ?? throw new ArgumentNullException(nameof(schemaName));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task StoreSchemaAsync(
SchemaInfo schema,
CancellationToken cancellationToken = default)
{
schema.Validate();
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
INSERT INTO {_schemaName}.event_schemas
(event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at)
VALUES
(@EventType, @Version, @ClrTypeName, @JsonSchema, @UpcastFromType, @UpcastFromVersion, @RegisteredAt)";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@EventType", schema.EventType);
cmd.Parameters.AddWithValue("@Version", schema.Version);
cmd.Parameters.AddWithValue("@ClrTypeName", schema.ClrType.AssemblyQualifiedName ?? schema.ClrType.FullName ?? schema.ClrType.Name);
cmd.Parameters.AddWithValue("@JsonSchema", (object?)schema.JsonSchema ?? DBNull.Value);
cmd.Parameters.AddWithValue("@UpcastFromType", (object?)schema.UpcastFromType?.AssemblyQualifiedName ?? DBNull.Value);
cmd.Parameters.AddWithValue("@UpcastFromVersion", (object?)schema.UpcastFromVersion ?? DBNull.Value);
cmd.Parameters.AddWithValue("@RegisteredAt", schema.RegisteredAt);
try
{
await cmd.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation(
"Stored schema {EventType} v{Version} in PostgreSQL",
schema.EventType,
schema.Version);
}
catch (PostgresException ex) when (ex.SqlState == "23505") // Unique violation
{
throw new InvalidOperationException(
$"Schema for {schema.EventType} v{schema.Version} already exists",
ex);
}
}
public async Task GetSchemaAsync(
string eventType,
int version,
CancellationToken cancellationToken = default)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
SELECT event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at
FROM {_schemaName}.event_schemas
WHERE event_type = @EventType AND version = @Version";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@EventType", eventType);
cmd.Parameters.AddWithValue("@Version", version);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
return null;
return ReadSchemaInfo(reader);
}
public async Task> GetSchemaHistoryAsync(
string eventType,
CancellationToken cancellationToken = default)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
SELECT event_type, version, clr_type_name, json_schema, upcast_from_type, upcast_from_version, registered_at
FROM {_schemaName}.event_schemas
WHERE event_type = @EventType
ORDER BY version ASC";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@EventType", eventType);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var schemas = new List();
while (await reader.ReadAsync(cancellationToken))
{
schemas.Add(ReadSchemaInfo(reader));
}
return schemas;
}
public async Task GetLatestVersionAsync(
string eventType,
CancellationToken cancellationToken = default)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
SELECT MAX(version)
FROM {_schemaName}.event_schemas
WHERE event_type = @EventType";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@EventType", eventType);
var result = await cmd.ExecuteScalarAsync(cancellationToken);
return result != DBNull.Value && result != null ? (int)result : null;
}
public async Task> GetAllEventTypesAsync(
CancellationToken cancellationToken = default)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
SELECT DISTINCT event_type
FROM {_schemaName}.event_schemas
ORDER BY event_type";
await using var cmd = new NpgsqlCommand(sql, conn);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var eventTypes = new List();
while (await reader.ReadAsync(cancellationToken))
{
eventTypes.Add(reader.GetString(0));
}
return eventTypes;
}
public async Task SchemaExistsAsync(
string eventType,
int version,
CancellationToken cancellationToken = default)
{
await using var conn = new NpgsqlConnection(_connectionString);
await conn.OpenAsync(cancellationToken);
var sql = $@"
SELECT COUNT(*)
FROM {_schemaName}.event_schemas
WHERE event_type = @EventType AND version = @Version";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@EventType", eventType);
cmd.Parameters.AddWithValue("@Version", version);
var count = (long)(await cmd.ExecuteScalarAsync(cancellationToken) ?? 0L);
return count > 0;
}
private static SchemaInfo ReadSchemaInfo(NpgsqlDataReader reader)
{
var eventType = reader.GetString(0);
var version = reader.GetInt32(1);
var clrTypeName = reader.GetString(2);
var jsonSchema = reader.IsDBNull(3) ? null : reader.GetString(3);
var upcastFromTypeName = reader.IsDBNull(4) ? null : reader.GetString(4);
var upcastFromVersion = reader.IsDBNull(5) ? null : (int?)reader.GetInt32(5);
var registeredAt = reader.GetFieldValue(6);
// Resolve CLR types
var clrType = Type.GetType(clrTypeName)
?? throw new InvalidOperationException($"Could not resolve CLR type: {clrTypeName}");
Type? upcastFromType = null;
if (upcastFromTypeName != null)
{
upcastFromType = Type.GetType(upcastFromTypeName)
?? throw new InvalidOperationException($"Could not resolve upcast from type: {upcastFromTypeName}");
}
return new SchemaInfo(
eventType,
version,
clrType,
jsonSchema,
upcastFromType,
upcastFromVersion,
registeredAt);
}
}