dotnet-cqrs/docs/event-streaming/storage/README.md

293 lines
6.7 KiB
Markdown

# Storage
Storage backends for event streaming.
## Overview
Svrnty.CQRS provides two storage implementations for event streams: **PostgreSQL** for production deployments and **In-Memory** for development and testing.
## Storage Backends
| Backend | Persistence | Use Case | Package |
|---------|-------------|----------|---------|
| **PostgreSQL** | Durable | Production | `Svrnty.CQRS.Events.PostgreSQL` |
| **In-Memory** | Volatile | Development/Testing | `Svrnty.CQRS.Events` |
## Quick Comparison
### PostgreSQL Storage
**Pros:**
- ✅ Durable persistence
- ✅ ACID transactions
- ✅ Concurrent access
- ✅ Consumer groups support
- ✅ Retention policies
- ✅ Event replay
- ✅ Stream configuration
- ✅ High performance (SKIP LOCKED)
**Cons:**
- ❌ Requires PostgreSQL instance
- ❌ Network latency
- ❌ More complex setup
**When to use:**
- Production deployments
- Multi-instance scenarios
- Long-term event storage
- Consumer group coordination
### In-Memory Storage
**Pros:**
- ✅ Zero setup
- ✅ Fast (no I/O)
- ✅ Simple configuration
- ✅ Great for testing
**Cons:**
- ❌ No persistence (lost on restart)
- ❌ Limited to single process
- ❌ No consumer groups
- ❌ No retention policies
**When to use:**
- Unit testing
- Local development
- Prototyping
- Learning the framework
## Installation
### PostgreSQL
```bash
dotnet add package Svrnty.CQRS.Events.PostgreSQL
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
```
### In-Memory
```bash
dotnet add package Svrnty.CQRS.Events
```
## Configuration
### PostgreSQL
```csharp
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Register PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
// Optional: Consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
// Optional: Retention policies
builder.Services.AddPostgresRetentionPolicies(options =>
{
options.Enabled = true;
options.CleanupInterval = TimeSpan.FromHours(1);
});
var app = builder.Build();
app.Run();
```
**appsettings.json:**
```json
{
"ConnectionStrings": {
"EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
},
"EventStreaming": {
"ConsumerGroups": {
"HeartbeatInterval": "00:00:10",
"SessionTimeout": "00:00:30",
"CleanupInterval": "00:01:00"
}
}
}
```
### In-Memory
```csharp
using Svrnty.CQRS.Events;
var builder = WebApplication.CreateBuilder(args);
// Register in-memory event streaming
builder.Services.AddInMemoryEventStreaming();
var app = builder.Build();
app.Run();
```
## Features by Backend
| Feature | PostgreSQL | In-Memory |
|---------|-----------|-----------|
| Persistent Streams | ✅ | ✅ |
| Ephemeral Streams | ✅ | ✅ |
| Consumer Groups | ✅ | ❌ |
| Retention Policies | ✅ | ❌ |
| Event Replay | ✅ | ✅ |
| Stream Configuration | ✅ | ❌ |
| gRPC Streaming | ✅ | ✅ |
| Health Checks | ✅ | ❌ |
| Metrics | ✅ | ✅ |
| Durability | ✅ | ❌ |
| Multi-Instance | ✅ | ❌ |
## Storage Operations
### Common Interface
Both backends implement the same interface:
```csharp
public interface IEventStreamStore
{
// Persistent streams
Task AppendAsync(string streamName, object[] events, CancellationToken ct = default);
IAsyncEnumerable<StoredEvent> ReadStreamAsync(string streamName, long fromOffset, CancellationToken ct = default);
// Ephemeral streams
Task EnqueueAsync(string streamName, object message, CancellationToken ct = default);
Task<StoredMessage?> DequeueAsync(string streamName, TimeSpan visibilityTimeout, CancellationToken ct = default);
Task AcknowledgeAsync(string streamName, string messageId, CancellationToken ct = default);
Task NackAsync(string streamName, string messageId, TimeSpan redeliverAfter, CancellationToken ct = default);
}
```
### Example Usage
```csharp
// Works with both PostgreSQL and in-memory
public class OrderService
{
private readonly IEventStreamStore _eventStore;
public async Task PublishOrderPlacedAsync(int orderId, string customer, decimal amount)
{
await _eventStore.AppendAsync("orders", new[]
{
new OrderPlacedEvent
{
OrderId = orderId,
CustomerName = customer,
TotalAmount = amount
}
});
}
public async Task ProcessOrdersAsync()
{
await foreach (var @event in _eventStore.ReadStreamAsync("orders", fromOffset: 0))
{
Console.WriteLine($"Order event: {@event.EventType}");
}
}
}
```
## Database Setup
### PostgreSQL (Docker)
```bash
# Start PostgreSQL
docker run -d --name postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=eventstore \
-p 5432:5432 \
postgres:16
# Tables created automatically on first run
dotnet run
```
### PostgreSQL (Production)
```bash
# Create database
createdb eventstore
# Run migrations (automatic)
# Tables created when application starts
# Or run migrations manually
dotnet ef database update
```
## Performance
### PostgreSQL Optimizations
1. **Connection pooling** (see [Connection Pooling](connection-pooling.md))
2. **Batch operations** for bulk inserts
3. **SKIP LOCKED** for concurrent dequeue
4. **Indexes** on stream_name, offset, timestamp
5. **Partitioning** for large streams (optional)
### In-Memory Optimizations
1. **Thread-safe collections** (ConcurrentQueue, ConcurrentDictionary)
2. **No I/O overhead**
3. **Direct memory access**
## Migration
### Development to Production
Switch from in-memory to PostgreSQL:
**Before (Development):**
```csharp
builder.Services.AddInMemoryEventStreaming();
```
**After (Production):**
```csharp
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
```
No code changes needed - same `IEventStreamStore` interface!
## Monitoring
### PostgreSQL
- Query stream sizes: `SELECT stream_name, COUNT(*) FROM events GROUP BY stream_name`
- Monitor consumer lag via consumer_offsets table
- Use pg_stat_statements for query performance
### In-Memory
- Check memory usage
- Monitor GC pressure
- Use memory profiler for large streams
## Learn More
- [In-Memory Storage](in-memory-storage.md) - Development setup
- [PostgreSQL Storage](postgresql-storage.md) - Production deployment
- [Database Schema](database-schema.md) - PostgreSQL schema details
- [Connection Pooling](connection-pooling.md) - Performance tuning
## See Also
- [Event Streaming Overview](../README.md)
- [Getting Started](../fundamentals/getting-started.md)
- [Consumer Groups](../consumer-groups/README.md)
- [Retention Policies](../retention-policies/README.md)