dotnet-cqrs/docs/event-streaming/storage
2025-12-11 01:18:24 -05:00
..
connection-pooling.md this is a mess 2025-12-11 01:18:24 -05:00
database-schema.md this is a mess 2025-12-11 01:18:24 -05:00
in-memory-storage.md this is a mess 2025-12-11 01:18:24 -05:00
postgresql-storage.md this is a mess 2025-12-11 01:18:24 -05:00
README.md this is a mess 2025-12-11 01:18:24 -05:00

Storage

Storage backends for event streaming.

Overview

Svrnty.CQRS provides two storage implementations for event streams: PostgreSQL for production deployments and In-Memory for development and testing.

Storage Backends

Backend Persistence Use Case Package
PostgreSQL Durable Production Svrnty.CQRS.Events.PostgreSQL
In-Memory Volatile Development/Testing Svrnty.CQRS.Events

Quick Comparison

PostgreSQL Storage

Pros:

  • Durable persistence
  • ACID transactions
  • Concurrent access
  • Consumer groups support
  • Retention policies
  • Event replay
  • Stream configuration
  • High performance (SKIP LOCKED)

Cons:

  • Requires PostgreSQL instance
  • Network latency
  • More complex setup

When to use:

  • Production deployments
  • Multi-instance scenarios
  • Long-term event storage
  • Consumer group coordination

In-Memory Storage

Pros:

  • Zero setup
  • Fast (no I/O)
  • Simple configuration
  • Great for testing

Cons:

  • No persistence (lost on restart)
  • Limited to single process
  • No consumer groups
  • No retention policies

When to use:

  • Unit testing
  • Local development
  • Prototyping
  • Learning the framework

Installation

PostgreSQL

dotnet add package Svrnty.CQRS.Events.PostgreSQL
dotnet add package Svrnty.CQRS.Events.ConsumerGroups

In-Memory

dotnet add package Svrnty.CQRS.Events

Configuration

PostgreSQL

using Svrnty.CQRS.Events.PostgreSQL;

var builder = WebApplication.CreateBuilder(args);

// Register PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

// Optional: Consumer groups
builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

// Optional: Retention policies
builder.Services.AddPostgresRetentionPolicies(options =>
{
    options.Enabled = true;
    options.CleanupInterval = TimeSpan.FromHours(1);
});

var app = builder.Build();
app.Run();

appsettings.json:

{
  "ConnectionStrings": {
    "EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
  },
  "EventStreaming": {
    "ConsumerGroups": {
      "HeartbeatInterval": "00:00:10",
      "SessionTimeout": "00:00:30",
      "CleanupInterval": "00:01:00"
    }
  }
}

In-Memory

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Register in-memory event streaming
builder.Services.AddInMemoryEventStreaming();

var app = builder.Build();
app.Run();

Features by Backend

Feature PostgreSQL In-Memory
Persistent Streams
Ephemeral Streams
Consumer Groups
Retention Policies
Event Replay
Stream Configuration
gRPC Streaming
Health Checks
Metrics
Durability
Multi-Instance

Storage Operations

Common Interface

Both backends implement the same interface:

public interface IEventStreamStore
{
    // Persistent streams
    Task AppendAsync(string streamName, object[] events, CancellationToken ct = default);
    IAsyncEnumerable<StoredEvent> ReadStreamAsync(string streamName, long fromOffset, CancellationToken ct = default);

    // Ephemeral streams
    Task EnqueueAsync(string streamName, object message, CancellationToken ct = default);
    Task<StoredMessage?> DequeueAsync(string streamName, TimeSpan visibilityTimeout, CancellationToken ct = default);
    Task AcknowledgeAsync(string streamName, string messageId, CancellationToken ct = default);
    Task NackAsync(string streamName, string messageId, TimeSpan redeliverAfter, CancellationToken ct = default);
}

Example Usage

// Works with both PostgreSQL and in-memory
public class OrderService
{
    private readonly IEventStreamStore _eventStore;

    public async Task PublishOrderPlacedAsync(int orderId, string customer, decimal amount)
    {
        await _eventStore.AppendAsync("orders", new[]
        {
            new OrderPlacedEvent
            {
                OrderId = orderId,
                CustomerName = customer,
                TotalAmount = amount
            }
        });
    }

    public async Task ProcessOrdersAsync()
    {
        await foreach (var @event in _eventStore.ReadStreamAsync("orders", fromOffset: 0))
        {
            Console.WriteLine($"Order event: {@event.EventType}");
        }
    }
}

Database Setup

PostgreSQL (Docker)

# Start PostgreSQL
docker run -d --name postgres \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=eventstore \
  -p 5432:5432 \
  postgres:16

# Tables created automatically on first run
dotnet run

PostgreSQL (Production)

# Create database
createdb eventstore

# Run migrations (automatic)
# Tables created when application starts

# Or run migrations manually
dotnet ef database update

Performance

PostgreSQL Optimizations

  1. Connection pooling (see Connection Pooling)
  2. Batch operations for bulk inserts
  3. SKIP LOCKED for concurrent dequeue
  4. Indexes on stream_name, offset, timestamp
  5. Partitioning for large streams (optional)

In-Memory Optimizations

  1. Thread-safe collections (ConcurrentQueue, ConcurrentDictionary)
  2. No I/O overhead
  3. Direct memory access

Migration

Development to Production

Switch from in-memory to PostgreSQL:

Before (Development):

builder.Services.AddInMemoryEventStreaming();

After (Production):

builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

No code changes needed - same IEventStreamStore interface!

Monitoring

PostgreSQL

  • Query stream sizes: SELECT stream_name, COUNT(*) FROM events GROUP BY stream_name
  • Monitor consumer lag via consumer_offsets table
  • Use pg_stat_statements for query performance

In-Memory

  • Check memory usage
  • Monitor GC pressure
  • Use memory profiler for large streams

Learn More

See Also