dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Migration/DatabaseMigrator.cs

247 lines
9.2 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace Svrnty.CQRS.Events.PostgreSQL.Migration;
/// <summary>
/// Manages database migrations for event streaming schema.
/// </summary>
public class DatabaseMigrator
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<DatabaseMigrator> _logger;
private static readonly Regex MigrationFilePattern = new Regex(@"^(\d{3})_(.+)\.sql$", RegexOptions.Compiled);
public DatabaseMigrator(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<DatabaseMigrator> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Executes all pending migrations.
/// </summary>
public async Task MigrateAsync(CancellationToken cancellationToken = default)
{
if (!_options.AutoMigrate)
{
_logger.LogInformation("Auto-migration is disabled. Skipping database migration.");
return;
}
_logger.LogInformation("Starting database migration...");
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
// Ensure schema_version table exists
await EnsureVersionTableExistsAsync(connection, cancellationToken);
// Get applied versions
var appliedVersions = await GetAppliedVersionsAsync(connection, cancellationToken);
_logger.LogInformation("Found {Count} previously applied migrations", appliedVersions.Count);
// Load all migration files
var migrations = LoadMigrations();
_logger.LogInformation("Found {Count} migration files", migrations.Count);
// Execute pending migrations
var pendingMigrations = migrations
.Where(m => !appliedVersions.Contains(m.Version))
.OrderBy(m => m.Version)
.ToList();
if (pendingMigrations.Count == 0)
{
_logger.LogInformation("Database is up to date. No migrations to apply.");
return;
}
_logger.LogInformation("Found {Count} pending migrations to apply", pendingMigrations.Count);
foreach (var migration in pendingMigrations)
{
await ExecuteMigrationAsync(connection, migration, cancellationToken);
}
_logger.LogInformation("Database migration completed successfully");
}
/// <summary>
/// Gets the current database schema version.
/// </summary>
public async Task<int> GetCurrentVersionAsync(CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await EnsureVersionTableExistsAsync(connection, cancellationToken);
var appliedVersions = await GetAppliedVersionsAsync(connection, cancellationToken);
return appliedVersions.Count > 0 ? appliedVersions.Max() : 0;
}
private async Task EnsureVersionTableExistsAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
{
const string sql = @"
CREATE SCHEMA IF NOT EXISTS event_streaming;
CREATE TABLE IF NOT EXISTS event_streaming.schema_version (
version INT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
description TEXT NOT NULL
);";
await using var command = new NpgsqlCommand(sql, connection);
await command.ExecuteNonQueryAsync(cancellationToken);
}
private async Task<List<int>> GetAppliedVersionsAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
{
const string sql = "SELECT version FROM event_streaming.schema_version ORDER BY version";
await using var command = new NpgsqlCommand(sql, connection);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var versions = new List<int>();
while (await reader.ReadAsync(cancellationToken))
{
versions.Add(reader.GetInt32(0));
}
return versions;
}
private List<Migration> LoadMigrations()
{
var migrations = new List<Migration>();
var assembly = Assembly.GetExecutingAssembly();
var resourceNames = assembly.GetManifestResourceNames()
.Where(r => r.Contains("Migrations") && r.EndsWith(".sql"))
.ToList();
if (resourceNames.Count == 0)
{
// Fallback to file system if embedded resources not found
var migrationsPath = Path.Combine(Path.GetDirectoryName(assembly.Location) ?? "", "Migrations");
if (Directory.Exists(migrationsPath))
{
var files = Directory.GetFiles(migrationsPath, "*.sql");
foreach (var file in files)
{
var fileName = Path.GetFileName(file);
var match = MigrationFilePattern.Match(fileName);
if (match.Success)
{
var version = int.Parse(match.Groups[1].Value);
var description = match.Groups[2].Value.Replace("_", " ");
var sql = File.ReadAllText(file);
migrations.Add(new Migration
{
Version = version,
Description = description,
Sql = sql,
FileName = fileName
});
}
}
}
}
else
{
// Load from embedded resources
foreach (var resourceName in resourceNames)
{
var fileName = resourceName.Split('.').TakeLast(2).First() + ".sql";
var match = MigrationFilePattern.Match(fileName);
if (match.Success)
{
var version = int.Parse(match.Groups[1].Value);
var description = match.Groups[2].Value.Replace("_", " ");
using var stream = assembly.GetManifestResourceStream(resourceName);
if (stream != null)
{
using var reader = new StreamReader(stream);
var sql = reader.ReadToEnd();
migrations.Add(new Migration
{
Version = version,
Description = description,
Sql = sql,
FileName = fileName
});
}
}
}
}
return migrations.OrderBy(m => m.Version).ToList();
}
private async Task ExecuteMigrationAsync(
NpgsqlConnection connection,
Migration migration,
CancellationToken cancellationToken)
{
_logger.LogInformation("Applying migration {Version}: {Description}", migration.Version, migration.Description);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
try
{
// Execute migration SQL
await using (var command = new NpgsqlCommand(migration.Sql, connection, transaction))
{
command.CommandTimeout = 300; // 5 minutes for large migrations
await command.ExecuteNonQueryAsync(cancellationToken);
}
// Record migration as applied
const string recordSql = @"
INSERT INTO event_streaming.schema_version (version, description)
VALUES (@Version, @Description)
ON CONFLICT (version) DO NOTHING";
await using (var command = new NpgsqlCommand(recordSql, connection, transaction))
{
command.Parameters.AddWithValue("@Version", migration.Version);
command.Parameters.AddWithValue("@Description", migration.Description);
await command.ExecuteNonQueryAsync(cancellationToken);
}
await transaction.CommitAsync(cancellationToken);
_logger.LogInformation("Successfully applied migration {Version}", migration.Version);
}
catch (Exception ex)
{
await transaction.RollbackAsync(cancellationToken);
_logger.LogError(ex, "Failed to apply migration {Version}: {Description}", migration.Version, migration.Description);
throw new InvalidOperationException($"Migration {migration.Version} failed: {ex.Message}", ex);
}
}
private class Migration
{
public required int Version { get; init; }
public required string Description { get; init; }
public required string Sql { get; init; }
public required string FileName { get; init; }
}
}