| .claude | ||
| .gitea/workflows | ||
| docs | ||
| roadmap-2026 | ||
| Svrnty.CQRS | ||
| Svrnty.CQRS.Abstractions | ||
| Svrnty.CQRS.DynamicQuery | ||
| Svrnty.CQRS.DynamicQuery.Abstractions | ||
| Svrnty.CQRS.DynamicQuery.MinimalApi | ||
| Svrnty.CQRS.Events | ||
| Svrnty.CQRS.Events.Abstractions | ||
| Svrnty.CQRS.Events.ConsumerGroups | ||
| Svrnty.CQRS.Events.ConsumerGroups.Abstractions | ||
| Svrnty.CQRS.Events.Grpc | ||
| Svrnty.CQRS.Events.PostgreSQL | ||
| Svrnty.CQRS.Events.RabbitMQ | ||
| Svrnty.CQRS.Events.SignalR | ||
| Svrnty.CQRS.FluentValidation | ||
| Svrnty.CQRS.Grpc | ||
| Svrnty.CQRS.Grpc.Abstractions | ||
| Svrnty.CQRS.Grpc.Generators | ||
| Svrnty.CQRS.MinimalApi | ||
| Svrnty.Phase2.Tests | ||
| Svrnty.Sample | ||
| .DS_Store | ||
| .gitattributes | ||
| .gitignore | ||
| ALL-PHASES-COMPLETE.md | ||
| bidirectional-communication-design.md | ||
| CLAUDE.md | ||
| docker-compose.yml | ||
| EVENT-STREAMING-COMPLETE.md | ||
| EVENT-STREAMING-IMPLEMENTATION-PLAN.md | ||
| icon.png | ||
| LICENSE | ||
| Phase2TestProgram.cs | ||
| PHASE1-COMPLETE.md | ||
| PHASE1-TESTING-GUIDE.md | ||
| PHASE2-COMPLETE.md | ||
| PHASE4-COMPLETE.md | ||
| PHASE5-COMPLETE.md | ||
| PHASE_7_SUMMARY.md | ||
| PHASE_8_SUMMARY.md | ||
| PHASE-2.2-COMPLETION.md | ||
| PHASE-2.3-PLAN.md | ||
| PHASE-2.4-PLAN.md | ||
| PHASE-2.5-PLAN.md | ||
| PHASE-2.6-PLAN.md | ||
| POSTGRESQL-TESTING.md | ||
| RABBITMQ-GUIDE.md | ||
| README.md | ||
| Svrnty.CQRS.sln | ||
| test-grpc-endpoints.sh | ||
| test-http-endpoints.sh | ||
| test-phase2-event-streaming.sh | ||
This project was originally initiated by Powered Software Inc. and was forked from the PoweredSoft.CQRS Repository
CQRS
Our implementation of query and command responsibility segregation (CQRS).
Getting Started
Install nuget package to your awesome project.
Abstractions Packages.
Sample of startup code for gRPC (Recommended)
using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.Grpc;
var builder = WebApplication.CreateBuilder(args);
// Register your commands with validators
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();
// Register your queries
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();
// Configure CQRS with gRPC support
builder.Services.AddSvrntyCqrs(cqrs =>
{
// Enable gRPC endpoints with reflection
cqrs.AddGrpc(grpc =>
{
grpc.EnableReflection();
});
});
var app = builder.Build();
// Map all configured CQRS endpoints
app.UseSvrntyCqrs();
app.Run();
Important: gRPC Requirements
The gRPC implementation uses Grpc.Tools with .proto files and source generators for automatic service implementation:
1. Install required packages:
dotnet add package Grpc.AspNetCore
dotnet add package Grpc.AspNetCore.Server.Reflection
dotnet add package Grpc.StatusProto # For Rich Error Model validation
2. Add the source generator as an analyzer:
dotnet add package Svrnty.CQRS.Grpc.Generators
The source generator is automatically configured as an analyzer when installed via NuGet and will generate both the .proto files and gRPC service implementations at compile time.
3. Define your C# commands and queries:
public record AddUserCommand
{
public required string Name { get; init; }
public required string Email { get; init; }
public int Age { get; init; }
}
public record RemoveUserCommand
{
public int UserId { get; init; }
}
Notes:
- The source generator automatically creates:
.protofiles in theProtos/directory from your C# commands and queriesCommandServiceImplandQueryServiceImplimplementations
- FluentValidation is automatically integrated with Google Rich Error Model for structured validation errors
- Validation errors return
google.rpc.StatuswithBadRequestcontainingFieldViolations - Use
recordtypes for commands/queries (immutable, value-based equality, more concise) - No need for protobuf-net attributes - just define your C# types
Sample of startup code for Minimal API (HTTP)
For HTTP scenarios (web browsers, public APIs), you can use the Minimal API approach:
using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.MinimalApi;
var builder = WebApplication.CreateBuilder(args);
// Register your commands with validators
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();
// Register your queries
builder.Services.AddQuery<PersonQuery, IQueryable<Person>, PersonQueryHandler>();
// Configure CQRS with Minimal API support
builder.Services.AddSvrntyCqrs(cqrs =>
{
// Enable Minimal API endpoints
cqrs.AddMinimalApi();
});
// Add Swagger (optional)
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
// Map all configured CQRS endpoints (automatically creates POST /api/command/* and POST/GET /api/query/*)
app.UseSvrntyCqrs();
app.Run();
Notes:
- FluentValidation is automatically integrated with RFC 7807 Problem Details for structured validation errors
- Use
recordtypes for commands/queries (immutable, value-based equality, more concise) - Supports both POST and GET (for queries) endpoints
- Automatically generates Swagger/OpenAPI documentation
Sample enabling both gRPC and HTTP
You can enable both gRPC and traditional HTTP endpoints simultaneously, allowing clients to choose their preferred protocol:
using Svrnty.CQRS;
using Svrnty.CQRS.FluentValidation;
using Svrnty.CQRS.Grpc;
using Svrnty.CQRS.MinimalApi;
var builder = WebApplication.CreateBuilder(args);
// Register your commands with validators
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();
// Register your queries
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();
// Configure CQRS with both gRPC and Minimal API support
builder.Services.AddSvrntyCqrs(cqrs =>
{
// Enable gRPC endpoints with reflection
cqrs.AddGrpc(grpc =>
{
grpc.EnableReflection();
});
// Enable Minimal API endpoints
cqrs.AddMinimalApi();
});
// Add HTTP support with Swagger
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
// Map all configured CQRS endpoints (both gRPC and HTTP)
app.UseSvrntyCqrs();
app.Run();
Benefits:
- Single codebase supports multiple protocols
- gRPC for high-performance, low-latency scenarios (microservices, internal APIs)
- HTTP for web browsers, legacy clients, and public APIs
- Same commands, queries, and validation logic for both protocols
- Swagger UI available for HTTP endpoints, gRPC reflection for gRPC clients
Fluent Validation
FluentValidation is optional but recommended for command and query validation. The Svrnty.CQRS.FluentValidation package provides extension methods to simplify validator registration.
With Svrnty.CQRS.FluentValidation (Recommended)
The package exposes extension method overloads that accept the validator as a generic parameter:
dotnet add package Svrnty.CQRS.FluentValidation
using Svrnty.CQRS.FluentValidation; // Extension methods for validator registration
// Command with result - validator as last generic parameter
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();
// Command without result - validator included in generics
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();
Benefits:
- Single line registration - Handler and validator registered together
- Type safety - Compiler ensures validator matches command type
- Less boilerplate - No need for separate
AddTransient<IValidator<T>>()calls - Cleaner code - Clear intent that validation is part of command pipeline
Without Svrnty.CQRS.FluentValidation
If you prefer not to use the FluentValidation package, you need to register commands and validators separately:
using FluentValidation;
using Svrnty.CQRS;
// Register command handler
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler>();
// Manually register validator
builder.Services.AddTransient<IValidator<EchoCommand>, EchoCommandValidator>();
Event Streaming
Svrnty.CQRS includes comprehensive event streaming support for building event-driven architectures with both persistent (event sourcing) and ephemeral (message queue) streams.
Quick Start
# Install core event streaming packages
dotnet add package Svrnty.CQRS.Events
dotnet add package Svrnty.CQRS.Events.Grpc # For gRPC bidirectional streaming
# Install storage backend
dotnet add package Svrnty.CQRS.Events.PostgreSQL # PostgreSQL storage (recommended for production)
Basic Setup
using Svrnty.CQRS.Events;
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Add event streaming support
builder.Services.AddSvrntyEvents();
builder.Services.AddDefaultEventDiscovery();
// Configure storage backend (PostgreSQL)
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetSection("EventStreaming:PostgreSQL"));
// Enable gRPC event streaming (optional)
builder.Services.AddSvrntyEventsGrpc();
var app = builder.Build();
// Map gRPC event streaming endpoints
app.MapGrpcService<EventStreamServiceImpl>();
app.Run();
PostgreSQL Configuration
Add to appsettings.json:
{
"EventStreaming": {
"PostgreSQL": {
"ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;Username=postgres;Password=postgres",
"SchemaName": "event_streaming",
"AutoMigrate": true,
"MaxPoolSize": 100,
"MinPoolSize": 5
}
}
}
Key Features:
- Auto-Migration: Database schema created automatically on startup (when
AutoMigrate: true) - Connection Pooling: Configurable Npgsql connection pool (5-100 connections)
- Schema Isolation: All tables in dedicated schema (default:
event_streaming)
Storage Options
1. PostgreSQL (Production-Ready) ✅
builder.Services.AddPostgresEventStreaming(options => {
options.ConnectionString = "Host=localhost;...";
options.AutoMigrate = true; // Create schema automatically
});
Features:
- Persistent and ephemeral stream support
- SKIP LOCKED for concurrent queue operations
- Visibility timeout for message processing
- Dead letter queue for failed messages
- Consumer offset tracking (Phase 2.3)
- Retention policies support (Phase 2.4)
2. In-Memory (Development/Testing)
builder.Services.AddInMemoryEventStorage();
Features:
- Fast in-memory storage
- Suitable for development and testing
- No external dependencies
- Data lost on restart
Stream Types
Persistent Streams (Event Sourcing)
Append-only event logs for event sourcing patterns:
// Append events to persistent stream
await eventStore.AppendAsync("user-123", new UserCreatedEvent
{
UserId = 123,
Name = "Alice",
Email = "alice@example.com"
});
// Read stream from offset
var events = await eventStore.ReadStreamAsync("user-123", fromOffset: 0);
// Get stream length
var length = await eventStore.GetStreamLengthAsync("user-123");
Ephemeral Streams (Message Queue)
Message queue semantics with at-least-once delivery:
// Enqueue messages
await eventStore.EnqueueAsync("notifications", new EmailNotificationEvent
{
To = "user@example.com",
Subject = "Welcome!",
Body = "Thanks for signing up"
});
// Dequeue with visibility timeout
var @event = await eventStore.DequeueAsync(
streamName: "notifications",
consumerId: "worker-1",
visibilityTimeout: TimeSpan.FromSeconds(30)
);
// Process and acknowledge
if (@event != null)
{
await ProcessEventAsync(@event);
await eventStore.AcknowledgeAsync("notifications", @event.EventId, "worker-1");
}
// Or negative acknowledge (requeue or move to DLQ)
await eventStore.NackAsync(
streamName: "notifications",
eventId: @event.EventId,
consumerId: "worker-1",
requeue: false // Move to dead letter queue
);
gRPC Bidirectional Streaming
The framework provides gRPC bidirectional streaming for real-time event delivery:
// Client-side subscription (in your client application)
var call = client.SubscribeToStream();
// Send subscription request
await call.RequestStream.WriteAsync(new SubscribeRequest
{
StreamName = "user-events",
ConsumerId = "client-123",
SubscriptionMode = SubscriptionMode.Broadcast
});
// Receive events in real-time
await foreach (var @event in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Received event: {@event.EventType}");
}
Consumer Groups
Consumer groups enable multiple consumers to coordinate processing of persistent streams without duplicates. This provides load balancing, fault tolerance, and at-least-once delivery guarantees.
Installation
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
Setup
using Svrnty.CQRS.Events.ConsumerGroups;
var builder = WebApplication.CreateBuilder(args);
// Add consumer group support with PostgreSQL backend
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
var app = builder.Build();
app.Run();
Configuration in appsettings.json:
{
"EventStreaming": {
"ConsumerGroups": {
"ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;...",
"SchemaName": "event_streaming",
"AutoMigrate": true
}
}
}
Basic Usage
// Inject the consumer group reader
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();
// Consume events with automatic offset management
await foreach (var @event in reader.ConsumeAsync(
streamName: "orders",
groupId: "order-processors",
consumerId: "worker-1",
options: new ConsumerGroupOptions
{
BatchSize = 100,
CommitStrategy = OffsetCommitStrategy.AfterBatch,
HeartbeatInterval = TimeSpan.FromSeconds(10),
SessionTimeout = TimeSpan.FromSeconds(30)
},
cancellationToken))
{
await ProcessOrderEventAsync(@event);
// Offset automatically committed after batch
}
Offset Commit Strategies
1. AfterEach - Commit after each event (safest, highest overhead):
options.CommitStrategy = OffsetCommitStrategy.AfterEach;
2. AfterBatch - Commit after each batch (balanced):
options.CommitStrategy = OffsetCommitStrategy.AfterBatch;
options.BatchSize = 100;
3. Periodic - Commit at intervals (highest throughput):
options.CommitStrategy = OffsetCommitStrategy.Periodic;
options.PeriodicCommitInterval = TimeSpan.FromSeconds(5);
4. Manual - Full control over commits:
options.CommitStrategy = OffsetCommitStrategy.Manual;
await foreach (var @event in reader.ConsumeAsync(...))
{
try
{
await ProcessEventAsync(@event);
// Manually commit after successful processing
await reader.CommitOffsetAsync(
streamName: "orders",
groupId: "order-processors",
consumerId: "worker-1",
offset: currentOffset,
cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Processing failed, will retry");
// Don't commit - event will be reprocessed
}
}
Monitoring Consumer Groups
var offsetStore = serviceProvider.GetRequiredService<IConsumerOffsetStore>();
// Get all active consumers in a group
var consumers = await offsetStore.GetActiveConsumersAsync("order-processors");
foreach (var consumer in consumers)
{
Console.WriteLine($"Consumer: {consumer.ConsumerId}");
Console.WriteLine($" Last Heartbeat: {consumer.LastHeartbeat}");
Console.WriteLine($" Registered: {consumer.RegisteredAt}");
}
// Get group offsets per consumer
var offsets = await offsetStore.GetGroupOffsetsAsync("order-processors", "orders");
foreach (var (consumerId, offset) in offsets)
{
Console.WriteLine($"{consumerId}: offset {offset}");
}
// Get last committed offset for the group (minimum across all consumers)
var groupOffset = await offsetStore.GetCommittedOffsetAsync("order-processors", "orders");
Console.WriteLine($"Group safe offset: {groupOffset}");
Key Features
- Automatic Offset Management: Tracks last processed position per consumer
- Heartbeat Monitoring: Detects and removes stale consumers automatically
- Flexible Commit Strategies: Choose when to commit offsets (manual, per-event, per-batch, periodic)
- Load Balancing: Multiple consumers can process the same stream
- Fault Tolerance: Consumers can resume from last committed offset after failure
- At-Least-Once Delivery: Events processed at least once, even with consumer failures
Health Monitoring
The ConsumerHealthMonitor background service automatically:
- Sends periodic heartbeats for registered consumers
- Detects stale consumers (no heartbeat within session timeout)
- Cleans up dead consumers from the registry
- Logs consumer group health metrics
Configure health monitoring:
builder.Services.AddPostgresConsumerGroups(
storageConfig => { /* ... */ },
healthConfig => {
healthConfig.CleanupInterval = TimeSpan.FromSeconds(30);
healthConfig.SessionTimeout = TimeSpan.FromSeconds(60);
healthConfig.Enabled = true;
});
Retention Policies
Automatic retention policy enforcement ensures that old events are cleaned up according to configurable rules. This helps manage database size and comply with data retention requirements.
Installation
dotnet add package Svrnty.CQRS.Events.PostgreSQL
The retention policy feature is included in the PostgreSQL event streaming package.
Setup
Register retention policy services in your application:
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
// Add retention policy background service
builder.Services.AddPostgresRetentionPolicies(options =>
{
options.Enabled = true;
options.CleanupInterval = TimeSpan.FromHours(1);
options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC
options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC
options.UseCleanupWindow = true;
});
Or use configuration:
builder.Services.AddPostgresRetentionPolicies(
builder.Configuration.GetSection("RetentionService"));
{
"RetentionService": {
"Enabled": true,
"CleanupInterval": "01:00:00",
"CleanupWindowStart": "02:00:00",
"CleanupWindowEnd": "06:00:00",
"UseCleanupWindow": true
}
}
Usage
Setting Retention Policies
var policyStore = serviceProvider.GetRequiredService<IRetentionPolicyStore>();
// Time-based retention: delete events older than 30 days
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
StreamName = "orders",
MaxAge = TimeSpan.FromDays(30),
Enabled = true
});
// Size-based retention: keep only last 10,000 events
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
StreamName = "analytics",
MaxEventCount = 10000,
Enabled = true
});
// Combined retention: use both time and size limits
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
StreamName = "logs",
MaxAge = TimeSpan.FromDays(7),
MaxEventCount = 50000,
Enabled = true
});
// Default policy for all streams (use "*" as stream name)
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
StreamName = "*",
MaxAge = TimeSpan.FromDays(90),
Enabled = true
});
Retrieving Policies
// Get specific policy
var policy = await policyStore.GetPolicyAsync("orders");
if (policy != null)
{
Console.WriteLine($"Stream: {policy.StreamName}");
Console.WriteLine($"Max Age: {policy.MaxAge}");
Console.WriteLine($"Max Event Count: {policy.MaxEventCount}");
Console.WriteLine($"Enabled: {policy.Enabled}");
}
// Get all policies
var policies = await policyStore.GetAllPoliciesAsync();
foreach (var p in policies)
{
Console.WriteLine($"{p.StreamName}: Age={p.MaxAge}, Count={p.MaxEventCount}");
}
Deleting Policies
// Delete a specific policy
var deleted = await policyStore.DeletePolicyAsync("orders");
// Note: Cannot delete the default "*" policy
Manual Cleanup
While the background service runs automatically, you can also trigger cleanup manually:
// Apply all enabled retention policies
var result = await policyStore.ApplyRetentionPoliciesAsync();
Console.WriteLine($"Streams Processed: {result.StreamsProcessed}");
Console.WriteLine($"Events Deleted: {result.EventsDeleted}");
Console.WriteLine($"Duration: {result.Duration}");
// Per-stream details
foreach (var (streamName, count) in result.EventsDeletedPerStream)
{
Console.WriteLine($" {streamName}: {count} events deleted");
}
Key Features
- Time-based Retention: Delete events older than specified duration
- Size-based Retention: Keep only the most recent N events per stream
- Wildcard Policies: Apply default policies to all streams using "*"
- Cleanup Windows: Run cleanup only during specified time windows (e.g., off-peak hours)
- Background Service: Automatic periodic cleanup via PeriodicTimer
- Statistics Tracking: Detailed metrics about cleanup operations
- Efficient Deletion: PostgreSQL stored procedures for batch cleanup
Cleanup Window
The cleanup window feature allows you to restrict when retention policies are enforced:
options.UseCleanupWindow = true;
options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC
options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC
The window automatically handles midnight crossing:
// Window that spans midnight (10 PM to 2 AM)
options.CleanupWindowStart = TimeSpan.FromHours(22); // 10 PM UTC
options.CleanupWindowEnd = TimeSpan.FromHours(2); // 2 AM UTC
Disable the window to run cleanup at any time:
options.UseCleanupWindow = false;
Monitoring
The background service logs cleanup operations:
[INF] Retention policy service started. Cleanup interval: 01:00:00, Window: 02:00:00-06:00:00 UTC
[INF] Starting retention policy enforcement cycle
[INF] Retention cleanup complete: 3 streams processed, 1,234 events deleted in 00:00:01.234
[DBG] Stream orders: 500 events deleted
[DBG] Stream analytics: 734 events deleted
[DBG] Stream logs: 0 events deleted
When outside the cleanup window:
[DBG] Outside cleanup window (02:00:00-06:00:00 UTC), skipping retention enforcement
Event Replay API
The Event Replay API enables rebuilding projections, reprocessing events, and time-travel debugging by replaying historical events from persistent streams.
Installation
dotnet add package Svrnty.CQRS.Events.PostgreSQL
The event replay feature is included in the PostgreSQL event streaming package.
Setup
Register the event replay service:
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
// Add event replay service
builder.Services.AddPostgresEventReplay();
Usage
Replay from Offset
Replay events starting from a specific sequence number:
var replayService = serviceProvider.GetRequiredService<IEventReplayService>();
await foreach (var @event in replayService.ReplayFromOffsetAsync(
streamName: "orders",
startOffset: 1000,
options: new ReplayOptions
{
BatchSize = 100,
MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec
ProgressCallback = progress =>
{
Console.WriteLine($"Progress: {progress.EventsProcessed} events " +
$"({progress.ProgressPercentage:F1}%) " +
$"@ {progress.EventsPerSecond:F0} events/sec");
}
}))
{
await ProcessEventAsync(@event);
}
Replay from Time
Replay events starting from a specific timestamp:
var startTime = DateTimeOffset.UtcNow.AddDays(-7);
await foreach (var @event in replayService.ReplayFromTimeAsync(
streamName: "orders",
startTime: startTime,
options: new ReplayOptions
{
MaxEvents = 10000,
EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" }
}))
{
await RebuildProjectionAsync(@event);
}
Replay Time Range
Replay events within a specific time window:
var startTime = DateTimeOffset.UtcNow.AddDays(-7);
var endTime = DateTimeOffset.UtcNow.AddDays(-6);
await foreach (var @event in replayService.ReplayTimeRangeAsync(
streamName: "analytics",
startTime: startTime,
endTime: endTime,
options: new ReplayOptions
{
EventTypeFilter = new[] { "PageView", "Click" },
ProgressInterval = 500
}))
{
await ProcessAnalyticsEventAsync(@event);
}
Replay All Events
Replay entire stream from the beginning:
await foreach (var @event in replayService.ReplayAllAsync(
streamName: "orders",
options: new ReplayOptions
{
BatchSize = 1000,
MaxEventsPerSecond = 5000
}))
{
await ProcessEventAsync(@event);
}
Get Replay Count
Get the total number of events that would be replayed:
var count = await replayService.GetReplayCountAsync(
streamName: "orders",
startOffset: 1000,
options: new ReplayOptions
{
EventTypeFilter = new[] { "OrderPlaced" }
});
Console.WriteLine($"Will replay {count} OrderPlaced events");
Replay Options
Control replay behavior with ReplayOptions:
var options = new ReplayOptions
{
// Batch size for reading from database (default: 100)
BatchSize = 100,
// Maximum events to replay (default: null = unlimited)
MaxEvents = 10000,
// Rate limiting in events/second (default: null = unlimited)
MaxEventsPerSecond = 1000,
// Filter by event types (default: null = all types)
EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" },
// Include metadata in events (default: true)
IncludeMetadata = true,
// Progress callback (default: null)
ProgressCallback = progress =>
{
Console.WriteLine($"{progress.EventsProcessed} events processed");
},
// How often to invoke progress callback (default: 1000)
ProgressInterval = 1000
};
Progress Tracking
Monitor replay progress with callbacks:
await foreach (var @event in replayService.ReplayFromOffsetAsync(
streamName: "orders",
startOffset: 0,
options: new ReplayOptions
{
ProgressCallback = progress =>
{
Console.WriteLine($@"
Replay Progress:
Current Offset: {progress.CurrentOffset}
Events Processed: {progress.EventsProcessed:N0}
Estimated Total: {progress.EstimatedTotal:N0}
Progress: {progress.ProgressPercentage:F1}%
Rate: {progress.EventsPerSecond:F0} events/sec
Elapsed: {progress.Elapsed}
Current Event Time: {progress.CurrentTimestamp}");
},
ProgressInterval = 1000
}))
{
await ProcessEventAsync(@event);
}
Rate Limiting
Control replay speed to avoid overwhelming consumers:
// Replay at maximum 500 events per second
await foreach (var @event in replayService.ReplayFromOffsetAsync(
streamName: "high-volume-stream",
startOffset: 0,
options: new ReplayOptions
{
MaxEventsPerSecond = 500
}))
{
await ProcessEventSlowlyAsync(@event);
}
Event Type Filtering
Replay only specific event types:
// Only replay order-related events
await foreach (var @event in replayService.ReplayAllAsync(
streamName: "orders",
options: new ReplayOptions
{
EventTypeFilter = new[]
{
"OrderPlaced",
"OrderShipped",
"OrderDelivered"
}
}))
{
await UpdateOrderProjectionAsync(@event);
}
Key Features
- Offset-based Replay: Replay from specific sequence numbers
- Time-based Replay: Replay from specific timestamps
- Time Range Replay: Replay events within time windows
- Event Type Filtering: Replay only specific event types
- Rate Limiting: Control replay speed with token bucket algorithm
- Progress Tracking: Monitor replay with callbacks and metrics
- Batching: Efficient streaming with configurable batch sizes
- Cancellation Support: Full CancellationToken support
Common Use Cases
Rebuilding Projections
// Rebuild entire read model from scratch
await foreach (var @event in replayService.ReplayAllAsync("orders"))
{
await projectionUpdater.ApplyAsync(@event);
}
Reprocessing After Bug Fixes
// Reprocess last 24 hours after fixing handler bug
var yesterday = DateTimeOffset.UtcNow.AddDays(-1);
await foreach (var @event in replayService.ReplayFromTimeAsync("orders", yesterday))
{
await fixedHandler.HandleAsync(@event);
}
Creating New Projections
// Build new analytics projection from historical data
await foreach (var @event in replayService.ReplayAllAsync(
"user-activity",
options: new ReplayOptions
{
EventTypeFilter = new[] { "PageView", "Click", "Purchase" },
MaxEventsPerSecond = 10000 // Fast replay for batch processing
}))
{
await analyticsProjection.ApplyAsync(@event);
}
Time-Travel Debugging
// Replay specific time period to debug issue
var bugStart = new DateTimeOffset(2025, 12, 10, 14, 30, 0, TimeSpan.Zero);
var bugEnd = new DateTimeOffset(2025, 12, 10, 15, 00, 0, TimeSpan.Zero);
await foreach (var @event in replayService.ReplayTimeRangeAsync(
"orders",
bugStart,
bugEnd))
{
await debugHandler.InspectAsync(@event);
}
Testing Resources
Comprehensive testing guide available: POSTGRESQL-TESTING.md
Topics covered:
- Docker PostgreSQL setup
- Persistent stream operations
- Ephemeral queue operations
- Visibility timeout testing
- Dead letter queue verification
- Performance testing
- Database schema inspection
Stream Configuration
The Stream Configuration feature provides per-stream configuration capabilities for fine-grained control over retention policies, dead letter queues, lifecycle management, performance tuning, and access control. Each stream can have its own settings that override global defaults.
Installation
Stream configuration is included in the PostgreSQL event streaming package:
dotnet add package Svrnty.CQRS.Events.PostgreSQL
Setup
Register the stream configuration services:
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
// Add stream configuration support
builder.Services.AddPostgresStreamConfiguration();
Usage
Basic Stream Configuration
Configure retention policies for a specific stream:
var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
var config = new StreamConfiguration
{
StreamName = "orders",
Description = "Order processing stream",
Tags = new Dictionary<string, string>
{
["domain"] = "orders",
["environment"] = "production"
},
Retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(90),
MaxSizeBytes = 10L * 1024 * 1024 * 1024, // 10 GB
EnablePartitioning = true,
PartitionInterval = TimeSpan.FromDays(7)
},
CreatedAt = DateTimeOffset.UtcNow,
CreatedBy = "admin"
};
await configStore.SetConfigurationAsync(config);
Dead Letter Queue Configuration
Configure error handling with dead letter queues:
var config = new StreamConfiguration
{
StreamName = "payment-processing",
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "payment-processing-dlq",
MaxDeliveryAttempts = 5,
RetryDelay = TimeSpan.FromMinutes(5),
StoreOriginalEvent = true,
StoreErrorDetails = true
},
CreatedAt = DateTimeOffset.UtcNow
};
await configStore.SetConfigurationAsync(config);
Lifecycle Management
Configure automatic archival and deletion:
var config = new StreamConfiguration
{
StreamName = "audit-logs",
Lifecycle = new LifecycleConfiguration
{
AutoCreate = true,
AutoArchive = true,
ArchiveAfter = TimeSpan.FromDays(365),
ArchiveLocation = "s3://archive-bucket/audit-logs",
AutoDelete = false
},
CreatedAt = DateTimeOffset.UtcNow
};
await configStore.SetConfigurationAsync(config);
Performance Tuning
Configure performance-related settings:
var config = new StreamConfiguration
{
StreamName = "high-throughput-events",
Performance = new PerformanceConfiguration
{
BatchSize = 1000,
EnableCompression = true,
CompressionAlgorithm = "gzip",
EnableIndexing = true,
IndexedFields = new List<string> { "userId", "tenantId", "eventType" },
CacheSize = 10000
},
CreatedAt = DateTimeOffset.UtcNow
};
await configStore.SetConfigurationAsync(config);
Access Control
Configure stream permissions and quotas:
var config = new StreamConfiguration
{
StreamName = "sensitive-data",
AccessControl = new AccessControlConfiguration
{
PublicRead = false,
PublicWrite = false,
AllowedReaders = new List<string> { "admin", "audit-service" },
AllowedWriters = new List<string> { "admin", "data-ingestion-service" },
MaxConsumerGroups = 5,
MaxEventsPerSecond = 10000
},
CreatedAt = DateTimeOffset.UtcNow
};
await configStore.SetConfigurationAsync(config);
Getting Effective Configuration
Retrieve the effective configuration (stream-specific merged with defaults):
var configProvider = serviceProvider.GetRequiredService<IStreamConfigurationProvider>();
// Gets merged configuration (stream-specific + global defaults)
var effectiveConfig = await configProvider.GetEffectiveConfigurationAsync("orders");
// Get specific configuration sections
var retention = await configProvider.GetRetentionConfigurationAsync("orders");
var dlq = await configProvider.GetDeadLetterQueueConfigurationAsync("orders");
var lifecycle = await configProvider.GetLifecycleConfigurationAsync("orders");
Finding Configurations
Query configurations by criteria:
// Find all streams with archiving enabled
var archivingStreams = await configStore.FindConfigurationsAsync(
c => c.Lifecycle?.AutoArchive == true);
// Find all production streams
var productionStreams = await configStore.FindConfigurationsAsync(
c => c.Tags?.ContainsKey("environment") == true &&
c.Tags["environment"] == "production");
// Get all configurations
var allConfigs = await configStore.GetAllConfigurationsAsync();
Deleting Configuration
Remove stream-specific configuration (reverts to defaults):
await configStore.DeleteConfigurationAsync("orders");
Configuration Options
RetentionConfiguration
- MaxAge: Maximum age before cleanup (e.g.,
TimeSpan.FromDays(90)) - MaxSizeBytes: Maximum storage size before cleanup
- MaxEventCount: Maximum number of events before cleanup
- EnablePartitioning: Enable table partitioning for better performance
- PartitionInterval: Partition interval (e.g., daily, weekly)
DeadLetterQueueConfiguration
- Enabled: Enable DLQ for this stream
- DeadLetterStreamName: Name of DLQ stream (defaults to
{StreamName}-dlq) - MaxDeliveryAttempts: Attempts before sending to DLQ (default: 3)
- RetryDelay: Delay between retry attempts
- StoreOriginalEvent: Store original event in DLQ
- StoreErrorDetails: Store error details in DLQ
LifecycleConfiguration
- AutoCreate: Automatically create stream if it doesn't exist
- AutoArchive: Automatically archive old events
- ArchiveAfter: Age after which events are archived
- ArchiveLocation: Storage location for archived events
- AutoDelete: Automatically delete old events
- DeleteAfter: Age after which events are deleted
PerformanceConfiguration
- BatchSize: Batch size for bulk operations
- EnableCompression: Enable event compression
- CompressionAlgorithm: Compression algorithm (e.g., "gzip", "zstd")
- EnableIndexing: Enable metadata field indexing
- IndexedFields: List of fields to index
- CacheSize: Cache size for frequently accessed events
AccessControlConfiguration
- PublicRead: Allow public read access
- PublicWrite: Allow public write access
- AllowedReaders: List of authorized readers
- AllowedWriters: List of authorized writers
- MaxConsumerGroups: Maximum consumer groups allowed
- MaxEventsPerSecond: Rate limit for events per second
Key Features
- Per-Stream Configuration: Override global settings per stream
- Retention Policies: Configure retention per stream
- Dead Letter Queues: Error handling with configurable retry logic
- Lifecycle Management: Automatic archival and deletion
- Performance Tuning: Optimize batch sizes, compression, and indexing
- Access Control: Stream-level permissions and quotas
- Configuration Merging: Stream-specific settings override global defaults
- Tag-Based Filtering: Categorize and query streams by tags
Common Use Cases
Multi-Tenant Configuration
// High-value tenant with extended retention
var premiumConfig = new StreamConfiguration
{
StreamName = "tenant-acme-corp",
Tags = new Dictionary<string, string> { ["tier"] = "premium" },
Retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(365) // 1 year retention
}
};
// Standard tenant with shorter retention
var standardConfig = new StreamConfiguration
{
StreamName = "tenant-small-co",
Tags = new Dictionary<string, string> { ["tier"] = "standard" },
Retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(30) // 30 days retention
}
};
Environment-Specific Settings
// Production: strict retention and DLQ
var prodConfig = new StreamConfiguration
{
StreamName = "orders-prod",
Tags = new Dictionary<string, string> { ["environment"] = "production" },
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) },
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 5
}
};
// Development: relaxed settings
var devConfig = new StreamConfiguration
{
StreamName = "orders-dev",
Tags = new Dictionary<string, string> { ["environment"] = "development" },
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) },
DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = false }
};
Domain-Specific Configuration
// Audit logs: long retention, auto-archive
var auditConfig = new StreamConfiguration
{
StreamName = "audit-logs",
Tags = new Dictionary<string, string> { ["domain"] = "security" },
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(2555) }, // 7 years
Lifecycle = new LifecycleConfiguration
{
AutoArchive = true,
ArchiveAfter = TimeSpan.FromDays(365),
ArchiveLocation = "s3://archive/audit"
}
};
// Analytics: high-throughput, short retention
var analyticsConfig = new StreamConfiguration
{
StreamName = "page-views",
Tags = new Dictionary<string, string> { ["domain"] = "analytics" },
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(30) },
Performance = new PerformanceConfiguration
{
BatchSize = 10000,
EnableCompression = true
}
};
2024-2025 Roadmap
| Task | Description | Status |
|---|---|---|
| Support .NET 8 | Ensure compatibility with .NET 8. | ✅ |
| Support .NET 10 | Upgrade to .NET 10 with C# 14 language support. | ✅ |
| Update FluentValidation | Upgrade FluentValidation to version 11.x for .NET 10 compatibility. | ✅ |
| Add gRPC Support with source generators | Implement gRPC endpoints with source generators and Google Rich Error Model for validation. | ✅ |
| Create a demo project (Svrnty.CQRS.Grpc.Sample) | Develop a comprehensive demo project showcasing gRPC and HTTP endpoints. | ✅ |
| Event Streaming - Phase 1: gRPC Bidirectional Streaming | Implement gRPC bidirectional streaming for real-time event delivery. | ✅ |
| Event Streaming - Phase 2.1: Storage Abstractions | Define IEventStreamStore interface for persistent and ephemeral streams. | ✅ |
| Event Streaming - Phase 2.2: PostgreSQL Storage | Implement PostgreSQL-backed storage with persistent streams, message queues, and DLQ. | ✅ |
| Event Streaming - Phase 2.3: Consumer Offset Tracking | Implement consumer group coordination and offset management for persistent streams. | ✅ |
| Event Streaming - Phase 2.4: Retention Policies | Add time-based and size-based retention with automatic cleanup and table partitioning. | ✅ |
| Event Streaming - Phase 2.5: Event Replay API | Add APIs for replaying events from specific offsets and time ranges. | ✅ |
| Event Streaming - Phase 2.6: Stream Configuration | Per-stream configuration for retention, DLQ, and lifecycle management. | ✅ |
| Create a website for the Framework | Develop a website to host comprehensive documentation for the framework. | ⬜️ |
2026 Roadmap
| Task | Description | Status |
|---|---|---|
| gRPC Compression Support | Smart message compression with automatic threshold detection and per-handler control. | ⬜️ |
| gRPC Metadata & Authorization Support | Expose ServerCallContext to handlers and integrate authorization services for gRPC endpoints. | ⬜️ |