1454 lines
48 KiB
Markdown
1454 lines
48 KiB
Markdown
> This project was originally initiated by [Powered Software Inc.](https://poweredsoft.com/) and was forked from the [PoweredSoft.CQRS](https://github.com/PoweredSoft/CQRS) Repository
|
|
|
|
# CQRS
|
|
|
|
Our implementation of query and command responsibility segregation (CQRS).
|
|
|
|
## Getting Started
|
|
|
|
> Install nuget package to your awesome project.
|
|
|
|
| Package Name | NuGet | NuGet Install |
|
|
|-----------------------------------------| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |-----------------------------------------------------------------------:|
|
|
| Svrnty.CQRS | [](https://www.nuget.org/packages/Svrnty.CQRS/) | ```dotnet add package Svrnty.CQRS ``` |
|
|
| Svrnty.CQRS.MinimalApi | [](https://www.nuget.org/packages/Svrnty.CQRS.MinimalApi/) | ```dotnet add package Svrnty.CQRS.MinimalApi ``` |
|
|
| Svrnty.CQRS.FluentValidation | [](https://www.nuget.org/packages/Svrnty.CQRS.FluentValidation/) | ```dotnet add package Svrnty.CQRS.FluentValidation ``` |
|
|
| Svrnty.CQRS.DynamicQuery | [](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery/) | ```dotnet add package Svrnty.CQRS.DynamicQuery ``` |
|
|
| Svrnty.CQRS.DynamicQuery.MinimalApi | [](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery.MinimalApi/) | ```dotnet add package Svrnty.CQRS.DynamicQuery.MinimalApi ``` |
|
|
| Svrnty.CQRS.Grpc | [](https://www.nuget.org/packages/Svrnty.CQRS.Grpc/) | ```dotnet add package Svrnty.CQRS.Grpc ``` |
|
|
| Svrnty.CQRS.Grpc.Generators | [](https://www.nuget.org/packages/Svrnty.CQRS.Grpc.Generators/) | ```dotnet add package Svrnty.CQRS.Grpc.Generators ``` |
|
|
| Svrnty.CQRS.Events | [](https://www.nuget.org/packages/Svrnty.CQRS.Events/) | ```dotnet add package Svrnty.CQRS.Events ``` |
|
|
| Svrnty.CQRS.Events.Grpc | [](https://www.nuget.org/packages/Svrnty.CQRS.Events.Grpc/) | ```dotnet add package Svrnty.CQRS.Events.Grpc ``` |
|
|
| Svrnty.CQRS.Events.PostgreSQL | [](https://www.nuget.org/packages/Svrnty.CQRS.Events.PostgreSQL/) | ```dotnet add package Svrnty.CQRS.Events.PostgreSQL ``` |
|
|
| Svrnty.CQRS.Events.ConsumerGroups | [](https://www.nuget.org/packages/Svrnty.CQRS.Events.ConsumerGroups/) | ```dotnet add package Svrnty.CQRS.Events.ConsumerGroups ``` |
|
|
|
|
> Abstractions Packages.
|
|
|
|
| Package Name | NuGet | NuGet Install |
|
|
| ---------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -----------------------------------------------------: |
|
|
| Svrnty.CQRS.Abstractions | [](https://www.nuget.org/packages/Svrnty.CQRS.Abstractions/) | ```dotnet add package Svrnty.CQRS.Abstractions ``` |
|
|
| Svrnty.CQRS.DynamicQuery.Abstractions | [](https://www.nuget.org/packages/Svrnty.CQRS.DynamicQuery.Abstractions/) | ```dotnet add package Svrnty.CQRS.DynamicQuery.Abstractions ``` |
|
|
| Svrnty.CQRS.Grpc.Abstractions | [](https://www.nuget.org/packages/Svrnty.CQRS.Grpc.Abstractions/) | ```dotnet add package Svrnty.CQRS.Grpc.Abstractions ``` |
|
|
| Svrnty.CQRS.Events.Abstractions | [](https://www.nuget.org/packages/Svrnty.CQRS.Events.Abstractions/) | ```dotnet add package Svrnty.CQRS.Events.Abstractions ``` |
|
|
| Svrnty.CQRS.Events.ConsumerGroups.Abstractions | [](https://www.nuget.org/packages/Svrnty.CQRS.Events.ConsumerGroups.Abstractions/) | ```dotnet add package Svrnty.CQRS.Events.ConsumerGroups.Abstractions ``` |
|
|
|
|
|
|
## Sample of startup code for gRPC (Recommended)
|
|
|
|
```csharp
|
|
using Svrnty.CQRS;
|
|
using Svrnty.CQRS.FluentValidation;
|
|
using Svrnty.CQRS.Grpc;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register your commands with validators
|
|
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
|
|
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();
|
|
|
|
// Register your queries
|
|
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();
|
|
|
|
// Configure CQRS with gRPC support
|
|
builder.Services.AddSvrntyCqrs(cqrs =>
|
|
{
|
|
// Enable gRPC endpoints with reflection
|
|
cqrs.AddGrpc(grpc =>
|
|
{
|
|
grpc.EnableReflection();
|
|
});
|
|
});
|
|
|
|
var app = builder.Build();
|
|
|
|
// Map all configured CQRS endpoints
|
|
app.UseSvrntyCqrs();
|
|
|
|
app.Run();
|
|
```
|
|
|
|
### Important: gRPC Requirements
|
|
|
|
The gRPC implementation uses **Grpc.Tools** with `.proto` files and **source generators** for automatic service implementation:
|
|
|
|
#### 1. Install required packages:
|
|
|
|
```bash
|
|
dotnet add package Grpc.AspNetCore
|
|
dotnet add package Grpc.AspNetCore.Server.Reflection
|
|
dotnet add package Grpc.StatusProto # For Rich Error Model validation
|
|
```
|
|
|
|
#### 2. Add the source generator as an analyzer:
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Grpc.Generators
|
|
```
|
|
|
|
The source generator is automatically configured as an analyzer when installed via NuGet and will generate both the `.proto` files and gRPC service implementations at compile time.
|
|
|
|
#### 3. Define your C# commands and queries:
|
|
|
|
```csharp
|
|
public record AddUserCommand
|
|
{
|
|
public required string Name { get; init; }
|
|
public required string Email { get; init; }
|
|
public int Age { get; init; }
|
|
}
|
|
|
|
public record RemoveUserCommand
|
|
{
|
|
public int UserId { get; init; }
|
|
}
|
|
```
|
|
|
|
**Notes:**
|
|
- The source generator automatically creates:
|
|
- `.proto` files in the `Protos/` directory from your C# commands and queries
|
|
- `CommandServiceImpl` and `QueryServiceImpl` implementations
|
|
- FluentValidation is automatically integrated with **Google Rich Error Model** for structured validation errors
|
|
- Validation errors return `google.rpc.Status` with `BadRequest` containing `FieldViolations`
|
|
- Use `record` types for commands/queries (immutable, value-based equality, more concise)
|
|
- No need for protobuf-net attributes - just define your C# types
|
|
|
|
## Sample of startup code for Minimal API (HTTP)
|
|
|
|
For HTTP scenarios (web browsers, public APIs), you can use the Minimal API approach:
|
|
|
|
```csharp
|
|
using Svrnty.CQRS;
|
|
using Svrnty.CQRS.FluentValidation;
|
|
using Svrnty.CQRS.MinimalApi;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register your commands with validators
|
|
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();
|
|
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();
|
|
|
|
// Register your queries
|
|
builder.Services.AddQuery<PersonQuery, IQueryable<Person>, PersonQueryHandler>();
|
|
|
|
// Configure CQRS with Minimal API support
|
|
builder.Services.AddSvrntyCqrs(cqrs =>
|
|
{
|
|
// Enable Minimal API endpoints
|
|
cqrs.AddMinimalApi();
|
|
});
|
|
|
|
// Add Swagger (optional)
|
|
builder.Services.AddEndpointsApiExplorer();
|
|
builder.Services.AddSwaggerGen();
|
|
|
|
var app = builder.Build();
|
|
|
|
if (app.Environment.IsDevelopment())
|
|
{
|
|
app.UseSwagger();
|
|
app.UseSwaggerUI();
|
|
}
|
|
|
|
// Map all configured CQRS endpoints (automatically creates POST /api/command/* and POST/GET /api/query/*)
|
|
app.UseSvrntyCqrs();
|
|
|
|
app.Run();
|
|
```
|
|
|
|
**Notes:**
|
|
- FluentValidation is automatically integrated with **RFC 7807 Problem Details** for structured validation errors
|
|
- Use `record` types for commands/queries (immutable, value-based equality, more concise)
|
|
- Supports both POST and GET (for queries) endpoints
|
|
- Automatically generates Swagger/OpenAPI documentation
|
|
|
|
## Sample enabling both gRPC and HTTP
|
|
|
|
You can enable both gRPC and traditional HTTP endpoints simultaneously, allowing clients to choose their preferred protocol:
|
|
|
|
```csharp
|
|
using Svrnty.CQRS;
|
|
using Svrnty.CQRS.FluentValidation;
|
|
using Svrnty.CQRS.Grpc;
|
|
using Svrnty.CQRS.MinimalApi;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register your commands with validators
|
|
builder.Services.AddCommand<AddUserCommand, int, AddUserCommandHandler, AddUserCommandValidator>();
|
|
builder.Services.AddCommand<RemoveUserCommand, RemoveUserCommandHandler>();
|
|
|
|
// Register your queries
|
|
builder.Services.AddQuery<FetchUserQuery, User, FetchUserQueryHandler>();
|
|
|
|
// Configure CQRS with both gRPC and Minimal API support
|
|
builder.Services.AddSvrntyCqrs(cqrs =>
|
|
{
|
|
// Enable gRPC endpoints with reflection
|
|
cqrs.AddGrpc(grpc =>
|
|
{
|
|
grpc.EnableReflection();
|
|
});
|
|
|
|
// Enable Minimal API endpoints
|
|
cqrs.AddMinimalApi();
|
|
});
|
|
|
|
// Add HTTP support with Swagger
|
|
builder.Services.AddEndpointsApiExplorer();
|
|
builder.Services.AddSwaggerGen();
|
|
|
|
var app = builder.Build();
|
|
|
|
if (app.Environment.IsDevelopment())
|
|
{
|
|
app.UseSwagger();
|
|
app.UseSwaggerUI();
|
|
}
|
|
|
|
// Map all configured CQRS endpoints (both gRPC and HTTP)
|
|
app.UseSvrntyCqrs();
|
|
|
|
app.Run();
|
|
```
|
|
|
|
**Benefits:**
|
|
- Single codebase supports multiple protocols
|
|
- gRPC for high-performance, low-latency scenarios (microservices, internal APIs)
|
|
- HTTP for web browsers, legacy clients, and public APIs
|
|
- Same commands, queries, and validation logic for both protocols
|
|
- Swagger UI available for HTTP endpoints, gRPC reflection for gRPC clients
|
|
|
|
# Fluent Validation
|
|
|
|
FluentValidation is optional but recommended for command and query validation. The `Svrnty.CQRS.FluentValidation` package provides extension methods to simplify validator registration.
|
|
|
|
## With Svrnty.CQRS.FluentValidation (Recommended)
|
|
|
|
The package exposes extension method overloads that accept the validator as a generic parameter:
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.FluentValidation
|
|
```
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.FluentValidation; // Extension methods for validator registration
|
|
|
|
// Command with result - validator as last generic parameter
|
|
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler, EchoCommandValidator>();
|
|
|
|
// Command without result - validator included in generics
|
|
builder.Services.AddCommand<CreatePersonCommand, CreatePersonCommandHandler, CreatePersonCommandValidator>();
|
|
```
|
|
|
|
**Benefits:**
|
|
- **Single line registration** - Handler and validator registered together
|
|
- **Type safety** - Compiler ensures validator matches command type
|
|
- **Less boilerplate** - No need for separate `AddTransient<IValidator<T>>()` calls
|
|
- **Cleaner code** - Clear intent that validation is part of command pipeline
|
|
|
|
## Without Svrnty.CQRS.FluentValidation
|
|
|
|
If you prefer not to use the FluentValidation package, you need to register commands and validators separately:
|
|
|
|
```csharp
|
|
using FluentValidation;
|
|
using Svrnty.CQRS;
|
|
|
|
// Register command handler
|
|
builder.Services.AddCommand<EchoCommand, string, EchoCommandHandler>();
|
|
|
|
// Manually register validator
|
|
builder.Services.AddTransient<IValidator<EchoCommand>, EchoCommandValidator>();
|
|
```
|
|
|
|
## Event Streaming
|
|
|
|
Svrnty.CQRS includes comprehensive event streaming support for building event-driven architectures with both **persistent** (event sourcing) and **ephemeral** (message queue) streams.
|
|
|
|
### Quick Start
|
|
|
|
```bash
|
|
# Install core event streaming packages
|
|
dotnet add package Svrnty.CQRS.Events
|
|
dotnet add package Svrnty.CQRS.Events.Grpc # For gRPC bidirectional streaming
|
|
|
|
# Install storage backend
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL # PostgreSQL storage (recommended for production)
|
|
```
|
|
|
|
### Basic Setup
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events;
|
|
using Svrnty.CQRS.Events.PostgreSQL;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Add event streaming support
|
|
builder.Services.AddSvrntyEvents();
|
|
builder.Services.AddDefaultEventDiscovery();
|
|
|
|
// Configure storage backend (PostgreSQL)
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetSection("EventStreaming:PostgreSQL"));
|
|
|
|
// Enable gRPC event streaming (optional)
|
|
builder.Services.AddSvrntyEventsGrpc();
|
|
|
|
var app = builder.Build();
|
|
|
|
// Map gRPC event streaming endpoints
|
|
app.MapGrpcService<EventStreamServiceImpl>();
|
|
|
|
app.Run();
|
|
```
|
|
|
|
### PostgreSQL Configuration
|
|
|
|
Add to `appsettings.json`:
|
|
|
|
```json
|
|
{
|
|
"EventStreaming": {
|
|
"PostgreSQL": {
|
|
"ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;Username=postgres;Password=postgres",
|
|
"SchemaName": "event_streaming",
|
|
"AutoMigrate": true,
|
|
"MaxPoolSize": 100,
|
|
"MinPoolSize": 5
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Key Features:**
|
|
- **Auto-Migration**: Database schema created automatically on startup (when `AutoMigrate: true`)
|
|
- **Connection Pooling**: Configurable Npgsql connection pool (5-100 connections)
|
|
- **Schema Isolation**: All tables in dedicated schema (default: `event_streaming`)
|
|
|
|
### Storage Options
|
|
|
|
#### 1. PostgreSQL (Production-Ready) ✅
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming(options => {
|
|
options.ConnectionString = "Host=localhost;...";
|
|
options.AutoMigrate = true; // Create schema automatically
|
|
});
|
|
```
|
|
|
|
**Features:**
|
|
- Persistent and ephemeral stream support
|
|
- SKIP LOCKED for concurrent queue operations
|
|
- Visibility timeout for message processing
|
|
- Dead letter queue for failed messages
|
|
- Consumer offset tracking (Phase 2.3)
|
|
- Retention policies support (Phase 2.4)
|
|
|
|
#### 2. In-Memory (Development/Testing)
|
|
```csharp
|
|
builder.Services.AddInMemoryEventStorage();
|
|
```
|
|
|
|
**Features:**
|
|
- Fast in-memory storage
|
|
- Suitable for development and testing
|
|
- No external dependencies
|
|
- Data lost on restart
|
|
|
|
### Stream Types
|
|
|
|
#### Persistent Streams (Event Sourcing)
|
|
Append-only event logs for event sourcing patterns:
|
|
|
|
```csharp
|
|
// Append events to persistent stream
|
|
await eventStore.AppendAsync("user-123", new UserCreatedEvent
|
|
{
|
|
UserId = 123,
|
|
Name = "Alice",
|
|
Email = "alice@example.com"
|
|
});
|
|
|
|
// Read stream from offset
|
|
var events = await eventStore.ReadStreamAsync("user-123", fromOffset: 0);
|
|
|
|
// Get stream length
|
|
var length = await eventStore.GetStreamLengthAsync("user-123");
|
|
```
|
|
|
|
#### Ephemeral Streams (Message Queue)
|
|
Message queue semantics with at-least-once delivery:
|
|
|
|
```csharp
|
|
// Enqueue messages
|
|
await eventStore.EnqueueAsync("notifications", new EmailNotificationEvent
|
|
{
|
|
To = "user@example.com",
|
|
Subject = "Welcome!",
|
|
Body = "Thanks for signing up"
|
|
});
|
|
|
|
// Dequeue with visibility timeout
|
|
var @event = await eventStore.DequeueAsync(
|
|
streamName: "notifications",
|
|
consumerId: "worker-1",
|
|
visibilityTimeout: TimeSpan.FromSeconds(30)
|
|
);
|
|
|
|
// Process and acknowledge
|
|
if (@event != null)
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
await eventStore.AcknowledgeAsync("notifications", @event.EventId, "worker-1");
|
|
}
|
|
|
|
// Or negative acknowledge (requeue or move to DLQ)
|
|
await eventStore.NackAsync(
|
|
streamName: "notifications",
|
|
eventId: @event.EventId,
|
|
consumerId: "worker-1",
|
|
requeue: false // Move to dead letter queue
|
|
);
|
|
```
|
|
|
|
### gRPC Bidirectional Streaming
|
|
|
|
The framework provides gRPC bidirectional streaming for real-time event delivery:
|
|
|
|
```csharp
|
|
// Client-side subscription (in your client application)
|
|
var call = client.SubscribeToStream();
|
|
|
|
// Send subscription request
|
|
await call.RequestStream.WriteAsync(new SubscribeRequest
|
|
{
|
|
StreamName = "user-events",
|
|
ConsumerId = "client-123",
|
|
SubscriptionMode = SubscriptionMode.Broadcast
|
|
});
|
|
|
|
// Receive events in real-time
|
|
await foreach (var @event in call.ResponseStream.ReadAllAsync())
|
|
{
|
|
Console.WriteLine($"Received event: {@event.EventType}");
|
|
}
|
|
```
|
|
|
|
### Consumer Groups
|
|
|
|
Consumer groups enable multiple consumers to coordinate processing of persistent streams without duplicates. This provides load balancing, fault tolerance, and at-least-once delivery guarantees.
|
|
|
|
#### Installation
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
|
|
```
|
|
|
|
#### Setup
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.ConsumerGroups;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Add consumer group support with PostgreSQL backend
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
Configuration in `appsettings.json`:
|
|
|
|
```json
|
|
{
|
|
"EventStreaming": {
|
|
"ConsumerGroups": {
|
|
"ConnectionString": "Host=localhost;Port=5432;Database=myapp_events;...",
|
|
"SchemaName": "event_streaming",
|
|
"AutoMigrate": true
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Basic Usage
|
|
|
|
```csharp
|
|
// Inject the consumer group reader
|
|
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();
|
|
|
|
// Consume events with automatic offset management
|
|
await foreach (var @event in reader.ConsumeAsync(
|
|
streamName: "orders",
|
|
groupId: "order-processors",
|
|
consumerId: "worker-1",
|
|
options: new ConsumerGroupOptions
|
|
{
|
|
BatchSize = 100,
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch,
|
|
HeartbeatInterval = TimeSpan.FromSeconds(10),
|
|
SessionTimeout = TimeSpan.FromSeconds(30)
|
|
},
|
|
cancellationToken))
|
|
{
|
|
await ProcessOrderEventAsync(@event);
|
|
// Offset automatically committed after batch
|
|
}
|
|
```
|
|
|
|
#### Offset Commit Strategies
|
|
|
|
**1. AfterEach** - Commit after each event (safest, highest overhead):
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.AfterEach;
|
|
```
|
|
|
|
**2. AfterBatch** - Commit after each batch (balanced):
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.AfterBatch;
|
|
options.BatchSize = 100;
|
|
```
|
|
|
|
**3. Periodic** - Commit at intervals (highest throughput):
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.Periodic;
|
|
options.PeriodicCommitInterval = TimeSpan.FromSeconds(5);
|
|
```
|
|
|
|
**4. Manual** - Full control over commits:
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.Manual;
|
|
|
|
await foreach (var @event in reader.ConsumeAsync(...))
|
|
{
|
|
try
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
|
|
// Manually commit after successful processing
|
|
await reader.CommitOffsetAsync(
|
|
streamName: "orders",
|
|
groupId: "order-processors",
|
|
consumerId: "worker-1",
|
|
offset: currentOffset,
|
|
cancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Processing failed, will retry");
|
|
// Don't commit - event will be reprocessed
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Monitoring Consumer Groups
|
|
|
|
```csharp
|
|
var offsetStore = serviceProvider.GetRequiredService<IConsumerOffsetStore>();
|
|
|
|
// Get all active consumers in a group
|
|
var consumers = await offsetStore.GetActiveConsumersAsync("order-processors");
|
|
foreach (var consumer in consumers)
|
|
{
|
|
Console.WriteLine($"Consumer: {consumer.ConsumerId}");
|
|
Console.WriteLine($" Last Heartbeat: {consumer.LastHeartbeat}");
|
|
Console.WriteLine($" Registered: {consumer.RegisteredAt}");
|
|
}
|
|
|
|
// Get group offsets per consumer
|
|
var offsets = await offsetStore.GetGroupOffsetsAsync("order-processors", "orders");
|
|
foreach (var (consumerId, offset) in offsets)
|
|
{
|
|
Console.WriteLine($"{consumerId}: offset {offset}");
|
|
}
|
|
|
|
// Get last committed offset for the group (minimum across all consumers)
|
|
var groupOffset = await offsetStore.GetCommittedOffsetAsync("order-processors", "orders");
|
|
Console.WriteLine($"Group safe offset: {groupOffset}");
|
|
```
|
|
|
|
#### Key Features
|
|
|
|
- **Automatic Offset Management**: Tracks last processed position per consumer
|
|
- **Heartbeat Monitoring**: Detects and removes stale consumers automatically
|
|
- **Flexible Commit Strategies**: Choose when to commit offsets (manual, per-event, per-batch, periodic)
|
|
- **Load Balancing**: Multiple consumers can process the same stream
|
|
- **Fault Tolerance**: Consumers can resume from last committed offset after failure
|
|
- **At-Least-Once Delivery**: Events processed at least once, even with consumer failures
|
|
|
|
#### Health Monitoring
|
|
|
|
The `ConsumerHealthMonitor` background service automatically:
|
|
- Sends periodic heartbeats for registered consumers
|
|
- Detects stale consumers (no heartbeat within session timeout)
|
|
- Cleans up dead consumers from the registry
|
|
- Logs consumer group health metrics
|
|
|
|
Configure health monitoring:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
storageConfig => { /* ... */ },
|
|
healthConfig => {
|
|
healthConfig.CleanupInterval = TimeSpan.FromSeconds(30);
|
|
healthConfig.SessionTimeout = TimeSpan.FromSeconds(60);
|
|
healthConfig.Enabled = true;
|
|
});
|
|
```
|
|
|
|
## Retention Policies
|
|
|
|
Automatic retention policy enforcement ensures that old events are cleaned up according to configurable rules. This helps manage database size and comply with data retention requirements.
|
|
|
|
### Installation
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
```
|
|
|
|
The retention policy feature is included in the PostgreSQL event streaming package.
|
|
|
|
### Setup
|
|
|
|
Register retention policy services in your application:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
|
|
|
|
// Add retention policy background service
|
|
builder.Services.AddPostgresRetentionPolicies(options =>
|
|
{
|
|
options.Enabled = true;
|
|
options.CleanupInterval = TimeSpan.FromHours(1);
|
|
options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC
|
|
options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC
|
|
options.UseCleanupWindow = true;
|
|
});
|
|
```
|
|
|
|
Or use configuration:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresRetentionPolicies(
|
|
builder.Configuration.GetSection("RetentionService"));
|
|
```
|
|
|
|
```json
|
|
{
|
|
"RetentionService": {
|
|
"Enabled": true,
|
|
"CleanupInterval": "01:00:00",
|
|
"CleanupWindowStart": "02:00:00",
|
|
"CleanupWindowEnd": "06:00:00",
|
|
"UseCleanupWindow": true
|
|
}
|
|
}
|
|
```
|
|
|
|
### Usage
|
|
|
|
#### Setting Retention Policies
|
|
|
|
```csharp
|
|
var policyStore = serviceProvider.GetRequiredService<IRetentionPolicyStore>();
|
|
|
|
// Time-based retention: delete events older than 30 days
|
|
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
|
|
{
|
|
StreamName = "orders",
|
|
MaxAge = TimeSpan.FromDays(30),
|
|
Enabled = true
|
|
});
|
|
|
|
// Size-based retention: keep only last 10,000 events
|
|
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
|
|
{
|
|
StreamName = "analytics",
|
|
MaxEventCount = 10000,
|
|
Enabled = true
|
|
});
|
|
|
|
// Combined retention: use both time and size limits
|
|
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
|
|
{
|
|
StreamName = "logs",
|
|
MaxAge = TimeSpan.FromDays(7),
|
|
MaxEventCount = 50000,
|
|
Enabled = true
|
|
});
|
|
|
|
// Default policy for all streams (use "*" as stream name)
|
|
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
|
|
{
|
|
StreamName = "*",
|
|
MaxAge = TimeSpan.FromDays(90),
|
|
Enabled = true
|
|
});
|
|
```
|
|
|
|
#### Retrieving Policies
|
|
|
|
```csharp
|
|
// Get specific policy
|
|
var policy = await policyStore.GetPolicyAsync("orders");
|
|
if (policy != null)
|
|
{
|
|
Console.WriteLine($"Stream: {policy.StreamName}");
|
|
Console.WriteLine($"Max Age: {policy.MaxAge}");
|
|
Console.WriteLine($"Max Event Count: {policy.MaxEventCount}");
|
|
Console.WriteLine($"Enabled: {policy.Enabled}");
|
|
}
|
|
|
|
// Get all policies
|
|
var policies = await policyStore.GetAllPoliciesAsync();
|
|
foreach (var p in policies)
|
|
{
|
|
Console.WriteLine($"{p.StreamName}: Age={p.MaxAge}, Count={p.MaxEventCount}");
|
|
}
|
|
```
|
|
|
|
#### Deleting Policies
|
|
|
|
```csharp
|
|
// Delete a specific policy
|
|
var deleted = await policyStore.DeletePolicyAsync("orders");
|
|
|
|
// Note: Cannot delete the default "*" policy
|
|
```
|
|
|
|
#### Manual Cleanup
|
|
|
|
While the background service runs automatically, you can also trigger cleanup manually:
|
|
|
|
```csharp
|
|
// Apply all enabled retention policies
|
|
var result = await policyStore.ApplyRetentionPoliciesAsync();
|
|
|
|
Console.WriteLine($"Streams Processed: {result.StreamsProcessed}");
|
|
Console.WriteLine($"Events Deleted: {result.EventsDeleted}");
|
|
Console.WriteLine($"Duration: {result.Duration}");
|
|
|
|
// Per-stream details
|
|
foreach (var (streamName, count) in result.EventsDeletedPerStream)
|
|
{
|
|
Console.WriteLine($" {streamName}: {count} events deleted");
|
|
}
|
|
```
|
|
|
|
### Key Features
|
|
|
|
- **Time-based Retention**: Delete events older than specified duration
|
|
- **Size-based Retention**: Keep only the most recent N events per stream
|
|
- **Wildcard Policies**: Apply default policies to all streams using "*"
|
|
- **Cleanup Windows**: Run cleanup only during specified time windows (e.g., off-peak hours)
|
|
- **Background Service**: Automatic periodic cleanup via PeriodicTimer
|
|
- **Statistics Tracking**: Detailed metrics about cleanup operations
|
|
- **Efficient Deletion**: PostgreSQL stored procedures for batch cleanup
|
|
|
|
### Cleanup Window
|
|
|
|
The cleanup window feature allows you to restrict when retention policies are enforced:
|
|
|
|
```csharp
|
|
options.UseCleanupWindow = true;
|
|
options.CleanupWindowStart = TimeSpan.FromHours(2); // 2 AM UTC
|
|
options.CleanupWindowEnd = TimeSpan.FromHours(6); // 6 AM UTC
|
|
```
|
|
|
|
The window automatically handles midnight crossing:
|
|
|
|
```csharp
|
|
// Window that spans midnight (10 PM to 2 AM)
|
|
options.CleanupWindowStart = TimeSpan.FromHours(22); // 10 PM UTC
|
|
options.CleanupWindowEnd = TimeSpan.FromHours(2); // 2 AM UTC
|
|
```
|
|
|
|
Disable the window to run cleanup at any time:
|
|
|
|
```csharp
|
|
options.UseCleanupWindow = false;
|
|
```
|
|
|
|
### Monitoring
|
|
|
|
The background service logs cleanup operations:
|
|
|
|
```
|
|
[INF] Retention policy service started. Cleanup interval: 01:00:00, Window: 02:00:00-06:00:00 UTC
|
|
[INF] Starting retention policy enforcement cycle
|
|
[INF] Retention cleanup complete: 3 streams processed, 1,234 events deleted in 00:00:01.234
|
|
[DBG] Stream orders: 500 events deleted
|
|
[DBG] Stream analytics: 734 events deleted
|
|
[DBG] Stream logs: 0 events deleted
|
|
```
|
|
|
|
When outside the cleanup window:
|
|
|
|
```
|
|
[DBG] Outside cleanup window (02:00:00-06:00:00 UTC), skipping retention enforcement
|
|
```
|
|
|
|
## Event Replay API
|
|
|
|
The Event Replay API enables rebuilding projections, reprocessing events, and time-travel debugging by replaying historical events from persistent streams.
|
|
|
|
### Installation
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
```
|
|
|
|
The event replay feature is included in the PostgreSQL event streaming package.
|
|
|
|
### Setup
|
|
|
|
Register the event replay service:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
|
|
|
|
// Add event replay service
|
|
builder.Services.AddPostgresEventReplay();
|
|
```
|
|
|
|
### Usage
|
|
|
|
#### Replay from Offset
|
|
|
|
Replay events starting from a specific sequence number:
|
|
|
|
```csharp
|
|
var replayService = serviceProvider.GetRequiredService<IEventReplayService>();
|
|
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync(
|
|
streamName: "orders",
|
|
startOffset: 1000,
|
|
options: new ReplayOptions
|
|
{
|
|
BatchSize = 100,
|
|
MaxEventsPerSecond = 1000, // Rate limit to 1000 events/sec
|
|
ProgressCallback = progress =>
|
|
{
|
|
Console.WriteLine($"Progress: {progress.EventsProcessed} events " +
|
|
$"({progress.ProgressPercentage:F1}%) " +
|
|
$"@ {progress.EventsPerSecond:F0} events/sec");
|
|
}
|
|
}))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Replay from Time
|
|
|
|
Replay events starting from a specific timestamp:
|
|
|
|
```csharp
|
|
var startTime = DateTimeOffset.UtcNow.AddDays(-7);
|
|
|
|
await foreach (var @event in replayService.ReplayFromTimeAsync(
|
|
streamName: "orders",
|
|
startTime: startTime,
|
|
options: new ReplayOptions
|
|
{
|
|
MaxEvents = 10000,
|
|
EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" }
|
|
}))
|
|
{
|
|
await RebuildProjectionAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Replay Time Range
|
|
|
|
Replay events within a specific time window:
|
|
|
|
```csharp
|
|
var startTime = DateTimeOffset.UtcNow.AddDays(-7);
|
|
var endTime = DateTimeOffset.UtcNow.AddDays(-6);
|
|
|
|
await foreach (var @event in replayService.ReplayTimeRangeAsync(
|
|
streamName: "analytics",
|
|
startTime: startTime,
|
|
endTime: endTime,
|
|
options: new ReplayOptions
|
|
{
|
|
EventTypeFilter = new[] { "PageView", "Click" },
|
|
ProgressInterval = 500
|
|
}))
|
|
{
|
|
await ProcessAnalyticsEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Replay All Events
|
|
|
|
Replay entire stream from the beginning:
|
|
|
|
```csharp
|
|
await foreach (var @event in replayService.ReplayAllAsync(
|
|
streamName: "orders",
|
|
options: new ReplayOptions
|
|
{
|
|
BatchSize = 1000,
|
|
MaxEventsPerSecond = 5000
|
|
}))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Get Replay Count
|
|
|
|
Get the total number of events that would be replayed:
|
|
|
|
```csharp
|
|
var count = await replayService.GetReplayCountAsync(
|
|
streamName: "orders",
|
|
startOffset: 1000,
|
|
options: new ReplayOptions
|
|
{
|
|
EventTypeFilter = new[] { "OrderPlaced" }
|
|
});
|
|
|
|
Console.WriteLine($"Will replay {count} OrderPlaced events");
|
|
```
|
|
|
|
### Replay Options
|
|
|
|
Control replay behavior with `ReplayOptions`:
|
|
|
|
```csharp
|
|
var options = new ReplayOptions
|
|
{
|
|
// Batch size for reading from database (default: 100)
|
|
BatchSize = 100,
|
|
|
|
// Maximum events to replay (default: null = unlimited)
|
|
MaxEvents = 10000,
|
|
|
|
// Rate limiting in events/second (default: null = unlimited)
|
|
MaxEventsPerSecond = 1000,
|
|
|
|
// Filter by event types (default: null = all types)
|
|
EventTypeFilter = new[] { "OrderPlaced", "OrderShipped" },
|
|
|
|
// Include metadata in events (default: true)
|
|
IncludeMetadata = true,
|
|
|
|
// Progress callback (default: null)
|
|
ProgressCallback = progress =>
|
|
{
|
|
Console.WriteLine($"{progress.EventsProcessed} events processed");
|
|
},
|
|
|
|
// How often to invoke progress callback (default: 1000)
|
|
ProgressInterval = 1000
|
|
};
|
|
```
|
|
|
|
### Progress Tracking
|
|
|
|
Monitor replay progress with callbacks:
|
|
|
|
```csharp
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync(
|
|
streamName: "orders",
|
|
startOffset: 0,
|
|
options: new ReplayOptions
|
|
{
|
|
ProgressCallback = progress =>
|
|
{
|
|
Console.WriteLine($@"
|
|
Replay Progress:
|
|
Current Offset: {progress.CurrentOffset}
|
|
Events Processed: {progress.EventsProcessed:N0}
|
|
Estimated Total: {progress.EstimatedTotal:N0}
|
|
Progress: {progress.ProgressPercentage:F1}%
|
|
Rate: {progress.EventsPerSecond:F0} events/sec
|
|
Elapsed: {progress.Elapsed}
|
|
Current Event Time: {progress.CurrentTimestamp}");
|
|
},
|
|
ProgressInterval = 1000
|
|
}))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Rate Limiting
|
|
|
|
Control replay speed to avoid overwhelming consumers:
|
|
|
|
```csharp
|
|
// Replay at maximum 500 events per second
|
|
await foreach (var @event in replayService.ReplayFromOffsetAsync(
|
|
streamName: "high-volume-stream",
|
|
startOffset: 0,
|
|
options: new ReplayOptions
|
|
{
|
|
MaxEventsPerSecond = 500
|
|
}))
|
|
{
|
|
await ProcessEventSlowlyAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Event Type Filtering
|
|
|
|
Replay only specific event types:
|
|
|
|
```csharp
|
|
// Only replay order-related events
|
|
await foreach (var @event in replayService.ReplayAllAsync(
|
|
streamName: "orders",
|
|
options: new ReplayOptions
|
|
{
|
|
EventTypeFilter = new[]
|
|
{
|
|
"OrderPlaced",
|
|
"OrderShipped",
|
|
"OrderDelivered"
|
|
}
|
|
}))
|
|
{
|
|
await UpdateOrderProjectionAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Key Features
|
|
|
|
- **Offset-based Replay**: Replay from specific sequence numbers
|
|
- **Time-based Replay**: Replay from specific timestamps
|
|
- **Time Range Replay**: Replay events within time windows
|
|
- **Event Type Filtering**: Replay only specific event types
|
|
- **Rate Limiting**: Control replay speed with token bucket algorithm
|
|
- **Progress Tracking**: Monitor replay with callbacks and metrics
|
|
- **Batching**: Efficient streaming with configurable batch sizes
|
|
- **Cancellation Support**: Full CancellationToken support
|
|
|
|
### Common Use Cases
|
|
|
|
#### Rebuilding Projections
|
|
|
|
```csharp
|
|
// Rebuild entire read model from scratch
|
|
await foreach (var @event in replayService.ReplayAllAsync("orders"))
|
|
{
|
|
await projectionUpdater.ApplyAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Reprocessing After Bug Fixes
|
|
|
|
```csharp
|
|
// Reprocess last 24 hours after fixing handler bug
|
|
var yesterday = DateTimeOffset.UtcNow.AddDays(-1);
|
|
|
|
await foreach (var @event in replayService.ReplayFromTimeAsync("orders", yesterday))
|
|
{
|
|
await fixedHandler.HandleAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Creating New Projections
|
|
|
|
```csharp
|
|
// Build new analytics projection from historical data
|
|
await foreach (var @event in replayService.ReplayAllAsync(
|
|
"user-activity",
|
|
options: new ReplayOptions
|
|
{
|
|
EventTypeFilter = new[] { "PageView", "Click", "Purchase" },
|
|
MaxEventsPerSecond = 10000 // Fast replay for batch processing
|
|
}))
|
|
{
|
|
await analyticsProjection.ApplyAsync(@event);
|
|
}
|
|
```
|
|
|
|
#### Time-Travel Debugging
|
|
|
|
```csharp
|
|
// Replay specific time period to debug issue
|
|
var bugStart = new DateTimeOffset(2025, 12, 10, 14, 30, 0, TimeSpan.Zero);
|
|
var bugEnd = new DateTimeOffset(2025, 12, 10, 15, 00, 0, TimeSpan.Zero);
|
|
|
|
await foreach (var @event in replayService.ReplayTimeRangeAsync(
|
|
"orders",
|
|
bugStart,
|
|
bugEnd))
|
|
{
|
|
await debugHandler.InspectAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Testing Resources
|
|
|
|
Comprehensive testing guide available: [POSTGRESQL-TESTING.md](POSTGRESQL-TESTING.md)
|
|
|
|
Topics covered:
|
|
- Docker PostgreSQL setup
|
|
- Persistent stream operations
|
|
- Ephemeral queue operations
|
|
- Visibility timeout testing
|
|
- Dead letter queue verification
|
|
- Performance testing
|
|
- Database schema inspection
|
|
|
|
## Stream Configuration
|
|
|
|
The Stream Configuration feature provides per-stream configuration capabilities for fine-grained control over retention policies, dead letter queues, lifecycle management, performance tuning, and access control. Each stream can have its own settings that override global defaults.
|
|
|
|
### Installation
|
|
|
|
Stream configuration is included in the PostgreSQL event streaming package:
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
```
|
|
|
|
### Setup
|
|
|
|
Register the stream configuration services:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresEventStreaming("Host=localhost;Database=events;...");
|
|
|
|
// Add stream configuration support
|
|
builder.Services.AddPostgresStreamConfiguration();
|
|
```
|
|
|
|
### Usage
|
|
|
|
#### Basic Stream Configuration
|
|
|
|
Configure retention policies for a specific stream:
|
|
|
|
```csharp
|
|
var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
|
|
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = "orders",
|
|
Description = "Order processing stream",
|
|
Tags = new Dictionary<string, string>
|
|
{
|
|
["domain"] = "orders",
|
|
["environment"] = "production"
|
|
},
|
|
Retention = new RetentionConfiguration
|
|
{
|
|
MaxAge = TimeSpan.FromDays(90),
|
|
MaxSizeBytes = 10L * 1024 * 1024 * 1024, // 10 GB
|
|
EnablePartitioning = true,
|
|
PartitionInterval = TimeSpan.FromDays(7)
|
|
},
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
CreatedBy = "admin"
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(config);
|
|
```
|
|
|
|
#### Dead Letter Queue Configuration
|
|
|
|
Configure error handling with dead letter queues:
|
|
|
|
```csharp
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = "payment-processing",
|
|
DeadLetterQueue = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = "payment-processing-dlq",
|
|
MaxDeliveryAttempts = 5,
|
|
RetryDelay = TimeSpan.FromMinutes(5),
|
|
StoreOriginalEvent = true,
|
|
StoreErrorDetails = true
|
|
},
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(config);
|
|
```
|
|
|
|
#### Lifecycle Management
|
|
|
|
Configure automatic archival and deletion:
|
|
|
|
```csharp
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = "audit-logs",
|
|
Lifecycle = new LifecycleConfiguration
|
|
{
|
|
AutoCreate = true,
|
|
AutoArchive = true,
|
|
ArchiveAfter = TimeSpan.FromDays(365),
|
|
ArchiveLocation = "s3://archive-bucket/audit-logs",
|
|
AutoDelete = false
|
|
},
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(config);
|
|
```
|
|
|
|
#### Performance Tuning
|
|
|
|
Configure performance-related settings:
|
|
|
|
```csharp
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = "high-throughput-events",
|
|
Performance = new PerformanceConfiguration
|
|
{
|
|
BatchSize = 1000,
|
|
EnableCompression = true,
|
|
CompressionAlgorithm = "gzip",
|
|
EnableIndexing = true,
|
|
IndexedFields = new List<string> { "userId", "tenantId", "eventType" },
|
|
CacheSize = 10000
|
|
},
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(config);
|
|
```
|
|
|
|
#### Access Control
|
|
|
|
Configure stream permissions and quotas:
|
|
|
|
```csharp
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = "sensitive-data",
|
|
AccessControl = new AccessControlConfiguration
|
|
{
|
|
PublicRead = false,
|
|
PublicWrite = false,
|
|
AllowedReaders = new List<string> { "admin", "audit-service" },
|
|
AllowedWriters = new List<string> { "admin", "data-ingestion-service" },
|
|
MaxConsumerGroups = 5,
|
|
MaxEventsPerSecond = 10000
|
|
},
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(config);
|
|
```
|
|
|
|
#### Getting Effective Configuration
|
|
|
|
Retrieve the effective configuration (stream-specific merged with defaults):
|
|
|
|
```csharp
|
|
var configProvider = serviceProvider.GetRequiredService<IStreamConfigurationProvider>();
|
|
|
|
// Gets merged configuration (stream-specific + global defaults)
|
|
var effectiveConfig = await configProvider.GetEffectiveConfigurationAsync("orders");
|
|
|
|
// Get specific configuration sections
|
|
var retention = await configProvider.GetRetentionConfigurationAsync("orders");
|
|
var dlq = await configProvider.GetDeadLetterQueueConfigurationAsync("orders");
|
|
var lifecycle = await configProvider.GetLifecycleConfigurationAsync("orders");
|
|
```
|
|
|
|
#### Finding Configurations
|
|
|
|
Query configurations by criteria:
|
|
|
|
```csharp
|
|
// Find all streams with archiving enabled
|
|
var archivingStreams = await configStore.FindConfigurationsAsync(
|
|
c => c.Lifecycle?.AutoArchive == true);
|
|
|
|
// Find all production streams
|
|
var productionStreams = await configStore.FindConfigurationsAsync(
|
|
c => c.Tags?.ContainsKey("environment") == true &&
|
|
c.Tags["environment"] == "production");
|
|
|
|
// Get all configurations
|
|
var allConfigs = await configStore.GetAllConfigurationsAsync();
|
|
```
|
|
|
|
#### Deleting Configuration
|
|
|
|
Remove stream-specific configuration (reverts to defaults):
|
|
|
|
```csharp
|
|
await configStore.DeleteConfigurationAsync("orders");
|
|
```
|
|
|
|
### Configuration Options
|
|
|
|
#### RetentionConfiguration
|
|
|
|
- **MaxAge**: Maximum age before cleanup (e.g., `TimeSpan.FromDays(90)`)
|
|
- **MaxSizeBytes**: Maximum storage size before cleanup
|
|
- **MaxEventCount**: Maximum number of events before cleanup
|
|
- **EnablePartitioning**: Enable table partitioning for better performance
|
|
- **PartitionInterval**: Partition interval (e.g., daily, weekly)
|
|
|
|
#### DeadLetterQueueConfiguration
|
|
|
|
- **Enabled**: Enable DLQ for this stream
|
|
- **DeadLetterStreamName**: Name of DLQ stream (defaults to `{StreamName}-dlq`)
|
|
- **MaxDeliveryAttempts**: Attempts before sending to DLQ (default: 3)
|
|
- **RetryDelay**: Delay between retry attempts
|
|
- **StoreOriginalEvent**: Store original event in DLQ
|
|
- **StoreErrorDetails**: Store error details in DLQ
|
|
|
|
#### LifecycleConfiguration
|
|
|
|
- **AutoCreate**: Automatically create stream if it doesn't exist
|
|
- **AutoArchive**: Automatically archive old events
|
|
- **ArchiveAfter**: Age after which events are archived
|
|
- **ArchiveLocation**: Storage location for archived events
|
|
- **AutoDelete**: Automatically delete old events
|
|
- **DeleteAfter**: Age after which events are deleted
|
|
|
|
#### PerformanceConfiguration
|
|
|
|
- **BatchSize**: Batch size for bulk operations
|
|
- **EnableCompression**: Enable event compression
|
|
- **CompressionAlgorithm**: Compression algorithm (e.g., "gzip", "zstd")
|
|
- **EnableIndexing**: Enable metadata field indexing
|
|
- **IndexedFields**: List of fields to index
|
|
- **CacheSize**: Cache size for frequently accessed events
|
|
|
|
#### AccessControlConfiguration
|
|
|
|
- **PublicRead**: Allow public read access
|
|
- **PublicWrite**: Allow public write access
|
|
- **AllowedReaders**: List of authorized readers
|
|
- **AllowedWriters**: List of authorized writers
|
|
- **MaxConsumerGroups**: Maximum consumer groups allowed
|
|
- **MaxEventsPerSecond**: Rate limit for events per second
|
|
|
|
### Key Features
|
|
|
|
- **Per-Stream Configuration**: Override global settings per stream
|
|
- **Retention Policies**: Configure retention per stream
|
|
- **Dead Letter Queues**: Error handling with configurable retry logic
|
|
- **Lifecycle Management**: Automatic archival and deletion
|
|
- **Performance Tuning**: Optimize batch sizes, compression, and indexing
|
|
- **Access Control**: Stream-level permissions and quotas
|
|
- **Configuration Merging**: Stream-specific settings override global defaults
|
|
- **Tag-Based Filtering**: Categorize and query streams by tags
|
|
|
|
### Common Use Cases
|
|
|
|
#### Multi-Tenant Configuration
|
|
|
|
```csharp
|
|
// High-value tenant with extended retention
|
|
var premiumConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "tenant-acme-corp",
|
|
Tags = new Dictionary<string, string> { ["tier"] = "premium" },
|
|
Retention = new RetentionConfiguration
|
|
{
|
|
MaxAge = TimeSpan.FromDays(365) // 1 year retention
|
|
}
|
|
};
|
|
|
|
// Standard tenant with shorter retention
|
|
var standardConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "tenant-small-co",
|
|
Tags = new Dictionary<string, string> { ["tier"] = "standard" },
|
|
Retention = new RetentionConfiguration
|
|
{
|
|
MaxAge = TimeSpan.FromDays(30) // 30 days retention
|
|
}
|
|
};
|
|
```
|
|
|
|
#### Environment-Specific Settings
|
|
|
|
```csharp
|
|
// Production: strict retention and DLQ
|
|
var prodConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "orders-prod",
|
|
Tags = new Dictionary<string, string> { ["environment"] = "production" },
|
|
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) },
|
|
DeadLetterQueue = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
MaxDeliveryAttempts = 5
|
|
}
|
|
};
|
|
|
|
// Development: relaxed settings
|
|
var devConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "orders-dev",
|
|
Tags = new Dictionary<string, string> { ["environment"] = "development" },
|
|
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) },
|
|
DeadLetterQueue = new DeadLetterQueueConfiguration { Enabled = false }
|
|
};
|
|
```
|
|
|
|
#### Domain-Specific Configuration
|
|
|
|
```csharp
|
|
// Audit logs: long retention, auto-archive
|
|
var auditConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "audit-logs",
|
|
Tags = new Dictionary<string, string> { ["domain"] = "security" },
|
|
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(2555) }, // 7 years
|
|
Lifecycle = new LifecycleConfiguration
|
|
{
|
|
AutoArchive = true,
|
|
ArchiveAfter = TimeSpan.FromDays(365),
|
|
ArchiveLocation = "s3://archive/audit"
|
|
}
|
|
};
|
|
|
|
// Analytics: high-throughput, short retention
|
|
var analyticsConfig = new StreamConfiguration
|
|
{
|
|
StreamName = "page-views",
|
|
Tags = new Dictionary<string, string> { ["domain"] = "analytics" },
|
|
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(30) },
|
|
Performance = new PerformanceConfiguration
|
|
{
|
|
BatchSize = 10000,
|
|
EnableCompression = true
|
|
}
|
|
};
|
|
```
|
|
|
|
# 2024-2025 Roadmap
|
|
|
|
| Task | Description | Status |
|
|
|----------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|--------|
|
|
| Support .NET 8 | Ensure compatibility with .NET 8. | ✅ |
|
|
| Support .NET 10 | Upgrade to .NET 10 with C# 14 language support. | ✅ |
|
|
| Update FluentValidation | Upgrade FluentValidation to version 11.x for .NET 10 compatibility. | ✅ |
|
|
| Add gRPC Support with source generators | Implement gRPC endpoints with source generators and Google Rich Error Model for validation. | ✅ |
|
|
| Create a demo project (Svrnty.CQRS.Grpc.Sample) | Develop a comprehensive demo project showcasing gRPC and HTTP endpoints. | ✅ |
|
|
| Event Streaming - Phase 1: gRPC Bidirectional Streaming | Implement gRPC bidirectional streaming for real-time event delivery. | ✅ |
|
|
| Event Streaming - Phase 2.1: Storage Abstractions | Define IEventStreamStore interface for persistent and ephemeral streams. | ✅ |
|
|
| Event Streaming - Phase 2.2: PostgreSQL Storage | Implement PostgreSQL-backed storage with persistent streams, message queues, and DLQ. | ✅ |
|
|
| Event Streaming - Phase 2.3: Consumer Offset Tracking | Implement consumer group coordination and offset management for persistent streams. | ✅ |
|
|
| Event Streaming - Phase 2.4: Retention Policies | Add time-based and size-based retention with automatic cleanup and table partitioning. | ✅ |
|
|
| Event Streaming - Phase 2.5: Event Replay API | Add APIs for replaying events from specific offsets and time ranges. | ✅ |
|
|
| Event Streaming - Phase 2.6: Stream Configuration | Per-stream configuration for retention, DLQ, and lifecycle management. | ✅ |
|
|
| Create a website for the Framework | Develop a website to host comprehensive documentation for the framework. | ⬜️ |
|
|
|
|
# 2026 Roadmap
|
|
|
|
| Task | Description | Status |
|
|
|----------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|--------|
|
|
| gRPC Compression Support | Smart message compression with automatic threshold detection and per-handler control. | ⬜️ |
|
|
| gRPC Metadata & Authorization Support | Expose ServerCallContext to handlers and integrate authorization services for gRPC endpoints. | ⬜️ | |