291 lines
8.5 KiB
Markdown
291 lines
8.5 KiB
Markdown
# PostgreSQL Event Streaming - Migration Guide
|
|
|
|
## Overview
|
|
|
|
This guide explains how to migrate from in-memory event storage to PostgreSQL-backed persistence, and how to use both storage backends together.
|
|
|
|
## Automatic Migrations
|
|
|
|
The PostgreSQL package includes an automatic migration system that runs on application startup.
|
|
|
|
### How It Works
|
|
|
|
1. **Schema Versioning**: All migrations are tracked in `event_streaming.schema_version` table
|
|
2. **Idempotent**: Migrations can be run multiple times safely (already-applied migrations are skipped)
|
|
3. **Transactional**: Each migration runs in a transaction (all-or-nothing)
|
|
4. **Ordered**: Migrations execute in numerical order (001, 003, 004, etc.)
|
|
|
|
### Migration Files
|
|
|
|
The following migrations are included:
|
|
|
|
- **001_InitialSchema.sql**: Creates core tables (events, queue_events, consumer_offsets, etc.)
|
|
- **003_RetentionPolicies.sql**: Adds retention policy support
|
|
- **004_StreamConfiguration.sql**: Adds per-stream configuration
|
|
|
|
### Configuration
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming(options =>
|
|
{
|
|
options.ConnectionString = "Host=localhost;Database=events;...";
|
|
options.AutoMigrate = true; // Default: true
|
|
});
|
|
```
|
|
|
|
**Auto-Migration Behavior:**
|
|
- `AutoMigrate = true` (default): Migrations run automatically on application startup
|
|
- `AutoMigrate = false`: Manual migration required (use DatabaseMigrator directly)
|
|
|
|
### Manual Migrations
|
|
|
|
For production environments, you may want to run migrations manually:
|
|
|
|
```csharp
|
|
// Disable auto-migration
|
|
builder.Services.AddPostgresEventStreaming(options =>
|
|
{
|
|
options.ConnectionString = "...";
|
|
options.AutoMigrate = false;
|
|
});
|
|
|
|
// Run migrations manually
|
|
var migrator = serviceProvider.GetRequiredService<DatabaseMigrator>();
|
|
await migrator.MigrateAsync();
|
|
|
|
// Check current version
|
|
var currentVersion = await migrator.GetCurrentVersionAsync();
|
|
Console.WriteLine($"Database version: {currentVersion}");
|
|
```
|
|
|
|
## Migration from In-Memory to PostgreSQL
|
|
|
|
### Step 1: Install PostgreSQL Package
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
```
|
|
|
|
### Step 2: Update Service Registration
|
|
|
|
**Before (In-Memory):**
|
|
```csharp
|
|
builder.Services.AddSvrntyCQRS();
|
|
builder.Services.AddInMemoryEventStorage();
|
|
```
|
|
|
|
**After (PostgreSQL):**
|
|
```csharp
|
|
builder.Services.AddSvrntyCQRS();
|
|
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
|
|
```
|
|
|
|
### Step 3: Configure PostgreSQL
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming(options =>
|
|
{
|
|
options.ConnectionString = "Host=localhost;Database=events;Username=user;Password=pass";
|
|
options.SchemaName = "event_streaming"; // Default
|
|
options.AutoMigrate = true; // Run migrations on startup
|
|
options.ReadBatchSize = 1000; // Events per query
|
|
options.CommandTimeout = 30; // Seconds
|
|
});
|
|
```
|
|
|
|
### Step 4: Run Application
|
|
|
|
On first startup, the migration system will:
|
|
1. Create the `event_streaming` schema
|
|
2. Create all tables (events, queue_events, etc.)
|
|
3. Create indexes for performance
|
|
4. Record applied migrations in `schema_version` table
|
|
|
|
## Mixing In-Memory and PostgreSQL
|
|
|
|
You can use both storage backends in the same application for different purposes:
|
|
|
|
```csharp
|
|
// Option 1: Use PostgreSQL as primary, but keep in-memory for testing
|
|
builder.Services.AddPostgresEventStreaming("Host=localhost;...");
|
|
|
|
// Option 2: Runtime switching based on environment
|
|
if (builder.Environment.IsDevelopment())
|
|
{
|
|
builder.Services.AddInMemoryEventStorage();
|
|
}
|
|
else
|
|
{
|
|
builder.Services.AddPostgresEventStreaming(builder.Configuration.GetConnectionString("EventStore"));
|
|
}
|
|
```
|
|
|
|
## Persistent vs Ephemeral Streams
|
|
|
|
Both storage backends support **persistent** and **ephemeral** streams:
|
|
|
|
### Persistent Streams (Event Sourcing)
|
|
- Events stored permanently in `events` table
|
|
- Support replay from any offset
|
|
- Ordered by offset within stream
|
|
- Used for event sourcing, audit logs, analytics
|
|
|
|
```csharp
|
|
// Append to persistent stream
|
|
await eventStore.AppendAsync(streamName, @event, metadata, cancellationToken);
|
|
|
|
// Read from persistent stream
|
|
await foreach (var @event in eventStore.ReadStreamAsync(streamName, startOffset, cancellationToken))
|
|
{
|
|
// Process event
|
|
}
|
|
```
|
|
|
|
### Ephemeral Streams (Message Queue)
|
|
- Events stored temporarily in `queue_events` table
|
|
- Deleted after acknowledgment
|
|
- Support visibility timeout (for redelivery)
|
|
- Used for work queues, notifications, real-time events
|
|
|
|
```csharp
|
|
// Enqueue to ephemeral stream
|
|
await eventStore.EnqueueAsync(streamName, @event, cancellationToken);
|
|
|
|
// Dequeue from ephemeral stream
|
|
var dequeuedEvent = await eventStore.DequeueAsync(streamName, consumerId, visibilityTimeout, cancellationToken);
|
|
|
|
// Acknowledge (delete permanently)
|
|
await eventStore.AcknowledgeAsync(eventId, cancellationToken);
|
|
|
|
// Or negative acknowledge (requeue)
|
|
await eventStore.NackAsync(eventId, requeue: true, cancellationToken);
|
|
```
|
|
|
|
## Database Schema
|
|
|
|
### Tables Created
|
|
|
|
- `event_streaming.events` - Persistent event log
|
|
- `event_streaming.queue_events` - Ephemeral message queue
|
|
- `event_streaming.in_flight_events` - Visibility timeout tracking
|
|
- `event_streaming.dead_letter_queue` - Failed messages
|
|
- `event_streaming.consumer_offsets` - Consumer position tracking
|
|
- `event_streaming.retention_policies` - Retention configuration
|
|
- `event_streaming.stream_configurations` - Per-stream settings
|
|
- `event_streaming.schema_version` - Migration tracking
|
|
|
|
### Performance Considerations
|
|
|
|
The migration creates indexes for:
|
|
- Stream name lookups
|
|
- Correlation ID queries
|
|
- Event type filtering
|
|
- Time-based queries
|
|
- JSONB event data (GIN index)
|
|
|
|
## Troubleshooting
|
|
|
|
### Migration Failures
|
|
|
|
If a migration fails:
|
|
1. Check application logs for detailed error messages
|
|
2. Verify database connection string
|
|
3. Ensure database user has CREATE/ALTER permissions
|
|
4. Check `event_streaming.schema_version` to see which migrations succeeded
|
|
5. Fix the issue and restart (failed migrations are rolled back)
|
|
|
|
### Manual Migration Repair
|
|
|
|
If you need to manually fix migrations:
|
|
|
|
```sql
|
|
-- Check applied migrations
|
|
SELECT * FROM event_streaming.schema_version ORDER BY version;
|
|
|
|
-- Manually record a migration as applied (use with caution!)
|
|
INSERT INTO event_streaming.schema_version (version, description)
|
|
VALUES (3, 'Retention Policies');
|
|
```
|
|
|
|
### Connection Issues
|
|
|
|
```csharp
|
|
// Test connection
|
|
await using var connection = new NpgsqlConnection(connectionString);
|
|
await connection.OpenAsync();
|
|
Console.WriteLine($"Connected to: {connection.Database}");
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Development**: Use `AutoMigrate = true` for convenience
|
|
2. **Production**: Use `AutoMigrate = false` and run migrations during deployment
|
|
3. **Backup**: Always backup database before running migrations in production
|
|
4. **Testing**: Test migrations on staging environment first
|
|
5. **Rollback**: Keep SQL scripts for manual rollback if needed
|
|
|
|
## Example: Full PostgreSQL Setup
|
|
|
|
```csharp
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Configure PostgreSQL event streaming
|
|
builder.Services.AddPostgresEventStreaming(options =>
|
|
{
|
|
options.ConnectionString = builder.Configuration.GetConnectionString("EventStore")
|
|
?? throw new InvalidOperationException("EventStore connection string not configured");
|
|
|
|
options.SchemaName = "event_streaming";
|
|
options.AutoMigrate = builder.Environment.IsDevelopment(); // Auto in dev, manual in prod
|
|
options.ReadBatchSize = 1000;
|
|
options.CommandTimeout = 30;
|
|
options.MaxPoolSize = 100;
|
|
});
|
|
|
|
// Register other event streaming services
|
|
builder.Services.AddPostgresRetentionPolicies();
|
|
builder.Services.AddPostgresEventReplay();
|
|
builder.Services.AddPostgresStreamConfiguration();
|
|
|
|
var app = builder.Build();
|
|
|
|
// In production, run migrations manually before starting
|
|
if (!builder.Environment.IsDevelopment())
|
|
{
|
|
var migrator = app.Services.GetRequiredService<DatabaseMigrator>();
|
|
await migrator.MigrateAsync();
|
|
var version = await migrator.GetCurrentVersionAsync();
|
|
Console.WriteLine($"Database migrated to version {version}");
|
|
}
|
|
|
|
app.Run();
|
|
```
|
|
|
|
## Connection String Examples
|
|
|
|
### Local Development
|
|
```
|
|
Host=localhost;Database=events;Username=postgres;Password=postgres;
|
|
```
|
|
|
|
### Production with SSL
|
|
```
|
|
Host=prod-db.example.com;Database=events;Username=app_user;Password=secret;SSL Mode=Require;Trust Server Certificate=false;
|
|
```
|
|
|
|
### Connection Pooling
|
|
```
|
|
Host=localhost;Database=events;Username=app;Password=pass;Minimum Pool Size=5;Maximum Pool Size=100;
|
|
```
|
|
|
|
### With Timeout Settings
|
|
```
|
|
Host=localhost;Database=events;Username=app;Password=pass;Timeout=30;Command Timeout=60;
|
|
```
|
|
|
|
## See Also
|
|
|
|
- [PostgreSQL Testing Guide](POSTGRESQL-TESTING.md)
|
|
- [Event Streaming Examples](EVENT_STREAMING_EXAMPLES.md)
|
|
- [README.md](../README.md)
|