using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; namespace Svrnty.CQRS.Events.PostgreSQL.Migration; /// /// Manages database migrations for event streaming schema. /// public class DatabaseMigrator { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; private static readonly Regex MigrationFilePattern = new Regex(@"^(\d{3})_(.+)\.sql$", RegexOptions.Compiled); public DatabaseMigrator( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// /// Executes all pending migrations. /// public async Task MigrateAsync(CancellationToken cancellationToken = default) { if (!_options.AutoMigrate) { _logger.LogInformation("Auto-migration is disabled. Skipping database migration."); return; } _logger.LogInformation("Starting database migration..."); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); // Ensure schema_version table exists await EnsureVersionTableExistsAsync(connection, cancellationToken); // Get applied versions var appliedVersions = await GetAppliedVersionsAsync(connection, cancellationToken); _logger.LogInformation("Found {Count} previously applied migrations", appliedVersions.Count); // Load all migration files var migrations = LoadMigrations(); _logger.LogInformation("Found {Count} migration files", migrations.Count); // Execute pending migrations var pendingMigrations = migrations .Where(m => !appliedVersions.Contains(m.Version)) .OrderBy(m => m.Version) .ToList(); if (pendingMigrations.Count == 0) { _logger.LogInformation("Database is up to date. No migrations to apply."); return; } _logger.LogInformation("Found {Count} pending migrations to apply", pendingMigrations.Count); foreach (var migration in pendingMigrations) { await ExecuteMigrationAsync(connection, migration, cancellationToken); } _logger.LogInformation("Database migration completed successfully"); } /// /// Gets the current database schema version. /// public async Task GetCurrentVersionAsync(CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await EnsureVersionTableExistsAsync(connection, cancellationToken); var appliedVersions = await GetAppliedVersionsAsync(connection, cancellationToken); return appliedVersions.Count > 0 ? appliedVersions.Max() : 0; } private async Task EnsureVersionTableExistsAsync(NpgsqlConnection connection, CancellationToken cancellationToken) { const string sql = @" CREATE SCHEMA IF NOT EXISTS event_streaming; CREATE TABLE IF NOT EXISTS event_streaming.schema_version ( version INT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), description TEXT NOT NULL );"; await using var command = new NpgsqlCommand(sql, connection); await command.ExecuteNonQueryAsync(cancellationToken); } private async Task> GetAppliedVersionsAsync(NpgsqlConnection connection, CancellationToken cancellationToken) { const string sql = "SELECT version FROM event_streaming.schema_version ORDER BY version"; await using var command = new NpgsqlCommand(sql, connection); await using var reader = await command.ExecuteReaderAsync(cancellationToken); var versions = new List(); while (await reader.ReadAsync(cancellationToken)) { versions.Add(reader.GetInt32(0)); } return versions; } private List LoadMigrations() { var migrations = new List(); var assembly = Assembly.GetExecutingAssembly(); var resourceNames = assembly.GetManifestResourceNames() .Where(r => r.Contains("Migrations") && r.EndsWith(".sql")) .ToList(); if (resourceNames.Count == 0) { // Fallback to file system if embedded resources not found var migrationsPath = Path.Combine(Path.GetDirectoryName(assembly.Location) ?? "", "Migrations"); if (Directory.Exists(migrationsPath)) { var files = Directory.GetFiles(migrationsPath, "*.sql"); foreach (var file in files) { var fileName = Path.GetFileName(file); var match = MigrationFilePattern.Match(fileName); if (match.Success) { var version = int.Parse(match.Groups[1].Value); var description = match.Groups[2].Value.Replace("_", " "); var sql = File.ReadAllText(file); migrations.Add(new Migration { Version = version, Description = description, Sql = sql, FileName = fileName }); } } } } else { // Load from embedded resources foreach (var resourceName in resourceNames) { var fileName = resourceName.Split('.').TakeLast(2).First() + ".sql"; var match = MigrationFilePattern.Match(fileName); if (match.Success) { var version = int.Parse(match.Groups[1].Value); var description = match.Groups[2].Value.Replace("_", " "); using var stream = assembly.GetManifestResourceStream(resourceName); if (stream != null) { using var reader = new StreamReader(stream); var sql = reader.ReadToEnd(); migrations.Add(new Migration { Version = version, Description = description, Sql = sql, FileName = fileName }); } } } } return migrations.OrderBy(m => m.Version).ToList(); } private async Task ExecuteMigrationAsync( NpgsqlConnection connection, Migration migration, CancellationToken cancellationToken) { _logger.LogInformation("Applying migration {Version}: {Description}", migration.Version, migration.Description); await using var transaction = await connection.BeginTransactionAsync(cancellationToken); try { // Execute migration SQL await using (var command = new NpgsqlCommand(migration.Sql, connection, transaction)) { command.CommandTimeout = 300; // 5 minutes for large migrations await command.ExecuteNonQueryAsync(cancellationToken); } // Record migration as applied const string recordSql = @" INSERT INTO event_streaming.schema_version (version, description) VALUES (@Version, @Description) ON CONFLICT (version) DO NOTHING"; await using (var command = new NpgsqlCommand(recordSql, connection, transaction)) { command.Parameters.AddWithValue("@Version", migration.Version); command.Parameters.AddWithValue("@Description", migration.Description); await command.ExecuteNonQueryAsync(cancellationToken); } await transaction.CommitAsync(cancellationToken); _logger.LogInformation("Successfully applied migration {Version}", migration.Version); } catch (Exception ex) { await transaction.RollbackAsync(cancellationToken); _logger.LogError(ex, "Failed to apply migration {Version}: {Description}", migration.Version, migration.Description); throw new InvalidOperationException($"Migration {migration.Version} failed: {ex.Message}", ex); } } private class Migration { public required int Version { get; init; } public required string Description { get; init; } public required string Sql { get; init; } public required string FileName { get; init; } } }