dotnet-cqrs/README.md

48 KiB
Raw Permalink Blame History

This project was originally initiated by Powered Software Inc. and was forked from the PoweredSoft.CQRS Repository

CQRS

Our implementation of query and command responsibility segregation (CQRS).

Getting Started

Install nuget package to your awesome project.

Package Name NuGet NuGet Install
Svrnty.CQRS NuGet dotnet add package Svrnty.CQRS
Svrnty.CQRS.MinimalApi NuGet dotnet add package Svrnty.CQRS.MinimalApi
Svrnty.CQRS.FluentValidation NuGet dotnet add package Svrnty.CQRS.FluentValidation
Svrnty.CQRS.DynamicQuery NuGet dotnet add package Svrnty.CQRS.DynamicQuery
Svrnty.CQRS.DynamicQuery.MinimalApi NuGet dotnet add package Svrnty.CQRS.DynamicQuery.MinimalApi
Svrnty.CQRS.Grpc NuGet dotnet add package Svrnty.CQRS.Grpc
Svrnty.CQRS.Grpc.Generators NuGet dotnet add package Svrnty.CQRS.Grpc.Generators
Svrnty.CQRS.Events NuGet dotnet add package Svrnty.CQRS.Events
Svrnty.CQRS.Events.Grpc NuGet dotnet add package Svrnty.CQRS.Events.Grpc
Svrnty.CQRS.Events.PostgreSQL NuGet dotnet add package Svrnty.CQRS.Events.PostgreSQL
Svrnty.CQRS.Events.ConsumerGroups NuGet dotnet add package Svrnty.CQRS.Events.ConsumerGroups

Abstractions Packages.

Package Name NuGet NuGet Install
Svrnty.CQRS.Abstractions NuGet dotnet add package Svrnty.CQRS.Abstractions
Svrnty.CQRS.DynamicQuery.Abstractions NuGet dotnet add package Svrnty.CQRS.DynamicQuery.Abstractions
Svrnty.CQRS.Grpc.Abstractions NuGet dotnet add package Svrnty.CQRS.Grpc.Abstractions
Svrnty.CQRS.Events.Abstractions NuGet dotnet add package Svrnty.CQRS.Events.Abstractions
Svrnty.CQRS.Events.ConsumerGroups.Abstractions NuGet dotnet add package Svrnty.CQRS.Events.ConsumerGroups.Abstractions
using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.Grpc;

var builder = WebApplication.CreateBuilder(args);

// Register your commands with validators
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();

// Register your queries
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();

// Configure CQRS with gRPC support
builder.Services.AddSvrntyCqrs(cqrs =>
{
    // Enable gRPC endpoints with reflection
    cqrs.AddGrpc(grpc =>
    {
        grpc.EnableReflection();
    });
});

var app = builder.Build();

// Map all configured CQRS endpoints
app.UseSvrntyCqrs();

app.Run();

Important: gRPC Requirements

The gRPC implementation uses Grpc.Tools with .proto files and source generators for automatic service implementation:

1. Install required packages:

dotnet add package Grpc.AspNetCore
dotnet add package Grpc.AspNetCore.Server.Reflection
dotnet add package Grpc.StatusProto  # For Rich Error Model validation

2. Add the source generator as an analyzer:

dotnet add package Svrnty.CQRS.Grpc.Generators

The source generator is automatically configured as an analyzer when installed via NuGet and will generate both the .proto files and gRPC service implementations at compile time.

3. Define your C# commands and queries:

public record AddUserCommand
{
    public required string Name { get; init; }
    public required string Email { get; init; }
    public int Age { get; init; }
}

public record RemoveUserCommand
{
    public int UserId { get; init; }
}

Notes:

  • The source generator automatically creates:
    • .proto files in the Protos/ directory from your C# commands and queries
    • CommandServiceImpl and QueryServiceImpl implementations
  • FluentValidation is automatically integrated with Google Rich Error Model for structured validation errors
  • Validation errors return google.rpc.Status with BadRequest containing FieldViolations
  • Use record types for commands/queries (immutable, value-based equality, more concise)
  • No need for protobuf-net attributes - just define your C# types

Sample of startup code for Minimal API (HTTP)

For HTTP scenarios (web browsers, public APIs), you can use the Minimal API approach:

using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.MinimalApi;

var builder = WebApplication.CreateBuilder(args);

// Register your commands with validators
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();

// Register your queries
builder.Services.AddQuery<PersonQuery, IQueryable<Person>, PersonQueryHandler>();

// Configure CQRS with Minimal API support
builder.Services.AddSvrntyCqrs(cqrs =>
{
    // Enable Minimal API endpoints
    cqrs.AddMinimalApi();
});

// Add Swagger (optional)
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// Map all configured CQRS endpoints (automatically creates POST /api/command/* and POST/GET /api/query/*)
app.UseSvrntyCqrs();

app.Run();

Notes:

  • FluentValidation is automatically integrated with RFC 7807 Problem Details for structured validation errors
  • Use record types for commands/queries (immutable, value-based equality, more concise)
  • Supports both POST and GET (for queries) endpoints
  • Automatically generates Swagger/OpenAPI documentation

Sample enabling both gRPC and HTTP

You can enable both gRPC and traditional HTTP endpoints simultaneously, allowing clients to choose their preferred protocol:

using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.Grpc;
using Svrnty.CQRS.MinimalApi;

var builder = WebApplication.CreateBuilder(args);

// Register your commands with validators
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();

// Register your queries
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();

// Configure CQRS with both gRPC and Minimal API support
builder.Services.AddSvrntyCqrs(cqrs =>
{
    // Enable gRPC endpoints with reflection
    cqrs.AddGrpc(grpc =>
    {
        grpc.EnableReflection();
    });

    // Enable Minimal API endpoints
    cqrs.AddMinimalApi();
});

// Add HTTP support with Swagger
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// Map all configured CQRS endpoints (both gRPC and HTTP)
app.UseSvrntyCqrs();

app.Run();

Benefits:

  • Single codebase supports multiple protocols
  • gRPC for high-performance, low-latency scenarios (microservices, internal APIs)
  • HTTP for web browsers, legacy clients, and public APIs
  • Same commands, queries, and validation logic for both protocols
  • Swagger UI available for HTTP endpoints, gRPC reflection for gRPC clients

Fluent Validation

FluentValidation is optional but recommended for command and query validation. The Svrnty.CQRS.FluentValidation package provides extension methods to simplify validator registration.

The package exposes extension method overloads that accept the validator as a generic parameter:

dotnet add package Svrnty.CQRS.FluentValidation
using Svrnty.CQRS.FluentValidation; // Extension methods for validator registration

// Command with result - validator as last generic parameter
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();

// Command without result - validator included in generics
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();

Benefits:

  • Single line registration - Handler and validator registered together
  • Type safety - Compiler ensures validator matches command type
  • Less boilerplate - No need for separate AddTransient<IValidator<T>>() calls
  • Cleaner code - Clear intent that validation is part of command pipeline

Without Svrnty.CQRS.FluentValidation

If you prefer not to use the FluentValidation package, you need to register commands and validators separately:

using FluentValidation;
using Svrnty.CQRS;

// Register command handler
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler>();

// Manually register validator
builder.Services.AddTransient<IValidator<EchoCommand>, EchoCommandValidator>();

Event Streaming

Svrnty.CQRS includes comprehensive event streaming support for building event-driven architectures with both persistent (event sourcing) and ephemeral (message queue) streams.

Quick Start

# Install core event streaming packages
dotnet add package Svrnty.CQRS.Events
dotnet add package Svrnty.CQRS.Events.Grpc  # For gRPC bidirectional streaming

# Install storage backend
dotnet add package Svrnty.CQRS.Events.PostgreSQL  # PostgreSQL storage (recommended for production)

Basic Setup

using Svrnty.CQRS.Events;
using Svrnty.CQRS.Events.PostgreSQL;

var builder = WebApplication.CreateBuilder(args);

// Add event streaming support
builder.Services.AddSvrntyEvents();
builder.Services.AddDefaultEventDiscovery();

// Configure storage backend (PostgreSQL)
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetSection("EventStreaming:PostgreSQL"));

// Enable gRPC event streaming (optional)
builder.Services.AddSvrntyEventsGrpc();

var app = builder.Build();

// Map gRPC event streaming endpoints
app.MapGrpcService<EventStreamServiceImpl>();

app.Run();

PostgreSQL Configuration

Add to appsettings.json:

{
  "EventStreaming": {
    "PostgreSQL": {
      "ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;Username=postgres;Password=postgres",
      "SchemaName": "event_streaming",
      "AutoMigrate": true,
      "MaxPoolSize": 100,
      "MinPoolSize": 5
    }
  }
}

Key Features:

  • Auto-Migration: Database schema created automatically on startup (when AutoMigrate: true)
  • Connection Pooling: Configurable Npgsql connection pool (5-100 connections)
  • Schema Isolation: All tables in dedicated schema (default: event_streaming)

Storage Options

1. PostgreSQL (Production-Ready)

builder.Services.AddPostgresEventStreaming(options => {
    options.ConnectionString = "Host=localhost;...";
    options.AutoMigrate = true;  // Create schema automatically
});

Features:

  • Persistent and ephemeral stream support
  • SKIP LOCKED for concurrent queue operations
  • Visibility timeout for message processing
  • Dead letter queue for failed messages
  • Consumer offset tracking (Phase 2.3)
  • Retention policies support (Phase 2.4)

2. In-Memory (Development/Testing)

builder.Services.AddInMemoryEventStorage();

Features:

  • Fast in-memory storage
  • Suitable for development and testing
  • No external dependencies
  • Data lost on restart

Stream Types

Persistent Streams (Event Sourcing)

Append-only event logs for event sourcing patterns:

// Append events to persistent stream
await eventStore.AppendAsync("user-123", new UserCreatedEvent
{
    UserId = 123,
    Name = "Alice",
    Email = "alice@example.com"
});

// Read stream from offset
var events = await eventStore.ReadStreamAsync("user-123", fromOffset: 0);

// Get stream length
var length = await eventStore.GetStreamLengthAsync("user-123");

Ephemeral Streams (Message Queue)

Message queue semantics with at-least-once delivery:

// Enqueue messages
await eventStore.EnqueueAsync("notifications", new EmailNotificationEvent
{
    To = "user@example.com",
    Subject = "Welcome!",
    Body = "Thanks for signing up"
});

// Dequeue with visibility timeout
var @event = await eventStore.DequeueAsync(
    streamName: "notifications",
    consumerId: "worker-1",
    visibilityTimeout: TimeSpan.FromSeconds(30)
);

// Process and acknowledge
if (@event != null)
{
    await ProcessEventAsync(@event);
    await eventStore.AcknowledgeAsync("notifications", @event.EventId, "worker-1");
}

// Or negative acknowledge (requeue or move to DLQ)
await eventStore.NackAsync(
    streamName: "notifications",
    eventId: @event.EventId,
    consumerId: "worker-1",
    requeue: false  // Move to dead letter queue
);

gRPC Bidirectional Streaming

The framework provides gRPC bidirectional streaming for real-time event delivery:

// Client-side subscription (in your client application)
var call = client.SubscribeToStream();

// Send subscription request
await call.RequestStream.WriteAsync(new SubscribeRequest
{
    StreamName = "user-events",
    ConsumerId = "client-123",
    SubscriptionMode = SubscriptionMode.Broadcast
});

// Receive events in real-time
await foreach (var @event in call.ResponseStream.ReadAllAsync())
{
    Console.WriteLine($"Received event: {@event.EventType}");
}

Consumer Groups

Consumer groups enable multiple consumers to coordinate processing of persistent streams without duplicates. This provides load balancing, fault tolerance, and at-least-once delivery guarantees.

Installation

dotnet add package Svrnty.CQRS.Events.ConsumerGroups

Setup

using Svrnty.CQRS.Events.ConsumerGroups;

var builder = WebApplication.CreateBuilder(args);

// Add consumer group support with PostgreSQL backend
builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

var app = builder.Build();
app.Run();

Configuration in appsettings.json:

{
  "EventStreaming": {
    "ConsumerGroups": {
      "ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;...",
      "SchemaName": "event_streaming",
      "AutoMigrate": true
    }
  }
}

Basic Usage

// Inject the consumer group reader
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();

// Consume events with automatic offset management
await foreach (var @event in reader.ConsumeAsync(
    streamName: "orders",
    groupId: "order-processors",
    consumerId: "worker-1",
    options: new ConsumerGroupOptions
    {
        BatchSize = 100,
        CommitStrategy = OffsetCommitStrategy.AfterBatch,
        HeartbeatInterval = TimeSpan.FromSeconds(10),
        SessionTimeout = TimeSpan.FromSeconds(30)
    },
    cancellationToken))
{
    await ProcessOrderEventAsync(@event);
    // Offset automatically committed after batch
}

Offset Commit Strategies

1. AfterEach - Commit after each event (safest, highest overhead):

options.CommitStrategy = OffsetCommitStrategy.AfterEach;

2. AfterBatch - Commit after each batch (balanced):

options.CommitStrategy = OffsetCommitStrategy.AfterBatch;
options.BatchSize = 100;

3. Periodic - Commit at intervals (highest throughput):

options.CommitStrategy = OffsetCommitStrategy.Periodic;
options.PeriodicCommitInterval = TimeSpan.FromSeconds(5);

4. Manual - Full control over commits:

options.CommitStrategy = OffsetCommitStrategy.Manual;

await foreach (var @event in reader.ConsumeAsync(...))
{
    try
    {
        await ProcessEventAsync(@event);

        // Manually commit after successful processing
        await reader.CommitOffsetAsync(
            streamName: "orders",
            groupId: "order-processors",
            consumerId: "worker-1",
            offset: currentOffset,
            cancellationToken);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Processing failed, will retry");
        // Don't commit - event will be reprocessed
    }
}

Monitoring Consumer Groups

var offsetStore = serviceProvider.GetRequiredService<IConsumerOffsetStore>();

// Get all active consumers in a group
var consumers = await offsetStore.GetActiveConsumersAsync("order-processors");
foreach (var consumer in consumers)
{
    Console.WriteLine($"Consumer: {consumer.ConsumerId}");
    Console.WriteLine($"  Last Heartbeat: {consumer.LastHeartbeat}");
    Console.WriteLine($"  Registered: {consumer.RegisteredAt}");
}

// Get group offsets per consumer
var offsets = await offsetStore.GetGroupOffsetsAsync("order-processors", "orders");
foreach (var (consumerId, offset) in offsets)
{
    Console.WriteLine($"{consumerId}: offset {offset}");
}

// Get last committed offset for the group (minimum across all consumers)
var groupOffset = await offsetStore.GetCommittedOffsetAsync("order-processors", "orders");
Console.WriteLine($"Group safe offset: {groupOffset}");

Key Features

  • Automatic Offset Management: Tracks last processed position per consumer
  • Heartbeat Monitoring: Detects and removes stale consumers automatically
  • Flexible Commit Strategies: Choose when to commit offsets (manual, per-event, per-batch, periodic)
  • Load Balancing: Multiple consumers can process the same stream
  • Fault Tolerance: Consumers can resume from last committed offset after failure
  • At-Least-Once Delivery: Events processed at least once, even with consumer failures

Health Monitoring

The ConsumerHealthMonitor background service automatically:

  • Sends periodic heartbeats for registered consumers
  • Detects stale consumers (no heartbeat within session timeout)
  • Cleans up dead consumers from the registry
  • Logs consumer group health metrics

Configure health monitoring:

builder.Services.AddPostgresConsumerGroups(
    storageConfig => { /* ... */ },
    healthConfig => {
        healthConfig.CleanupInterval = TimeSpan.FromSeconds(30);
        healthConfig.SessionTimeout = TimeSpan.FromSeconds(60);
        healthConfig.Enabled = true;
    });

Retention Policies

Automatic retention policy enforcement ensures that old events are cleaned up according to configurable rules. This helps manage database size and comply with data retention requirements.

Installation

dotnet add package Svrnty.CQRS.Events.PostgreSQL

The retention policy feature is included in the PostgreSQL event streaming package.

Setup

Register retention policy services in your application:

builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");

// Add retention policy background service
builder.Services.AddPostgresRetentionPolicies(options =>
{
    options.Enabled = true;
    options.CleanupInterval = TimeSpan.FromHours(1);
    options.CleanupWindowStart = TimeSpan.FromHours(2);  // 2 AM UTC
    options.CleanupWindowEnd = TimeSpan.FromHours(6);    // 6 AM UTC
    options.UseCleanupWindow = true;
});

Or use configuration:

builder.Services.AddPostgresRetentionPolicies(
    builder.Configuration.GetSection("RetentionService"));
{
  "RetentionService": {
    "Enabled": true,
    "CleanupInterval": "01:00:00",
    "CleanupWindowStart": "02:00:00",
    "CleanupWindowEnd": "06:00:00",
    "UseCleanupWindow": true
  }
}

Usage

Setting Retention Policies

var policyStore = serviceProvider.GetRequiredService<IRetentionPolicyStore>();

// Time-based retention: delete events older than 30 days
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "orders",
    MaxAge = TimeSpan.FromDays(30),
    Enabled = true
});

// Size-based retention: keep only last 10,000 events
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "analytics",
    MaxEventCount = 10000,
    Enabled = true
});

// Combined retention: use both time and size limits
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "logs",
    MaxAge = TimeSpan.FromDays(7),
    MaxEventCount = 50000,
    Enabled = true
});

// Default policy for all streams (use "*" as stream name)
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "*",
    MaxAge = TimeSpan.FromDays(90),
    Enabled = true
});

Retrieving Policies

// Get specific policy
var policy = await policyStore.GetPolicyAsync("orders");
if (policy != null)
{
    Console.WriteLine($"Stream: {policy.StreamName}");
    Console.WriteLine($"Max Age: {policy.MaxAge}");
    Console.WriteLine($"Max Event Count: {policy.MaxEventCount}");
    Console.WriteLine($"Enabled: {policy.Enabled}");
}

// Get all policies
var policies = await policyStore.GetAllPoliciesAsync();
foreach (var p in policies)
{
    Console.WriteLine($"{p.StreamName}: Age={p.MaxAge}, Count={p.MaxEventCount}");
}

Deleting Policies

// Delete a specific policy
var deleted = await policyStore.DeletePolicyAsync("orders");

// Note: Cannot delete the default "*" policy

Manual Cleanup

While the background service runs automatically, you can also trigger cleanup manually:

// Apply all enabled retention policies
var result = await policyStore.ApplyRetentionPoliciesAsync();

Console.WriteLine($"Streams Processed: {result.StreamsProcessed}");
Console.WriteLine($"Events Deleted: {result.EventsDeleted}");
Console.WriteLine($"Duration: {result.Duration}");

// Per-stream details
foreach (var (streamName, count) in result.EventsDeletedPerStream)
{
    Console.WriteLine($"  {streamName}: {count} events deleted");
}

Key Features

  • Time-based Retention: Delete events older than specified duration
  • Size-based Retention: Keep only the most recent N events per stream
  • Wildcard Policies: Apply default policies to all streams using "*"
  • Cleanup Windows: Run cleanup only during specified time windows (e.g., off-peak hours)
  • Background Service: Automatic periodic cleanup via PeriodicTimer
  • Statistics Tracking: Detailed metrics about cleanup operations
  • Efficient Deletion: PostgreSQL stored procedures for batch cleanup

Cleanup Window

The cleanup window feature allows you to restrict when retention policies are enforced:

options.UseCleanupWindow = true;
options.CleanupWindowStart = TimeSpan.FromHours(2);   // 2 AM UTC
options.CleanupWindowEnd = TimeSpan.FromHours(6);     // 6 AM UTC

The window automatically handles midnight crossing:

// Window that spans midnight (10 PM to 2 AM)
options.CleanupWindowStart = TimeSpan.FromHours(22);  // 10 PM UTC
options.CleanupWindowEnd = TimeSpan.FromHours(2);     // 2 AM UTC

Disable the window to run cleanup at any time:

options.UseCleanupWindow = false;

Monitoring

The background service logs cleanup operations:

[INF] Retention policy service started. Cleanup interval: 01:00:00, Window: 02:00:00-06:00:00 UTC
[INF] Starting retention policy enforcement cycle
[INF] Retention cleanup complete: 3 streams processed, 1,234 events deleted in 00:00:01.234
[DBG] Stream orders: 500 events deleted
[DBG] Stream analytics: 734 events deleted
[DBG] Stream logs: 0 events deleted

When outside the cleanup window:

[DBG] Outside cleanup window (02:00:00-06:00:00 UTC), skipping retention enforcement

Event Replay API

The Event Replay API enables rebuilding projections, reprocessing events, and time-travel debugging by replaying historical events from persistent streams.

Installation

dotnet add package Svrnty.CQRS.Events.PostgreSQL

The event replay feature is included in the PostgreSQL event streaming package.

Setup

Register the event replay service:

builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");

// Add event replay service
builder.Services.AddPostgresEventReplay();

Usage

Replay from Offset

Replay events starting from a specific sequence number:

var replayService = serviceProvider.GetRequiredService<IEventReplayService>();

await foreach (var @event in replayService.ReplayFromOffsetAsync(
    streamName: "orders",
    startOffset: 1000,
    options: new ReplayOptions
    {
        BatchSize = 100,
        MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec
        ProgressCallback = progress =>
        {
            Console.WriteLine($"Progress: {progress.EventsProcessed} events " +
                            $"({progress.ProgressPercentage:F1}%) " +
                            $"@ {progress.EventsPerSecond:F0} events/sec");
        }
    }))
{
    await ProcessEventAsync(@event);
}

Replay from Time

Replay events starting from a specific timestamp:

var startTime = DateTimeOffset.UtcNow.AddDays(-7);

await foreach (var @event in replayService.ReplayFromTimeAsync(
    streamName: "orders",
    startTime: startTime,
    options: new ReplayOptions
    {
        MaxEvents = 10000,
        EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" }
    }))
{
    await RebuildProjectionAsync(@event);
}

Replay Time Range

Replay events within a specific time window:

var startTime = DateTimeOffset.UtcNow.AddDays(-7);
var endTime = DateTimeOffset.UtcNow.AddDays(-6);

await foreach (var @event in replayService.ReplayTimeRangeAsync(
    streamName: "analytics",
    startTime: startTime,
    endTime: endTime,
    options: new ReplayOptions
    {
        EventTypeFilter = new[] { "PageView", "Click" },
        ProgressInterval = 500
    }))
{
    await ProcessAnalyticsEventAsync(@event);
}

Replay All Events

Replay entire stream from the beginning:

await foreach (var @event in replayService.ReplayAllAsync(
    streamName: "orders",
    options: new ReplayOptions
    {
        BatchSize = 1000,
        MaxEventsPerSecond = 5000
    }))
{
    await ProcessEventAsync(@event);
}

Get Replay Count

Get the total number of events that would be replayed:

var count = await replayService.GetReplayCountAsync(
    streamName: "orders",
    startOffset: 1000,
    options: new ReplayOptions
    {
        EventTypeFilter = new[] { "OrderPlaced" }
    });

Console.WriteLine($"Will replay {count} OrderPlaced events");

Replay Options

Control replay behavior with ReplayOptions:

var options = new ReplayOptions
{
    // Batch size for reading from database (default: 100)
    BatchSize = 100,

    // Maximum events to replay (default: null = unlimited)
    MaxEvents = 10000,

    // Rate limiting in events/second (default: null = unlimited)
    MaxEventsPerSecond = 1000,

    // Filter by event types (default: null = all types)
    EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" },

    // Include metadata in events (default: true)
    IncludeMetadata = true,

    // Progress callback (default: null)
    ProgressCallback = progress =>
    {
        Console.WriteLine($"{progress.EventsProcessed} events processed");
    },

    // How often to invoke progress callback (default: 1000)
    ProgressInterval = 1000
};

Progress Tracking

Monitor replay progress with callbacks:

await foreach (var @event in replayService.ReplayFromOffsetAsync(
    streamName: "orders",
    startOffset: 0,
    options: new ReplayOptions
    {
        ProgressCallback = progress =>
        {
            Console.WriteLine($@"
Replay Progress:
  Current Offset: {progress.CurrentOffset}
  Events Processed: {progress.EventsProcessed:N0}
  Estimated Total: {progress.EstimatedTotal:N0}
  Progress: {progress.ProgressPercentage:F1}%
  Rate: {progress.EventsPerSecond:F0} events/sec
  Elapsed: {progress.Elapsed}
  Current Event Time: {progress.CurrentTimestamp}");
        },
        ProgressInterval = 1000
    }))
{
    await ProcessEventAsync(@event);
}

Rate Limiting

Control replay speed to avoid overwhelming consumers:

// Replay at maximum 500 events per second
await foreach (var @event in replayService.ReplayFromOffsetAsync(
    streamName: "high-volume-stream",
    startOffset: 0,
    options: new ReplayOptions
    {
        MaxEventsPerSecond = 500
    }))
{
    await ProcessEventSlowlyAsync(@event);
}

Event Type Filtering

Replay only specific event types:

// Only replay order-related events
await foreach (var @event in replayService.ReplayAllAsync(
    streamName: "orders",
    options: new ReplayOptions
    {
        EventTypeFilter = new[]
        {
            "OrderPlaced",
            "OrderShipped",
            "OrderDelivered"
        }
    }))
{
    await UpdateOrderProjectionAsync(@event);
}

Key Features

  • Offset-based Replay: Replay from specific sequence numbers
  • Time-based Replay: Replay from specific timestamps
  • Time Range Replay: Replay events within time windows
  • Event Type Filtering: Replay only specific event types
  • Rate Limiting: Control replay speed with token bucket algorithm
  • Progress Tracking: Monitor replay with callbacks and metrics
  • Batching: Efficient streaming with configurable batch sizes
  • Cancellation Support: Full CancellationToken support

Common Use Cases

Rebuilding Projections

// Rebuild entire read model from scratch
await foreach (var @event in replayService.ReplayAllAsync("orders"))
{
    await projectionUpdater.ApplyAsync(@event);
}

Reprocessing After Bug Fixes

// Reprocess last 24 hours after fixing handler bug
var yesterday = DateTimeOffset.UtcNow.AddDays(-1);

await foreach (var @event in replayService.ReplayFromTimeAsync("orders", yesterday))
{
    await fixedHandler.HandleAsync(@event);
}

Creating New Projections

// Build new analytics projection from historical data
await foreach (var @event in replayService.ReplayAllAsync(
    "user-activity",
    options: new ReplayOptions
    {
        EventTypeFilter = new[] { "PageView", "Click", "Purchase" },
        MaxEventsPerSecond = 10000 // Fast replay for batch processing
    }))
{
    await analyticsProjection.ApplyAsync(@event);
}

Time-Travel Debugging

// Replay specific time period to debug issue
var bugStart = new DateTimeOffset(2025, 12, 10, 14, 30, 0, TimeSpan.Zero);
var bugEnd = new DateTimeOffset(2025, 12, 10, 15, 00, 0, TimeSpan.Zero);

await foreach (var @event in replayService.ReplayTimeRangeAsync(
    "orders",
    bugStart,
    bugEnd))
{
    await debugHandler.InspectAsync(@event);
}

Testing Resources

Comprehensive testing guide available: POSTGRESQL-TESTING.md

Topics covered:

  • Docker PostgreSQL setup
  • Persistent stream operations
  • Ephemeral queue operations
  • Visibility timeout testing
  • Dead letter queue verification
  • Performance testing
  • Database schema inspection

Stream Configuration

The Stream Configuration feature provides per-stream configuration capabilities for fine-grained control over retention policies, dead letter queues, lifecycle management, performance tuning, and access control. Each stream can have its own settings that override global defaults.

Installation

Stream configuration is included in the PostgreSQL event streaming package:

dotnet add package Svrnty.CQRS.Events.PostgreSQL

Setup

Register the stream configuration services:

builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");

// Add stream configuration support
builder.Services.AddPostgresStreamConfiguration();

Usage

Basic Stream Configuration

Configure retention policies for a specific stream:

var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();

var config = new StreamConfiguration
{
    StreamName = "orders",
    Description = "Order processing stream",
    Tags = new Dictionary<string, string>
    {
        ["domain"] = "orders",
        ["environment"] = "production"
    },
    Retention = new RetentionConfiguration
    {
        MaxAge = TimeSpan.FromDays(90),
        MaxSizeBytes = 10L * 1024 * 1024 * 1024, // 10 GB
        EnablePartitioning = true,
        PartitionInterval = TimeSpan.FromDays(7)
    },
    CreatedAt = DateTimeOffset.UtcNow,
    CreatedBy = "admin"
};

await configStore.SetConfigurationAsync(config);

Dead Letter Queue Configuration

Configure error handling with dead letter queues:

var config = new StreamConfiguration
{
    StreamName = "payment-processing",
    DeadLetterQueue = new DeadLetterQueueConfiguration
    {
        Enabled = true,
        DeadLetterStreamName = "payment-processing-dlq",
        MaxDeliveryAttempts = 5,
        RetryDelay = TimeSpan.FromMinutes(5),
        StoreOriginalEvent = true,
        StoreErrorDetails = true
    },
    CreatedAt = DateTimeOffset.UtcNow
};

await configStore.SetConfigurationAsync(config);

Lifecycle Management

Configure automatic archival and deletion:

var config = new StreamConfiguration
{
    StreamName = "audit-logs",
    Lifecycle = new LifecycleConfiguration
    {
        AutoCreate = true,
        AutoArchive = true,
        ArchiveAfter = TimeSpan.FromDays(365),
        ArchiveLocation = "s3://archive-bucket/audit-logs",
        AutoDelete = false
    },
    CreatedAt = DateTimeOffset.UtcNow
};

await configStore.SetConfigurationAsync(config);

Performance Tuning

Configure performance-related settings:

var config = new StreamConfiguration
{
    StreamName = "high-throughput-events",
    Performance = new PerformanceConfiguration
    {
        BatchSize = 1000,
        EnableCompression = true,
        CompressionAlgorithm = "gzip",
        EnableIndexing = true,
        IndexedFields = new List<string> { "userId", "tenantId", "eventType" },
        CacheSize = 10000
    },
    CreatedAt = DateTimeOffset.UtcNow
};

await configStore.SetConfigurationAsync(config);

Access Control

Configure stream permissions and quotas:

var config = new StreamConfiguration
{
    StreamName = "sensitive-data",
    AccessControl = new AccessControlConfiguration
    {
        PublicRead = false,
        PublicWrite = false,
        AllowedReaders = new List<string> { "admin", "audit-service" },
        AllowedWriters = new List<string> { "admin", "data-ingestion-service" },
        MaxConsumerGroups = 5,
        MaxEventsPerSecond = 10000
    },
    CreatedAt = DateTimeOffset.UtcNow
};

await configStore.SetConfigurationAsync(config);

Getting Effective Configuration

Retrieve the effective configuration (stream-specific merged with defaults):

var configProvider = serviceProvider.GetRequiredService<IStreamConfigurationProvider>();

// Gets merged configuration (stream-specific + global defaults)
var effectiveConfig = await configProvider.GetEffectiveConfigurationAsync("orders");

// Get specific configuration sections
var retention = await configProvider.GetRetentionConfigurationAsync("orders");
var dlq = await configProvider.GetDeadLetterQueueConfigurationAsync("orders");
var lifecycle = await configProvider.GetLifecycleConfigurationAsync("orders");

Finding Configurations

Query configurations by criteria:

// Find all streams with archiving enabled
var archivingStreams = await configStore.FindConfigurationsAsync(
    c => c.Lifecycle?.AutoArchive == true);

// Find all production streams
var productionStreams = await configStore.FindConfigurationsAsync(
    c => c.Tags?.ContainsKey("environment") == true &&
         c.Tags["environment"] == "production");

// Get all configurations
var allConfigs = await configStore.GetAllConfigurationsAsync();

Deleting Configuration

Remove stream-specific configuration (reverts to defaults):

await configStore.DeleteConfigurationAsync("orders");

Configuration Options

RetentionConfiguration

  • MaxAge: Maximum age before cleanup (e.g., TimeSpan.FromDays(90))
  • MaxSizeBytes: Maximum storage size before cleanup
  • MaxEventCount: Maximum number of events before cleanup
  • EnablePartitioning: Enable table partitioning for better performance
  • PartitionInterval: Partition interval (e.g., daily, weekly)

DeadLetterQueueConfiguration

  • Enabled: Enable DLQ for this stream
  • DeadLetterStreamName: Name of DLQ stream (defaults to {StreamName}-dlq)
  • MaxDeliveryAttempts: Attempts before sending to DLQ (default: 3)
  • RetryDelay: Delay between retry attempts
  • StoreOriginalEvent: Store original event in DLQ
  • StoreErrorDetails: Store error details in DLQ

LifecycleConfiguration

  • AutoCreate: Automatically create stream if it doesn't exist
  • AutoArchive: Automatically archive old events
  • ArchiveAfter: Age after which events are archived
  • ArchiveLocation: Storage location for archived events
  • AutoDelete: Automatically delete old events
  • DeleteAfter: Age after which events are deleted

PerformanceConfiguration

  • BatchSize: Batch size for bulk operations
  • EnableCompression: Enable event compression
  • CompressionAlgorithm: Compression algorithm (e.g., "gzip", "zstd")
  • EnableIndexing: Enable metadata field indexing
  • IndexedFields: List of fields to index
  • CacheSize: Cache size for frequently accessed events

AccessControlConfiguration

  • PublicRead: Allow public read access
  • PublicWrite: Allow public write access
  • AllowedReaders: List of authorized readers
  • AllowedWriters: List of authorized writers
  • MaxConsumerGroups: Maximum consumer groups allowed
  • MaxEventsPerSecond: Rate limit for events per second

Key Features

  • Per-Stream Configuration: Override global settings per stream
  • Retention Policies: Configure retention per stream
  • Dead Letter Queues: Error handling with configurable retry logic
  • Lifecycle Management: Automatic archival and deletion
  • Performance Tuning: Optimize batch sizes, compression, and indexing
  • Access Control: Stream-level permissions and quotas
  • Configuration Merging: Stream-specific settings override global defaults
  • Tag-Based Filtering: Categorize and query streams by tags

Common Use Cases

Multi-Tenant Configuration

// High-value tenant with extended retention
var premiumConfig = new StreamConfiguration
{
    StreamName = "tenant-acme-corp",
    Tags = new Dictionary<string, string> { ["tier"] = "premium" },
    Retention = new RetentionConfiguration
    {
        MaxAge = TimeSpan.FromDays(365) // 1 year retention
    }
};

// Standard tenant with shorter retention
var standardConfig = new StreamConfiguration
{
    StreamName = "tenant-small-co",
    Tags = new Dictionary<string, string> { ["tier"] = "standard" },
    Retention = new RetentionConfiguration
    {
        MaxAge = TimeSpan.FromDays(30) // 30 days retention
    }
};

Environment-Specific Settings

// Production: strict retention and DLQ
var prodConfig = new StreamConfiguration
{
    StreamName = "orders-prod",
    Tags = new Dictionary<string, string> { ["environment"] = "production" },
    Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) },
    DeadLetterQueue = new DeadLetterQueueConfiguration
    {
        Enabled = true,
        MaxDeliveryAttempts = 5
    }
};

// Development: relaxed settings
var devConfig = new StreamConfiguration
{
    StreamName = "orders-dev",
    Tags = new Dictionary<string, string> { ["environment"] = "development" },
    Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) },
    DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = false }
};

Domain-Specific Configuration

// Audit logs: long retention, auto-archive
var auditConfig = new StreamConfiguration
{
    StreamName = "audit-logs",
    Tags = new Dictionary<string, string> { ["domain"] = "security" },
    Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(2555) }, // 7 years
    Lifecycle = new LifecycleConfiguration
    {
        AutoArchive = true,
        ArchiveAfter = TimeSpan.FromDays(365),
        ArchiveLocation = "s3://archive/audit"
    }
};

// Analytics: high-throughput, short retention
var analyticsConfig = new StreamConfiguration
{
    StreamName = "page-views",
    Tags = new Dictionary<string, string> { ["domain"] = "analytics" },
    Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(30) },
    Performance = new PerformanceConfiguration
    {
        BatchSize = 10000,
        EnableCompression = true
    }
};

2024-2025 Roadmap

Task Description Status
Support .NET 8 Ensure compatibility with .NET 8.
Support .NET 10 Upgrade to .NET 10 with C# 14 language support.
Update FluentValidation Upgrade FluentValidation to version 11.x for .NET 10 compatibility.
Add gRPC Support with source generators Implement gRPC endpoints with source generators and Google Rich Error Model for validation.
Create a demo project (Svrnty.CQRS.Grpc.Sample) Develop a comprehensive demo project showcasing gRPC and HTTP endpoints.
Event Streaming - Phase 1: gRPC Bidirectional Streaming Implement gRPC bidirectional streaming for real-time event delivery.
Event Streaming - Phase 2.1: Storage Abstractions Define IEventStreamStore interface for persistent and ephemeral streams.
Event Streaming - Phase 2.2: PostgreSQL Storage Implement PostgreSQL-backed storage with persistent streams, message queues, and DLQ.
Event Streaming - Phase 2.3: Consumer Offset Tracking Implement consumer group coordination and offset management for persistent streams.
Event Streaming - Phase 2.4: Retention Policies Add time-based and size-based retention with automatic cleanup and table partitioning.
Event Streaming - Phase 2.5: Event Replay API Add APIs for replaying events from specific offsets and time ranges.
Event Streaming - Phase 2.6: Stream Configuration Per-stream configuration for retention, DLQ, and lifecycle management.
Create a website for the Framework Develop a website to host comprehensive documentation for the framework.

2026 Roadmap

Task Description Status
gRPC Compression Support Smart message compression with automatic threshold detection and per-handler control.
gRPC Metadata & Authorization Support Expose ServerCallContext to handlers and integrate authorization services for gRPC endpoints.