dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/MIGRATION-GUIDE.md

291 lines
8.5 KiB
Markdown

# PostgreSQL Event Streaming - Migration Guide
## Overview
This guide explains how to migrate from in-memory event storage to PostgreSQL-backed persistence, and how to use both storage backends together.
## Automatic Migrations
The PostgreSQL package includes an automatic migration system that runs on application startup.
### How It Works
1. **Schema Versioning**: All migrations are tracked in `event_streaming.schema_version` table
2. **Idempotent**: Migrations can be run multiple times safely (already-applied migrations are skipped)
3. **Transactional**: Each migration runs in a transaction (all-or-nothing)
4. **Ordered**: Migrations execute in numerical order (001, 003, 004, etc.)
### Migration Files
The following migrations are included:
- **001_InitialSchema.sql**: Creates core tables (events, queue_events, consumer_offsets, etc.)
- **003_RetentionPolicies.sql**: Adds retention policy support
- **004_StreamConfiguration.sql**: Adds per-stream configuration
### Configuration
```csharp
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "Host=localhost;Database=events;...";
options.AutoMigrate = true; // Default: true
});
```
**Auto-Migration Behavior:**
- `AutoMigrate = true` (default): Migrations run automatically on application startup
- `AutoMigrate = false`: Manual migration required (use DatabaseMigrator directly)
### Manual Migrations
For production environments, you may want to run migrations manually:
```csharp
// Disable auto-migration
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "...";
options.AutoMigrate = false;
});
// Run migrations manually
var migrator = serviceProvider.GetRequiredService<DatabaseMigrator>();
await migrator.MigrateAsync();
// Check current version
var currentVersion = await migrator.GetCurrentVersionAsync();
Console.WriteLine($"Database version: {currentVersion}");
```
## Migration from In-Memory to PostgreSQL
### Step 1: Install PostgreSQL Package
```bash
dotnet add package Svrnty.CQRS.Events.PostgreSQL
```
### Step 2: Update Service Registration
**Before (In-Memory):**
```csharp
builder.Services.AddSvrntyCQRS();
builder.Services.AddInMemoryEventStorage();
```
**After (PostgreSQL):**
```csharp
builder.Services.AddSvrntyCQRS();
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
```
### Step 3: Configure PostgreSQL
```csharp
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = "Host=localhost;Database=events;Username=user;Password=pass";
options.SchemaName = "event_streaming"; // Default
options.AutoMigrate = true; // Run migrations on startup
options.ReadBatchSize = 1000; // Events per query
options.CommandTimeout = 30; // Seconds
});
```
### Step 4: Run Application
On first startup, the migration system will:
1. Create the `event_streaming` schema
2. Create all tables (events, queue_events, etc.)
3. Create indexes for performance
4. Record applied migrations in `schema_version` table
## Mixing In-Memory and PostgreSQL
You can use both storage backends in the same application for different purposes:
```csharp
// Option 1: Use PostgreSQL as primary, but keep in-memory for testing
builder.Services.AddPostgresEventStreaming("Host=localhost;...");
// Option 2: Runtime switching based on environment
if (builder.Environment.IsDevelopment())
{
builder.Services.AddInMemoryEventStorage();
}
else
{
builder.Services.AddPostgresEventStreaming(builder.Configuration.GetConnectionString("EventStore"));
}
```
## Persistent vs Ephemeral Streams
Both storage backends support **persistent** and **ephemeral** streams:
### Persistent Streams (Event Sourcing)
- Events stored permanently in `events` table
- Support replay from any offset
- Ordered by offset within stream
- Used for event sourcing, audit logs, analytics
```csharp
// Append to persistent stream
await eventStore.AppendAsync(streamName, @event, metadata, cancellationToken);
// Read from persistent stream
await foreach (var @event in eventStore.ReadStreamAsync(streamName, startOffset, cancellationToken))
{
// Process event
}
```
### Ephemeral Streams (Message Queue)
- Events stored temporarily in `queue_events` table
- Deleted after acknowledgment
- Support visibility timeout (for redelivery)
- Used for work queues, notifications, real-time events
```csharp
// Enqueue to ephemeral stream
await eventStore.EnqueueAsync(streamName, @event, cancellationToken);
// Dequeue from ephemeral stream
var dequeuedEvent = await eventStore.DequeueAsync(streamName, consumerId, visibilityTimeout, cancellationToken);
// Acknowledge (delete permanently)
await eventStore.AcknowledgeAsync(eventId, cancellationToken);
// Or negative acknowledge (requeue)
await eventStore.NackAsync(eventId, requeue: true, cancellationToken);
```
## Database Schema
### Tables Created
- `event_streaming.events` - Persistent event log
- `event_streaming.queue_events` - Ephemeral message queue
- `event_streaming.in_flight_events` - Visibility timeout tracking
- `event_streaming.dead_letter_queue` - Failed messages
- `event_streaming.consumer_offsets` - Consumer position tracking
- `event_streaming.retention_policies` - Retention configuration
- `event_streaming.stream_configurations` - Per-stream settings
- `event_streaming.schema_version` - Migration tracking
### Performance Considerations
The migration creates indexes for:
- Stream name lookups
- Correlation ID queries
- Event type filtering
- Time-based queries
- JSONB event data (GIN index)
## Troubleshooting
### Migration Failures
If a migration fails:
1. Check application logs for detailed error messages
2. Verify database connection string
3. Ensure database user has CREATE/ALTER permissions
4. Check `event_streaming.schema_version` to see which migrations succeeded
5. Fix the issue and restart (failed migrations are rolled back)
### Manual Migration Repair
If you need to manually fix migrations:
```sql
-- Check applied migrations
SELECT * FROM event_streaming.schema_version ORDER BY version;
-- Manually record a migration as applied (use with caution!)
INSERT INTO event_streaming.schema_version (version, description)
VALUES (3, 'Retention Policies');
```
### Connection Issues
```csharp
// Test connection
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
Console.WriteLine($"Connected to: {connection.Database}");
```
## Best Practices
1. **Development**: Use `AutoMigrate = true` for convenience
2. **Production**: Use `AutoMigrate = false` and run migrations during deployment
3. **Backup**: Always backup database before running migrations in production
4. **Testing**: Test migrations on staging environment first
5. **Rollback**: Keep SQL scripts for manual rollback if needed
## Example: Full PostgreSQL Setup
```csharp
var builder = WebApplication.CreateBuilder(args);
// Configure PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = builder.Configuration.GetConnectionString("EventStore")
?? throw new InvalidOperationException("EventStore connection string not configured");
options.SchemaName = "event_streaming";
options.AutoMigrate = builder.Environment.IsDevelopment(); // Auto in dev, manual in prod
options.ReadBatchSize = 1000;
options.CommandTimeout = 30;
options.MaxPoolSize = 100;
});
// Register other event streaming services
builder.Services.AddPostgresRetentionPolicies();
builder.Services.AddPostgresEventReplay();
builder.Services.AddPostgresStreamConfiguration();
var app = builder.Build();
// In production, run migrations manually before starting
if (!builder.Environment.IsDevelopment())
{
var migrator = app.Services.GetRequiredService<DatabaseMigrator>();
await migrator.MigrateAsync();
var version = await migrator.GetCurrentVersionAsync();
Console.WriteLine($"Database migrated to version {version}");
}
app.Run();
```
## Connection String Examples
### Local Development
```
Host=localhost;Database=events;Username=postgres;Password=postgres;
```
### Production with SSL
```
Host=prod-db.example.com;Database=events;Username=app_user;Password=secret;SSL Mode=Require;Trust Server Certificate=false;
```
### Connection Pooling
```
Host=localhost;Database=events;Username=app;Password=pass;Minimum Pool Size=5;Maximum Pool Size=100;
```
### With Timeout Settings
```
Host=localhost;Database=events;Username=app;Password=pass;Timeout=30;Command Timeout=60;
```
## See Also
- [PostgreSQL Testing Guide](POSTGRESQL-TESTING.md)
- [Event Streaming Examples](EVENT_STREAMING_EXAMPLES.md)
- [README.md](../README.md)