dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/MIGRATION-GUIDE.md

8.5 KiB

PostgreSQL Event Streaming - Migration Guide

Overview

This guide explains how to migrate from in-memory event storage to PostgreSQL-backed persistence, and how to use both storage backends together.

Automatic Migrations

The PostgreSQL package includes an automatic migration system that runs on application startup.

How It Works

  1. Schema Versioning: All migrations are tracked in event_streaming.schema_version table
  2. Idempotent: Migrations can be run multiple times safely (already-applied migrations are skipped)
  3. Transactional: Each migration runs in a transaction (all-or-nothing)
  4. Ordered: Migrations execute in numerical order (001, 003, 004, etc.)

Migration Files

The following migrations are included:

  • 001_InitialSchema.sql: Creates core tables (events, queue_events, consumer_offsets, etc.)
  • 003_RetentionPolicies.sql: Adds retention policy support
  • 004_StreamConfiguration.sql: Adds per-stream configuration

Configuration

builder.Services.AddPostgresEventStreaming(options =>
{
    options.ConnectionString = "Host=localhost;Database=events;...";
    options.AutoMigrate = true; // Default: true
});

Auto-Migration Behavior:

  • AutoMigrate = true (default): Migrations run automatically on application startup
  • AutoMigrate = false: Manual migration required (use DatabaseMigrator directly)

Manual Migrations

For production environments, you may want to run migrations manually:

// Disable auto-migration
builder.Services.AddPostgresEventStreaming(options =>
{
    options.ConnectionString = "...";
    options.AutoMigrate = false;
});

// Run migrations manually
var migrator = serviceProvider.GetRequiredService<DatabaseMigrator>();
await migrator.MigrateAsync();

// Check current version
var currentVersion = await migrator.GetCurrentVersionAsync();
Console.WriteLine($"Database version: {currentVersion}");

Migration from In-Memory to PostgreSQL

Step 1: Install PostgreSQL Package

dotnet add package Svrnty.CQRS.Events.PostgreSQL

Step 2: Update Service Registration

Before (In-Memory):

builder.Services.AddSvrntyCQRS();
builder.Services.AddInMemoryEventStorage();

After (PostgreSQL):

builder.Services.AddSvrntyCQRS();
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");

Step 3: Configure PostgreSQL

builder.Services.AddPostgresEventStreaming(options =>
{
    options.ConnectionString = "Host=localhost;Database=events;Username=user;Password=pass";
    options.SchemaName = "event_streaming"; // Default
    options.AutoMigrate = true; // Run migrations on startup
    options.ReadBatchSize = 1000; // Events per query
    options.CommandTimeout = 30; // Seconds
});

Step 4: Run Application

On first startup, the migration system will:

  1. Create the event_streaming schema
  2. Create all tables (events, queue_events, etc.)
  3. Create indexes for performance
  4. Record applied migrations in schema_version table

Mixing In-Memory and PostgreSQL

You can use both storage backends in the same application for different purposes:

// Option 1: Use PostgreSQL as primary, but keep in-memory for testing
builder.Services.AddPostgresEventStreaming("Host=localhost;...");

// Option 2: Runtime switching based on environment
if (builder.Environment.IsDevelopment())
{
    builder.Services.AddInMemoryEventStorage();
}
else
{
    builder.Services.AddPostgresEventStreaming(builder.Configuration.GetConnectionString("EventStore"));
}

Persistent vs Ephemeral Streams

Both storage backends support persistent and ephemeral streams:

Persistent Streams (Event Sourcing)

  • Events stored permanently in events table
  • Support replay from any offset
  • Ordered by offset within stream
  • Used for event sourcing, audit logs, analytics
// Append to persistent stream
await eventStore.AppendAsync(streamName, @event, metadata, cancellationToken);

// Read from persistent stream
await foreach (var @event in eventStore.ReadStreamAsync(streamName, startOffset, cancellationToken))
{
    // Process event
}

Ephemeral Streams (Message Queue)

  • Events stored temporarily in queue_events table
  • Deleted after acknowledgment
  • Support visibility timeout (for redelivery)
  • Used for work queues, notifications, real-time events
// Enqueue to ephemeral stream
await eventStore.EnqueueAsync(streamName, @event, cancellationToken);

// Dequeue from ephemeral stream
var dequeuedEvent = await eventStore.DequeueAsync(streamName, consumerId, visibilityTimeout, cancellationToken);

// Acknowledge (delete permanently)
await eventStore.AcknowledgeAsync(eventId, cancellationToken);

// Or negative acknowledge (requeue)
await eventStore.NackAsync(eventId, requeue: true, cancellationToken);

Database Schema

Tables Created

  • event_streaming.events - Persistent event log
  • event_streaming.queue_events - Ephemeral message queue
  • event_streaming.in_flight_events - Visibility timeout tracking
  • event_streaming.dead_letter_queue - Failed messages
  • event_streaming.consumer_offsets - Consumer position tracking
  • event_streaming.retention_policies - Retention configuration
  • event_streaming.stream_configurations - Per-stream settings
  • event_streaming.schema_version - Migration tracking

Performance Considerations

The migration creates indexes for:

  • Stream name lookups
  • Correlation ID queries
  • Event type filtering
  • Time-based queries
  • JSONB event data (GIN index)

Troubleshooting

Migration Failures

If a migration fails:

  1. Check application logs for detailed error messages
  2. Verify database connection string
  3. Ensure database user has CREATE/ALTER permissions
  4. Check event_streaming.schema_version to see which migrations succeeded
  5. Fix the issue and restart (failed migrations are rolled back)

Manual Migration Repair

If you need to manually fix migrations:

-- Check applied migrations
SELECT * FROM event_streaming.schema_version ORDER BY version;

-- Manually record a migration as applied (use with caution!)
INSERT INTO event_streaming.schema_version (version, description)
VALUES (3, 'Retention Policies');

Connection Issues

// Test connection
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
Console.WriteLine($"Connected to: {connection.Database}");

Best Practices

  1. Development: Use AutoMigrate = true for convenience
  2. Production: Use AutoMigrate = false and run migrations during deployment
  3. Backup: Always backup database before running migrations in production
  4. Testing: Test migrations on staging environment first
  5. Rollback: Keep SQL scripts for manual rollback if needed

Example: Full PostgreSQL Setup

var builder = WebApplication.CreateBuilder(args);

// Configure PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(options =>
{
    options.ConnectionString = builder.Configuration.GetConnectionString("EventStore")
        ?? throw new InvalidOperationException("EventStore connection string not configured");

    options.SchemaName = "event_streaming";
    options.AutoMigrate = builder.Environment.IsDevelopment(); // Auto in dev, manual in prod
    options.ReadBatchSize = 1000;
    options.CommandTimeout = 30;
    options.MaxPoolSize = 100;
});

// Register other event streaming services
builder.Services.AddPostgresRetentionPolicies();
builder.Services.AddPostgresEventReplay();
builder.Services.AddPostgresStreamConfiguration();

var app = builder.Build();

// In production, run migrations manually before starting
if (!builder.Environment.IsDevelopment())
{
    var migrator = app.Services.GetRequiredService<DatabaseMigrator>();
    await migrator.MigrateAsync();
    var version = await migrator.GetCurrentVersionAsync();
    Console.WriteLine($"Database migrated to version {version}");
}

app.Run();

Connection String Examples

Local Development

Host=localhost;Database=events;Username=postgres;Password=postgres;

Production with SSL

Host=prod-db.example.com;Database=events;Username=app_user;Password=secret;SSL Mode=Require;Trust Server Certificate=false;

Connection Pooling

Host=localhost;Database=events;Username=app;Password=pass;Minimum Pool Size=5;Maximum Pool Size=100;

With Timeout Settings

Host=localhost;Database=events;Username=app;Password=pass;Timeout=30;Command Timeout=60;

See Also