6.7 KiB
6.7 KiB
Storage
Storage backends for event streaming.
Overview
Svrnty.CQRS provides two storage implementations for event streams: PostgreSQL for production deployments and In-Memory for development and testing.
Storage Backends
| Backend | Persistence | Use Case | Package |
|---|---|---|---|
| PostgreSQL | Durable | Production | Svrnty.CQRS.Events.PostgreSQL |
| In-Memory | Volatile | Development/Testing | Svrnty.CQRS.Events |
Quick Comparison
PostgreSQL Storage
Pros:
- ✅ Durable persistence
- ✅ ACID transactions
- ✅ Concurrent access
- ✅ Consumer groups support
- ✅ Retention policies
- ✅ Event replay
- ✅ Stream configuration
- ✅ High performance (SKIP LOCKED)
Cons:
- ❌ Requires PostgreSQL instance
- ❌ Network latency
- ❌ More complex setup
When to use:
- Production deployments
- Multi-instance scenarios
- Long-term event storage
- Consumer group coordination
In-Memory Storage
Pros:
- ✅ Zero setup
- ✅ Fast (no I/O)
- ✅ Simple configuration
- ✅ Great for testing
Cons:
- ❌ No persistence (lost on restart)
- ❌ Limited to single process
- ❌ No consumer groups
- ❌ No retention policies
When to use:
- Unit testing
- Local development
- Prototyping
- Learning the framework
Installation
PostgreSQL
dotnet add package Svrnty.CQRS.Events.PostgreSQL
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
In-Memory
dotnet add package Svrnty.CQRS.Events
Configuration
PostgreSQL
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Register PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
// Optional: Consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
// Optional: Retention policies
builder.Services.AddPostgresRetentionPolicies(options =>
{
options.Enabled = true;
options.CleanupInterval = TimeSpan.FromHours(1);
});
var app = builder.Build();
app.Run();
appsettings.json:
{
"ConnectionStrings": {
"EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
},
"EventStreaming": {
"ConsumerGroups": {
"HeartbeatInterval": "00:00:10",
"SessionTimeout": "00:00:30",
"CleanupInterval": "00:01:00"
}
}
}
In-Memory
using Svrnty.CQRS.Events;
var builder = WebApplication.CreateBuilder(args);
// Register in-memory event streaming
builder.Services.AddInMemoryEventStreaming();
var app = builder.Build();
app.Run();
Features by Backend
| Feature | PostgreSQL | In-Memory |
|---|---|---|
| Persistent Streams | ✅ | ✅ |
| Ephemeral Streams | ✅ | ✅ |
| Consumer Groups | ✅ | ❌ |
| Retention Policies | ✅ | ❌ |
| Event Replay | ✅ | ✅ |
| Stream Configuration | ✅ | ❌ |
| gRPC Streaming | ✅ | ✅ |
| Health Checks | ✅ | ❌ |
| Metrics | ✅ | ✅ |
| Durability | ✅ | ❌ |
| Multi-Instance | ✅ | ❌ |
Storage Operations
Common Interface
Both backends implement the same interface:
public interface IEventStreamStore
{
// Persistent streams
Task AppendAsync(string streamName, object[] events, CancellationToken ct = default);
IAsyncEnumerable<StoredEvent> ReadStreamAsync(string streamName, long fromOffset, CancellationToken ct = default);
// Ephemeral streams
Task EnqueueAsync(string streamName, object message, CancellationToken ct = default);
Task<StoredMessage?> DequeueAsync(string streamName, TimeSpan visibilityTimeout, CancellationToken ct = default);
Task AcknowledgeAsync(string streamName, string messageId, CancellationToken ct = default);
Task NackAsync(string streamName, string messageId, TimeSpan redeliverAfter, CancellationToken ct = default);
}
Example Usage
// Works with both PostgreSQL and in-memory
public class OrderService
{
private readonly IEventStreamStore _eventStore;
public async Task PublishOrderPlacedAsync(int orderId, string customer, decimal amount)
{
await _eventStore.AppendAsync("orders", new[]
{
new OrderPlacedEvent
{
OrderId = orderId,
CustomerName = customer,
TotalAmount = amount
}
});
}
public async Task ProcessOrdersAsync()
{
await foreach (var @event in _eventStore.ReadStreamAsync("orders", fromOffset: 0))
{
Console.WriteLine($"Order event: {@event.EventType}");
}
}
}
Database Setup
PostgreSQL (Docker)
# Start PostgreSQL
docker run -d --name postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=eventstore \
-p 5432:5432 \
postgres:16
# Tables created automatically on first run
dotnet run
PostgreSQL (Production)
# Create database
createdb eventstore
# Run migrations (automatic)
# Tables created when application starts
# Or run migrations manually
dotnet ef database update
Performance
PostgreSQL Optimizations
- Connection pooling (see Connection Pooling)
- Batch operations for bulk inserts
- SKIP LOCKED for concurrent dequeue
- Indexes on stream_name, offset, timestamp
- Partitioning for large streams (optional)
In-Memory Optimizations
- Thread-safe collections (ConcurrentQueue, ConcurrentDictionary)
- No I/O overhead
- Direct memory access
Migration
Development to Production
Switch from in-memory to PostgreSQL:
Before (Development):
builder.Services.AddInMemoryEventStreaming();
After (Production):
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
No code changes needed - same IEventStreamStore interface!
Monitoring
PostgreSQL
- Query stream sizes:
SELECT stream_name, COUNT(*) FROM events GROUP BY stream_name - Monitor consumer lag via consumer_offsets table
- Use pg_stat_statements for query performance
In-Memory
- Check memory usage
- Monitor GC pressure
- Use memory profiler for large streams
Learn More
- In-Memory Storage - Development setup
- PostgreSQL Storage - Production deployment
- Database Schema - PostgreSQL schema details
- Connection Pooling - Performance tuning