| .. | ||
| access-control.md | ||
| dead-letter-queues.md | ||
| lifecycle-config.md | ||
| performance-config.md | ||
| README.md | ||
| retention-config.md | ||
Stream Configuration
Per-stream configuration for fine-grained control over retention, dead letter queues, lifecycle, performance, and access control.
Overview
Stream configuration allows you to customize behavior on a per-stream basis, enabling:
- Retention Policies - Time, size, and count-based retention per stream
- Dead Letter Queues - Error handling and retry logic
- Lifecycle Management - Automatic archival and deletion
- Performance Tuning - Batch sizes, compression, indexing
- Access Control - Stream-level permissions and rate limits
Key Features:
- ✅ Per-Stream Settings - Override global configuration per stream
- ✅ Retention Control - MaxAge, MaxSizeBytes, MaxEventCount
- ✅ DLQ Configuration - Automatic retry and dead letter handling
- ✅ Auto-Archival - Move old events to cold storage
- ✅ Performance Options - Batching, compression, indexing
- ✅ Access Control - Stream-level permissions
- ✅ Tag-Based Filtering - Organize streams by tags
Quick Start
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.PostgreSQL;
var builder = WebApplication.CreateBuilder(args);
// Register stream configuration
builder.Services.AddPostgresStreamConfiguration();
var app = builder.Build();
// Configure stream
var configStore = app.Services.GetRequiredService<IStreamConfigurationStore>();
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "orders",
Retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(90),
MaxEventCount = 1000000
},
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 5
}
});
app.Run();
Configuration Components
Retention Configuration
Control event retention per stream:
var retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(30), // Keep 30 days
MaxSizeBytes = 10L * 1024 * 1024 * 1024, // 10 GB limit
MaxEventCount = 1000000, // Keep last 1M events
EnablePartitioning = true, // Partition by time
PartitionInterval = PartitionInterval.Daily
};
Learn more about Retention Configuration →
Dead Letter Queue Configuration
Handle failed events automatically:
var dlq = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "orders-dlq", // Custom DLQ stream
MaxDeliveryAttempts = 5, // Retry 5 times
RetryDelay = TimeSpan.FromMinutes(5), // Wait 5 min between retries
EnableExponentialBackoff = true // Exponential retry delays
};
Learn more about Dead Letter Queues →
Lifecycle Configuration
Automate archival and cleanup:
var lifecycle = new LifecycleConfiguration
{
AutoCreate = true, // Create stream on first append
AutoArchive = true, // Enable archival
ArchiveAfter = TimeSpan.FromDays(365), // Archive after 1 year
ArchiveLocation = "s3://archive/orders", // S3 bucket
AutoDelete = true, // Delete after archive
DeleteAfter = TimeSpan.FromDays(400) // Delete 400 days old
};
Learn more about Lifecycle Configuration →
Performance Configuration
Optimize stream performance:
var performance = new PerformanceConfiguration
{
BatchSize = 1000, // Read 1000 events per query
EnableCompression = true, // Compress event data
CompressionAlgorithm = "gzip",
EnableIndexing = true, // Index metadata fields
IndexedFields = new List<string> { "userId", "tenantId" },
CacheSize = 10000 // Cache last 10k events
};
Learn more about Performance Configuration →
Access Control Configuration
Control stream access:
var accessControl = new AccessControlConfiguration
{
PublicRead = false, // Require authentication
PublicWrite = false,
AllowedReaders = new List<string> { "admin", "order-service" },
AllowedWriters = new List<string> { "order-service" },
MaxConsumerGroups = 10, // Limit consumer groups
MaxEventsPerSecond = 10000 // Rate limit writes
};
Learn more about Access Control →
Complete Configuration Example
using Svrnty.CQRS.Events.Abstractions;
// High-volume production stream
var orderConfig = new StreamConfiguration
{
StreamName = "orders",
Description = "Production order events",
Tags = new List<string> { "production", "critical", "orders" },
Retention = new RetentionConfiguration
{
MaxAge = TimeSpan.FromDays(90),
MaxSizeBytes = 50L * 1024 * 1024 * 1024, // 50 GB
MaxEventCount = 10000000,
EnablePartitioning = true,
PartitionInterval = PartitionInterval.Daily
},
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "orders-dlq",
MaxDeliveryAttempts = 5,
RetryDelay = TimeSpan.FromMinutes(5),
EnableExponentialBackoff = true
},
Lifecycle = new LifecycleConfiguration
{
AutoCreate = true,
AutoArchive = true,
ArchiveAfter = TimeSpan.FromDays(90),
ArchiveLocation = "s3://prod-archive/orders",
AutoDelete = false // Keep in archive indefinitely
},
Performance = new PerformanceConfiguration
{
BatchSize = 1000,
EnableCompression = true,
CompressionAlgorithm = "gzip",
EnableIndexing = true,
IndexedFields = new List<string> { "userId", "orderId", "tenantId" },
CacheSize = 100000
},
AccessControl = new AccessControlConfiguration
{
PublicRead = false,
PublicWrite = false,
AllowedReaders = new List<string> { "admin", "order-service", "analytics-service" },
AllowedWriters = new List<string> { "order-service" },
MaxConsumerGroups = 20,
MaxEventsPerSecond = 50000
}
};
await configStore.SetConfigurationAsync(orderConfig);
Getting Effective Configuration
Merge stream-specific and global settings:
var configProvider = serviceProvider.GetRequiredService<IStreamConfigurationProvider>();
// Get effective configuration (stream-specific merged with defaults)
var effectiveConfig = await configProvider.GetEffectiveConfigurationAsync("orders");
Console.WriteLine($"Retention: {effectiveConfig.Retention.MaxAge}");
Console.WriteLine($"DLQ Enabled: {effectiveConfig.DeadLetterQueue.Enabled}");
Console.WriteLine($"Batch Size: {effectiveConfig.Performance.BatchSize}");
Configuration Precedence
Configuration is resolved in this order:
- Stream-Specific Configuration - Highest priority
- Global Configuration - Fallback for missing values
- Framework Defaults - Built-in defaults
// Example: Stream-specific overrides global
// Global: MaxAge = 30 days
// Stream "orders": MaxAge = 90 days
// Effective for "orders": MaxAge = 90 days
// Global: BatchSize = 100
// Stream "orders": BatchSize not set
// Effective for "orders": BatchSize = 100 (from global)
Managing Configuration
Set Configuration
var config = new StreamConfiguration
{
StreamName = "analytics",
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) }
};
await configStore.SetConfigurationAsync(config);
Get Configuration
var config = await configStore.GetConfigurationAsync("analytics");
if (config == null)
{
Console.WriteLine("No configuration found, using defaults");
}
Update Configuration
var config = await configStore.GetConfigurationAsync("analytics");
if (config != null)
{
config.Retention.MaxAge = TimeSpan.FromDays(14); // Update retention
await configStore.SetConfigurationAsync(config);
}
Delete Configuration
await configStore.DeleteConfigurationAsync("analytics");
// Stream will now use global configuration
Configuration by Environment
Different settings for dev vs production:
var environment = builder.Environment.EnvironmentName;
var retention = environment == "Production"
? new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) }
: new RetentionConfiguration { MaxAge = TimeSpan.FromDays(7) };
var config = new StreamConfiguration
{
StreamName = "orders",
Retention = retention
};
await configStore.SetConfigurationAsync(config);
Multi-Tenant Configuration
Per-tenant stream configuration:
// Tenant A - high retention
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "tenant-a-orders",
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(365) },
Tags = new List<string> { "tenant-a", "premium" }
});
// Tenant B - standard retention
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "tenant-b-orders",
Retention = new RetentionConfiguration { MaxAge = TimeSpan.FromDays(90) },
Tags = new List<string> { "tenant-b", "standard" }
});
Querying by Tags
// Find all production streams
var prodStreams = await configStore.GetStreamsByTagAsync("production");
foreach (var config in prodStreams)
{
Console.WriteLine($"Production stream: {config.StreamName}");
}
Best Practices
✅ DO
- Use tags to organize streams
- Set retention appropriate for data type
- Enable DLQ for critical streams
- Configure archival for compliance
- Use compression for large events
- Index fields used in queries
- Limit consumer groups per stream
- Test configuration in development first
❌ DON'T
- Don't use the same configuration for all streams
- Don't set retention too short for audit logs
- Don't disable DLQ for critical streams
- Don't forget to configure archival location
- Don't over-index (impacts write performance)
- Don't allow unlimited consumer groups
- Don't forget environment-specific settings