8.5 KiB
8.5 KiB
PostgreSQL Event Streaming - Migration Guide
Overview
This guide explains how to migrate from in-memory event storage to PostgreSQL-backed persistence, and how to use both storage backends together.
Automatic Migrations
The PostgreSQL package includes an automatic migration system that runs on application startup.
How It Works
- Schema Versioning: All migrations are tracked in
event_streaming.schema_versiontable - Idempotent: Migrations can be run multiple times safely (already-applied migrations are skipped)
- Transactional: Each migration runs in a transaction (all-or-nothing)
- Ordered: Migrations execute in numerical order (001, 003, 004, etc.)
Migration Files
The following migrations are included:
- 001_InitialSchema.sql: Creates core tables (events, queue_events, consumer_offsets, etc.)
- 003_RetentionPolicies.sql: Adds retention policy support
- 004_StreamConfiguration.sql: Adds per-stream configuration
Configuration
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "Host=localhost;Database=events;...";
options.AutoMigrate = true; // Default: true
});
Auto-Migration Behavior:
AutoMigrate = true(default): Migrations run automatically on application startupAutoMigrate = false: Manual migration required (use DatabaseMigrator directly)
Manual Migrations
For production environments, you may want to run migrations manually:
// Disable auto-migration
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "...";
options.AutoMigrate = false;
});
// Run migrations manually
var migrator = serviceProvider.GetRequiredService<DatabaseMigrator>();
await migrator.MigrateAsync();
// Check current version
var currentVersion = await migrator.GetCurrentVersionAsync();
Console.WriteLine($"Database version: {currentVersion}");
Migration from In-Memory to PostgreSQL
Step 1: Install PostgreSQL Package
dotnet add package Svrnty.CQRS.Events.PostgreSQL
Step 2: Update Service Registration
Before (In-Memory):
builder.Services.AddSvrntyCQRS();
builder.Services.AddInMemoryEventStorage();
After (PostgreSQL):
builder.Services.AddSvrntyCQRS();
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
Step 3: Configure PostgreSQL
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "Host=localhost;Database=events;Username=user;Password=pass";
options.SchemaName = "event_streaming"; // Default
options.AutoMigrate = true; // Run migrations on startup
options.ReadBatchSize = 1000; // Events per query
options.CommandTimeout = 30; // Seconds
});
Step 4: Run Application
On first startup, the migration system will:
- Create the
event_streamingschema - Create all tables (events, queue_events, etc.)
- Create indexes for performance
- Record applied migrations in
schema_versiontable
Mixing In-Memory and PostgreSQL
You can use both storage backends in the same application for different purposes:
// Option 1: Use PostgreSQL as primary, but keep in-memory for testing
builder.Services.AddPostgresEventStreaming("Host=localhost;...");
// Option 2: Runtime switching based on environment
if (builder.Environment.IsDevelopment())
{
builder.Services.AddInMemoryEventStorage();
}
else
{
builder.Services.AddPostgresEventStreaming(builder.Configuration.GetConnectionString("EventStore"));
}
Persistent vs Ephemeral Streams
Both storage backends support persistent and ephemeral streams:
Persistent Streams (Event Sourcing)
- Events stored permanently in
eventstable - Support replay from any offset
- Ordered by offset within stream
- Used for event sourcing, audit logs, analytics
// Append to persistent stream
await eventStore.AppendAsync(streamName, @event, metadata, cancellationToken);
// Read from persistent stream
await foreach (var @event in eventStore.ReadStreamAsync(streamName, startOffset, cancellationToken))
{
// Process event
}
Ephemeral Streams (Message Queue)
- Events stored temporarily in
queue_eventstable - Deleted after acknowledgment
- Support visibility timeout (for redelivery)
- Used for work queues, notifications, real-time events
// Enqueue to ephemeral stream
await eventStore.EnqueueAsync(streamName, @event, cancellationToken);
// Dequeue from ephemeral stream
var dequeuedEvent = await eventStore.DequeueAsync(streamName, consumerId, visibilityTimeout, cancellationToken);
// Acknowledge (delete permanently)
await eventStore.AcknowledgeAsync(eventId, cancellationToken);
// Or negative acknowledge (requeue)
await eventStore.NackAsync(eventId, requeue: true, cancellationToken);
Database Schema
Tables Created
event_streaming.events- Persistent event logevent_streaming.queue_events- Ephemeral message queueevent_streaming.in_flight_events- Visibility timeout trackingevent_streaming.dead_letter_queue- Failed messagesevent_streaming.consumer_offsets- Consumer position trackingevent_streaming.retention_policies- Retention configurationevent_streaming.stream_configurations- Per-stream settingsevent_streaming.schema_version- Migration tracking
Performance Considerations
The migration creates indexes for:
- Stream name lookups
- Correlation ID queries
- Event type filtering
- Time-based queries
- JSONB event data (GIN index)
Troubleshooting
Migration Failures
If a migration fails:
- Check application logs for detailed error messages
- Verify database connection string
- Ensure database user has CREATE/ALTER permissions
- Check
event_streaming.schema_versionto see which migrations succeeded - Fix the issue and restart (failed migrations are rolled back)
Manual Migration Repair
If you need to manually fix migrations:
-- Check applied migrations
SELECT * FROM event_streaming.schema_version ORDER BY version;
-- Manually record a migration as applied (use with caution!)
INSERT INTO event_streaming.schema_version (version, description)
VALUES (3, 'Retention Policies');
Connection Issues
// Test connection
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
Console.WriteLine($"Connected to: {connection.Database}");
Best Practices
- Development: Use
AutoMigrate = truefor convenience - Production: Use
AutoMigrate = falseand run migrations during deployment - Backup: Always backup database before running migrations in production
- Testing: Test migrations on staging environment first
- Rollback: Keep SQL scripts for manual rollback if needed
Example: Full PostgreSQL Setup
var builder = WebApplication.CreateBuilder(args);
// Configure PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = builder.Configuration.GetConnectionString("EventStore")
?? throw new InvalidOperationException("EventStore connection string not configured");
options.SchemaName = "event_streaming";
options.AutoMigrate = builder.Environment.IsDevelopment(); // Auto in dev, manual in prod
options.ReadBatchSize = 1000;
options.CommandTimeout = 30;
options.MaxPoolSize = 100;
});
// Register other event streaming services
builder.Services.AddPostgresRetentionPolicies();
builder.Services.AddPostgresEventReplay();
builder.Services.AddPostgresStreamConfiguration();
var app = builder.Build();
// In production, run migrations manually before starting
if (!builder.Environment.IsDevelopment())
{
var migrator = app.Services.GetRequiredService<DatabaseMigrator>();
await migrator.MigrateAsync();
var version = await migrator.GetCurrentVersionAsync();
Console.WriteLine($"Database migrated to version {version}");
}
app.Run();
Connection String Examples
Local Development
Host=localhost;Database=events;Username=postgres;Password=postgres;
Production with SSL
Host=prod-db.example.com;Database=events;Username=app_user;Password=secret;SSL Mode=Require;Trust Server Certificate=false;
Connection Pooling
Host=localhost;Database=events;Username=app;Password=pass;Minimum Pool Size=5;Maximum Pool Size=100;
With Timeout Settings
Host=localhost;Database=events;Username=app;Password=pass;Timeout=30;Command Timeout=60;