# PostgreSQL Event Streaming - Migration Guide ## Overview This guide explains how to migrate from in-memory event storage to PostgreSQL-backed persistence, and how to use both storage backends together. ## Automatic Migrations The PostgreSQL package includes an automatic migration system that runs on application startup. ### How It Works 1. **Schema Versioning**: All migrations are tracked in `event_streaming.schema_version` table 2. **Idempotent**: Migrations can be run multiple times safely (already-applied migrations are skipped) 3. **Transactional**: Each migration runs in a transaction (all-or-nothing) 4. **Ordered**: Migrations execute in numerical order (001, 003, 004, etc.) ### Migration Files The following migrations are included: - **001_InitialSchema.sql**: Creates core tables (events, queue_events, consumer_offsets, etc.) - **003_RetentionPolicies.sql**: Adds retention policy support - **004_StreamConfiguration.sql**: Adds per-stream configuration ### Configuration ```csharp builder.Services.AddPostgresEventStreaming(options => { options.ConnectionString = "Host=localhost;Database=events;..."; options.AutoMigrate = true; // Default: true }); ``` **Auto-Migration Behavior:** - `AutoMigrate = true` (default): Migrations run automatically on application startup - `AutoMigrate = false`: Manual migration required (use DatabaseMigrator directly) ### Manual Migrations For production environments, you may want to run migrations manually: ```csharp // Disable auto-migration builder.Services.AddPostgresEventStreaming(options => { options.ConnectionString = "..."; options.AutoMigrate = false; }); // Run migrations manually var migrator = serviceProvider.GetRequiredService(); await migrator.MigrateAsync(); // Check current version var currentVersion = await migrator.GetCurrentVersionAsync(); Console.WriteLine($"Database version: {currentVersion}"); ``` ## Migration from In-Memory to PostgreSQL ### Step 1: Install PostgreSQL Package ```bash dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` ### Step 2: Update Service Registration **Before (In-Memory):** ```csharp builder.Services.AddSvrntyCQRS(); builder.Services.AddInMemoryEventStorage(); ``` **After (PostgreSQL):** ```csharp builder.Services.AddSvrntyCQRS(); builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;..."); ``` ### Step 3: Configure PostgreSQL ```csharp builder.Services.AddPostgresEventStreaming(options => { options.ConnectionString = "Host=localhost;Database=events;Username=user;Password=pass"; options.SchemaName = "event_streaming"; // Default options.AutoMigrate = true; // Run migrations on startup options.ReadBatchSize = 1000; // Events per query options.CommandTimeout = 30; // Seconds }); ``` ### Step 4: Run Application On first startup, the migration system will: 1. Create the `event_streaming` schema 2. Create all tables (events, queue_events, etc.) 3. Create indexes for performance 4. Record applied migrations in `schema_version` table ## Mixing In-Memory and PostgreSQL You can use both storage backends in the same application for different purposes: ```csharp // Option 1: Use PostgreSQL as primary, but keep in-memory for testing builder.Services.AddPostgresEventStreaming("Host=localhost;..."); // Option 2: Runtime switching based on environment if (builder.Environment.IsDevelopment()) { builder.Services.AddInMemoryEventStorage(); } else { builder.Services.AddPostgresEventStreaming(builder.Configuration.GetConnectionString("EventStore")); } ``` ## Persistent vs Ephemeral Streams Both storage backends support **persistent** and **ephemeral** streams: ### Persistent Streams (Event Sourcing) - Events stored permanently in `events` table - Support replay from any offset - Ordered by offset within stream - Used for event sourcing, audit logs, analytics ```csharp // Append to persistent stream await eventStore.AppendAsync(streamName, @event, metadata, cancellationToken); // Read from persistent stream await foreach (var @event in eventStore.ReadStreamAsync(streamName, startOffset, cancellationToken)) { // Process event } ``` ### Ephemeral Streams (Message Queue) - Events stored temporarily in `queue_events` table - Deleted after acknowledgment - Support visibility timeout (for redelivery) - Used for work queues, notifications, real-time events ```csharp // Enqueue to ephemeral stream await eventStore.EnqueueAsync(streamName, @event, cancellationToken); // Dequeue from ephemeral stream var dequeuedEvent = await eventStore.DequeueAsync(streamName, consumerId, visibilityTimeout, cancellationToken); // Acknowledge (delete permanently) await eventStore.AcknowledgeAsync(eventId, cancellationToken); // Or negative acknowledge (requeue) await eventStore.NackAsync(eventId, requeue: true, cancellationToken); ``` ## Database Schema ### Tables Created - `event_streaming.events` - Persistent event log - `event_streaming.queue_events` - Ephemeral message queue - `event_streaming.in_flight_events` - Visibility timeout tracking - `event_streaming.dead_letter_queue` - Failed messages - `event_streaming.consumer_offsets` - Consumer position tracking - `event_streaming.retention_policies` - Retention configuration - `event_streaming.stream_configurations` - Per-stream settings - `event_streaming.schema_version` - Migration tracking ### Performance Considerations The migration creates indexes for: - Stream name lookups - Correlation ID queries - Event type filtering - Time-based queries - JSONB event data (GIN index) ## Troubleshooting ### Migration Failures If a migration fails: 1. Check application logs for detailed error messages 2. Verify database connection string 3. Ensure database user has CREATE/ALTER permissions 4. Check `event_streaming.schema_version` to see which migrations succeeded 5. Fix the issue and restart (failed migrations are rolled back) ### Manual Migration Repair If you need to manually fix migrations: ```sql -- Check applied migrations SELECT * FROM event_streaming.schema_version ORDER BY version; -- Manually record a migration as applied (use with caution!) INSERT INTO event_streaming.schema_version (version, description) VALUES (3, 'Retention Policies'); ``` ### Connection Issues ```csharp // Test connection await using var connection = new NpgsqlConnection(connectionString); await connection.OpenAsync(); Console.WriteLine($"Connected to: {connection.Database}"); ``` ## Best Practices 1. **Development**: Use `AutoMigrate = true` for convenience 2. **Production**: Use `AutoMigrate = false` and run migrations during deployment 3. **Backup**: Always backup database before running migrations in production 4. **Testing**: Test migrations on staging environment first 5. **Rollback**: Keep SQL scripts for manual rollback if needed ## Example: Full PostgreSQL Setup ```csharp var builder = WebApplication.CreateBuilder(args); // Configure PostgreSQL event streaming builder.Services.AddPostgresEventStreaming(options => { options.ConnectionString = builder.Configuration.GetConnectionString("EventStore") ?? throw new InvalidOperationException("EventStore connection string not configured"); options.SchemaName = "event_streaming"; options.AutoMigrate = builder.Environment.IsDevelopment(); // Auto in dev, manual in prod options.ReadBatchSize = 1000; options.CommandTimeout = 30; options.MaxPoolSize = 100; }); // Register other event streaming services builder.Services.AddPostgresRetentionPolicies(); builder.Services.AddPostgresEventReplay(); builder.Services.AddPostgresStreamConfiguration(); var app = builder.Build(); // In production, run migrations manually before starting if (!builder.Environment.IsDevelopment()) { var migrator = app.Services.GetRequiredService(); await migrator.MigrateAsync(); var version = await migrator.GetCurrentVersionAsync(); Console.WriteLine($"Database migrated to version {version}"); } app.Run(); ``` ## Connection String Examples ### Local Development ``` Host=localhost;Database=events;Username=postgres;Password=postgres; ``` ### Production with SSL ``` Host=prod-db.example.com;Database=events;Username=app_user;Password=secret;SSL Mode=Require;Trust Server Certificate=false; ``` ### Connection Pooling ``` Host=localhost;Database=events;Username=app;Password=pass;Minimum Pool Size=5;Maximum Pool Size=100; ``` ### With Timeout Settings ``` Host=localhost;Database=events;Username=app;Password=pass;Timeout=30;Command Timeout=60; ``` ## See Also - [PostgreSQL Testing Guide](POSTGRESQL-TESTING.md) - [Event Streaming Examples](EVENT_STREAMING_EXAMPLES.md) - [README.md](../README.md)