> This project was originally initiated by [Powered Software Inc.](https://poweredsoft.com/) and was forked from the [PoweredSoft.CQRS](https://github.com/PoweredSoft/CQRS) Repository # CQRS Our implementation of query and command responsibility segregation (CQRS). ## Getting Started > Install nuget package to your awesome project. | Package Name | NuGet | NuGet Install | |-----------------------------------------| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |-----------------------------------------------------------------------:| | Svrnty.CQRS | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS/) | ```dotnet add package Svrnty.CQRS ``` | | Svrnty.CQRS.MinimalApi | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.MinimalApi.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.MinimalApi/) | ```dotnet add package Svrnty.CQRS.MinimalApi ``` | | Svrnty.CQRS.FluentValidation | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.FluentValidation.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.FluentValidation/) | ```dotnet add package Svrnty.CQRS.FluentValidation ``` | | Svrnty.CQRS.DynamicQuery | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.DynamicQuery.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery/) | ```dotnet add package Svrnty.CQRS.DynamicQuery ``` | | Svrnty.CQRS.DynamicQuery.MinimalApi | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.DynamicQuery.MinimalApi.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery.MinimalApi/) | ```dotnet add package Svrnty.CQRS.DynamicQuery.MinimalApi ``` | | Svrnty.CQRS.Grpc | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Grpc.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Grpc/) | ```dotnet add package Svrnty.CQRS.Grpc ``` | | Svrnty.CQRS.Grpc.Generators | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Grpc.Generators.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Grpc.Generators/) | ```dotnet add package Svrnty.CQRS.Grpc.Generators ``` | | Svrnty.CQRS.Events | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events/) | ```dotnet add package Svrnty.CQRS.Events ``` | | Svrnty.CQRS.Events.Grpc | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.Grpc.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events.Grpc/) | ```dotnet add package Svrnty.CQRS.Events.Grpc ``` | | Svrnty.CQRS.Events.PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.PostgreSQL.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events.PostgreSQL/) | ```dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` | | Svrnty.CQRS.Events.ConsumerGroups | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.ConsumerGroups.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events.ConsumerGroups/) | ```dotnet add package Svrnty.CQRS.Events.ConsumerGroups ``` | > Abstractions Packages. | Package Name | NuGet | NuGet Install | | ---------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -----------------------------------------------------: | | Svrnty.CQRS.Abstractions | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Abstractions.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Abstractions/) | ```dotnet add package Svrnty.CQRS.Abstractions ``` | | Svrnty.CQRS.DynamicQuery.Abstractions | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.DynamicQuery.Abstractions.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery.Abstractions/) | ```dotnet add package Svrnty.CQRS.DynamicQuery.Abstractions ``` | | Svrnty.CQRS.Grpc.Abstractions | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Grpc.Abstractions.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Grpc.Abstractions/) | ```dotnet add package Svrnty.CQRS.Grpc.Abstractions ``` | | Svrnty.CQRS.Events.Abstractions | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.Abstractions.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events.Abstractions/) | ```dotnet add package Svrnty.CQRS.Events.Abstractions ``` | | Svrnty.CQRS.Events.ConsumerGroups.Abstractions | [![NuGet](https://img.shields.io/nuget/v/Svrnty.CQRS.Events.ConsumerGroups.Abstractions.svg?style=flat-square&label=nuget)](https://www.nuget.org/packages/Svrnty.CQRS.Events.ConsumerGroups.Abstractions/) | ```dotnet add package Svrnty.CQRS.Events.ConsumerGroups.Abstractions ``` | ## Sample of startup code for gRPC (Recommended) ```csharp using Svrnty.CQRS; using Svrnty.CQRS.FluentValidation; using Svrnty.CQRS.Grpc; var builder = WebApplication.CreateBuilder(args); // Register your commands with validators builder.Services.AddCommand(); builder.Services.AddCommand(); // Register your queries builder.Services.AddQuery(); // Configure CQRS with gRPC support builder.Services.AddSvrntyCqrs(cqrs => { // Enable gRPC endpoints with reflection cqrs.AddGrpc(grpc => { grpc.EnableReflection(); }); }); var app = builder.Build(); // Map all configured CQRS endpoints app.UseSvrntyCqrs(); app.Run(); ``` ### Important: gRPC Requirements The gRPC implementation uses **Grpc.Tools** with `.proto` files and **source generators** for automatic service implementation: #### 1. Install required packages: ```bash dotnet add package Grpc.AspNetCore dotnet add package Grpc.AspNetCore.Server.Reflection dotnet add package Grpc.StatusProto # For Rich Error Model validation ``` #### 2. Add the source generator as an analyzer: ```bash dotnet add package Svrnty.CQRS.Grpc.Generators ``` The source generator is automatically configured as an analyzer when installed via NuGet and will generate both the `.proto` files and gRPC service implementations at compile time. #### 3. Define your C# commands and queries: ```csharp public record AddUserCommand { public required string Name { get; init; } public required string Email { get; init; } public int Age { get; init; } } public record RemoveUserCommand { public int UserId { get; init; } } ``` **Notes:** - The source generator automatically creates: - `.proto` files in the `Protos/` directory from your C# commands and queries - `CommandServiceImpl` and `QueryServiceImpl` implementations - FluentValidation is automatically integrated with **Google Rich Error Model** for structured validation errors - Validation errors return `google.rpc.Status` with `BadRequest` containing `FieldViolations` - Use `record` types for commands/queries (immutable, value-based equality, more concise) - No need for protobuf-net attributes - just define your C# types ## Sample of startup code for Minimal API (HTTP) For HTTP scenarios (web browsers, public APIs), you can use the Minimal API approach: ```csharp using Svrnty.CQRS; using Svrnty.CQRS.FluentValidation; using Svrnty.CQRS.MinimalApi; var builder = WebApplication.CreateBuilder(args); // Register your commands with validators builder.Services.AddCommand(); builder.Services.AddCommand(); // Register your queries builder.Services.AddQuery, PersonQueryHandler>(); // Configure CQRS with Minimal API support builder.Services.AddSvrntyCqrs(cqrs => { // Enable Minimal API endpoints cqrs.AddMinimalApi(); }); // Add Swagger (optional) builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); var app = builder.Build(); if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } // Map all configured CQRS endpoints (automatically creates POST /api/command/* and POST/GET /api/query/*) app.UseSvrntyCqrs(); app.Run(); ``` **Notes:** - FluentValidation is automatically integrated with **RFC 7807 Problem Details** for structured validation errors - Use `record` types for commands/queries (immutable, value-based equality, more concise) - Supports both POST and GET (for queries) endpoints - Automatically generates Swagger/OpenAPI documentation ## Sample enabling both gRPC and HTTP You can enable both gRPC and traditional HTTP endpoints simultaneously, allowing clients to choose their preferred protocol: ```csharp using Svrnty.CQRS; using Svrnty.CQRS.FluentValidation; using Svrnty.CQRS.Grpc; using Svrnty.CQRS.MinimalApi; var builder = WebApplication.CreateBuilder(args); // Register your commands with validators builder.Services.AddCommand(); builder.Services.AddCommand(); // Register your queries builder.Services.AddQuery(); // Configure CQRS with both gRPC and Minimal API support builder.Services.AddSvrntyCqrs(cqrs => { // Enable gRPC endpoints with reflection cqrs.AddGrpc(grpc => { grpc.EnableReflection(); }); // Enable Minimal API endpoints cqrs.AddMinimalApi(); }); // Add HTTP support with Swagger builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); var app = builder.Build(); if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } // Map all configured CQRS endpoints (both gRPC and HTTP) app.UseSvrntyCqrs(); app.Run(); ``` **Benefits:** - Single codebase supports multiple protocols - gRPC for high-performance, low-latency scenarios (microservices, internal APIs) - HTTP for web browsers, legacy clients, and public APIs - Same commands, queries, and validation logic for both protocols - Swagger UI available for HTTP endpoints, gRPC reflection for gRPC clients # Fluent Validation FluentValidation is optional but recommended for command and query validation. The `Svrnty.CQRS.FluentValidation` package provides extension methods to simplify validator registration. ## With Svrnty.CQRS.FluentValidation (Recommended) The package exposes extension method overloads that accept the validator as a generic parameter: ```bash dotnet add package Svrnty.CQRS.FluentValidation ``` ```csharp using Svrnty.CQRS.FluentValidation; // Extension methods for validator registration // Command with result - validator as last generic parameter builder.Services.AddCommand(); // Command without result - validator included in generics builder.Services.AddCommand(); ``` **Benefits:** - **Single line registration** - Handler and validator registered together - **Type safety** - Compiler ensures validator matches command type - **Less boilerplate** - No need for separate `AddTransient>()` calls - **Cleaner code** - Clear intent that validation is part of command pipeline ## Without Svrnty.CQRS.FluentValidation If you prefer not to use the FluentValidation package, you need to register commands and validators separately: ```csharp using FluentValidation; using Svrnty.CQRS; // Register command handler builder.Services.AddCommand(); // Manually register validator builder.Services.AddTransient, EchoCommandValidator>(); ``` ## Event Streaming Svrnty.CQRS includes comprehensive event streaming support for building event-driven architectures with both **persistent** (event sourcing) and **ephemeral** (message queue) streams. ### Quick Start ```bash # Install core event streaming packages dotnet add package Svrnty.CQRS.Events dotnet add package Svrnty.CQRS.Events.Grpc # For gRPC bidirectional streaming # Install storage backend dotnet add package Svrnty.CQRS.Events.PostgreSQL # PostgreSQL storage (recommended for production) ``` ### Basic Setup ```csharp using Svrnty.CQRS.Events; using Svrnty.CQRS.Events.PostgreSQL; var builder = WebApplication.CreateBuilder(args); // Add event streaming support builder.Services.AddSvrntyEvents(); builder.Services.AddDefaultEventDiscovery(); // Configure storage backend (PostgreSQL) builder.Services.AddPostgresEventStreaming( builder.Configuration.GetSection("EventStreaming:PostgreSQL")); // Enable gRPC event streaming (optional) builder.Services.AddSvrntyEventsGrpc(); var app = builder.Build(); // Map gRPC event streaming endpoints app.MapGrpcService(); app.Run(); ``` ### PostgreSQL Configuration Add to `appsettings.json`: ```json { "EventStreaming": { "PostgreSQL": { "ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;Username=postgres;Password=postgres", "SchemaName": "event_streaming", "AutoMigrate": true, "MaxPoolSize": 100, "MinPoolSize": 5 } } } ``` **Key Features:** - **Auto-Migration**: Database schema created automatically on startup (when `AutoMigrate: true`) - **Connection Pooling**: Configurable Npgsql connection pool (5-100 connections) - **Schema Isolation**: All tables in dedicated schema (default: `event_streaming`) ### Storage Options #### 1. PostgreSQL (Production-Ready) ✅ ```csharp builder.Services.AddPostgresEventStreaming(options => { options.ConnectionString = "Host=localhost;..."; options.AutoMigrate = true; // Create schema automatically }); ``` **Features:** - Persistent and ephemeral stream support - SKIP LOCKED for concurrent queue operations - Visibility timeout for message processing - Dead letter queue for failed messages - Consumer offset tracking (Phase 2.3) - Retention policies support (Phase 2.4) #### 2. In-Memory (Development/Testing) ```csharp builder.Services.AddInMemoryEventStorage(); ``` **Features:** - Fast in-memory storage - Suitable for development and testing - No external dependencies - Data lost on restart ### Stream Types #### Persistent Streams (Event Sourcing) Append-only event logs for event sourcing patterns: ```csharp // Append events to persistent stream await eventStore.AppendAsync("user-123", new UserCreatedEvent { UserId = 123, Name = "Alice", Email = "alice@example.com" }); // Read stream from offset var events = await eventStore.ReadStreamAsync("user-123", fromOffset: 0); // Get stream length var length = await eventStore.GetStreamLengthAsync("user-123"); ``` #### Ephemeral Streams (Message Queue) Message queue semantics with at-least-once delivery: ```csharp // Enqueue messages await eventStore.EnqueueAsync("notifications", new EmailNotificationEvent { To = "user@example.com", Subject = "Welcome!", Body = "Thanks for signing up" }); // Dequeue with visibility timeout var @event = await eventStore.DequeueAsync( streamName: "notifications", consumerId: "worker-1", visibilityTimeout: TimeSpan.FromSeconds(30) ); // Process and acknowledge if (@event != null) { await ProcessEventAsync(@event); await eventStore.AcknowledgeAsync("notifications", @event.EventId, "worker-1"); } // Or negative acknowledge (requeue or move to DLQ) await eventStore.NackAsync( streamName: "notifications", eventId: @event.EventId, consumerId: "worker-1", requeue: false // Move to dead letter queue ); ``` ### gRPC Bidirectional Streaming The framework provides gRPC bidirectional streaming for real-time event delivery: ```csharp // Client-side subscription (in your client application) var call = client.SubscribeToStream(); // Send subscription request await call.RequestStream.WriteAsync(new SubscribeRequest { StreamName = "user-events", ConsumerId = "client-123", SubscriptionMode = SubscriptionMode.Broadcast }); // Receive events in real-time await foreach (var @event in call.ResponseStream.ReadAllAsync()) { Console.WriteLine($"Received event: {@event.EventType}"); } ``` ### Consumer Groups Consumer groups enable multiple consumers to coordinate processing of persistent streams without duplicates. This provides load balancing, fault tolerance, and at-least-once delivery guarantees. #### Installation ```bash dotnet add package Svrnty.CQRS.Events.ConsumerGroups ``` #### Setup ```csharp using Svrnty.CQRS.Events.ConsumerGroups; var builder = WebApplication.CreateBuilder(args); // Add consumer group support with PostgreSQL backend builder.Services.AddPostgresConsumerGroups( builder.Configuration.GetSection("EventStreaming:ConsumerGroups")); var app = builder.Build(); app.Run(); ``` Configuration in `appsettings.json`: ```json { "EventStreaming": { "ConsumerGroups": { "ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;...", "SchemaName": "event_streaming", "AutoMigrate": true } } } ``` #### Basic Usage ```csharp // Inject the consumer group reader var reader = serviceProvider.GetRequiredService(); // Consume events with automatic offset management await foreach (var @event in reader.ConsumeAsync( streamName: "orders", groupId: "order-processors", consumerId: "worker-1", options: new ConsumerGroupOptions { BatchSize = 100, CommitStrategy = OffsetCommitStrategy.AfterBatch, HeartbeatInterval = TimeSpan.FromSeconds(10), SessionTimeout = TimeSpan.FromSeconds(30) }, cancellationToken)) { await ProcessOrderEventAsync(@event); // Offset automatically committed after batch } ``` #### Offset Commit Strategies **1. AfterEach** - Commit after each event (safest, highest overhead): ```csharp options.CommitStrategy = OffsetCommitStrategy.AfterEach; ``` **2. AfterBatch** - Commit after each batch (balanced): ```csharp options.CommitStrategy = OffsetCommitStrategy.AfterBatch; options.BatchSize = 100; ``` **3. Periodic** - Commit at intervals (highest throughput): ```csharp options.CommitStrategy = OffsetCommitStrategy.Periodic; options.PeriodicCommitInterval = TimeSpan.FromSeconds(5); ``` **4. Manual** - Full control over commits: ```csharp options.CommitStrategy = OffsetCommitStrategy.Manual; await foreach (var @event in reader.ConsumeAsync(...)) { try { await ProcessEventAsync(@event); // Manually commit after successful processing await reader.CommitOffsetAsync( streamName: "orders", groupId: "order-processors", consumerId: "worker-1", offset: currentOffset, cancellationToken); } catch (Exception ex) { _logger.LogError(ex, "Processing failed, will retry"); // Don't commit - event will be reprocessed } } ``` #### Monitoring Consumer Groups ```csharp var offsetStore = serviceProvider.GetRequiredService(); // Get all active consumers in a group var consumers = await offsetStore.GetActiveConsumersAsync("order-processors"); foreach (var consumer in consumers) { Console.WriteLine($"Consumer: {consumer.ConsumerId}"); Console.WriteLine($" Last Heartbeat: {consumer.LastHeartbeat}"); Console.WriteLine($" Registered: {consumer.RegisteredAt}"); } // Get group offsets per consumer var offsets = await offsetStore.GetGroupOffsetsAsync("order-processors", "orders"); foreach (var (consumerId, offset) in offsets) { Console.WriteLine($"{consumerId}: offset {offset}"); } // Get last committed offset for the group (minimum across all consumers) var groupOffset = await offsetStore.GetCommittedOffsetAsync("order-processors", "orders"); Console.WriteLine($"Group safe offset: {groupOffset}"); ``` #### Key Features - **Automatic Offset Management**: Tracks last processed position per consumer - **Heartbeat Monitoring**: Detects and removes stale consumers automatically - **Flexible Commit Strategies**: Choose when to commit offsets (manual, per-event, per-batch, periodic) - **Load Balancing**: Multiple consumers can process the same stream - **Fault Tolerance**: Consumers can resume from last committed offset after failure - **At-Least-Once Delivery**: Events processed at least once, even with consumer failures #### Health Monitoring The `ConsumerHealthMonitor` background service automatically: - Sends periodic heartbeats for registered consumers - Detects stale consumers (no heartbeat within session timeout) - Cleans up dead consumers from the registry - Logs consumer group health metrics Configure health monitoring: ```csharp builder.Services.AddPostgresConsumerGroups( storageConfig => { /* ... */ }, healthConfig => { healthConfig.CleanupInterval = TimeSpan.FromSeconds(30); healthConfig.SessionTimeout = TimeSpan.FromSeconds(60); healthConfig.Enabled = true; }); ``` ## Retention Policies Automatic retention policy enforcement ensures that old events are cleaned up according to configurable rules. This helps manage database size and comply with data retention requirements. ### Installation ```bash dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` The retention policy feature is included in the PostgreSQL event streaming package. ### Setup Register retention policy services in your application: ```csharp builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;..."); // Add retention policy background service builder.Services.AddPostgresRetentionPolicies(options => { options.Enabled = true; options.CleanupInterval = TimeSpan.FromHours(1); options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC options.UseCleanupWindow = true; }); ``` Or use configuration: ```csharp builder.Services.AddPostgresRetentionPolicies( builder.Configuration.GetSection("RetentionService")); ``` ```json { "RetentionService": { "Enabled": true, "CleanupInterval": "01:00:00", "CleanupWindowStart": "02:00:00", "CleanupWindowEnd": "06:00:00", "UseCleanupWindow": true } } ``` ### Usage #### Setting Retention Policies ```csharp var policyStore = serviceProvider.GetRequiredService(); // Time-based retention: delete events older than 30 days await policyStore.SetPolicyAsync(new RetentionPolicyConfig { StreamName = "orders", MaxAge = TimeSpan.FromDays(30), Enabled = true }); // Size-based retention: keep only last 10,000 events await policyStore.SetPolicyAsync(new RetentionPolicyConfig { StreamName = "analytics", MaxEventCount = 10000, Enabled = true }); // Combined retention: use both time and size limits await policyStore.SetPolicyAsync(new RetentionPolicyConfig { StreamName = "logs", MaxAge = TimeSpan.FromDays(7), MaxEventCount = 50000, Enabled = true }); // Default policy for all streams (use "*" as stream name) await policyStore.SetPolicyAsync(new RetentionPolicyConfig { StreamName = "*", MaxAge = TimeSpan.FromDays(90), Enabled = true }); ``` #### Retrieving Policies ```csharp // Get specific policy var policy = await policyStore.GetPolicyAsync("orders"); if (policy != null) { Console.WriteLine($"Stream: {policy.StreamName}"); Console.WriteLine($"Max Age: {policy.MaxAge}"); Console.WriteLine($"Max Event Count: {policy.MaxEventCount}"); Console.WriteLine($"Enabled: {policy.Enabled}"); } // Get all policies var policies = await policyStore.GetAllPoliciesAsync(); foreach (var p in policies) { Console.WriteLine($"{p.StreamName}: Age={p.MaxAge}, Count={p.MaxEventCount}"); } ``` #### Deleting Policies ```csharp // Delete a specific policy var deleted = await policyStore.DeletePolicyAsync("orders"); // Note: Cannot delete the default "*" policy ``` #### Manual Cleanup While the background service runs automatically, you can also trigger cleanup manually: ```csharp // Apply all enabled retention policies var result = await policyStore.ApplyRetentionPoliciesAsync(); Console.WriteLine($"Streams Processed: {result.StreamsProcessed}"); Console.WriteLine($"Events Deleted: {result.EventsDeleted}"); Console.WriteLine($"Duration: {result.Duration}"); // Per-stream details foreach (var (streamName, count) in result.EventsDeletedPerStream) { Console.WriteLine($" {streamName}: {count} events deleted"); } ``` ### Key Features - **Time-based Retention**: Delete events older than specified duration - **Size-based Retention**: Keep only the most recent N events per stream - **Wildcard Policies**: Apply default policies to all streams using "*" - **Cleanup Windows**: Run cleanup only during specified time windows (e.g., off-peak hours) - **Background Service**: Automatic periodic cleanup via PeriodicTimer - **Statistics Tracking**: Detailed metrics about cleanup operations - **Efficient Deletion**: PostgreSQL stored procedures for batch cleanup ### Cleanup Window The cleanup window feature allows you to restrict when retention policies are enforced: ```csharp options.UseCleanupWindow = true; options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC ``` The window automatically handles midnight crossing: ```csharp // Window that spans midnight (10 PM to 2 AM) options.CleanupWindowStart = TimeSpan.FromHours(22); // 10 PM UTC options.CleanupWindowEnd = TimeSpan.FromHours(2); // 2 AM UTC ``` Disable the window to run cleanup at any time: ```csharp options.UseCleanupWindow = false; ``` ### Monitoring The background service logs cleanup operations: ``` [INF] Retention policy service started. Cleanup interval: 01:00:00, Window: 02:00:00-06:00:00 UTC [INF] Starting retention policy enforcement cycle [INF] Retention cleanup complete: 3 streams processed, 1,234 events deleted in 00:00:01.234 [DBG] Stream orders: 500 events deleted [DBG] Stream analytics: 734 events deleted [DBG] Stream logs: 0 events deleted ``` When outside the cleanup window: ``` [DBG] Outside cleanup window (02:00:00-06:00:00 UTC), skipping retention enforcement ``` ## Event Replay API The Event Replay API enables rebuilding projections, reprocessing events, and time-travel debugging by replaying historical events from persistent streams. ### Installation ```bash dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` The event replay feature is included in the PostgreSQL event streaming package. ### Setup Register the event replay service: ```csharp builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;..."); // Add event replay service builder.Services.AddPostgresEventReplay(); ``` ### Usage #### Replay from Offset Replay events starting from a specific sequence number: ```csharp var replayService = serviceProvider.GetRequiredService(); await foreach (var @event in replayService.ReplayFromOffsetAsync( streamName: "orders", startOffset: 1000, options: new ReplayOptions { BatchSize = 100, MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec ProgressCallback = progress => { Console.WriteLine($"Progress: {progress.EventsProcessed} events " + $"({progress.ProgressPercentage:F1}%) " + $"@ {progress.EventsPerSecond:F0} events/sec"); } })) { await ProcessEventAsync(@event); } ``` #### Replay from Time Replay events starting from a specific timestamp: ```csharp var startTime = DateTimeOffset.UtcNow.AddDays(-7); await foreach (var @event in replayService.ReplayFromTimeAsync( streamName: "orders", startTime: startTime, options: new ReplayOptions { MaxEvents = 10000, EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" } })) { await RebuildProjectionAsync(@event); } ``` #### Replay Time Range Replay events within a specific time window: ```csharp var startTime = DateTimeOffset.UtcNow.AddDays(-7); var endTime = DateTimeOffset.UtcNow.AddDays(-6); await foreach (var @event in replayService.ReplayTimeRangeAsync( streamName: "analytics", startTime: startTime, endTime: endTime, options: new ReplayOptions { EventTypeFilter = new[] { "PageView", "Click" }, ProgressInterval = 500 })) { await ProcessAnalyticsEventAsync(@event); } ``` #### Replay All Events Replay entire stream from the beginning: ```csharp await foreach (var @event in replayService.ReplayAllAsync( streamName: "orders", options: new ReplayOptions { BatchSize = 1000, MaxEventsPerSecond = 5000 })) { await ProcessEventAsync(@event); } ``` #### Get Replay Count Get the total number of events that would be replayed: ```csharp var count = await replayService.GetReplayCountAsync( streamName: "orders", startOffset: 1000, options: new ReplayOptions { EventTypeFilter = new[] { "OrderPlaced" } }); Console.WriteLine($"Will replay {count} OrderPlaced events"); ``` ### Replay Options Control replay behavior with `ReplayOptions`: ```csharp var options = new ReplayOptions { // Batch size for reading from database (default: 100) BatchSize = 100, // Maximum events to replay (default: null = unlimited) MaxEvents = 10000, // Rate limiting in events/second (default: null = unlimited) MaxEventsPerSecond = 1000, // Filter by event types (default: null = all types) EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" }, // Include metadata in events (default: true) IncludeMetadata = true, // Progress callback (default: null) ProgressCallback = progress => { Console.WriteLine($"{progress.EventsProcessed} events processed"); }, // How often to invoke progress callback (default: 1000) ProgressInterval = 1000 }; ``` ### Progress Tracking Monitor replay progress with callbacks: ```csharp await foreach (var @event in replayService.ReplayFromOffsetAsync( streamName: "orders", startOffset: 0, options: new ReplayOptions { ProgressCallback = progress => { Console.WriteLine($@" Replay Progress: Current Offset: {progress.CurrentOffset} Events Processed: {progress.EventsProcessed:N0} Estimated Total: {progress.EstimatedTotal:N0} Progress: {progress.ProgressPercentage:F1}% Rate: {progress.EventsPerSecond:F0} events/sec Elapsed: {progress.Elapsed} Current Event Time: {progress.CurrentTimestamp}"); }, ProgressInterval = 1000 })) { await ProcessEventAsync(@event); } ``` ### Rate Limiting Control replay speed to avoid overwhelming consumers: ```csharp // Replay at maximum 500 events per second await foreach (var @event in replayService.ReplayFromOffsetAsync( streamName: "high-volume-stream", startOffset: 0, options: new ReplayOptions { MaxEventsPerSecond = 500 })) { await ProcessEventSlowlyAsync(@event); } ``` ### Event Type Filtering Replay only specific event types: ```csharp // Only replay order-related events await foreach (var @event in replayService.ReplayAllAsync( streamName: "orders", options: new ReplayOptions { EventTypeFilter = new[] { "OrderPlaced", "OrderShipped", "OrderDelivered" } })) { await UpdateOrderProjectionAsync(@event); } ``` ### Key Features - **Offset-based Replay**: Replay from specific sequence numbers - **Time-based Replay**: Replay from specific timestamps - **Time Range Replay**: Replay events within time windows - **Event Type Filtering**: Replay only specific event types - **Rate Limiting**: Control replay speed with token bucket algorithm - **Progress Tracking**: Monitor replay with callbacks and metrics - **Batching**: Efficient streaming with configurable batch sizes - **Cancellation Support**: Full CancellationToken support ### Common Use Cases #### Rebuilding Projections ```csharp // Rebuild entire read model from scratch await foreach (var @event in replayService.ReplayAllAsync("orders")) { await projectionUpdater.ApplyAsync(@event); } ``` #### Reprocessing After Bug Fixes ```csharp // Reprocess last 24 hours after fixing handler bug var yesterday = DateTimeOffset.UtcNow.AddDays(-1); await foreach (var @event in replayService.ReplayFromTimeAsync("orders", yesterday)) { await fixedHandler.HandleAsync(@event); } ``` #### Creating New Projections ```csharp // Build new analytics projection from historical data await foreach (var @event in replayService.ReplayAllAsync( "user-activity", options: new ReplayOptions { EventTypeFilter = new[] { "PageView", "Click", "Purchase" }, MaxEventsPerSecond = 10000 // Fast replay for batch processing })) { await analyticsProjection.ApplyAsync(@event); } ``` #### Time-Travel Debugging ```csharp // Replay specific time period to debug issue var bugStart = new DateTimeOffset(2025, 12, 10, 14, 30, 0, TimeSpan.Zero); var bugEnd = new DateTimeOffset(2025, 12, 10, 15, 00, 0, TimeSpan.Zero); await foreach (var @event in replayService.ReplayTimeRangeAsync( "orders", bugStart, bugEnd)) { await debugHandler.InspectAsync(@event); } ``` ### Testing Resources Comprehensive testing guide available: [POSTGRESQL-TESTING.md](POSTGRESQL-TESTING.md) Topics covered: - Docker PostgreSQL setup - Persistent stream operations - Ephemeral queue operations - Visibility timeout testing - Dead letter queue verification - Performance testing - Database schema inspection ## Stream Configuration The Stream Configuration feature provides per-stream configuration capabilities for fine-grained control over retention policies, dead letter queues, lifecycle management, performance tuning, and access control. Each stream can have its own settings that override global defaults. ### Installation Stream configuration is included in the PostgreSQL event streaming package: ```bash dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` ### Setup Register the stream configuration services: ```csharp builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;..."); // Add stream configuration support builder.Services.AddPostgresStreamConfiguration(); ``` ### Usage #### Basic Stream Configuration Configure retention policies for a specific stream: ```csharp var configStore = serviceProvider.GetRequiredService(); var config = new StreamConfiguration { StreamName = "orders", Description = "Order processing stream", Tags = new Dictionary { ["domain"] = "orders", ["environment"] = "production" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90), MaxSizeBytes = 10L * 1024 * 1024 * 1024, // 10 GB EnablePartitioning = true, PartitionInterval = TimeSpan.FromDays(7) }, CreatedAt = DateTimeOffset.UtcNow, CreatedBy = "admin" }; await configStore.SetConfigurationAsync(config); ``` #### Dead Letter Queue Configuration Configure error handling with dead letter queues: ```csharp var config = new StreamConfiguration { StreamName = "payment-processing", DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = true, DeadLetterStreamName = "payment-processing-dlq", MaxDeliveryAttempts = 5, RetryDelay = TimeSpan.FromMinutes(5), StoreOriginalEvent = true, StoreErrorDetails = true }, CreatedAt = DateTimeOffset.UtcNow }; await configStore.SetConfigurationAsync(config); ``` #### Lifecycle Management Configure automatic archival and deletion: ```csharp var config = new StreamConfiguration { StreamName = "audit-logs", Lifecycle = new LifecycleConfiguration { AutoCreate = true, AutoArchive = true, ArchiveAfter = TimeSpan.FromDays(365), ArchiveLocation = "s3://archive-bucket/audit-logs", AutoDelete = false }, CreatedAt = DateTimeOffset.UtcNow }; await configStore.SetConfigurationAsync(config); ``` #### Performance Tuning Configure performance-related settings: ```csharp var config = new StreamConfiguration { StreamName = "high-throughput-events", Performance = new PerformanceConfiguration { BatchSize = 1000, EnableCompression = true, CompressionAlgorithm = "gzip", EnableIndexing = true, IndexedFields = new List { "userId", "tenantId", "eventType" }, CacheSize = 10000 }, CreatedAt = DateTimeOffset.UtcNow }; await configStore.SetConfigurationAsync(config); ``` #### Access Control Configure stream permissions and quotas: ```csharp var config = new StreamConfiguration { StreamName = "sensitive-data", AccessControl = new AccessControlConfiguration { PublicRead = false, PublicWrite = false, AllowedReaders = new List { "admin", "audit-service" }, AllowedWriters = new List { "admin", "data-ingestion-service" }, MaxConsumerGroups = 5, MaxEventsPerSecond = 10000 }, CreatedAt = DateTimeOffset.UtcNow }; await configStore.SetConfigurationAsync(config); ``` #### Getting Effective Configuration Retrieve the effective configuration (stream-specific merged with defaults): ```csharp var configProvider = serviceProvider.GetRequiredService(); // Gets merged configuration (stream-specific + global defaults) var effectiveConfig = await configProvider.GetEffectiveConfigurationAsync("orders"); // Get specific configuration sections var retention = await configProvider.GetRetentionConfigurationAsync("orders"); var dlq = await configProvider.GetDeadLetterQueueConfigurationAsync("orders"); var lifecycle = await configProvider.GetLifecycleConfigurationAsync("orders"); ``` #### Finding Configurations Query configurations by criteria: ```csharp // Find all streams with archiving enabled var archivingStreams = await configStore.FindConfigurationsAsync( c => c.Lifecycle?.AutoArchive == true); // Find all production streams var productionStreams = await configStore.FindConfigurationsAsync( c => c.Tags?.ContainsKey("environment") == true && c.Tags["environment"] == "production"); // Get all configurations var allConfigs = await configStore.GetAllConfigurationsAsync(); ``` #### Deleting Configuration Remove stream-specific configuration (reverts to defaults): ```csharp await configStore.DeleteConfigurationAsync("orders"); ``` ### Configuration Options #### RetentionConfiguration - **MaxAge**: Maximum age before cleanup (e.g., `TimeSpan.FromDays(90)`) - **MaxSizeBytes**: Maximum storage size before cleanup - **MaxEventCount**: Maximum number of events before cleanup - **EnablePartitioning**: Enable table partitioning for better performance - **PartitionInterval**: Partition interval (e.g., daily, weekly) #### DeadLetterQueueConfiguration - **Enabled**: Enable DLQ for this stream - **DeadLetterStreamName**: Name of DLQ stream (defaults to `{StreamName}-dlq`) - **MaxDeliveryAttempts**: Attempts before sending to DLQ (default: 3) - **RetryDelay**: Delay between retry attempts - **StoreOriginalEvent**: Store original event in DLQ - **StoreErrorDetails**: Store error details in DLQ #### LifecycleConfiguration - **AutoCreate**: Automatically create stream if it doesn't exist - **AutoArchive**: Automatically archive old events - **ArchiveAfter**: Age after which events are archived - **ArchiveLocation**: Storage location for archived events - **AutoDelete**: Automatically delete old events - **DeleteAfter**: Age after which events are deleted #### PerformanceConfiguration - **BatchSize**: Batch size for bulk operations - **EnableCompression**: Enable event compression - **CompressionAlgorithm**: Compression algorithm (e.g., "gzip", "zstd") - **EnableIndexing**: Enable metadata field indexing - **IndexedFields**: List of fields to index - **CacheSize**: Cache size for frequently accessed events #### AccessControlConfiguration - **PublicRead**: Allow public read access - **PublicWrite**: Allow public write access - **AllowedReaders**: List of authorized readers - **AllowedWriters**: List of authorized writers - **MaxConsumerGroups**: Maximum consumer groups allowed - **MaxEventsPerSecond**: Rate limit for events per second ### Key Features - **Per-Stream Configuration**: Override global settings per stream - **Retention Policies**: Configure retention per stream - **Dead Letter Queues**: Error handling with configurable retry logic - **Lifecycle Management**: Automatic archival and deletion - **Performance Tuning**: Optimize batch sizes, compression, and indexing - **Access Control**: Stream-level permissions and quotas - **Configuration Merging**: Stream-specific settings override global defaults - **Tag-Based Filtering**: Categorize and query streams by tags ### Common Use Cases #### Multi-Tenant Configuration ```csharp // High-value tenant with extended retention var premiumConfig = new StreamConfiguration { StreamName = "tenant-acme-corp", Tags = new Dictionary { ["tier"] = "premium" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(365) // 1 year retention } }; // Standard tenant with shorter retention var standardConfig = new StreamConfiguration { StreamName = "tenant-small-co", Tags = new Dictionary { ["tier"] = "standard" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(30) // 30 days retention } }; ``` #### Environment-Specific Settings ```csharp // Production: strict retention and DLQ var prodConfig = new StreamConfiguration { StreamName = "orders-prod", Tags = new Dictionary { ["environment"] = "production" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) }, DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = true, MaxDeliveryAttempts = 5 } }; // Development: relaxed settings var devConfig = new StreamConfiguration { StreamName = "orders-dev", Tags = new Dictionary { ["environment"] = "development" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) }, DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = false } }; ``` #### Domain-Specific Configuration ```csharp // Audit logs: long retention, auto-archive var auditConfig = new StreamConfiguration { StreamName = "audit-logs", Tags = new Dictionary { ["domain"] = "security" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(2555) }, // 7 years Lifecycle = new LifecycleConfiguration { AutoArchive = true, ArchiveAfter = TimeSpan.FromDays(365), ArchiveLocation = "s3://archive/audit" } }; // Analytics: high-throughput, short retention var analyticsConfig = new StreamConfiguration { StreamName = "page-views", Tags = new Dictionary { ["domain"] = "analytics" }, Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(30) }, Performance = new PerformanceConfiguration { BatchSize = 10000, EnableCompression = true } }; ``` # 2024-2025 Roadmap | Task | Description | Status | |----------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|--------| | Support .NET 8 | Ensure compatibility with .NET 8. | ✅ | | Support .NET 10 | Upgrade to .NET 10 with C# 14 language support. | ✅ | | Update FluentValidation | Upgrade FluentValidation to version 11.x for .NET 10 compatibility. | ✅ | | Add gRPC Support with source generators | Implement gRPC endpoints with source generators and Google Rich Error Model for validation. | ✅ | | Create a demo project (Svrnty.CQRS.Grpc.Sample) | Develop a comprehensive demo project showcasing gRPC and HTTP endpoints. | ✅ | | Event Streaming - Phase 1: gRPC Bidirectional Streaming | Implement gRPC bidirectional streaming for real-time event delivery. | ✅ | | Event Streaming - Phase 2.1: Storage Abstractions | Define IEventStreamStore interface for persistent and ephemeral streams. | ✅ | | Event Streaming - Phase 2.2: PostgreSQL Storage | Implement PostgreSQL-backed storage with persistent streams, message queues, and DLQ. | ✅ | | Event Streaming - Phase 2.3: Consumer Offset Tracking | Implement consumer group coordination and offset management for persistent streams. | ✅ | | Event Streaming - Phase 2.4: Retention Policies | Add time-based and size-based retention with automatic cleanup and table partitioning. | ✅ | | Event Streaming - Phase 2.5: Event Replay API | Add APIs for replaying events from specific offsets and time ranges. | ✅ | | Event Streaming - Phase 2.6: Stream Configuration | Per-stream configuration for retention, DLQ, and lifecycle management. | ✅ | | Create a website for the Framework | Develop a website to host comprehensive documentation for the framework. | ⬜️ | # 2026 Roadmap | Task | Description | Status | |----------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|--------| | gRPC Compression Support | Smart message compression with automatic threshold detection and per-handler control. | ⬜️ | | gRPC Metadata & Authorization Support | Expose ServerCallContext to handlers and integrate authorization services for gRPC endpoints. | ⬜️ |